Thursday, November 30, 2023

Predicate Push down in Spark

 Predicate pushdown is a technique in Apache Spark where filters (predicates) are pushed down to the data source level, reducing the amount of data that needs to be processed. This can significantly improve query performance by filtering out unnecessary data early in the data retrieval process.

Let's consider an example with Spark SQL and a DataFrame reading data from a source like Apache Parquet:


from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder.appName("PredicatePushdownExample").getOrCreate() # Assume you have a Parquet file with a schema containing columns: id, name, age, and city parquet_path = "/path/to/your/parquet/data" # Read data from the Parquet file df = spark.read.parquet(parquet_path) # Example of predicate pushdown filtered_df = df.filter(df["age"] > 25) # Show the resulting DataFrame filtered_df.show() # Stop the Spark session spark.stop()


In this example, the filter operation applies a condition (age > 25) on the DataFrame df. When Spark encounters this filter, it can push the filter down to the Parquet data source, meaning that the Parquet reader will only read and process the data that satisfies the condition. This is an optimization because it reduces the amount of data read from disk and processed in-memory.

Behind the scenes, Spark's Catalyst optimizer analyzes the DataFrame operations and tries to push down predicates to the data source whenever possible. The specific capabilities depend on the data source. For example, Parquet supports predicate pushdown, as do many other data sources like ORC and Delta Lake.

Always keep in mind that the effectiveness of predicate pushdown depends on the underlying data source and the type of operations being performed. It's a powerful optimization technique, especially when dealing with large datasets, as it minimizes the amount of data that needs to be processed in a Spark job.

Spark Optimization Techniques

 1. Partitioning:

Ensure that your data is well-partitioned. Adjust the number of partitions using repartition and coalesce to optimize data distribution across nodes.

2. Broadcast hash Join :

Use Broadcast hash join for small tables that can fit in memory. This reduces the need for expensive shuffling in join operations. 

3. Caching and Persistence:

Cache intermediate results or frequently used datasets using persist or cache to avoid recomputation.

4.Memory Tuning:

Adjust the memory configurations for executors and the driver to optimize the memory usage . Balance between executor memory , storage memory and the shuffle memory . 

5.Data Serialization:

Choose efficient serialization formats (e.g., Kryo, Avro) to reduce the size of data transmitted over the network.

6.Shuffling Optimization:

Minimize data shuffling by using operations like reduceByKey instead of groupByKey. Optimize joins, aggregations and group by operations 

Adjust the shuffle partition size using spark.sql.shuffle.partitions.

7.Dynamic Allocation:

Enable dynamic allocation (spark.dynamicAllocation.enabled) to adjust the number of executors dynamically based on workload.This can help in resource utilization. 

8.Code Optimization:

Write efficient Spark transformations and actions. Minimize the use of unnecessary collect and repartition operations.Minimize the use of custom UDFs and prefer built-in Spark functions when possible.

9.Hardware Considerations:

Properly provision hardware resources , considering factors like CPU , memory to match the workload and data processing requirement .

10. Column Pruning:

When reading data only select the columns needed for processing to reduce  I/O and memory requirements .

11.Predicate Pushdown :

Predicate pushdown is a technique in Apache Spark where filters (predicates) are pushed down to the data source level, reducing the amount of data that needs to be processed. This can significantly improve query performance by filtering out unnecessary data early in the data retrieval process.

# Example of predicate pushdown

filtered_df = df.filter(df["age"] > 25)

In this example, the filter operation applies a condition (age > 25) on the DataFrame df. When Spark encounters this filter, it can push the filter down to the Parquet data source, meaning that the Parquet reader will only read and process the data that satisfies the condition. This is an optimization because it reduces the amount of data read from disk and processed in-memory.

12.Parquet file Format:

Choose efficient file formats like Parquet, which provides columnar storaand can significantly reduce I/O and query performance . 

13.Task Paralleism:

Increase the level of parallelism by optimiziing the number of tasks. This involves adjusting the number of partitions and degree of paralleism in operations.

14. Monitor and Tune Spark UI:

Monitor the jobs and identify the bottlenecks. Tune configuration based on the insight gained from monitoring.

Python to Mysql Connectivity

 import mysql.connector

connection = mysql.connector.connect( host="localhost", user="yourusername", password="yourpassword", database="mydatabase" ) mycursor = connection.cursor() mycursor.execute("SELECT name, address FROM customers") myresult = mycursor.fetchall() for x in myresult: print(x)

Count words in a file in Python program

 with open ('E:\\source_files\\file1.txt','r') as file:

    #Read the entire file
content = file.read()
print(content)

#C:\bigdata\anaconda3\envs\py38\python.exe C:\bigdata\PycharmProjects\Test_Project\test1.py

#Bangalore is Karnataka
#Chennai is Tamilnadu

#split the content into words
words = content.split()
print(f'output after split is :{words}')
# output after split is :['Bangalore', 'is', 'Karnataka', 'Chennai', 'is', 'Tamilnadu']

#count the number of words:
word_count = len(words)
print(f'final output count is :{word_count}')
#final output count is :6

Tuesday, November 14, 2023

#1st Simple Glue Job

 #1st Simple Glue Job

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.functions import * from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session data ="s3://rishita2023poc/input/bank-full.csv" # \ Slash symobol should be given after Parenthesis closure bracket but before '.' symobol. adf= spark.read.format ("csv")\ .option("header","true")\ .option("inferSchema","true")\ .option("sep",";")\ .load(data) adf.show() res=adf.where(col("age")>60) op="s3://rishita2023poc/output" res.write.format("csv") \ .option("header","true")\ .mode("overwrite")\ .save(op) ''' ### To Store the output to MYSQL RDS db ,add the dependency jar file from S3 Bucket path in dependency section. #Dependent Jars path: s3://rishita2023poc/drivers/mysql-connector-java-8.0.12.jar host="jdbc:mysql://mysqldb.cwkqaojgxfrd.ap-south-1.rds.amazonaws.com:3306/newdb" res.write.format("jdbc")\ .option("url",host)\ .option("user","myuser")\ .option("password","mypassword")\ .option("driver","com.mysql.cj.jdbc.Driver")\ .option("dbtable","gluebank60")\ .save() ''' job = Job(glueContext) job.init(args['JOB_NAME'], args) job.commit()