1. Create a Lambda function in AWS Lambda.
2. Create a lambda trigger in AWS Lambda and attach bucket name, foldername and file type.
Lambda function code:
import json
import boto3
def lambda_handler(event, context):
# TODO implement
glue = boto3.client("glue")
file_name = event['Records'][0]['s3']['object']['key']
bucket_name = event['Records'][0]['s3']['bucket']['name']
print("File Name : ", file_name)
print("Bucket Name : ",bucket_name)
response = glue.start_job_run(JobName = "jsonjob", Arguments = {"--file":file_name,"--bucket":bucket_name))
print("Lambda invoke")
AWS Glue job Code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
#args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = getResolvedOptions(sys.argv, ["s3path","bucket"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
from pyspark.sql.functions import *
file_name=args['s3path']
bucket_name=args['bucket']
print("Bucket Name" , bucket_name)
print("File Name" , file_name)
data="s3://{}/{}".format(bucket_name,file_name)
print("Input File Path : ",data);
#data="s3://ravi2023poc/bank/bank-full.csv"
adf=spark.read.format("csv").option("header","true").option("inferSchema","true").option("sep",";").load(data)
adf.show()
res=adf.where(col("age")>60)
#op="s3://ravi2023poc/output/bank"
#res.write.format("csv").option("header","true").save(op)
host="jdbc:mysql://mysqldb.cwkqaojgxfrd.ap-south-1.rds.amazonaws.com:3306/newdb"
res.write.mode("append").format("jdbc").option("url",host).option("user","myuser").option("password","mypassword").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","liveinc").save()
--
No comments:
Post a Comment