Friday, December 1, 2023

AWS Lambda Function to trigger the Glue job if new file arrives in S3

1. Create a Lambda function in AWS Lambda.

2. Create a lambda trigger in AWS Lambda and attach bucket name, foldername and file type.

Lambda function code:

import json import boto3 def lambda_handler(event, context): # TODO implement glue = boto3.client("glue") file_name = event['Records'][0]['s3']['object']['key'] bucket_name = event['Records'][0]['s3']['bucket']['name'] print("File Name : ", file_name) print("Bucket Name : ",bucket_name) response = glue.start_job_run(JobName = "jsonjob", Arguments = {"--file":file_name,"--bucket":bucket_name)) print("Lambda invoke")















AWS Glue job Code:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] #args = getResolvedOptions(sys.argv, ['JOB_NAME']) args = getResolvedOptions(sys.argv, ["s3path","bucket"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session from pyspark.sql.functions import * file_name=args['s3path'] bucket_name=args['bucket'] print("Bucket Name" , bucket_name) print("File Name" , file_name) data="s3://{}/{}".format(bucket_name,file_name) print("Input File Path : ",data); #data="s3://ravi2023poc/bank/bank-full.csv" adf=spark.read.format("csv").option("header","true").option("inferSchema","true").option("sep",";").load(data) adf.show() res=adf.where(col("age")>60) #op="s3://ravi2023poc/output/bank" #res.write.format("csv").option("header","true").save(op) host="jdbc:mysql://mysqldb.cwkqaojgxfrd.ap-south-1.rds.amazonaws.com:3306/newdb" res.write.mode("append").format("jdbc").option("url",host).option("user","myuser").option("password","mypassword").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","liveinc").save() --






No comments:

Post a Comment