Thursday, June 18, 2026

LEAD() and LAG() are window functions

 

LEAD() and LAG() are window functions used to access values from the next row or previous row without using a self-join.

Sample Employees Table

EmployeeIDEmployeeNameSalary
1Alice3000
2Bob4000
3Charlie2500
4David5000
5Emma3500

1. LAG() Example

Get the previous employee's salary:

SELECT
    EmployeeID,
    EmployeeName,
    Salary,
    LAG(Salary) OVER (
        ORDER BY EmployeeID
    ) AS PreviousSalary
FROM Employees;

Output

EmployeeIDEmployeeNameSalaryPreviousSalary
1Alice3000NULL
2Bob40003000
3Charlie25004000
4David50002500
5Emma35005000

LAG() looks backward.


2. LEAD() Example

Get the next employee's salary:

SELECT
    EmployeeID,
    EmployeeName,
    Salary,
    LEAD(Salary) OVER (
        ORDER BY EmployeeID
    ) AS NextSalary
FROM Employees;

Output

EmployeeIDEmployeeNameSalaryNextSalary
1Alice30004000
2Bob40002500
3Charlie25005000
4David50003500
5Emma3500NULL

LEAD() looks forward.


3. Compare Salary with Previous Employee

SELECT
    EmployeeID,
    EmployeeName,
    Salary,
    LAG(Salary) OVER (ORDER BY EmployeeID) AS PrevSalary,
    Salary - LAG(Salary) OVER (ORDER BY EmployeeID) AS Difference
FROM Employees;

Output

EmployeeSalaryPrevSalaryDifference
Alice3000NULLNULL
Bob400030001000
Charlie25004000-1500
David500025002500
Emma35005000-1500

Useful for salary growth/decline analysis.


4. Using Offset

Previous 2nd Employee Salary

SELECT
    EmployeeID,
    EmployeeName,
    Salary,
    LAG(Salary, 2) OVER (
        ORDER BY EmployeeID
    ) AS Salary2RowsBack
FROM Employees;

Output:

EmployeeSalarySalary2RowsBack
Alice3000NULL
Bob4000NULL
Charlie25003000
David50004000
Emma35002500

5. Default Value Example

Instead of NULL:

SELECT
    EmployeeID,
    EmployeeName,
    Salary,
    LAG(Salary, 1, 0) OVER (
        ORDER BY EmployeeID
    ) AS PreviousSalary
FROM Employees;

Output:

EmployeeSalaryPreviousSalary
Alice30000
Bob40003000
Charlie25004000
David50002500
Emma35005000

Syntax

LAG

LAG(column_name, offset, default_value)
OVER (ORDER BY column_name)

LEAD

LEAD(column_name, offset, default_value)
OVER (ORDER BY column_name)

Interview Use Cases

  • Compare current row with previous row.

  • Find salary increase/decrease.

  • Calculate month-over-month sales growth.

  • Detect gaps in dates or sequences.

  • Compare employee performance with previous/next employee.

  • Find consecutive records (attendance, login history, transactions).

SUM() OVER() is a window function often used to calculate a running total without grouping rows.

 

SUM() OVER() is a window function often used to calculate a running total without grouping rows.

Sample Employees Table

EmployeeIDEmployeeNameSalary
1Alice3000
2Bob4000
3Charlie2500
4David5000
5Emma3500

Running Total of Salaries

SELECT
    EmployeeID,
    EmployeeName,
    Salary,
    SUM(Salary) OVER (
        ORDER BY EmployeeID
    ) AS RunningTotal
FROM Employees;

Output

EmployeeIDEmployeeNameSalaryRunningTotal
1Alice30003000
2Bob40007000
3Charlie25009500
4David500014500
5Emma350018000

How it works

For each row:

  • Alice → 3000

  • Alice + Bob → 7000

  • Alice + Bob + Charlie → 9500

  • Alice + Bob + Charlie + David → 14500

  • Alice + Bob + Charlie + David + Emma → 18000

The window function is effectively doing:

SUM(Salary) OVER (
    ORDER BY EmployeeID
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)

which means:

  • Start from the first row (UNBOUNDED PRECEDING)

  • Keep adding values up to the current row (CURRENT ROW)

Running Total by Department

Suppose the table has a department column:

EmployeeIDEmployeeNameDepartmentSalary
1AliceHR3000
2BobHR4000
3CharlieIT2500
4DavidIT5000
5EmmaHR3500
SELECT
    EmployeeID,
    EmployeeName,
    Department,
    Salary,
    SUM(Salary) OVER (
        PARTITION BY Department
        ORDER BY EmployeeID
    ) AS DeptRunningTotal
FROM Employees;

Output

EmployeeIDEmployeeNameDepartmentSalaryDeptRunningTotal
1AliceHR30003000
2BobHR40007000
5EmmaHR350010500
3CharlieIT25002500
4DavidIT50007500

Here, PARTITION BY Department restarts the running total for each department.

Formula:

SUM(column_name) OVER (
    [PARTITION BY partition_column]
    ORDER BY order_column
)

This is one of the most common SQL interview questions involving window functions.

Tuesday, January 14, 2025

Decorator Function in python

A decorator is a function that takes another function and extends its behavior without explicitly modifying it. 
It's a powerful tool that allows you to add functionality to an existing code in a clean and readable way.


#Define a decorator function
def my_decorator(func):
    def wrapper():
        print("Something is happening before the function is called.")
        func()
        print("Something is happening after the function is called.")
    return wrapper

#Apply the decorator to a function
@my_decorator
def say_hello():
    print("Hello!")

#Call the decorated function
say_hello()


Explanation:

1. The my_decorator function takes another function (func) as an argument.
2. Inside my_decorator, a new function (wrapper) is defined. 
   This function calls the original function (func) and also does some additional work before and after calling func.
3. The wrapper function is returned by my_decorator.
4. The @my_decorator syntax before the say_hello function definition applies the my_decorator decorator to say_hello. 
   This means that when say_hello is called, it will actually call the wrapper function returned by my_decorator.
5. When say_hello() is called, it will print:


Explanation of Parameters

1. decorator_name(func):

decorator_name: This is the name of the decorator function.
func: This parameter represents the function being decorated. When you use a decorator, the decorated function is passed to this parameter.

2. wrapper(*args, **kwargs):

wrapper: This is a nested function inside the decorator. It wraps the original function, adding additional functionality.
*args: This collects any positional arguments passed to the decorated function into a tuple.
**kwargs: This collects any keyword arguments passed to the decorated function into a dictionary.
The wrapper function allows the decorator to handle functions with any number and types of arguments.

3. @decorator_name:

This syntax applies the decorator to the function_to_decorate function. It is equivalent to writing function_to_decorate = decorator_name(function_to_decorate).


Something is happening before the function is called.
Hello!
Something is happening after the function is called.



Without Decorator Function:
##########################
def say_hello():
    print("Hello!")

#Call the decorated function
say_hello()


Output: 
    
Hello!


Ex-2 : User Authentication:
    
    def requires_authentication(func):
    def wrapper(user, *args, **kwargs):
        if not user.get('is_authenticated', False):
            print("User is not authenticated. Access denied.")
            return None
        return func(user, *args, **kwargs)
    return wrapper


# Apply the decorator
@requires_authentication

def view_profile(user):
    print(f"Viewing profile of {user['username']}")

# Test the decorated function
user1 = {'username': 'Alice', 'is_authenticated': True}
user2 = {'username': 'Bob', 'is_authenticated': False}

view_profile(user1)
view_profile(user2)

#Output:
 Viewing profile of Alice
 User is not authenticated. Access denied.   
 
 
 
 #Without Decorator function 
 
 def view_profile(user):
    print(f"Viewing profile of {user['username']}")

# Test the decorated function
user1 = {'username': 'Alice', 'is_authenticated': True}
user2 = {'username': 'Bob', 'is_authenticated': False}

view_profile(user1)
view_profile(user2)

#output
Viewing profile of Alice
Viewing profile of Bob

Exploding nested JSON in Pyspark (Struct and Array)

Exploding nested JSON in Pyspark (Struct and Array) 


'''

{
"company": {
"name": "Tech Innovators Inc.",
"founded": 2010,
"isPublic": true,
"departments": [
{
"name": "Engineering",
"manager": {
"name": "Alice Johnson",
"email": "alice.johnson@techinnovators.com"
},
"employees": [
{
"name": "John Doe",
"position": "Software Engineer",
"skills": ["JavaScript", "Python", "AWS"]
},
{
"name": "Emily Smith",
"position": "DevOps Engineer",
"skills": ["Docker", "Kubernetes", "Linux"]
}
]
},
{
"name": "Marketing",
"manager": {
"name": "Robert Brown",
"email": "robert.brown@techinnovators.com"
},
"employees": [
{
"name": "Laura Wilson",
"position": "Marketing Specialist",
"skills": ["SEO", "Content Writing", "Analytics"]
}
]
}
],
"contact": {
"address": {
"street": "123 Innovation Lane",
"city": "Tech City",
"state": "CA",
"zip": "94000"
},
"phone": "+1-800-555-1234",
"email": "info@techinnovators.com"
}
}
}



'''
Explanation:
Top-level object: The company key holds the main structure.

Nested objects:
departments is an array of objects, each representing a department.
Each department object has a manager (another object) and an employees array.

Arrays:
employees contains an array of objects, with each object describing an employee.

Each employee has an array of skills.

Contact information: contact contains another nested object for the address and other contact details.

This JSON structure can be easily parsed and used in programming languages like JavaScript,
 Python, or any other that supports JSON. 
 '''



'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSON").getOrCreate()

# Load the JSON file
df = spark.read.format('json').option('multiline','true').load('E:\\source_files\\nested_json_explode_example.json')
#df.show(truncate=False)
df.printSchema()
'''
root
|-- company: struct (nullable = true)
| |-- contact: struct (nullable = true)
| | |-- address: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- street: string (nullable = true)
| | | |-- zip: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- phone: string (nullable = true)
| |-- departments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- employees: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- name: string (nullable = true)
| | | | | |-- position: string (nullable = true)
| | | | | |-- skills: array (nullable = true)
| | | | | | |-- element: string (containsNull = true)
| | | |-- manager: struct (nullable = true)
| | | | |-- email: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | |-- name: string (nullable = true)
| |-- founded: long (nullable = true)
| |-- isPublic: boolean (nullable = true)
| |-- name: string (nullable = true)
'''
# Flatten the nested fields
flattened_df = df.select(col("company.name").alias("company_name"),
col("company.founded").alias("company_founded"),
col("company.isPublic").alias("company_isPublic"),
col("company.contact.address.street").alias("contact_street"),
col("company.contact.address.city").alias("contact_city"),
col("company.contact.address.state").alias("contact_state"),
col("company.contact.address.zip").alias("contact_zip"),
col("company.contact.phone").alias("contact_phone"),
col("company.contact.email").alias("contact_email"),

explode("company.departments").alias("department")) \
.select("company_name", "company_founded", "company_isPublic", "contact_street", "contact_city",
"contact_state", "contact_zip", "contact_phone", "contact_email",

col("department.name").alias("department_name"), col("department.manager.name").alias("manager_name"),
col("department.manager.email").alias("manager_email"),

explode("department.employees").alias("employee")) \
.select("company_name", "company_founded", "company_isPublic", "contact_street", "contact_city",
"contact_state", "contact_zip", "contact_phone", "contact_email", "department_name",
"manager_name", "manager_email",
col("employee.name").alias("employee_name"),
col("employee.position").alias("employee_position"),
explode(col("employee.skills").alias("employee_skills"))
)

# Show the flattened DataFrame
flattened_df.show(truncate=False)

###############################################

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSON").getOrCreate()

# Load the JSON file
df = spark.read.format('json').option('Multiline','True').load('E:\\source_files\\nested_json_explode_example.json')
#df.show(truncate=False)
df.printSchema()

# Select and display top-level fields
df.select("company.name", "company.founded", "company.isPublic").show()

#Explode the nested departments array
departments_df = df.select(explode("company.departments").alias("department"))

departments_df.printSchema()

# Explode the nested employees array within departments
expanded_df = departments_df.select(
    col("department.name").alias("department_name"),
    col("department.manager.name").alias("manager_name"),
    col("department.manager.email").alias("manager_email"),
    explode(col("department.employees")).alias("employee")
)

# Further expand employee details
final_df = expanded_df.select(
    col("department_name"),
    col("manager_name"),
    col("manager_email"),
    col("employee.name").alias("employee_name"),
    col("employee.position").alias("employee_position"),
    explode(col("employee.skills")).alias("employee_skill")
)

# Show the final exploded DataFrame
final_df.show(truncate=False)

# Select and display contact information
contact_df = df.select("company.contact.*")
contact_df.select(
"address.street", "address.city", "address.state", "address.zip", "phone", "email" ).show()
# Stop Spark session
spark.stop()





Saturday, January 4, 2025

lambda function and its limitations in AWS

 

In AWS, Lambda functions are serverless compute services that allow you to run code in response to events or HTTP requests without provisioning or managing servers. While AWS Lambda is a powerful tool, it has certain limitations you should be aware of when designing applications.


Key Features of AWS Lambda:

  • Supports multiple runtimes (e.g., Python, Node.js, Java, etc.).
  • Scales automatically based on traffic.
  • Pay-as-you-go model, billed for execution time and requests.

Common Limitations of AWS Lambda:

1. Execution Timeout

  • Limit: Maximum execution time for a Lambda function is 15 minutes.
  • Impact: Long-running tasks such as batch processing, video encoding, or large database operations may fail.
  • Solution: Use AWS Step Functions for workflows or break tasks into smaller chunks.

2. Memory and CPU

  • Limit: Memory allocation ranges from 128 MB to 10,240 MB.
    • CPU is proportional to memory, with no option to configure CPU directly.
  • Impact: Computationally intensive tasks may require higher memory settings.
  • Solution: Optimize code, offload heavy processing to ECS/EKS, or use purpose-built services like AWS SageMaker for ML tasks.

3. Ephemeral Storage

  • Limit: Each Lambda function gets 512 MB of temporary storage in the /tmp directory.
  • Impact: Insufficient for storing large files during execution.
  • Solution: Use S3 for temporary storage or increase storage with ephemeral storage (up to 10 GB as of 2023).

4. Deployment Package Size

  • Limit:
    • 50 MB for direct upload as a ZIP file.
    • 250 MB unzipped (including layers).
  • Impact: Large libraries or dependencies may exceed this limit.
  • Solution: Use Lambda Layers to share dependencies or container images (up to 10 GB).

5. Concurrent Execution

  • Limit: Default concurrency limit is 1,000 simultaneous executions per region, adjustable by AWS support.
  • Impact: Exceeding this limit leads to throttling, which may affect user experience.
  • Solution: Request a limit increase or use reserved concurrency to allocate resources to critical functions.

6. Cold Starts

  • Limit: When a function is invoked after being idle, a cold start occurs, adding latency.
  • Impact: Affects real-time or low-latency applications.
  • Solution: Use provisioned concurrency or optimize function initialization.

7. VPC Networking

  • Limit: Lambda functions inside a VPC may experience additional latency when establishing ENI (Elastic Network Interface) connections.
  • Impact: Slower execution when accessing VPC resources like RDS or Elasticsearch.
  • Solution: Use AWS PrivateLink, reduce VPC subnets, or optimize ENI setup.

8. Supported Runtimes

  • Limit: Only supports specific runtimes (e.g., Python, Node.js, Java, Go).
  • Impact: Custom runtimes need to be built using AWS Lambda Runtime API.
  • Solution: Use custom runtimes or container images for unsupported languages.

9. Statefulness

  • Limit: AWS Lambda is stateless, meaning the function does not retain state between invocations.
  • Impact: Complex applications requiring persistent state need additional storage.
  • Solution: Use DynamoDB, S3, or external databases for state management.

10. Execution Environment

  • Limit: Functions run in a sandboxed environment with restrictions on OS access, thread counts, and system libraries.
  • Impact: Limited control over the underlying environment.
  • Solution: Use container-based Lambdas for more control over the runtime.

11. IAM Permissions

  • Limit: Misconfigured IAM roles or excessive permissions can lead to security issues.
  • Impact: Potential data leaks or unauthorized access.
  • Solution: Follow the principle of least privilege for IAM roles.

12. Cost

  • Limit: While Lambda is cost-effective for infrequent tasks, high-frequency or long-running tasks can become expensive.
  • Impact: Unexpected costs for poorly optimized or high-throughput applications.
  • Solution: Monitor costs using AWS Cost Explorer or switch to alternative compute services (e.g., ECS, Fargate).

Conclusion

AWS Lambda is a versatile and efficient solution for event-driven and serverless architectures, but its limitations require careful design and planning. Understanding and working around these constraints ensures optimal performance and cost-efficiency. For complex applications, consider hybrid approaches using other AWS services.

Map vs flatmap in Pyspark

 

The difference between flatMap and map in PySpark lies in the output structure they produce after applying a transformation function to each element of an RDD. Both are transformations, but they behave differently based on the results of the applied function.


Key Differences Between map and flatMap

Feature map flatMap
Output Transforms each input element into exactly one output element. Transforms each input element into zero, one, or multiple output elements.
Flattening Does not flatten the output; results remain nested if the function returns a list or collection. Flattens the output; all elements from lists or collections are returned as a single, flattened sequence.
Use Case Use when the function produces one-to-one mapping or transformation. Use when the function may produce multiple outputs or a collection for each input.

Examples

1. map Example

Each element of the RDD is transformed into exactly one element in the result.

from pyspark import SparkContext

sc = SparkContext("local", "Map vs FlatMap")

# Input RDD
rdd = sc.parallelize([1, 2, 3])

# Apply map to double each number
mapped_rdd = rdd.map(lambda x: [x, x * 2])

print(mapped_rdd.collect())  
# Output: [[1, 2], [2, 4], [3, 6]]

2. flatMap Example

Each element can be transformed into multiple outputs, and the result is flattened.

# Apply flatMap to produce multiple outputs for each element
flat_mapped_rdd = rdd.flatMap(lambda x: [x, x * 2])

print(flat_mapped_rdd.collect())  
# Output: [1, 2, 2, 4, 3, 6]

Key Points in Behavior

  1. Nested Output with map:

    • The map transformation retains the structure of the function's output, even if it is a list or collection.
    • Example: A single list [1, 2] remains as [1, 2] inside the RDD.
  2. Flattened Output with flatMap:

    • The flatMap transformation flattens the output of the function.
    • Example: A list [1, 2] is split into separate elements 1 and 2 in the final RDD.

When to Use Which?

  • Use map:

    • When you want a one-to-one transformation (e.g., applying a function to each element).
    • When the transformation doesn't produce lists or collections as output.
  • Use flatMap:

    • When you need a one-to-many transformation or need to flatten the output.
    • When the function produces lists, collections, or even empty outputs for some elements.

Advanced Example

Splitting Sentences into Words (flatMap vs. map)

# Input RDD of sentences
rdd = sc.parallelize(["Hello world", "PySpark map and flatMap"])

# Using map
mapped_rdd = rdd.map(lambda sentence: sentence.split(" "))
print(mapped_rdd.collect())
# Output: [['Hello', 'world'], ['PySpark', 'map', 'and', 'flatMap']]

# Using flatMap
flat_mapped_rdd = rdd.flatMap(lambda sentence: sentence.split(" "))
print(flat_mapped_rdd.collect())
# Output: ['Hello', 'world', 'PySpark', 'map', 'and', 'flatMap']

Summary

  • Use map for transformations where the output is exactly one element per input.
  • Use flatMap for transformations where the output may be multiple elements per input or where the result needs to be flattened into a single list.

difference between reducebykey and groupbykey in pyspark

 

In PySpark, both reduceByKey and groupByKey are operations used on paired RDDs (key-value RDDs) for aggregating data by keys. However, they differ in terms of functionality, performance, and when you should use them.


Key Differences Between reduceByKey and groupByKey:

Feature reduceByKey groupByKey
Purpose Combines values for each key using a binary function (e.g., sum, max). Groups all values for each key into an iterable.
Performance More efficient, as it performs aggregation locally (on each partition) before shuffling data. Less efficient, as it involves a full shuffle of data before grouping.
Shuffle Behavior Reduces the amount of data shuffled across the network. Transfers all values to the same partition, which can be costly.
Output Returns an RDD with one value per key (e.g., (key, aggregated_value)). Returns an RDD with all values for each key (e.g., (key, [value1, value2, ...])).
Use Case Use when you need to aggregate values (e.g., sum, max). Use when you need all the values for a key.

Examples

1. reduceByKey Example

Use reduceByKey for aggregation, such as summing up values for each key.

from pyspark import SparkContext

sc = SparkContext("local", "reduceByKey Example")

# Example RDD
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 2), ("b", 3)])

# Sum values for each key
result = rdd.reduceByKey(lambda x, y: x + y)

print(result.collect())  # Output: [('a', 3), ('b', 5)]
  • Aggregation happens locally on each partition first (e.g., summing values for "a" and "b" separately in each partition), reducing the amount of data shuffled across the network.

2. groupByKey Example

Use groupByKey when you need all values for each key as a collection.

# Group values for each key
result = rdd.groupByKey()

# Convert the result to a list for inspection
print([(key, list(values)) for key, values in result.collect()])
# Output: [('a', [1, 2]), ('b', [2, 3])]
  • All values for each key are shuffled across the network to the same partition.

Performance Comparison

  1. reduceByKey is more efficient:

    • Combines values within each partition before shuffling, reducing the amount of data transferred across the network.
  2. groupByKey can be expensive:

    • Transfers all values for each key across the network, which can lead to out-of-memory errors if one key has many values (skewed data).

When to Use Which?

  • Use reduceByKey:

    • When performing aggregation operations (e.g., sum, average, max, etc.).
    • Preferred due to its better performance and reduced shuffling.
  • Use groupByKey:

    • When you need to process all the values for a key at once (e.g., custom processing like sorting values or performing non-reducible operations).

Pro Tip: Replace groupByKey with combineByKey or reduceByKey whenever possible for better performance. For example, if you want to calculate the average per key, use combineByKey instead of grouping all values and computing the average manually.