Sunday, December 10, 2023

Q: Findout the most experienced employee per project (by SQL and Dataframe Window Function Method

Q: Findout the most experienced employee per project. create table Project ( project_id integer, employee_id integer ); create table Employee ( employee_id integer, name varchar(3200), experience_years integer ); insert into Project values (1,1); insert into Project values (1,2); insert into Project values (1,3); insert into Project values (1,4); insert into Project values (2,5); insert into Project values (2,6); insert into Project values (2,7); insert into Project values (2,8); insert into Project values (3,9); insert into Project values (3,10); insert into Project values (3,11); insert into Project values (3,12); insert into Project values (4,13); insert into Project values (4,14); insert into Project values (4,15); insert into Project values (4,16); -- insert into Employee values (1,'san1',2); insert into Employee values (2,'san2',5); insert into Employee values (3,'san3',8); insert into Employee values (4,'san4',10); insert into Employee values (5,'san5',2); insert into Employee values (6,'san6',5); insert into Employee values (7,'san7',10); insert into Employee values (8,'san8',10); insert into Employee values (9,'san9',2); insert into Employee values (10,'san10',5); insert into Employee values (11,'san11',8); insert into Employee values (12,'san12',10); insert into Employee values (13,'san13',2); insert into Employee values (14,'san14',5); insert into Employee values (15,'san15',8); insert into Employee values (16,'san16',8); select * from project ; PROJECT_ID EMPLOYEE_ID ---------- ----------- 1 1 1 2 1 3 1 4 2 5 2 6 2 7 2 8 3 9 3 10 3 11 3 12 4 13 4 14 4 15 4 16 select * from Employee ; EMPLOYEE_ID NAME EXPERIENCE_YEARS ----------- -------------------- ---------------- 1 san1 2 2 san2 5 3 san3 8 4 san4 10 5 san5 2 6 san6 5 7 san7 10 8 san8 10 9 san9 2 10 san10 5 11 san11 8 12 san12 10 13 san13 2 14 san14 5 15 san15 8 16 san16 8 Q: Findout the most experienced employee per project. Step 1: Group or partition the data on project level ; Step 2: Find the highest experience_years for each Group. It is possible in a single project there are multiple employees who are working with same and highest number of years of experience. we need to find the highest experienced employee per project . set lines 100 SELECT * FROM (SELECT p.project_id, e.employee_id, e.experience_years, RANK () OVER (PARTITION BY project_id ORDER BY experience_years DESC) AS rank_experience FROM Project p JOIN employee e ON p.employee_id = e.employee_id) WHERE rank_experience = 1; PROJECT_ID EMPLOYEE_ID EXPERIENCE_YEARS RANK_EXPERIENCE ---------- ----------- ---------------- --------------- 1 4 10 1 2 7 10 1 2 8 10 1 3 12 10 1 4 15 8 1 4 16 8 1 Now you can see in data for project_id = 1 and project_id = 3 only 1 employee per project have highest number of experience which is 10. But in case of project_id = 2 and project_id = 4 there are 2 employees per project have highest number of experiences 10 and 8 respectively. from pyspark.sql import SparkSession spark = SparkSession.builder.appName('employees_highestexp').getOrCreate() project_dtls = [(1,1), (1,2), (1,3), (1,4), (2,5), (2,6), (2,7), (2,8), (3,9), (3,10), (3,11), (3,12), (4,13), (4,14), (4,15), (4,16) ] schema = ["project_id ", "employee_id "] project_df = spark.createDataFrame(data=project_dtls,schema=schema) project_df.show() +-----------+------------+ |project_id |employee_id | +-----------+------------+ | 1| 1| | 1| 2| | 1| 3| | 1| 4| | 2| 5| | 2| 6| | 2| 7| | 2| 8| | 3| 9| | 3| 10| | 3| 11| | 3| 12| | 4| 13| | 4| 14| | 4| 15| | 4| 16| +-----------+------------+ employee_dtls = [(1,'san1',2), (2,'san2',5), (3,'san3',8), (4,'san4',10), (5,'san5',2), (6,'san6',5), (7,'san7',10), (8,'san8',10), (9,'san9',2), (10,'san10',5), (11,'san11',8), (12,'san12',10), (13,'san13',2), (14,'san14',5), (15,'san15',8), (16,'san16',8) ] schema = ["employee_id","name","experience_years"] employee_df = spark.createDataFrame(data=employee_dtls,schema=schema) employee_df.show() +-----------+-----+----------------+ |employee_id| name|experience_years| +-----------+-----+----------------+ | 1| san1| 2| | 2| san2| 5| | 3| san3| 8| | 4| san4| 10| | 5| san5| 2| | 6| san6| 5| | 7| san7| 10| | 8| san8| 10| | 9| san9| 2| | 10|san10| 5| | 11|san11| 8| | 12|san12| 10| | 13|san13| 2| | 14|san14| 5| | 15|san15| 8| | 16|san16| 8| +-----------+-----+----------------+ Now we have both data frame ready we will try to create an intermediate data frame by joining this two data frames so that we can apply Window Rank() function top upon that. #create an intermediate data frame by joining this two data frames # so that we can apply Window Rank() function top upon that. project_expyears_df = project_df.join(employee_df,project_df.employee_id == employee_df.employee_id,"inner") # In the above self join we will get two columns of employee_id , hence in select statement # we have to remove one. +-----------+-----------+-----------+-----+----------------+ |project_id |employee_id|employee_id| name|experience_years| +-----------+-----------+-----------+-----+----------------+ | 2| 7| 7| san7| 10| | 2| 6| 6| san6| 5| | 3| 9| 9| san9| 2| | 2| 5| 5| san5| 2| | 1| 1| 1| san1| 2| | 3| 10| 10|san10| 5| | 1| 3| 3| san3| 8| | 3| 12| 12|san12| 10| | 2| 8| 8| san8| 10| | 3| 11| 11|san11| 8| | 1| 2| 2| san2| 5| | 1| 4| 4| san4| 10| | 4| 13| 13|san13| 2| | 4| 14| 14|san14| 5| | 4| 15| 15|san15| 8| | 4| 16| 16|san16| 8| +-----------+-----------+-----------+-----+----------------+ #NB: We can give '\' character before select statement and after '.' character. # We can give '\' character before .show() method # We can give '\' character before .withColumn() method project_expyears_df = project_df.join(employee_df,project_df.employee_id == employee_df.employee_id,"inner").\ select(project_df.project_id,project_df.employee_id,employee_df.name,employee_df.experience_years) project_expyears_df.show() +----------+-----------+-----+----------------+ |project_id|employee_id| name|experience_years| +----------+-----------+-----+----------------+ | 2| 7| san7| 10| | 2| 6| san6| 5| | 3| 9| san9| 2| | 2| 5| san5| 2| | 1| 1| san1| 2| | 3| 10|san10| 5| | 1| 3| san3| 8| | 3| 12|san12| 10| | 2| 8| san8| 10| | 3| 11|san11| 8| | 1| 2| san2| 5| | 1| 4| san4| 10| | 4| 13|san13| 2| | 4| 14|san14| 5| | 4| 15|san15| 8| | 4| 16|san16| 8| +----------+-----------+-----+----------------+ project_expyears_df.sort("project_id","employee_id").show() +----------+-----------+-----+----------------+ |project_id|employee_id| name|experience_years| +----------+-----------+-----+----------------+ | 1| 1| san1| 2| | 1| 2| san2| 5| | 1| 3| san3| 8| | 1| 4| san4| 10| | 2| 5| san5| 2| | 2| 6| san6| 5| | 2| 7| san7| 10| | 2| 8| san8| 10| | 3| 9| san9| 2| | 3| 10|san10| 5| | 3| 11|san11| 8| | 3| 12|san12| 10| | 4| 13|san13| 2| | 4| 14|san14| 5| | 4| 15|san15| 8| | 4| 16|san16| 8| +----------+-----------+-----+----------------+ we will first create windows specification and then apply the 'Windows' specification on the joined data frame as below. #Step1 : Create Window Spec windowSpec = Window.partitionBy(col("project_id")).orderBy(col("experience_years").desc()) #Step2 : Apply Window Spec on Joined Data Frame for Rank() function as below - project_expyears_df.withColumn("myrank",rank().over(windowSpec)).where(col("myrank")==1).sort(col("project_id")).\ select("project_id","employee_id","name","experience_years").show() +----------+-----------+-----+----------------+ |project_id|employee_id| name|experience_years| +----------+-----------+-----+----------------+ | 1| 4| san4| 10| | 2| 8| san8| 10| | 2| 7| san7| 10| | 3| 12|san12| 10| | 4| 15|san15| 8| | 4| 16|san16| 8| +----------+-----------+-----+----------------+

No comments:

Post a Comment