from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, asc,desc
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, cume_dist, ntile
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, BooleanType, DateType, IntegerType
spark = SparkSession.builder.appName('Joins_On_Dataframe').master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
emppath = "E:\\source_files\\employees.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
deptpath = "E:\\source_files\\departments.csv"
deptdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(deptpath)
empdf.show(110)
empdf.select("*").show(truncate=False)
'''
Inlay Hints disable in Pycharm
DataFrame.count()
pyspark.sql.DataFrame.count() function is used to get the number of rows present in the DataFrame. count() is an action operation
that triggers the transformations to execute. Since transformations are lazy in nature they do not get executed until we call an action().
In the below example, empDF is a DataFrame object, and below is the detailed explanation.
DataFrame.count() -Returns the number of records in a DataFrame.
DataFrame.columns – Returns all column names of a DataFrame as a list.
len() – len() is a Python function that returns a number of elements present in a list.
len(DataFrame.columns) – Returns the number of columns in a DataFrame.
'''
# Get row count
rows = empdf.count()
# Get columns count
cols = len(empdf.columns)
print(f"DataFrame Dimensions : {(rows,cols)}")
print(f"DataFrame Rows count : {rows}")
print(f"DataFrame Columns count : {cols}")
# functions.count()
empdf.select(count(empdf.FIRST_NAME)).show()
empdf.select(count(empdf.FIRST_NAME), count(empdf.EMAIL)).show()
#empdf.agg({'FIRST_NAME':'count','EMAIL':'count'}).show()
empdf.select(length(col("FIRST_NAME"))).show()
empdf.select(upper(col("FIRST_NAME"))).show()
empdf.select(lower(col("FIRST_NAME"))).show()
empdf.select( concat_ws ('_',col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.select( concat ( col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.limit(5).select(col("employee_id"),col("first_name"),col("email")).show(truncate=False)
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
empdf.printSchema()
empdf.select( col("first_name"), col ("last_name") ).distinct().show()
empdf.select(trim(col("first_name"))).show()
empdf.select(substring(col("first_name"),2,4)).show()
empdf.sort("department_id","first_name").show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc()).select(col("employee_id"),col("email")).show(truncate=False)
empdf.sort(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.orderBy("department_id","FirsT_name").show(truncate=False)
empdf.orderBy(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc()).show(truncate=False)
empdf.orderBy(col("department_id").asc(),col("first_name").desc()).show(truncate=False)
empdf.select( coalesce ( col("employee_id"),col("salary"),col("manager_id") ) ).orderBy(col("employee_id").desc()).show()
empdf.select(current_date()).show()
print("Separatror #####################################################")
empdf.createOrReplaceTempView("EMP")
spark.sql("select employee_id, first_name,last_name, salary from EMP ORDER BY department_id asc").show(truncate=False)
print("Separatror ***************************")
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
print("Separatror ??????????????????????????????")
empdf.groupBy(col("department_id")).agg (sum("salary")).show(truncate=False)
'''
>>> empdf.groupBy().min("salary").show()
+-----------+
|min(salary)|
+-----------+
| 2100|
+-----------+
>>> empdf.groupBy().agg(min("salary")).show()
+-----------+
|min(salary)|
+-----------+
| 2100|
+-----------+
'''
'''
(empdf.groupBy(col("department_id")).agg( count("department_id").alias("count"),
sum("salary").alias("sum"),
max("Salary").alias("max"),
min("SAlary").alias("min"),
avg("SALARY").alias("avg")
).show()
)
'''
empdf.groupBy("department_id") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("commission_pct").alias("sum_commission_pct"), \
max("commission_pct").alias("max_bonus") \
) \
.show(truncate=False)
empdf.groupBy( col("department_id") ).agg(count("*").alias("cnt")).orderBy( col("cnt").desc() ).show(truncate=False)
empdf.where(col("department_id")==10).show(truncate=False)
empdf.where(col("FIRST_NAME")=="Jennifer").show(truncate=False)
print("Separatror #####################################################")
empdf.filter(col("FIRST_NAME")=="Kimberely").show(truncate=False)
empdf.filter(col("FIRST_NAME")=="Kimberely").select(col("employee_id"),col("email")).show(truncate=False)
empdf.filter ( col("first_name").like("Kar%") ).select(col("First_name"),col("Last_name")).show()
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id"),col("salary") ).show(truncate=False)
#Change DataType using PySpark withColumn()
empdf.withColumn("salary",col("salary").cast("Integer")).show()
#Update The Value of an Existing Column
empdf.withColumn("salary",col("salary")*10000).show()
#Add a New Column using withColumn()
empdf.withColumn("CopiedColumn",col("salary")* -1).show()
#Add a New Column using withColumn()
'''
In order to create a new column, pass the column name you wanted to the first argument of withColumn() transformation function.
Make sure this new column not already present on DataFrame,
if it presents it updates the value of that column.
On below snippet, PySpark lit() function is used to add a constant value to a DataFrame column.
We can also chain in order to add multiple columns.
'''
#empdf.withColumn("Country", lit("USA")).show()
empdf.withColumn("State", lit("California"))\
.withColumn("District",lit("Bonita"))\
.show()
#Rename Column Name To rename an existing column use withColumnRenamed() function on DataFrame.
empdf.withColumnRenamed("LAST_NAME","EMPLOYEE_LAST_NAME") \
.show(truncate=False)
#Drop Column From PySpark DataFrame
empdf.drop("salary") \
.show()
df2 = empdf.withColumn("PHONE_NUMBER",col("PHONE_NUMBER").cast(StringType() ) ) \
.withColumn("COMMISSION_PCT",col("COMMISSION_PCT").cast(BooleanType() ) ) \
.withColumn("HIRE_DATE",col("HIRE_DATE").cast(DateType() ) ) \
.withColumn("EMPLOYEE_ID",col("HIRE_DATE").cast(IntegerType()))
df2.printSchema()
empdf.select( col("first_name"),col("last_name") ).show()
empdf.select("first_name","last_name").show()
empdf.agg(min("salary")).show()
empdf.agg(max("salary")).show()
#empdf.avg("salary").show()
empdf.select(trim(col("FIRST_NAME"))).show()
empdf.select(col("salary").cast('long')).printSchema()
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
spark = SparkSession.builder.appName('casewhen').master("local[*]").getOrCreate()
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ," ")\
.otherwise("N/A"))
df2.show()
or
df2=df.select( col("*"),when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ,"")\
.otherwise("N/A").alias("new_gender") )
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ' '" +
"ELSE 'N/A' END"))
df3.show(truncate=False)
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END").alias("new_gender"))
df4.show()
df.createOrReplaceTempView("EMP")
spark.sql("select name, gender,salary,CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END as new_gender from EMP").show()
'''
empdf.filter ( col("first_name").like("Kar%") ).select(col("First_name"),col("Last_name")).show()
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id"),col("salary") ).show(truncate=False)
empdf.filter(col("commission_pct").isNull() ).select( col("employee_id"),col("First_name"),col("department_id"),col("salary"),col("commission_pct")).show(truncate=False)
empdf.filter(col("commission_pct").isNull() ).select("*").show(truncate=False)
empdf.filter(col("commission_pct").isNotNull() ).select("*").show(truncate=False)
empdf.filter( col("department_id").isin(20,30,40) ).select( col("employee_id"),col("First_name"),col("department_id"),col("salary"),col("commission_pct")).show(truncate=False)
empdf.filter( col("department_id").isin(20,30,40) ).select("*").show(truncate=False)
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Union_UnionAll').master("local[*]").getOrCreate()
simpleData = [("James","Sales","NY",90000,34,10000), \
("Michael","Sales","NY",86000,56,20000), \
("Robert","Sales","CA",81000,30,23000), \
("Maria","Finance","CA",90000,24,23000) \
]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
#df.printSchema()
#df.show(truncate=False)
simpleData2 = [("James","Sales","NY",90000,34,10000), \
("Maria","Finance","CA",90000,24,23000), \
("Jen","Finance","NY",79000,53,15000), \
("Jeff","Marketing","CA",80000,25,18000), \
("Kumar","Marketing","NY",91000,50,21000) \
]
columns2= ["employee_name","department","state","salary","age","bonus"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
'''
DataFrame union() method merges two DataFrames and returns the new DataFrame with all rows
from two Dataframes regardless of duplicate data. Both union and unionAll will give data including duplicates.
'''
#df2.printSchema()
#df2.show(truncate=False)
unionDF = df.union(df2)
unionDF.show(truncate=False)
'''
Since the union() method returns all rows without distinct records, we will use the distinct() function
to return just one record when duplicate exists.
'''
disDF = df.union(df2).distinct()
disDF.show(truncate=False)
'''
empdf.createOrReplaceTempView("cte1")
df_cte1 = spark.sql("select * from cte1 where salary > 15000")
df_cte1.show()
#df_cte1.write.format("parquet").saveAsTable("test")
#df_cte1.write.format("parquet").save("E:\\source_files\\out.parquet")
#df_cte1.write.option("header","False").format("parquet").save("E:\\source_files\\out.parquet")
df_cte1.write.mode("append").option("header","False").option("delimeter","|").format("csv").save("E:\\source_files\\out.csv")
windowSpec = Window.partitionBy("department_id").orderBy("salary")
empdf.withColumn("row_number",row_number().over(windowSpec)) \
.show(truncate=False)
empdf.withColumn("rank",rank().over(windowSpec)) \
.show()
empdf.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.show()
empdf.withColumn("lag",lag("salary",2).over(windowSpec)) \
.show()
empdf.withColumn("lead",lead("salary",2).over(windowSpec)) \
.show()
empdf.withColumn("cume_dist",cume_dist().over(windowSpec)) \
.show()
empdf.withColumn("ntile",ntile(2).over(windowSpec)) \
.show()
windowSpec = Window.partitionBy("department_id").orderBy("salary")
windowSpecAgg = Window.partitionBy("department_id")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
empdf.withColumn("row",row_number().over(windowSpec)) \
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
.where(col("row")==1).select("department_id","avg","sum","min","max") \
.show()
emppath = "E:\\source_files\\emp_duplicate.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
empdf.show()
#distinctDF = empdf.distinct()
#print("Distinct count: "+str(distinctDF.count()))
#distinctDF.show(truncate=False)
#df2 = empdf.dropDuplicates()
#print("Distinct count: "+str(df2.count()))
#df2.show(truncate=False)
# PySpark Distinct of Selected Multiple Columns
#Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed on specific columns.
dropDisDF = empdf.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
# Prepare Data
data = [("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000), \
("Saif", "Sales", 4100) \
]
# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
'''
######################################################
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, asc,desc
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, cume_dist, ntile
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.types import StringType, BooleanType, DateType, IntegerType
spark = SparkSession.builder.appName('Joins_On_Dataframe').master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
emppath = "E:\\source_files\\employees.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
deptpath = "E:\\source_files\\departments.csv"
deptdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(deptpath)
#empdf.show(110)
#empdf.select("*").show(truncate=False)
#1.select():
#empdf.select(col("employee_id"),col("first_name"), col("salary")).show(truncate=False)
#empdf.select("*").show(truncate=False)
#2.distinct():
'''
empdf.select( col("first_name"), col ("last_name") ).distinct().show()
distinctDF = empdf.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)
#3.where() equivalent WHERE in SQL:
empdf.where(col("department_id")==10).show(truncate=False)
empdf.where(col("FIRST_NAME")=="Jennifer").show(truncate=False)
#4.filter() equivalent WHERE in SQL:
#empdf.filter ( col("department_id")=="30" ).select ( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
#5.sort() equivalent ORDER BY in SQL :
empdf.sort(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc())\
.select(col("employee_id"),col("email")).show(truncate=False)
#6.count() equivalent count(*) in SQL
#DataFrame.count() -Returns the number of records in a DataFrame.
#DataFrame.columns – Returns all column names of a DataFrame as a list.
#len() – len() is a Python function that returns a number of elements present in a list.
#len(DataFrame.columns) – Returns the number of columns in a DataFrame.
empdf.select(count(empdf.FIRST_NAME)).show()
empdf.select(count(empdf.FIRST_NAME), count(empdf.EMAIL)).show()
# Get row count
rows = empdf.count()
# Get columns count
cols = len(empdf.columns)
print(f"DataFrame Dimensions : {(rows,cols)}")
print(f"DataFrame Rows count : {rows}")
print(f"DataFrame Columns count : {cols}")
#7.sum() equivalent SUM() in SQL
#empdf.agg(sum("salary")).show()
#empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
#8.avg() equivalent AVG() in SQL
#empdf.agg(avg("salary")).show()
#empdf.groupBy(col("department_id")).agg(avg("salary")).show(truncate=False)
#9.max() equivalent MAX() in SQL
empdf.agg(max("salary")).show()
empdf.groupBy(col("department_id")).agg(max("salary")).show(truncate=False)
#10.min() equivalent MIN() in SQL
empdf.agg(min("salary")).show()
empdf.groupBy(col("department_id")).agg(min("salary")).show(truncate=False)
#11.len() equivalent LEN() in SQL
empdf.select( length( col("FIRST_NAME") ) ).show()
#12.upper() equivalent UPPER() in SQL
empdf.select(upper(col("FIRST_NAME"))).show()
#13.lower() equivalent LOWER() in SQL
empdf.select(lower(col("FIRST_NAME"))).show()
#14.initcap() equivalent INITCAP() in SQL
empdf.select(initcap(col("FIRST_NAME"))).show()
#15.concat() equivalent CONCAT() in SQL
empdf.select( concat ( col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.select( concat_ws ('_',col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
#16.trim() equivalent TRIM() in SQL
empdf.select(trim(col("first_name"))).show()
#17.substring() equivalent SUBSTR() in SQL
#we can extract a substring or slice of a string from the DataFrame column by providing the position
#and length of the string you wanted to slice.
#substring(str, starting position, length)
#Note: Please note that the position is not zero based, but 1 based index.
#empdf.select(substring(col("first_name"),2,4)).show()
empdf.withColumn("CopiedColumn",col("salary")* -1).show()
data = [(1,"20200828"),(2,"20180525")]
columns=["id","date"]
df=spark.createDataFrame(data,columns)
df.printSchema()
df.withColumn("year", substring(col("date"), 1,4))\
.withColumn("month", substring( col("date"), 5,2))\
.withColumn("day", substring(col("date"), 7,2)).show()
'''
data = [ (1, 'John', 'Doe', 'john.doe@example.com', 'New York'), (2, 'Jane', 'Doe', 'jane.doe@example.com', 'Los Angeles'),
(3, 'James', 'Smith', 'james.smith@example.com', 'Chicago'),
(4, 'mary', 'Johnson', 'mary.johnson@example.com', 'Houston')]
schema = StructType([
StructField('id', IntegerType(), True) ,
StructField('firstname', StringType(), True) ,
StructField('lastname', StringType(), False) ,
StructField('email', StringType(), True) ,
StructField('address', StringType(), False)
])
df = spark.createDataFrame(data, schema)
df.printSchema()
df.show()
#17.contains() :It checks whether a string column contains a specified substring or not.
#df.filter(col("address").contains("New York")).show()
#18.startswith(): It checks whether a string column starts with a specified substring or not.
#df.filter(col("firstname").startswith("J")).show()
#19.endswith(): It checks whether a string column ends with a specified substring or not.
#df.filter(col("email").endswith(".com")).show()
#19.lpad(): It checks whether a string column ends with a specified substring or not.
#df.select(lpad("id", 3, "0").alias('padded_id')).show()
#20.trim(): It removes leading and trailing whitespace from a string column.
#df.select(trim(col("email")).alias('trimmed_email')).show()
#21.ltrim(): It removes leading whitespace from a string column.
#df.select(ltrim(col("email")).alias('trimmed_email')).show()
#22.rtrim(): It removes trailing whitespace from a string column.
#df.select(rtrim(col("email")).alias('trimmed_email')).show()
#23: regex_extract(): It extracts substrings from a string column based on a regular expression pattern.
df.select( regexp_extract( col("email") , r"([a-zA-Z]+)\.com$", 1 ) ).show()
#using the regexp_extract function to extract the domain name from each email address in the email column.
# The regular expression r'([a-zA-Z]+)\.com$' is used to match the domain name at the end of the email address (after the "@" symbol).
# The third argument to the regexp_extract function specifies which group to extract (1 for the domain name).
# The resulting DataFrame will have a single column containing the domain name of each email address.
#24:regex_replace(): It replaces substrings in a string column based on a regular expression pattern.
#df.select( regexp_replace( col("email"), r"\.com$", ".net" ).alias('domain') ).show()
#25:rpad(): It pads a string column on the right with a specified character to a specified length.
#df.select(rpad(col("id"), 5, '0') ).show()
#26:split(): It splits a string column into an array of substrings based on a delimiter.
#df.select( split( col("address"), ' ' ) ).show()
#27: To See current date
empdf.select(current_date()).show()
#Change DataType using PySpark withColumn()
empdf.withColumn("salary",col("salary").cast("Integer")).show()
empdf.select(col("salary").cast('long')).printSchema()
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
spark = SparkSession.builder.appName('casewhen').master("local[*]").getOrCreate()
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ," ")\
.otherwise("N/A"))
df2.show()
or
df2=df.select( col("*"),when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ,"")\
.otherwise("N/A").alias("new_gender") )
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ' '" +
"ELSE 'N/A' END"))
df3.show(truncate=False)
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END").alias("new_gender"))
df4.show()
df.createOrReplaceTempView("EMP")
spark.sql("select name, gender,salary,CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END as new_gender from EMP").show()
'''
#28:coalesce : 1st not null values it will return.
empdf.select( coalesce ( col("employee_id"),col("salary"),col("manager_id") ) ).orderBy(col("employee_id").desc()).show()
#29:groupBy()
empdf.groupBy().agg(min("salary")).show()
empdf.groupBy(col("department_id")).agg (sum("salary")).show(truncate=False)
############################################################
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, asc,desc
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, cume_dist, ntile
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.types import StringType, BooleanType, DateType, IntegerType
spark = SparkSession.builder.appName('Joins_On_Dataframe').master("local[*]").getOrCreate()
deptpath = "E:\\source_files\\departments.csv"
deptdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(deptpath)
spark.sparkContext.setLogLevel("ERROR")
emppath = "E:\\source_files\\employees.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").option("skip",5).load(emppath)
empdf.show(110)
empdf.select("*").show(truncate=False)
#1.select():
empdf.select(col("employee_id"),col("first_name"), col("salary")).show(truncate=False)
empdf.select("*").show(truncate=False)
'''
Inlay Hints disable in Pycharm to avoid schema = , data = ... etc
DataFrame.count()
pyspark.sql.DataFrame.count() function is used to get the number of rows present in the DataFrame. count() is an action operation
that triggers the transformations to execute. Since transformations are lazy in nature they do not get executed until we call an action().
In the below example, empDF is a DataFrame object, and below is the detailed explanation.
DataFrame.count() -Returns the number of records in a DataFrame.
DataFrame.columns – Returns all column names of a DataFrame as a list.
len() – len() is a Python function that returns a number of elements present in a list.
len(DataFrame.columns) – Returns the number of columns in a DataFrame.
'''
# Get row count
rows = empdf.count()
# Get columns count
cols = len(empdf.columns)
print(f"DataFrame Dimensions : {(rows,cols)}")
print(f"DataFrame Rows count : {rows}")
print(f"DataFrame Columns count : {cols}")
#2.distinct():
empdf.select( col("first_name"), col ("last_name") ).distinct().show()
distinctDF = empdf.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)
#3.where() equivalent WHERE in SQL:
empdf.where(col("department_id")==10).show(truncate=False)
empdf.where(col("FIRST_NAME")=="Jennifer").show(truncate=False)
#4.filter() equivalent WHERE in SQL:
empdf.filter ( col("department_id")=="30" ).select ( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
#5.sort() equivalent ORDER BY in SQL :
empdf.sort(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc())\
.select(col("employee_id"),col("email")).show(truncate=False)
#6.count() equivalent count(*) in SQL
'''
DataFrame.count() -Returns the number of records in a DataFrame.
#DataFrame.columns – Returns all column names of a DataFrame as a list.
#len() – len() is a Python function that returns a number of elements present in a list.
#len(DataFrame.columns) – Returns the number of columns in a DataFrame.
'''
empdf.select(count(empdf.FIRST_NAME)).show()
empdf.select(count(empdf.FIRST_NAME), count(empdf.EMAIL)).show()
#or
# Get row count
rows = empdf.count()
# Get columns count
cols = len(empdf.columns)
print(f"DataFrame Dimensions : {(rows,cols)}")
print(f"DataFrame Rows count : {rows}")
print(f"DataFrame Columns count : {cols}")
#7.sum() equivalent SUM() in SQL
empdf.agg(sum("salary")).show()
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
#8.avg() equivalent AVG() in SQL
empdf.agg(avg("salary")).show()
empdf.groupBy(col("department_id")).agg(avg("salary")).show(truncate=False)
#9.max() equivalent MAX() in SQL
empdf.agg(max("salary")).show()
empdf.groupBy(col("department_id")).agg(max("salary")).show(truncate=False)
#10.min() equivalent MIN() in SQL
empdf.agg(min("salary")).show()
empdf.groupBy(col("department_id")).agg(min("salary")).show(truncate=False)
#11.len() equivalent LEN() in SQL
empdf.select( length( col("FIRST_NAME") ) ).show()
#12.upper() equivalent UPPER() in SQL
empdf.select(upper(col("FIRST_NAME"))).show()
#13.lower() equivalent LOWER() in SQL
empdf.select(lower(col("FIRST_NAME"))).show()
#14.initcap() equivalent INITCAP() in SQL
empdf.select(initcap(col("FIRST_NAME"))).show()
#15.concat() equivalent CONCAT() in SQL
empdf.select( concat ( col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.select( concat_ws ('_',col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
#16.trim() equivalent TRIM() in SQL
empdf.select(trim(col("first_name"))).show()
#17.substring() equivalent SUBSTR() in SQL
'''
we can extract a substring or slice of a string from the DataFrame column by providing the position
#and length of the string you wanted to slice.
#substring(str, starting position, length)
#Note: Please note that the position is not zero based, but 1 based index.
'''
empdf.select(substring(col("first_name"),2,4)).show()
empdf.withColumn("CopiedColumn",col("salary")* -1).show()
data = [(1,"20200828"),(2,"20180525")]
columns=["id","date"]
df=spark.createDataFrame(data,columns)
df.show()
'''
+---+--------+
| id| date|
+---+--------+
| 1|20200828|
| 2|20180525|
+---+--------+
'''
df.printSchema()
df.withColumn("year", substring(col("date"), 1,4))\
.withColumn("month", substring( col("date"), 5,2))\
.withColumn("day", substring(col("date"), 7,2)).show()
'''
+---+--------+----+-----+---+
| id| date|year|month|day|
+---+--------+----+-----+---+
| 1|20200828|2020| 08| 28|
| 2|20180525|2018| 05| 25|
+---+--------+----+-----+---+
'''
data = [ (1, 'John', 'Doe', 'john.doe@example.com', 'New York'), (2, 'Jane', 'Doe', 'jane.doe@example.com', 'Los Angeles'),
(3, 'James', 'Smith', 'james.smith@example.com', 'Chicago'),
(4, 'mary', 'Johnson', 'mary.johnson@example.com', 'Houston')]
schema = StructType([
StructField('id', IntegerType(), True) ,
StructField('firstname', StringType(), True) ,
StructField('lastname', StringType(), False) ,
StructField('email', StringType(), True) ,
StructField('address', StringType(), False)
])
df = spark.createDataFrame(data, schema)
df.printSchema()
df.show(truncate=False)
'''
+---+---------+--------+------------------------+-----------+
|id |firstname|lastname|email |address |
+---+---------+--------+------------------------+-----------+
|1 |John |Doe |john.doe@example.com |New York |
|2 |Jane |Doe |jane.doe@example.com |Los Angeles|
|3 |James |Smith |james.smith@example.com |Chicago |
|4 |mary |Johnson |mary.johnson@example.com|Houston |
+---+---------+--------+------------------------+-----------+
'''
#17.contains() :It checks whether a string column contains a specified substring or not.
df.filter(col("address").contains("New York")).show(truncate=False)
#18.startswith(): It checks whether a string column starts with a specified substring or not.
df.filter(col("firstname").startswith("J")).show(truncate=False)
#19.endswith(): It checks whether a string column ends with a specified substring or not.
df.filter(col("email").endswith(".com")).show(truncate=False)
#20.lpad(): It checks whether a string column ends with a specified substring or not.
df.select(lpad("id", 3, "0").alias('padded_id')).show(truncate=False)
#21.trim(): It removes leading and trailing whitespace from a string column.
df.select(trim(col("email")).alias('trimmed_email')).show(truncate=False)
#22.ltrim(): It removes leading whitespace from a string column.
df.select(ltrim(col("email")).alias('trimmed_email')).show(truncate=False)
#23.rtrim(): It removes trailing whitespace from a string column.
df.select(rtrim(col("email")).alias('trimmed_email')).show(truncate=False)
#24: regex_extract(): It extracts substrings from a string column based on a regular expression pattern.
'''
using the regexp_extract function to extract the domain name from each email address in the email column.
The regular expression r'([a-zA-Z]+)\.com$' is used to match the domain name at the end of the email address (after the "@" symbol).
The third argument to the regexp_extract function specifies which group to extract (1 for the domain name).
The resulting DataFrame will have a single column containing the domain name of each email address.
'''
df.select( regexp_extract( col("email") , r"([a-zA-Z]+)\.com$", 1 ) ).show(truncate=False)
#25:regex_replace(): It replaces substrings in a string column based on a regular expression pattern.
df.select( regexp_replace( col("email"), r"\.com$", ".net" ).alias('domain') ).show(truncate=False)
#26:rpad(): It pads a string column on the right with a specified character to a specified length.
df.select(rpad(col("id"), 5, '0') ).show(truncate=False)
#26:split(): It splits a string column into an array of substrings based on a delimiter.
df.select( split( col("address"), ' ' ) ).show()
#27: To See current date
empdf.select(current_date()).show()
#Change DataType using PySpark withColumn()
empdf.withColumn("salary",col("salary").cast("Integer")).show(truncate=False)
empdf.select(col("salary").cast('long')).printSchema()
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ," ")\
.otherwise("N/A"))
df2.show()
# or
df2=df.select( col("*"),when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ,"")\
.otherwise("N/A").alias("new_gender") )
#or
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ' '" +
"ELSE 'N/A' END"))
df3.show(truncate=False)
#or
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END").alias("new_gender"))
df4.show()
df.createOrReplaceTempView("EMP")
spark.sql("select name, gender,salary,CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END as new_gender from EMP").show()
########### Additional *******************################################################
#*******************************************************************************************
#empdf.agg({'FIRST_NAME':'count','EMAIL':'count'}).show()
empdf.select(length(col("FIRST_NAME"))).show()
empdf.select(upper(col("FIRST_NAME"))).show()
empdf.select(lower(col("FIRST_NAME"))).show()
empdf.select( concat_ws ('_',col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.select( concat ( col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.limit(5).select(col("employee_id"),col("first_name"),col("email")).show(truncate=False)
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
empdf.printSchema()
empdf.select( col("first_name"), col ("last_name") ).distinct().show()
empdf.select(trim(col("first_name"))).show()
empdf.select(substring(col("first_name"),2,4)).show()
empdf.sort("department_id","first_name").show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc()).select(col("employee_id"),col("email")).show(truncate=False)
empdf.sort(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.orderBy("department_id","FirsT_name").show(truncate=False)
empdf.orderBy(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc()).show(truncate=False)
empdf.orderBy(col("department_id").asc(),col("first_name").desc()).show(truncate=False)
empdf.select( coalesce ( col("employee_id"),col("salary"),col("manager_id") ) ).orderBy(col("employee_id").desc()).show()
empdf.select(current_date()).show()
print("Separatror #####################################################")
empdf.createOrReplaceTempView("EMP")
spark.sql("select employee_id, first_name,last_name, salary from EMP ORDER BY department_id asc").show(truncate=False)
print("Separatror ***************************")
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
print("Separatror ??????????????????????????????")
empdf.groupBy(col("department_id")).agg (sum("salary")).show(truncate=False)
'''
>>> empdf.groupBy().min("salary").show()
+-----------+
|min(salary)|
+-----------+
| 2100|
+-----------+
>>> empdf.groupBy().agg(min("salary")).show()
+-----------+
|min(salary)|
+-----------+
| 2100|
+-----------+
'''
'''
(empdf.groupBy(col("department_id")).agg( count("department_id").alias("count"),
sum("salary").alias("sum"),
max("Salary").alias("max"),
min("SAlary").alias("min"),
avg("SALARY").alias("avg")
).show()
)
'''
empdf.groupBy("department_id") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("commission_pct").alias("sum_commission_pct"), \
max("commission_pct").alias("max_bonus") \
) \
.show(truncate=False)
empdf.groupBy( col("department_id") ).agg(count("*").alias("cnt")).orderBy( col("cnt").desc() ).show(truncate=False)
empdf.where(col("department_id")==10).show(truncate=False)
empdf.where(col("FIRST_NAME")=="Jennifer").show(truncate=False)
print("Separatror #####################################################")
empdf.filter(col("FIRST_NAME")=="Kimberely").show(truncate=False)
empdf.filter(col("FIRST_NAME")=="Kimberely").select(col("employee_id"),col("email")).show(truncate=False)
empdf.filter ( col("first_name").like("Kar%") ).select(col("First_name"),col("Last_name")).show()
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id"),col("salary") ).show(truncate=False)
#Change DataType using PySpark withColumn()
empdf.withColumn("salary",col("salary").cast("Integer")).show()
#Update The Value of an Existing Column
empdf.withColumn("salary",col("salary")*10000).show()
#Add a New Column using withColumn()
empdf.withColumn("CopiedColumn",col("salary")* -1).show()
#Add a New Column using withColumn()
'''
In order to create a new column, pass the column name you wanted to the first argument of withColumn() transformation function.
Make sure this new column not already present on DataFrame,
if it presents it updates the value of that column.
On below snippet, PySpark lit() function is used to add a constant value to a DataFrame column.
We can also chain in order to add multiple columns.
'''
#empdf.withColumn("Country", lit("USA")).show()
empdf.withColumn("State", lit("California"))\
.withColumn("District",lit("Bonita"))\
.show()
#Rename Column Name To rename an existing column use withColumnRenamed() function on DataFrame.
empdf.withColumnRenamed("LAST_NAME","EMPLOYEE_LAST_NAME") \
.show(truncate=False)
#Drop Column From PySpark DataFrame
empdf.drop("salary") \
.show()
df2 = empdf.withColumn("PHONE_NUMBER",col("PHONE_NUMBER").cast(StringType() ) ) \
.withColumn("COMMISSION_PCT",col("COMMISSION_PCT").cast(BooleanType() ) ) \
.withColumn("HIRE_DATE",col("HIRE_DATE").cast(DateType() ) ) \
.withColumn("EMPLOYEE_ID",col("HIRE_DATE").cast(IntegerType()))
df2.printSchema()
empdf.select( col("first_name"),col("last_name") ).show()
empdf.select("first_name","last_name").show()
empdf.agg(min("salary")).show()
empdf.agg(max("salary")).show()
#empdf.avg("salary").show()
empdf.select(trim(col("FIRST_NAME"))).show()
empdf.select(col("salary").cast('long')).printSchema()
empdf.filter ( col("first_name").like("Kar%") ).select(col("First_name"),col("Last_name")).show()
empdf.filter ( col("department_id")=="30" ).select ( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id"),col("salary") ).show(truncate=False)
empdf.filter(col("commission_pct").isNull() ).select( col("employee_id"),col("First_name"),col("department_id"),col("salary"),col("commission_pct")).show(truncate=False)
empdf.filter(col("commission_pct").isNull() ).select("*").show(truncate=False)
empdf.filter(col("commission_pct").isNotNull() ).select("*").show(truncate=False)
empdf.filter( col("department_id").isin(20,30,40) ).select( col("employee_id"),col("First_name"),col("department_id"),col("salary"),col("commission_pct")).show(truncate=False)
empdf.filter( col("department_id").isin(20,30,40) ).select("*").show(truncate=False)
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Union_UnionAll').master("local[*]").getOrCreate()
simpleData = [("James","Sales","NY",90000,34,10000), \
("Michael","Sales","NY",86000,56,20000), \
("Robert","Sales","CA",81000,30,23000), \
("Maria","Finance","CA",90000,24,23000) \
]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
#df.printSchema()
#df.show(truncate=False)
simpleData2 = [("James","Sales","NY",90000,34,10000), \
("Maria","Finance","CA",90000,24,23000), \
("Jen","Finance","NY",79000,53,15000), \
("Jeff","Marketing","CA",80000,25,18000), \
("Kumar","Marketing","NY",91000,50,21000) \
]
columns2= ["employee_name","department","state","salary","age","bonus"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
'''
'''
DataFrame union() method merges two DataFrames and returns the new DataFrame with all rows
from two Dataframes regardless of duplicate data. Both union and unionAll will give data including duplicates.
'''
#df2.printSchema()
#df2.show(truncate=False)
unionDF = df.union(df2)
unionDF.show(truncate=False)
'''
Since the union() method returns all rows without distinct records, we will use the distinct() function
to return just one record when duplicate exists.
'''
disDF = df.union(df2).distinct()
disDF.show(truncate=False)
empdf.createOrReplaceTempView("cte1")
df_cte1 = spark.sql("select * from cte1 where salary > 15000")
df_cte1.show()
df_cte1.write.format("parquet").saveAsTable("test")
df_cte1.write.format("parquet").save("E:\\source_files\\out.parquet")
df_cte1.write.option("header","False").format("parquet").save("E:\\source_files\\out.parquet")
df_cte1.write.mode("append").option("header","False").option("delimeter","|").format("csv").save("E:\\source_files\\out.csv")
windowSpec = Window.partitionBy("department_id").orderBy("salary")
empdf.withColumn("row_number",row_number().over(windowSpec)) \
.show(truncate=False)
empdf.withColumn("rank",rank().over(windowSpec)) \
.show()
empdf.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.show()
empdf.withColumn("lag",lag("salary",2).over(windowSpec)) \
.show()
empdf.withColumn("lead",lead("salary",2).over(windowSpec)) \
.show()
empdf.withColumn("cume_dist",cume_dist().over(windowSpec)) \
.show()
empdf.withColumn("ntile",ntile(2).over(windowSpec)) \
.show()
windowSpec = Window.partitionBy("department_id").orderBy("salary")
windowSpecAgg = Window.partitionBy("department_id")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
empdf.withColumn("row",row_number().over(windowSpec)) \
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
.where(col("row")==1).select("department_id","avg","sum","min","max") \
.show()
emppath = "E:\\source_files\\emp_duplicate.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
empdf.show()
#distinctDF = empdf.distinct()
#print("Distinct count: "+str(distinctDF.count()))
#distinctDF.show(truncate=False)
#df2 = empdf.dropDuplicates()
#print("Distinct count: "+str(df2.count()))
#df2.show(truncate=False)
# PySpark Distinct of Selected Multiple Columns
#Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed on specific columns.
dropDisDF = empdf.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
# Prepare Data
data = [("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000), \
("Saif", "Sales", 4100) \
]
# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
'''
No comments:
Post a Comment