Tuesday, January 14, 2025

Exploding nested JSON in Pyspark (Struct and Array)

Exploding nested JSON in Pyspark (Struct and Array) 


'''

{
"company": {
"name": "Tech Innovators Inc.",
"founded": 2010,
"isPublic": true,
"departments": [
{
"name": "Engineering",
"manager": {
"name": "Alice Johnson",
"email": "alice.johnson@techinnovators.com"
},
"employees": [
{
"name": "John Doe",
"position": "Software Engineer",
"skills": ["JavaScript", "Python", "AWS"]
},
{
"name": "Emily Smith",
"position": "DevOps Engineer",
"skills": ["Docker", "Kubernetes", "Linux"]
}
]
},
{
"name": "Marketing",
"manager": {
"name": "Robert Brown",
"email": "robert.brown@techinnovators.com"
},
"employees": [
{
"name": "Laura Wilson",
"position": "Marketing Specialist",
"skills": ["SEO", "Content Writing", "Analytics"]
}
]
}
],
"contact": {
"address": {
"street": "123 Innovation Lane",
"city": "Tech City",
"state": "CA",
"zip": "94000"
},
"phone": "+1-800-555-1234",
"email": "info@techinnovators.com"
}
}
}



'''
Explanation:
Top-level object: The company key holds the main structure.

Nested objects:
departments is an array of objects, each representing a department.
Each department object has a manager (another object) and an employees array.

Arrays:
employees contains an array of objects, with each object describing an employee.

Each employee has an array of skills.

Contact information: contact contains another nested object for the address and other contact details.

This JSON structure can be easily parsed and used in programming languages like JavaScript,
 Python, or any other that supports JSON. 
 '''



'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSON").getOrCreate()

# Load the JSON file
df = spark.read.format('json').option('multiline','true').load('E:\\source_files\\nested_json_explode_example.json')
#df.show(truncate=False)
df.printSchema()
'''
root
|-- company: struct (nullable = true)
| |-- contact: struct (nullable = true)
| | |-- address: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- street: string (nullable = true)
| | | |-- zip: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- phone: string (nullable = true)
| |-- departments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- employees: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- name: string (nullable = true)
| | | | | |-- position: string (nullable = true)
| | | | | |-- skills: array (nullable = true)
| | | | | | |-- element: string (containsNull = true)
| | | |-- manager: struct (nullable = true)
| | | | |-- email: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | |-- name: string (nullable = true)
| |-- founded: long (nullable = true)
| |-- isPublic: boolean (nullable = true)
| |-- name: string (nullable = true)
'''
# Flatten the nested fields
flattened_df = df.select(col("company.name").alias("company_name"),
col("company.founded").alias("company_founded"),
col("company.isPublic").alias("company_isPublic"),
col("company.contact.address.street").alias("contact_street"),
col("company.contact.address.city").alias("contact_city"),
col("company.contact.address.state").alias("contact_state"),
col("company.contact.address.zip").alias("contact_zip"),
col("company.contact.phone").alias("contact_phone"),
col("company.contact.email").alias("contact_email"),

explode("company.departments").alias("department")) \
.select("company_name", "company_founded", "company_isPublic", "contact_street", "contact_city",
"contact_state", "contact_zip", "contact_phone", "contact_email",

col("department.name").alias("department_name"), col("department.manager.name").alias("manager_name"),
col("department.manager.email").alias("manager_email"),

explode("department.employees").alias("employee")) \
.select("company_name", "company_founded", "company_isPublic", "contact_street", "contact_city",
"contact_state", "contact_zip", "contact_phone", "contact_email", "department_name",
"manager_name", "manager_email",
col("employee.name").alias("employee_name"),
col("employee.position").alias("employee_position"),
explode(col("employee.skills").alias("employee_skills"))
)

# Show the flattened DataFrame
flattened_df.show(truncate=False)

###############################################

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSON").getOrCreate()

# Load the JSON file
df = spark.read.format('json').option('Multiline','True').load('E:\\source_files\\nested_json_explode_example.json')
#df.show(truncate=False)
df.printSchema()

# Select and display top-level fields
df.select("company.name", "company.founded", "company.isPublic").show()

#Explode the nested departments array
departments_df = df.select(explode("company.departments").alias("department"))

departments_df.printSchema()

# Explode the nested employees array within departments
expanded_df = departments_df.select(
    col("department.name").alias("department_name"),
    col("department.manager.name").alias("manager_name"),
    col("department.manager.email").alias("manager_email"),
    explode(col("department.employees")).alias("employee")
)

# Further expand employee details
final_df = expanded_df.select(
    col("department_name"),
    col("manager_name"),
    col("manager_email"),
    col("employee.name").alias("employee_name"),
    col("employee.position").alias("employee_position"),
    explode(col("employee.skills")).alias("employee_skill")
)

# Show the final exploded DataFrame
final_df.show(truncate=False)

# Select and display contact information
contact_df = df.select("company.contact.*")
contact_df.select(
"address.street", "address.city", "address.state", "address.zip", "phone", "email" ).show()
# Stop Spark session
spark.stop()





No comments:

Post a Comment