Handling Nulls in Pyspark:
*************************
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('duplicate').master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
path = "E:\\source_files\\null_example.csv"
null_example_df = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(path)
null_example_df.show()
+----+--------+------------------+---------+
| ID| NAME| JOB_PROFILE| CITY|
+----+--------+------------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 2| null|Software Developer| null|
| 3| Swati| Data Analyst|Hyderabad|
| 4| null| null| Noida|
| 5| Arpit| Android Developer| Banglore|
| 6| Ritik| null| null|
| 7| null| null| null|
|null| null| null| null|
+----+--------+------------------+---------+
Parameter:
how: This parameter is used to determine if the row or column has to remove or not.
‘any’ – If any of the value in Dataframe is NULL then drop that row or column.
‘all’ – If all the values of particular row or column is NULL then drop.
thresh: only display rows with at least (i) Not Null values in rows
subset: If the given subset of column(s) contains any of the null value then drop that row or column.
# Example 1 : Cleaning data with dropna using "any" parameter in PySpark.
#if any row having any Null value we are dropping that rows
# Nulls are in row number 2,4,6,7 & 8. Hence Row number 1,3 & 5 will be retrieved.
df = null_example_df.dropna(how="any")
df.show()
+---+--------+-----------------+---------+
| ID| NAME| JOB_PROFILE| CITY|
+---+--------+-----------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 3| Swati| Data Analyst|Hyderabad|
| 5| Arpit|Android Developer| Banglore|
+---+--------+-----------------+---------+
# Example 2 : Cleaning data with dropna using "all" parameter in PySpark.
#if any row having all values are null then we are dropping that row.
#only row number 8 will be dropped.
df = null_example_df.dropna(how="all")
df.show()
+---+--------+------------------+---------+
| ID| NAME| JOB_PROFILE| CITY|
+---+--------+------------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 2| null|Software Developer| null|
| 3| Swati| Data Analyst|Hyderabad|
| 4| null| null| Noida|
| 5| Arpit| Android Developer| Banglore|
| 6| Ritik| null| null|
| 7| null| null| null|
+---+--------+------------------+---------+
# Example 3: Cleaning data with dropna using thresh parameter in PySpark.
'''
In the below code, we have passed the thresh=1 parameter in the dropna() function which means that
if there are any rows or columns which is having at least one not null column values , show those records only
drop rest of records .
'''
#thresh: only keep rows with at least (i) Not Null values
df = null_example_df.dropna(thresh=1)
df.show()
+---+--------+------------------+---------+
| ID| NAME| JOB_PROFILE| CITY|
+---+--------+------------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 2| null|Software Developer| null|
| 3| Swati| Data Analyst|Hyderabad|
| 4| null| null| Noida|
| 5| Arpit| Android Developer| Banglore|
| 6| Ritik| null| null|
| 7| null| null| null|
+---+--------+------------------+---------+
#thres=3 : only display rows with at least (3) Not Null values in rows
df = null_example_df.dropna(thresh=3)
df.show()
+---+--------+-----------------+---------+
| ID| NAME| JOB_PROFILE| CITY|
+---+--------+-----------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 3| Swati| Data Analyst|Hyderabad|
| 5| Arpit|Android Developer| Banglore|
+---+--------+-----------------+---------+
#thresh: only display rows with at least (5) column Not Null values.
# As we don't have 5 columns , the blank schema retrived with no data .
#df = null_example_df.dropna(thresh=5)
#df.show()
+---+----+-----------+----+
| ID|NAME|JOB_PROFILE|CITY|
+---+----+-----------+----+
+---+----+-----------+----+
#Example 4: Cleaning data with dropna using subset parameter in PySpark.
'''
In the below code, we have passed the parameter subset=’City’ which is the column name in the dropna() function.
If any rows that holds City as NULL , it will be dropped from dataframe. Rest of the records will be shown.
Row number 2,6,7,8 are having Null in City Column , hence these records will be dropped.
'''
df = null_example_df.dropna(subset="City")
df.show()
+---+--------+-----------------+---------+
| ID| NAME| JOB_PROFILE| CITY|
+---+--------+-----------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 3| Swati| Data Analyst|Hyderabad|
| 4| null| null| Noida|
| 5| Arpit|Android Developer| Banglore|
+---+--------+-----------------+---------+
#Example 5: Cleaning data with dropna using thresh and subset combination parameter in PySpark.
# if thresh value is satisfied with subset column then dropping that row
#thres=2 : only display rows with at least (2) Not Null values in rows
# subset=(“Id”,”Name”,”City”) : Consider on these 3 columns out of total 4 columns
# and only display rows with at least (2) Not Null values in rows
df = null_example_df.dropna(thresh=2,subset=("Id","Name","City"))
df.show()
| ID| NAME| JOB_PROFILE| CITY|
+---+--------+-----------------+---------+
| 1|Shivansh| Data Scientist| Noida|
| 3| Swati| Data Analyst|Hyderabad|
| 4| null| null| Noida|
| 5| Arpit|Android Developer| Banglore|
| 6| Ritik| null| null|
+---+--------+-----------------+---------+
df = null_example_df.dropna(thresh=4,subset=("Id","Name","City"))
df.show()
+---+----+-----------+----+
| ID|NAME|JOB_PROFILE|CITY|
+---+----+-----------+----+
+---+----+-----------+----+
# In the above example (ID, Name, City) are not null in 3 columns , hence no data displayed.
# Joins in Dataframe
# Joins in Dataframe
#dataframe_name.join(other, on, how)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Joins_On_Dataframe').master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
'''Jupyter Notebook commands
import findspark
findspark.init()
findspark.find()
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('Joins_On_Dataframe').master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
emppath = "E:\\source_files\\employees.csv"
deptpath = "E:\\source_files\\departments.csv"
locpath = "E:\\source_files\\locations.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
deptdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(deptpath)
locdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(locpath)
# employees DF left table and Departments DF is right table.
# Show 107 rows
#df1.show(107)
# Show 20 rows with full column value
#df1.show(truncate=False)
# Show 107 rows & full column value
empdf.show (107,truncate=False)
deptdf.show(107,truncate=False)
#1.INNER JOIN : It will retrive the matching records from both the dataframes.
#injdf = empdf.join (deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID)
injdf = empdf.join (deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"inner")
injdf.show()
#2.LEFT OUTER JOIN: It will retrive all the records from Left side table including matching records from right side table.
#lftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"left")
#lftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"leftouter")
lftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"left_outer")
lftdf.show()
#2.LEFT OUTER JOIN: It will retrive all the records from Left side table including matching records from right side table.
#lftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"left")
#lftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"leftouter")
lftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"left_outer")
lftdf.show()
#3.RIGHT OUTER JOIN: It will retrive all the records from right side table including matching records from left side table.
#rftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"right")
#rftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"rightouter")
rftdf= empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"right_outer")
rftdf.show()
#4.FULL OUTER JOIN : It will retrive all the matching or non-matching records from both the dataframes.
fojdf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"full")
#fojdf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"fullouter")
#fojdf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"full_outer")
fojdf.show()
#5.leftsemi/semi/left_semi JOIN (It will perform an equijoin on both the DF but retrive the matching/common record from
# left table only )
lftsemidf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"semi")
lftsemidf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"leftsemi")
lftsemidf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"left_semi")
lftsemidf.show()
empdf.createOrReplaceTempView("employees")
deptdf.createOrReplaceTempView("departments")
spark.sql('''
SELECT count(*)
FROM employees AS e
LEFT SEMI JOIN departments AS d
ON e.department_id = d.department_id
''').show()
'''
select e.* from employees e, departments d
where e.department_id=d.department_id
and e.department_id is not null;
'''
#6.leftanti/anti/left_anti JOIN (It will not perform any equijoin on both the DF
# but It will retrive the uncommon(Joining Condition Column value Null) record from left table only )
#This join returns rows in the left DataFrame that have no matching rows in the right DataFrame
#lftantidf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"anti")
#lftantidf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"leftanti")
lftantidf = empdf.join(deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"left_anti")
lftantidf.show()
spark.sql('''
SELECT *
FROM employees AS e
LEFT ANTI JOIN departments AS d
ON e.department_id = d.department_id
''').show()
#or
'''
select e.* from employees e, departments d
where e.department_id=d.department_id
and e.department_id is null;
'''
#7: Joining multiple Dataframe tables .
multdf = empdf.join (deptdf,empdf.DEPARTMENT_ID==deptdf.DEPARTMENT_ID,"inner").join(locdf,deptdf.LOCATION_ID==locdf.LOCATION_ID,"inner")
injdf.show()
#8: Self Join
'''
Returns all possible combinations of rows from both the dataframes.
In other words, it takes every row from one dataframe and matches it with every row in the other dataframe.
The result is a new dataframe with all possible combinations of the rows from the two input dataframes.
'''
# Perform the self join
empdf.alias("worker").join(empdf.alias("manager"),\
col("worker.MANAGER_ID") == col("manager.EMPLOYEE_ID"),"inner") \
.select(col("worker.EMPLOYEE_ID"),col("worker.FIRST_NAME"), \
col("manager.EMPLOYEE_ID").alias("superior_emp_id"), \
col("manager.FIRST_NAME").alias("superior_emp_name")) \
.show(truncate=False)
or
#Whenever we are giving .join keyword , the 2nd table columns will be added to the right of joined dataframe.
#Only the number of columns will increase in joined DF , there will be no immediate change to number of rows.
selfjoin = empdf.alias("worker").join(empdf.alias("manager"), col("worker.MANAGER_ID") == col("manager.EMPLOYEE_ID"),"inner")
selfjoin.show(300)
selfjoin2 = selfjoin.select(col("worker.EMPLOYEE_ID"), col("worker.FIRST_NAME"),
col("manager.EMPLOYEE_ID").alias("superior_emp_id"),
col("manager.FIRST_NAME").alias("superior_emp_name"))
selfjoin2.show(300)
#9: Cross Join
'''
Returns all possible combinations of rows from both the dataframes.
In other words, it takes every row from one dataframe and matches it with every row in the other dataframe.
The result is a new dataframe with all possible combinations of the rows from the two input dataframes.
'''
# Perform the cross join
#cross_join = df1.crossJoin(df2)
cross_join = empdf.crossJoin(deptdf)
cross_join.show(3000)
Dataframe Functions
******************
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, asc,desc
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, cume_dist, ntile
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.types import StringType, BooleanType, DateType, IntegerType
spark = SparkSession.builder.appName('Joins_On_Dataframe').master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
emppath = "E:\\source_files\\employees.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
deptpath = "E:\\source_files\\departments.csv"
deptdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(deptpath)
empdf.show(110)
empdf.select("*").show(truncate=False)
#1.select():
empdf.select(col("employee_id"),col("first_name"), col("salary")).show(truncate=False)
empdf.select("*").show(truncate=False)
'''
Inlay Hints disable in Pycharm to avoid schema = , data = ... etc
DataFrame.count()
pyspark.sql.DataFrame.count() function is used to get the number of rows present in the DataFrame. count() is an action operation
that triggers the transformations to execute. Since transformations are lazy in nature they do not get executed until we call an action().
In the below example, empDF is a DataFrame object, and below is the detailed explanation.
DataFrame.count() -Returns the number of records in a DataFrame.
DataFrame.columns – Returns all column names of a DataFrame as a list.
len() – len() is a Python function that returns a number of elements present in a list.
len(DataFrame.columns) – Returns the number of columns in a DataFrame.
'''
# Get row count
rows = empdf.count()
# Get columns count
cols = len(empdf.columns)
print(f"DataFrame Dimensions : {(rows,cols)}")
print(f"DataFrame Rows count : {rows}")
print(f"DataFrame Columns count : {cols}")
#2.distinct():
empdf.select( col("first_name"), col ("last_name") ).distinct().show()
distinctDF = empdf.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)
#3.where() equivalent WHERE in SQL:
empdf.where(col("department_id")==10).show(truncate=False)
empdf.where(col("FIRST_NAME")=="Jennifer").show(truncate=False)
#4.filter() equivalent WHERE in SQL:
empdf.filter ( col("department_id")=="30" ).select ( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
#5.sort() equivalent ORDER BY in SQL :
empdf.sort(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc())\
.select(col("employee_id"),col("email")).show(truncate=False)
#6.count() equivalent count(*) in SQL
'''
DataFrame.count() -Returns the number of records in a DataFrame.
#DataFrame.columns – Returns all column names of a DataFrame as a list.
#len() – len() is a Python function that returns a number of elements present in a list.
#len(DataFrame.columns) – Returns the number of columns in a DataFrame.
'''
empdf.select(count(empdf.FIRST_NAME)).show()
empdf.select(count(empdf.FIRST_NAME), count(empdf.EMAIL)).show()
#or
# Get row count
rows = empdf.count()
# Get columns count
cols = len(empdf.columns)
print(f"DataFrame Dimensions : {(rows,cols)}")
print(f"DataFrame Rows count : {rows}")
print(f"DataFrame Columns count : {cols}")
#7.sum() equivalent SUM() in SQL
empdf.agg(sum("salary")).show()
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
#8.avg() equivalent AVG() in SQL
empdf.agg(avg("salary")).show()
empdf.groupBy(col("department_id")).agg(avg("salary")).show(truncate=False)
#9.max() equivalent MAX() in SQL
empdf.agg(max("salary")).show()
empdf.groupBy(col("department_id")).agg(max("salary")).show(truncate=False)
#10.min() equivalent MIN() in SQL
empdf.agg(min("salary")).show()
empdf.groupBy(col("department_id")).agg(min("salary")).show(truncate=False)
#11.len() equivalent LEN() in SQL
empdf.select( length( col("FIRST_NAME") ) ).show()
#12.upper() equivalent UPPER() in SQL
empdf.select(upper(col("FIRST_NAME"))).show()
#13.lower() equivalent LOWER() in SQL
empdf.select(lower(col("FIRST_NAME"))).show()
#14.initcap() equivalent INITCAP() in SQL
empdf.select(initcap(col("FIRST_NAME"))).show()
#15.concat() equivalent CONCAT() in SQL
empdf.select( concat ( col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.select( concat_ws ('_',col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
#16.trim() equivalent TRIM() in SQL
empdf.select(trim(col("first_name"))).show()
#17.substring() equivalent SUBSTR() in SQL
'''
we can extract a substring or slice of a string from the DataFrame column by providing the position
#and length of the string you wanted to slice.
#substring(str, starting position, length)
#Note: Please note that the position is not zero based, but 1 based index.
'''
empdf.select(substring(col("first_name"),2,4)).show()
empdf.withColumn("CopiedColumn",col("salary")* -1).show()
data = [(1,"20200828"),(2,"20180525")]
columns=["id","date"]
df=spark.createDataFrame(data,columns)
df.show()
'''
+---+--------+
| id| date|
+---+--------+
| 1|20200828|
| 2|20180525|
+---+--------+
'''
df.printSchema()
df.withColumn("year", substring(col("date"), 1,4))\
.withColumn("month", substring( col("date"), 5,2))\
.withColumn("day", substring(col("date"), 7,2)).show()
'''
+---+--------+----+-----+---+
| id| date|year|month|day|
+---+--------+----+-----+---+
| 1|20200828|2020| 08| 28|
| 2|20180525|2018| 05| 25|
+---+--------+----+-----+---+
'''
data = [ (1, 'John', 'Doe', 'john.doe@example.com', 'New York'), (2, 'Jane', 'Doe', 'jane.doe@example.com', 'Los Angeles'),
(3, 'James', 'Smith', 'james.smith@example.com', 'Chicago'),
(4, 'mary', 'Johnson', 'mary.johnson@example.com', 'Houston')]
schema = StructType([
StructField('id', IntegerType(), True) ,
StructField('firstname', StringType(), True) ,
StructField('lastname', StringType(), False) ,
StructField('email', StringType(), True) ,
StructField('address', StringType(), False)
])
df = spark.createDataFrame(data, schema)
df.printSchema()
df.show(truncate=False)
'''
+---+---------+--------+------------------------+-----------+
|id |firstname|lastname|email |address |
+---+---------+--------+------------------------+-----------+
|1 |John |Doe |john.doe@example.com |New York |
|2 |Jane |Doe |jane.doe@example.com |Los Angeles|
|3 |James |Smith |james.smith@example.com |Chicago |
|4 |mary |Johnson |mary.johnson@example.com|Houston |
+---+---------+--------+------------------------+-----------+
'''
#17.contains() :It checks whether a string column contains a specified substring or not.
df.filter(col("address").contains("New York")).show(truncate=False)
#18.startswith(): It checks whether a string column starts with a specified substring or not.
df.filter(col("firstname").startswith("J")).show(truncate=False)
#19.endswith(): It checks whether a string column ends with a specified substring or not.
df.filter(col("email").endswith(".com")).show(truncate=False)
#20.lpad(): It checks whether a string column ends with a specified substring or not.
df.select(lpad("id", 3, "0").alias('padded_id')).show(truncate=False)
#21.trim(): It removes leading and trailing whitespace from a string column.
df.select(trim(col("email")).alias('trimmed_email')).show(truncate=False)
#22.ltrim(): It removes leading whitespace from a string column.
df.select(ltrim(col("email")).alias('trimmed_email')).show(truncate=False)
#23.rtrim(): It removes trailing whitespace from a string column.
df.select(rtrim(col("email")).alias('trimmed_email')).show(truncate=False)
#24: regex_extract(): It extracts substrings from a string column based on a regular expression pattern.
'''
using the regexp_extract function to extract the domain name from each email address in the email column.
The regular expression r'([a-zA-Z]+)\.com$' is used to match the domain name at the end of the email address (after the "@" symbol).
The third argument to the regexp_extract function specifies which group to extract (1 for the domain name).
The resulting DataFrame will have a single column containing the domain name of each email address.
'''
df.select( regexp_extract( col("email") , r"([a-zA-Z]+)\.com$", 1 ) ).show(truncate=False)
#25:regex_replace(): It replaces substrings in a string column based on a regular expression pattern.
df.select( regexp_replace( col("email"), r"\.com$", ".net" ).alias('domain') ).show(truncate=False)
#26:rpad(): It pads a string column on the right with a specified character to a specified length.
df.select(rpad(col("id"), 5, '0') ).show(truncate=False)
#26:split(): It splits a string column into an array of substrings based on a delimiter.
df.select( split( col("address"), ' ' ) ).show()
#27: To See current date
empdf.select(current_date()).show()
#Change DataType using PySpark withColumn()
empdf.withColumn("salary",col("salary").cast("Integer")).show(truncate=False)
empdf.select(col("salary").cast('long')).printSchema()
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ," ")\
.otherwise("N/A"))
df2.show()
# or
df2=df.select( col("*"),when(df.gender == "M","Male")\
.when(df.gender == "F","Female")\
.when(df.gender.isNull() ,"")\
.otherwise("N/A").alias("new_gender") )
#or
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ' '" +
"ELSE 'N/A' END"))
df3.show(truncate=False)
#or
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END").alias("new_gender"))
df4.show()
df.createOrReplaceTempView("EMP")
spark.sql("select name, gender,salary,CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
"ELSE 'N/A' END as new_gender from EMP").show()
########### Additional *******************################################################
#*******************************************************************************************
#empdf.agg({'FIRST_NAME':'count','EMAIL':'count'}).show()
empdf.select(length(col("FIRST_NAME"))).show()
empdf.select(upper(col("FIRST_NAME"))).show()
empdf.select(lower(col("FIRST_NAME"))).show()
empdf.select( concat_ws ('_',col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.select( concat ( col("FIRST_NAME"),col("LAST_NAME") ) ).alias("FullName").show()
empdf.limit(5).select(col("employee_id"),col("first_name"),col("email")).show(truncate=False)
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
empdf.printSchema()
empdf.select( col("first_name"), col ("last_name") ).distinct().show()
empdf.select(trim(col("first_name"))).show()
empdf.select(substring(col("first_name"),2,4)).show()
empdf.sort("department_id","first_name").show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc()).select(col("employee_id"),col("email")).show(truncate=False)
empdf.sort(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.orderBy("department_id","FirsT_name").show(truncate=False)
empdf.orderBy(col("department_id"),col("FirsT_name")).show(truncate=False)
empdf.sort(col("department_id").asc(),col("first_name").desc()).show(truncate=False)
empdf.orderBy(col("department_id").asc(),col("first_name").desc()).show(truncate=False)
empdf.select( coalesce ( col("employee_id"),col("salary"),col("manager_id") ) ).orderBy(col("employee_id").desc()).show()
empdf.select(current_date()).show()
print("Separatror #####################################################")
empdf.createOrReplaceTempView("EMP")
spark.sql("select employee_id, first_name,last_name, salary from EMP ORDER BY department_id asc").show(truncate=False)
print("Separatror ***************************")
empdf.groupBy(col("department_id")).sum("salary").show(truncate=False)
print("Separatror ??????????????????????????????")
empdf.groupBy(col("department_id")).agg (sum("salary")).show(truncate=False)
'''
>>> empdf.groupBy().min("salary").show()
+-----------+
|min(salary)|
+-----------+
| 2100|
+-----------+
>>> empdf.groupBy().agg(min("salary")).show()
+-----------+
|min(salary)|
+-----------+
| 2100|
+-----------+
'''
'''
(empdf.groupBy(col("department_id")).agg( count("department_id").alias("count"),
sum("salary").alias("sum"),
max("Salary").alias("max"),
min("SAlary").alias("min"),
avg("SALARY").alias("avg")
).show()
)
'''
empdf.groupBy("department_id") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("commission_pct").alias("sum_commission_pct"), \
max("commission_pct").alias("max_bonus") \
) \
.show(truncate=False)
empdf.groupBy( col("department_id") ).agg(count("*").alias("cnt")).orderBy( col("cnt").desc() ).show(truncate=False)
empdf.where(col("department_id")==10).show(truncate=False)
empdf.where(col("FIRST_NAME")=="Jennifer").show(truncate=False)
print("Separatror #####################################################")
empdf.filter(col("FIRST_NAME")=="Kimberely").show(truncate=False)
empdf.filter(col("FIRST_NAME")=="Kimberely").select(col("employee_id"),col("email")).show(truncate=False)
empdf.filter ( col("first_name").like("Kar%") ).select(col("First_name"),col("Last_name")).show()
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id"),col("salary") ).show(truncate=False)
#Change DataType using PySpark withColumn()
empdf.withColumn("salary",col("salary").cast("Integer")).show()
#Update The Value of an Existing Column
empdf.withColumn("salary",col("salary")*10000).show()
#Add a New Column using withColumn()
empdf.withColumn("CopiedColumn",col("salary")* -1).show()
#Add a New Column using withColumn()
'''
In order to create a new column, pass the column name you wanted to the first argument of withColumn() transformation function.
Make sure this new column not already present on DataFrame,
if it presents it updates the value of that column.
On below snippet, PySpark lit() function is used to add a constant value to a DataFrame column.
We can also chain in order to add multiple columns.
'''
#empdf.withColumn("Country", lit("USA")).show()
empdf.withColumn("State", lit("California"))\
.withColumn("District",lit("Bonita"))\
.show()
#Rename Column Name To rename an existing column use withColumnRenamed() function on DataFrame.
empdf.withColumnRenamed("LAST_NAME","EMPLOYEE_LAST_NAME") \
.show(truncate=False)
#Drop Column From PySpark DataFrame
empdf.drop("salary") \
.show()
df2 = empdf.withColumn("PHONE_NUMBER",col("PHONE_NUMBER").cast(StringType() ) ) \
.withColumn("COMMISSION_PCT",col("COMMISSION_PCT").cast(BooleanType() ) ) \
.withColumn("HIRE_DATE",col("HIRE_DATE").cast(DateType() ) ) \
.withColumn("EMPLOYEE_ID",col("HIRE_DATE").cast(IntegerType()))
df2.printSchema()
empdf.select( col("first_name"),col("last_name") ).show()
empdf.select("first_name","last_name").show()
empdf.agg(min("salary")).show()
empdf.agg(max("salary")).show()
#empdf.avg("salary").show()
empdf.select(trim(col("FIRST_NAME"))).show()
empdf.select(col("salary").cast('long')).printSchema()
empdf.filter ( col("first_name").like("Kar%") ).select(col("First_name"),col("Last_name")).show()
empdf.filter ( col("department_id")=="30" ).select ( col("employee_id"),col("First_name"),col("department_id") ).show(truncate=False)
empdf.filter ( (col("department_id")=="30") & (col("first_name").like ("Kar%")) ) \
.select( col("employee_id"),col("First_name"),col("department_id"),col("salary") ).show(truncate=False)
empdf.filter(col("commission_pct").isNull() ).select( col("employee_id"),col("First_name"),col("department_id"),col("salary"),col("commission_pct")).show(truncate=False)
empdf.filter(col("commission_pct").isNull() ).select("*").show(truncate=False)
empdf.filter(col("commission_pct").isNotNull() ).select("*").show(truncate=False)
empdf.filter( col("department_id").isin(20,30,40) ).select( col("employee_id"),col("First_name"),col("department_id"),col("salary"),col("commission_pct")).show(truncate=False)
empdf.filter( col("department_id").isin(20,30,40) ).select("*").show(truncate=False)
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Union_UnionAll').master("local[*]").getOrCreate()
simpleData = [("James","Sales","NY",90000,34,10000), \
("Michael","Sales","NY",86000,56,20000), \
("Robert","Sales","CA",81000,30,23000), \
("Maria","Finance","CA",90000,24,23000) \
]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
#df.printSchema()
#df.show(truncate=False)
simpleData2 = [("James","Sales","NY",90000,34,10000), \
("Maria","Finance","CA",90000,24,23000), \
("Jen","Finance","NY",79000,53,15000), \
("Jeff","Marketing","CA",80000,25,18000), \
("Kumar","Marketing","NY",91000,50,21000) \
]
columns2= ["employee_name","department","state","salary","age","bonus"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
'''
'''
DataFrame union() method merges two DataFrames and returns the new DataFrame with all rows
from two Dataframes regardless of duplicate data. Both union and unionAll will give data including duplicates.
'''
#df2.printSchema()
#df2.show(truncate=False)
unionDF = df.union(df2)
unionDF.show(truncate=False)
'''
Since the union() method returns all rows without distinct records, we will use the distinct() function
to return just one record when duplicate exists.
'''
disDF = df.union(df2).distinct()
disDF.show(truncate=False)
empdf.createOrReplaceTempView("cte1")
df_cte1 = spark.sql("select * from cte1 where salary > 15000")
df_cte1.show()
df_cte1.write.format("parquet").saveAsTable("test")
df_cte1.write.format("parquet").save("E:\\source_files\\out.parquet")
df_cte1.write.option("header","False").format("parquet").save("E:\\source_files\\out.parquet")
df_cte1.write.mode("append").option("header","False").option("delimeter","|").format("csv").save("E:\\source_files\\out.csv")
windowSpec = Window.partitionBy("department_id").orderBy("salary")
empdf.withColumn("row_number",row_number().over(windowSpec)) \
.show(truncate=False)
empdf.withColumn("rank",rank().over(windowSpec)) \
.show()
empdf.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.show()
empdf.withColumn("lag",lag("salary",2).over(windowSpec)) \
.show()
empdf.withColumn("lead",lead("salary",2).over(windowSpec)) \
.show()
empdf.withColumn("cume_dist",cume_dist().over(windowSpec)) \
.show()
empdf.withColumn("ntile",ntile(2).over(windowSpec)) \
.show()
windowSpec = Window.partitionBy("department_id").orderBy("salary")
windowSpecAgg = Window.partitionBy("department_id")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
empdf.withColumn("row",row_number().over(windowSpec)) \
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
.where(col("row")==1).select("department_id","avg","sum","min","max") \
.show()
emppath = "E:\\source_files\\emp_duplicate.csv"
empdf = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", ",").load(emppath)
empdf.show()
#distinctDF = empdf.distinct()
#print("Distinct count: "+str(distinctDF.count()))
#distinctDF.show(truncate=False)
#df2 = empdf.dropDuplicates()
#print("Distinct count: "+str(df2.count()))
#df2.show(truncate=False)
# PySpark Distinct of Selected Multiple Columns
#Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed on specific columns.
dropDisDF = empdf.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
# Prepare Data
data = [("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000), \
("Saif", "Sales", 4100) \
]
# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
'''
No comments:
Post a Comment