A decorator is a function that takes another function and extends its behavior without explicitly modifying it. It's a powerful tool that allows you to add functionality to an existing code in a clean and readable way. #Define a decorator function def my_decorator(func): def wrapper(): print("Something is happening before the function is called.") func() print("Something is happening after the function is called.") return wrapper #Apply the decorator to a function @my_decorator def say_hello(): print("Hello!") #Call the decorated function say_hello() Explanation: 1. The my_decorator function takes another function (func) as an argument. 2. Inside my_decorator, a new function (wrapper) is defined. This function calls the original function (func) and also does some additional work before and after calling func. 3. The wrapper function is returned by my_decorator. 4. The @my_decorator syntax before the say_hello function definition applies the my_decorator decorator to say_hello. This means that when say_hello is called, it will actually call the wrapper function returned by my_decorator. 5. When say_hello() is called, it will print: Explanation of Parameters 1. decorator_name(func): decorator_name: This is the name of the decorator function. func: This parameter represents the function being decorated. When you use a decorator, the decorated function is passed to this parameter. 2. wrapper(*args, **kwargs): wrapper: This is a nested function inside the decorator. It wraps the original function, adding additional functionality. *args: This collects any positional arguments passed to the decorated function into a tuple. **kwargs: This collects any keyword arguments passed to the decorated function into a dictionary. The wrapper function allows the decorator to handle functions with any number and types of arguments. 3. @decorator_name: This syntax applies the decorator to the function_to_decorate function. It is equivalent to writing function_to_decorate = decorator_name(function_to_decorate). Something is happening before the function is called. Hello! Something is happening after the function is called. Without Decorator Function: ########################## def say_hello(): print("Hello!") #Call the decorated function say_hello() Output: Hello! Ex-2 : User Authentication: def requires_authentication(func): def wrapper(user, *args, **kwargs): if not user.get('is_authenticated', False): print("User is not authenticated. Access denied.") return None return func(user, *args, **kwargs) return wrapper # Apply the decorator @requires_authentication def view_profile(user): print(f"Viewing profile of {user['username']}") # Test the decorated function user1 = {'username': 'Alice', 'is_authenticated': True} user2 = {'username': 'Bob', 'is_authenticated': False} view_profile(user1) view_profile(user2) #Output: Viewing profile of Alice User is not authenticated. Access denied. #Without Decorator function def view_profile(user): print(f"Viewing profile of {user['username']}") # Test the decorated function user1 = {'username': 'Alice', 'is_authenticated': True} user2 = {'username': 'Bob', 'is_authenticated': False} view_profile(user1) view_profile(user2) #output Viewing profile of Alice Viewing profile of Bob
To live a creative life we must lose the fear of being wrong.
Tuesday, January 14, 2025
Decorator Function in python
Exploding nested JSON in Pyspark (Struct and Array)
Exploding nested JSON in Pyspark (Struct and Array)
'''
{
"company": {
"name": "Tech Innovators Inc.",
"founded": 2010,
"isPublic": true,
"departments": [
{
"name": "Engineering",
"manager": {
"name": "Alice Johnson",
"email": "alice.johnson@techinnovators.com"
},
"employees": [
{
"name": "John Doe",
"position": "Software Engineer",
"skills": ["JavaScript", "Python", "AWS"]
},
{
"name": "Emily Smith",
"position": "DevOps Engineer",
"skills": ["Docker", "Kubernetes", "Linux"]
}
]
},
{
"name": "Marketing",
"manager": {
"name": "Robert Brown",
"email": "robert.brown@techinnovators.com"
},
"employees": [
{
"name": "Laura Wilson",
"position": "Marketing Specialist",
"skills": ["SEO", "Content Writing", "Analytics"]
}
]
}
],
"contact": {
"address": {
"street": "123 Innovation Lane",
"city": "Tech City",
"state": "CA",
"zip": "94000"
},
"phone": "+1-800-555-1234",
"email": "info@techinnovators.com"
}
}
}''' Explanation: Top-level object: The company key holds the main structure.
Nested objects: departments is an array of objects, each representing a department. Each department object has a manager (another object) and an employees array. Arrays: employees contains an array of objects, with each object describing an employee.
Each employee has an array of skills. Contact information: contact contains another nested object for the address and other contact details. This JSON structure can be easily parsed and used in programming languages like JavaScript, Python, or any other that supports JSON. '''
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSON").getOrCreate()
# Load the JSON file
df = spark.read.format('json').option('multiline','true').load('E:\\source_files\\nested_json_explode_example.json')
#df.show(truncate=False)
df.printSchema()
'''
root
|-- company: struct (nullable = true)
| |-- contact: struct (nullable = true)
| | |-- address: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- street: string (nullable = true)
| | | |-- zip: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- phone: string (nullable = true)
| |-- departments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- employees: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- name: string (nullable = true)
| | | | | |-- position: string (nullable = true)
| | | | | |-- skills: array (nullable = true)
| | | | | | |-- element: string (containsNull = true)
| | | |-- manager: struct (nullable = true)
| | | | |-- email: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | |-- name: string (nullable = true)
| |-- founded: long (nullable = true)
| |-- isPublic: boolean (nullable = true)
| |-- name: string (nullable = true)
'''
# Flatten the nested fields
flattened_df = df.select(col("company.name").alias("company_name"),
col("company.founded").alias("company_founded"),
col("company.isPublic").alias("company_isPublic"),
col("company.contact.address.street").alias("contact_street"),
col("company.contact.address.city").alias("contact_city"),
col("company.contact.address.state").alias("contact_state"),
col("company.contact.address.zip").alias("contact_zip"),
col("company.contact.phone").alias("contact_phone"),
col("company.contact.email").alias("contact_email"),
explode("company.departments").alias("department")) \
.select("company_name", "company_founded", "company_isPublic", "contact_street", "contact_city",
"contact_state", "contact_zip", "contact_phone", "contact_email",
col("department.name").alias("department_name"), col("department.manager.name").alias("manager_name"),
col("department.manager.email").alias("manager_email"),
explode("department.employees").alias("employee")) \
.select("company_name", "company_founded", "company_isPublic", "contact_street", "contact_city",
"contact_state", "contact_zip", "contact_phone", "contact_email", "department_name",
"manager_name", "manager_email",
col("employee.name").alias("employee_name"),
col("employee.position").alias("employee_position"),
explode(col("employee.skills").alias("employee_skills"))
)
# Show the flattened DataFrame
flattened_df.show(truncate=False)
###############################################
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSON").getOrCreate()
# Load the JSON file
df = spark.read.format('json').option('Multiline','True').load('E:\\source_files\\nested_json_explode_example.json')
#df.show(truncate=False)
df.printSchema()
# Select and display top-level fields
df.select("company.name", "company.founded", "company.isPublic").show()
#Explode the nested departments array
departments_df = df.select(explode("company.departments").alias("department"))
departments_df.printSchema()
# Explode the nested employees array within departments
expanded_df = departments_df.select(
col("department.name").alias("department_name"),
col("department.manager.name").alias("manager_name"),
col("department.manager.email").alias("manager_email"),
explode(col("department.employees")).alias("employee")
)
# Further expand employee details
final_df = expanded_df.select(
col("department_name"),
col("manager_name"),
col("manager_email"),
col("employee.name").alias("employee_name"),
col("employee.position").alias("employee_position"),
explode(col("employee.skills")).alias("employee_skill")
)
# Show the final exploded DataFrame
final_df.show(truncate=False)
# Select and display contact information
contact_df = df.select("company.contact.*")
contact_df.select(
"address.street", "address.city", "address.state", "address.zip", "phone", "email" ).show()
# Stop Spark session
spark.stop()
Saturday, January 4, 2025
lambda function and its limitations in AWS
In AWS, Lambda functions are serverless compute services that allow you to run code in response to events or HTTP requests without provisioning or managing servers. While AWS Lambda is a powerful tool, it has certain limitations you should be aware of when designing applications.
Key Features of AWS Lambda:
- Supports multiple runtimes (e.g., Python, Node.js, Java, etc.).
- Scales automatically based on traffic.
- Pay-as-you-go model, billed for execution time and requests.
Common Limitations of AWS Lambda:
1. Execution Timeout
- Limit: Maximum execution time for a Lambda function is 15 minutes.
- Impact: Long-running tasks such as batch processing, video encoding, or large database operations may fail.
- Solution: Use AWS Step Functions for workflows or break tasks into smaller chunks.
2. Memory and CPU
- Limit: Memory allocation ranges from 128 MB to 10,240 MB.
- CPU is proportional to memory, with no option to configure CPU directly.
- Impact: Computationally intensive tasks may require higher memory settings.
- Solution: Optimize code, offload heavy processing to ECS/EKS, or use purpose-built services like AWS SageMaker for ML tasks.
3. Ephemeral Storage
- Limit: Each Lambda function gets 512 MB of temporary storage in the
/tmp
directory. - Impact: Insufficient for storing large files during execution.
- Solution: Use S3 for temporary storage or increase storage with ephemeral storage (up to 10 GB as of 2023).
4. Deployment Package Size
- Limit:
- 50 MB for direct upload as a ZIP file.
- 250 MB unzipped (including layers).
- Impact: Large libraries or dependencies may exceed this limit.
- Solution: Use Lambda Layers to share dependencies or container images (up to 10 GB).
5. Concurrent Execution
- Limit: Default concurrency limit is 1,000 simultaneous executions per region, adjustable by AWS support.
- Impact: Exceeding this limit leads to throttling, which may affect user experience.
- Solution: Request a limit increase or use reserved concurrency to allocate resources to critical functions.
6. Cold Starts
- Limit: When a function is invoked after being idle, a cold start occurs, adding latency.
- Impact: Affects real-time or low-latency applications.
- Solution: Use provisioned concurrency or optimize function initialization.
7. VPC Networking
- Limit: Lambda functions inside a VPC may experience additional latency when establishing ENI (Elastic Network Interface) connections.
- Impact: Slower execution when accessing VPC resources like RDS or Elasticsearch.
- Solution: Use AWS PrivateLink, reduce VPC subnets, or optimize ENI setup.
8. Supported Runtimes
- Limit: Only supports specific runtimes (e.g., Python, Node.js, Java, Go).
- Impact: Custom runtimes need to be built using AWS Lambda Runtime API.
- Solution: Use custom runtimes or container images for unsupported languages.
9. Statefulness
- Limit: AWS Lambda is stateless, meaning the function does not retain state between invocations.
- Impact: Complex applications requiring persistent state need additional storage.
- Solution: Use DynamoDB, S3, or external databases for state management.
10. Execution Environment
- Limit: Functions run in a sandboxed environment with restrictions on OS access, thread counts, and system libraries.
- Impact: Limited control over the underlying environment.
- Solution: Use container-based Lambdas for more control over the runtime.
11. IAM Permissions
- Limit: Misconfigured IAM roles or excessive permissions can lead to security issues.
- Impact: Potential data leaks or unauthorized access.
- Solution: Follow the principle of least privilege for IAM roles.
12. Cost
- Limit: While Lambda is cost-effective for infrequent tasks, high-frequency or long-running tasks can become expensive.
- Impact: Unexpected costs for poorly optimized or high-throughput applications.
- Solution: Monitor costs using AWS Cost Explorer or switch to alternative compute services (e.g., ECS, Fargate).
Conclusion
AWS Lambda is a versatile and efficient solution for event-driven and serverless architectures, but its limitations require careful design and planning. Understanding and working around these constraints ensures optimal performance and cost-efficiency. For complex applications, consider hybrid approaches using other AWS services.
Map vs flatmap in Pyspark
The difference between flatMap
and map
in PySpark lies in the output structure they produce after applying a transformation function to each element of an RDD. Both are transformations, but they behave differently based on the results of the applied function.
Key Differences Between map
and flatMap
Feature | map |
flatMap |
---|---|---|
Output | Transforms each input element into exactly one output element. | Transforms each input element into zero, one, or multiple output elements. |
Flattening | Does not flatten the output; results remain nested if the function returns a list or collection. | Flattens the output; all elements from lists or collections are returned as a single, flattened sequence. |
Use Case | Use when the function produces one-to-one mapping or transformation. | Use when the function may produce multiple outputs or a collection for each input. |
Examples
1. map
Example
Each element of the RDD is transformed into exactly one element in the result.
from pyspark import SparkContext
sc = SparkContext("local", "Map vs FlatMap")
# Input RDD
rdd = sc.parallelize([1, 2, 3])
# Apply map to double each number
mapped_rdd = rdd.map(lambda x: [x, x * 2])
print(mapped_rdd.collect())
# Output: [[1, 2], [2, 4], [3, 6]]
2. flatMap
Example
Each element can be transformed into multiple outputs, and the result is flattened.
# Apply flatMap to produce multiple outputs for each element
flat_mapped_rdd = rdd.flatMap(lambda x: [x, x * 2])
print(flat_mapped_rdd.collect())
# Output: [1, 2, 2, 4, 3, 6]
Key Points in Behavior
-
Nested Output with
map
:- The
map
transformation retains the structure of the function's output, even if it is a list or collection. - Example: A single list
[1, 2]
remains as[1, 2]
inside the RDD.
- The
-
Flattened Output with
flatMap
:- The
flatMap
transformation flattens the output of the function. - Example: A list
[1, 2]
is split into separate elements1
and2
in the final RDD.
- The
When to Use Which?
-
Use
map
:- When you want a one-to-one transformation (e.g., applying a function to each element).
- When the transformation doesn't produce lists or collections as output.
-
Use
flatMap
:- When you need a one-to-many transformation or need to flatten the output.
- When the function produces lists, collections, or even empty outputs for some elements.
Advanced Example
Splitting Sentences into Words (flatMap
vs. map
)
# Input RDD of sentences
rdd = sc.parallelize(["Hello world", "PySpark map and flatMap"])
# Using map
mapped_rdd = rdd.map(lambda sentence: sentence.split(" "))
print(mapped_rdd.collect())
# Output: [['Hello', 'world'], ['PySpark', 'map', 'and', 'flatMap']]
# Using flatMap
flat_mapped_rdd = rdd.flatMap(lambda sentence: sentence.split(" "))
print(flat_mapped_rdd.collect())
# Output: ['Hello', 'world', 'PySpark', 'map', 'and', 'flatMap']
Summary
- Use
map
for transformations where the output is exactly one element per input. - Use
flatMap
for transformations where the output may be multiple elements per input or where the result needs to be flattened into a single list.
difference between reducebykey and groupbykey in pyspark
In PySpark, both reduceByKey
and groupByKey
are operations used on paired RDDs (key-value RDDs) for aggregating data by keys. However, they differ in terms of functionality, performance, and when you should use them.
Key Differences Between reduceByKey
and groupByKey
:
Feature | reduceByKey |
groupByKey |
---|---|---|
Purpose | Combines values for each key using a binary function (e.g., sum, max). | Groups all values for each key into an iterable. |
Performance | More efficient, as it performs aggregation locally (on each partition) before shuffling data. | Less efficient, as it involves a full shuffle of data before grouping. |
Shuffle Behavior | Reduces the amount of data shuffled across the network. | Transfers all values to the same partition, which can be costly. |
Output | Returns an RDD with one value per key (e.g., (key, aggregated_value) ). |
Returns an RDD with all values for each key (e.g., (key, [value1, value2, ...]) ). |
Use Case | Use when you need to aggregate values (e.g., sum, max). | Use when you need all the values for a key. |
Examples
1. reduceByKey
Example
Use reduceByKey
for aggregation, such as summing up values for each key.
from pyspark import SparkContext
sc = SparkContext("local", "reduceByKey Example")
# Example RDD
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 2), ("b", 3)])
# Sum values for each key
result = rdd.reduceByKey(lambda x, y: x + y)
print(result.collect()) # Output: [('a', 3), ('b', 5)]
- Aggregation happens locally on each partition first (e.g., summing values for "a" and "b" separately in each partition), reducing the amount of data shuffled across the network.
2. groupByKey
Example
Use groupByKey
when you need all values for each key as a collection.
# Group values for each key
result = rdd.groupByKey()
# Convert the result to a list for inspection
print([(key, list(values)) for key, values in result.collect()])
# Output: [('a', [1, 2]), ('b', [2, 3])]
- All values for each key are shuffled across the network to the same partition.
Performance Comparison
-
reduceByKey
is more efficient:- Combines values within each partition before shuffling, reducing the amount of data transferred across the network.
-
groupByKey
can be expensive:- Transfers all values for each key across the network, which can lead to out-of-memory errors if one key has many values (skewed data).
When to Use Which?
-
Use
reduceByKey
:- When performing aggregation operations (e.g., sum, average, max, etc.).
- Preferred due to its better performance and reduced shuffling.
-
Use
groupByKey
:- When you need to process all the values for a key at once (e.g., custom processing like sorting values or performing non-reducible operations).
Pro Tip: Replace groupByKey
with combineByKey
or reduceByKey
whenever possible for better performance. For example, if you want to calculate the average per key, use combineByKey
instead of grouping all values and computing the average manually.
what is lambda function in python and spark
A lambda function, also known as an anonymous function, is a small and unnamed function defined using the `lambda` keyword. It is often used for short-term tasks, such as in functional programming operations like `map`, `filter`, and `reduce`. Here's a quick overview of how lambda functions work in both Python and PySpark:
### Python Lambda Function
A lambda function in Python can take any number of arguments but can only have one expression. The syntax is as follows:
```python
lambda arguments: expression
```
Here’s an example of using a lambda function to add two numbers:
```python
add = lambda x, y: x + y
print(add(2, 3)) # Output: 5
```
Lambda functions are often used with functions like `map()`, `filter()`, and `reduce()`:
```python
# Using lambda with map
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x ** 2, numbers))
print(squared) # Output: [1, 4, 9, 16, 25]
# Using lambda with filter
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
print(even_numbers) # Output: [2, 4]
# Using lambda with reduce
from functools import reduce
product = reduce(lambda x, y: x * y, numbers)
print(product) # Output: 120
```
### Lambda Function in PySpark
In PySpark, lambda functions are used in similar ways, especially with operations on RDDs. Here are some examples:
```python
from pyspark import SparkContext
sc = SparkContext("local", "example")
# Creating an RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Using lambda with map
squared_rdd = rdd.map(lambda x: x ** 2)
print(squared_rdd.collect()) # Output: [1, 4, 9, 16, 25]
# Using lambda with filter
even_rdd = rdd.filter(lambda x: x % 2 == 0)
print(even_rdd.collect()) # Output: [2, 4]
# Using lambda with reduce
product_rdd = rdd.reduce(lambda x, y: x * y)
print(product_rdd) # Output: 120
```
In both Python and PySpark, lambda functions provide a concise and powerful way to perform operations on data, especially in contexts where defining a full function would be overkill.
Factorial of a number
Recursive Approach
def factorial(n):
if n == 0 or n == 1:
return 1 ;
else:
return n * factorial(n - 1)
print(factorial(5)) # Output: 120
Iterative Approach
result = 1
for i in range(1, n + 1): -- for and return statements are in same indent
result *= i
return result
print(factorial(5)) # Output: 120
sort in python
The sort()
method in Python is used to sort a list in place, meaning the list itself is modified and reordered. It can sort the list in ascending or descending order based on the values or a custom key function.
Syntax:
list.sort(key=None, reverse=False)
Parameters:
key
(optional):- A function that specifies a sorting criterion. By default, elements are sorted based on their natural order.
- Example:
key=len
sorts the list by the length of each element.
reverse
(optional):- If
True
, the list is sorted in descending order. Default isFalse
(ascending order).
- If
Examples:
1. Basic Sorting (Ascending Order):
numbers = [3, 1, 4, 1, 5, 9]
numbers.sort()
print(numbers) # Output: [1, 1, 3, 4, 5, 9]
2. Sorting in Descending Order:
numbers.sort(reverse=True)
print(numbers) # Output: [9, 5, 4, 3, 1, 1]
3. Custom Sorting Using key
:
# Sort strings by their length
words = ["apple", "banana", "cherry", "date"]
words.sort(key=len)
print(words) # Output: ['date', 'apple', 'banana', 'cherry']
4. Sorting with a Custom Function:
# Sort numbers by their distance from 5
numbers = [10, 2, 8, 3, 6]
numbers.sort(key=lambda x: abs(x - 5))
print(numbers) # Output: [6, 3, 8, 2, 10]
Key Points:
-
In-place Sorting:
sort()
modifies the original list.- If you need a new sorted list without changing the original, use the
sorted()
function instead.
original = [3, 1, 4] sorted_list = sorted(original) # New sorted list print(original) # Output: [3, 1, 4]
-
Non-comparable Elements:
- Sorting a list with incompatible types (e.g., numbers and strings) will raise a
TypeError
.
- Sorting a list with incompatible types (e.g., numbers and strings) will raise a
-
Efficient Sorting:
sort()
uses the Timsort algorithm, which is highly optimized and stable.
The sort()
method is ideal for in-place sorting, while sorted()
is more versatile for generating new sorted lists.
Map, Filter, and Reduce in PySpark
In PySpark, map, filter, and reduce are operations applied to Resilient Distributed Datasets (RDDs) to perform transformations and actions. These operations are foundational for distributed data processing in PySpark.
1. Map
The map
transformation applies a function to each element of the RDD and returns a new RDD with transformed elements.
from pyspark import SparkContext
sc = SparkContext("local", "Map Example")
# Create an RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Use map to square each element
mapped_rdd = rdd.map(lambda x: x ** 2)
print(mapped_rdd.collect()) # Output: [1, 4, 9, 16, 25]
2. Filter
The filter
transformation selects elements from the RDD that satisfy a given condition and returns a new RDD.
# Filter RDD to select only even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect()) # Output: [2, 4]
# Filter RDD to select only even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect()) # Output: [2, 4]
3. Reduce
The reduce
action aggregates the elements of the RDD using a binary operator, returning a single value.
Example:
# Reduce RDD to compute the sum of elements
sum_result = rdd.reduce(lambda x, y: x + y)
print(sum_result) # Output: 15
Combining Map, Filter, and Reduce
These operations can be combined to perform complex computations in a distributed manner.
Example:
# Combine map, filter, and reduce
result = rdd.map(lambda x: x ** 2) \
.filter(lambda x: x > 10) \
.reduce(lambda x, y: x + y)
print(result) # Output: 41 (16 + 25 from squares greater than 10)
Key Points:
Lazy Evaluation:
map
andfilter
are transformations, so they are lazily evaluated and executed only when an action (e.g.,reduce
,collect
,count
) is called.
Distributed Nature:
- These operations are performed in a distributed manner, with
map
andfilter
transforming partitions independently, whilereduce
requires shuffling data across partitions.
- These operations are performed in a distributed manner, with
RDD Focus:
- These operations work on RDDs. For DataFrames, equivalent operations like
select
,filter
, andagg
are more commonly used.
If you're working on large-scale data, consider using PySpark DataFrame APIs for better performance and easier optimization by the Spark Catalyst opti
- These operations work on RDDs. For DataFrames, equivalent operations like
Where do we use pandas in Data Engg
Pandas is extensively used in Data Engineering for various tasks related to data manipulation, cleaning, transformation, and analysis. While Pandas is more commonly associated with Data Analysis, its capabilities make it an essential tool for small-to-medium-scale Data Engineering tasks or prototyping.
Here are specific areas where Pandas is used in Data Engineering:
Data Extraction (ETL Process)
- Use Case: Extracting data from various sources such as CSV files, Excel spreadsheets, databases, APIs, and more.
-
- import pandas as pd
# Load data from CSV
df = pd.read_csv("data.csv")
# Load data from a SQL database
import sqlite3
conn = sqlite3.connect('database.db')
df = pd.read_sql("SELECT * FROM table_name", conn)
2. Data Cleaning
Handling missing values, duplicates, and inconsistent data types.
# Drop rows with missing values
df.dropna(inplace=True)
# Fill missing values with default
df.fillna(value={"column": 0}, inplace=True)
# Remove duplicate rows
df.drop_duplicates(inplace=True)
3. Data Transformation
Aggregation, filtering, sorting, and feature engineering.
# Filter rows based on a condition
df_filtered = df[df["age"] > 30]
# Add a new column
df["salary_with_bonus"] = df["salary"] * 1.1
# Grouping and aggregation
df_grouped = df.groupby("department").agg({"salary": "mean"})
4. Data Validation and Profiling
Ensuring data consistency and profiling the dataset.
# Check for data types
print(df.dtypes)
# Get summary statistics
print(df.describe())
# Validate uniqueness of a column
assert df["id"].is_unique
5. Small-scale Data Pipelines
# Extract data
df = pd.read_csv("input.csv")
# Transform data
df["total_cost"] = df["quantity"] * df["price"]
df = df[df["total_cost"] > 100]
# Load data to a new file
df.to_csv("output.csv", index=False)
6. Data Integration
Merging and joining datasets from different sources.
# Merge two datasets
df_merged = pd.merge(df1, df2, on="id", how="inner")
. Prototyping and Testing
- Rapidly prototyping ETL logic before scaling it to frameworks like PySpark, Dask, or Apache Beam.
# Prototype logic using Pandas
df_transformed = df.groupby("category").sum()
# Scale up the logic in PySpark later
# Prototype logic using Pandas
df_transformed = df.groupby("category").sum()
# Scale up the logic in PySpark later
When to Use Pandas in Data Engineering:
- Small to Medium Datasets: When data fits in memory (up to a few GBs).
- Prototyping: Quick tests before scaling with distributed systems like PySpark or Dask.
- Ad-hoc Analysis: One-off or exploratory analysis.
- Preprocessing: Cleaning and transforming data before loading it into a database or data warehouse.
For larger datasets, distributed computing frameworks like PySpark, Dask, or Apache Beam are often more appropriate.
How to check class of another child class in python
In Python, you can use the isinstance()
function to check if an object is an instance of a specific class or any class derived from it. You can also use the issubclass()
function to check if a class is a subclass of another class.
class Parent:
pass
class Child(Parent):
pass
class Grandchild(Child):
pass
# Creating instances
p = Parent()
c = Child()
g = Grandchild()
# Checking instances
print(isinstance(c, Parent)) # Output: True, because Child is a subclass of Parent
print(isinstance(g, Child)) # Output: True, because Grandchild is a subclass of Child
print(isinstance(g, Parent)) # Output: True, because Grandchild is a subclass of Parent
# Checking subclasses
print(issubclass(Child, Parent)) # Output: True
print(issubclass(Grandchild, Parent)) # Output: True
print(issubclass(Grandchild, Child)) # Output: True
print(issubclass(Parent, Child)) # Output: False
isinstance(object, classinfo)
returnsTrue
ifobject
is an instance ofclassinfo
or a subclass thereof.issubclass(class, classinfo)
returnsTrue
ifclass
is a subclass ofclassinfo
.
These functions are handy for ensuring your objects and classes inherit from the expected parent classes.
isinstance(object, class)
:
- Checks if the object is an instance of the given class or its subclass.
- Useful when you have an object and want to verify its type.
issubclass(subclass, class)
:
- Checks if the given class is a subclass of another class.
- Useful when you’re dealing with class relationships and not specific instances.
If you want to check if an object belongs to a child class (and not the parent class directly):
if isinstance(c, Parent) and not isinstance(g, Child):
print("Object belongs to ChildA but not Parent directly.")
False
Shallow copy vs deep copy in python
'''
Shallow Copy (copy.copy):
A shallow copy creates a new object and then (to the extent possible) inserts references into it to the
objects found in the original object. This means that if you modify a sub-object of the original object,
the same modification will be reflected in the copied object.
Creates a new object and copies the references of the original object's internal data.
If the original object contains references to other objects, the shallow copy will share those same references.
Changes made to the original object or the nested objects will also affect the shallow copy.
'''
import copy
original_list = [[1, 2], [3, 4]]
# Create a shallow copy
shallow_copy_list = copy.copy(original_list)
# Modify the original list
original_list.append([5, 6])
original_list[0][0] = 'X'
print("Original List:", original_list)
print("Shallow Copy List:", shallow_copy_list)
'''
output:
Original List: [['X', 2], [3, 4], [5, 6]]
Shallow Copy List: [['X', 2], [3, 4]]
As you can see, when we modified the original list by changing the first element of the first sublist,
the same change was reflected in the shallow copy.
However, when we appended a new sublist to the original list, the shallow copy remained unchanged.
2nd example:
A shallow copy creates a new object, but it "inserts references" into it to the objects found in the original.
This means that if the original object contains references to other objects,
these are shared between the original and the copied object.
You can create a shallow copy using the copy module's copy() method, the copy method of a list, or slicing.
import copy
original = [[1, 2, 3], [4, 5, 6]]
shallow_copy = copy.copy(original)
shallow_copy[0][0] = 'X' -- Suppose we have modified the shallow copied object element , it will update in original
print(original) # Output: [['X', 2, 3], [4, 5, 6]]
print(shallow_copy) # Output: [['X', 2, 3], [4, 5, 6]]
'''
'''
Deep Copy
A deep copy creates a new compound object and then, recursively, "inserts copies"
into it of the objects found in the original.
A deep copy creates a new object and recursively adds the copies of nested objects found in the original,
meaning that it does not share references with the original object.
'''
import copy
original_list = [[1, 2], [3, 4]]
# Create a deep copy
deep_copy_list = copy.deepcopy(original_list)
# Modify the original list
original_list.append([5, 6])
original_list[0][0] = 'X'
print("Original List:", original_list)
print("Deep Copy List:", deep_copy_list)
''' output
Original List: [['X', 2], [3, 4], [5, 6]]
Deep Copy List: [[1, 2], [3, 4]]
In this case, the deep copy remained completely unchanged, even when we modified the original list.
This is because the deep copy created entirely new objects, rather than just referencing the original objects.
2nd example:
import copy
original = [[1, 2, 3], [4, 5, 6]]
deep_copy = copy.deepcopy(original)
deep_copy[0][0] = 'X' ---- Suppose we have modified the deep copied object element , it will not change the original obj.
print(original) # Output: [[1, 2, 3], [4, 5, 6]]
print(deep_copy) # Output: [['X', 2, 3], [4, 5, 6]]
use a shallow copy when you need to copy the object structure without copying the elements inside, and
use a deep copy when you need to clone the object along with all the objects it references.
'''