Thursday, August 17, 2023

Wordcount in Pyspark

 Wordcount in Pyspark:

 from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("word_count").master("local[*]").getOrCreate() # 1. Read the text file into an RDD text_rdd= spark.sparkContext.textFile("E:\\source_files\\file1.txt") x= text_rdd.collect() print("base RDD reading the file contents line by line",x) #base RDD reading the file contents line by line ['Bangalore is Karnataka ', 'Chennai is Tamilnadu'] #2. Split the Lines into Words words = text_rdd.flatMap(lambda line: line.split()) y= words.collect() print("flatMap transformation Output",y) #flatMap transformation Output ['Bangalore', 'is', 'Karnataka', 'Chennai', 'is', 'Tamilnadu'] #3. Map each word to a key-value Pair with a count of 1 : (Create a list of Tuples by adding 1 to each word) word_counts = lambda word: (word,1) ) z=word_counts.collect() print("map transformation Output",z) #map transformation Output [('Bangalore', 1), ('is', 1), ('Karnataka', 1), ('Chennai', 1), ('is', 1), ('Tamilnadu', 1)] #4.Use reduceByKey function to count the occurenece of eah word. word_count_result = word_counts.reduceByKey(lambda a,b: (a+b) ) k = word_count_result.collect() print("reduceByKey transformation Output",k) #reduceByKey transformation Output [('is', 2), ('Karnataka', 1), ('Chennai', 1), ('Bangalore', 1), ('Tamilnadu', 1)] word_count_result.toDF().show() #word_count_result.saveAsTextFile("D:\\word_count_outputnew") ''' Output in Columnar Structure but with column heaaders as _1 and _2 +---------+---+ | _1| _2| +---------+---+ | is| 2| |Karnataka| 1| | Chennai| 1| |Bangalore| 1| |Tamilnadu| 1| +---------+---+ D:\word_count_outputnew\part-00000 ('is', 2) ('Karnataka', 1) ('Chennai', 1) D:\word_count_outputnew\part-00001 ('Bangalore', 1) ('Tamilnadu', 1) ''' for p,q in word_count_result.collect(): print(f"{p}: {q}") ''' is: 2 Karnataka: 1 Chennai: 1 Bangalore: 1 Tamilnadu: 1 ''' #one_liner of the above analysis text_rdd.flatMap(lambda line: line.split()).map( lambda word: (word,1) ).reduceByKey(lambda a,b: (a+b) ).toDF().show()

