Friday, December 1, 2023

Slowly changing data (SCD) Type 2 operation into Delta tables

customersTable = ... # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)

updatesDF = ... # DataFrame with schema (customerId, address, effectiveDate) # Rows to INSERT new addresses of existing customers newAddressesToInsert = updatesDF \ .alias("updates") \ .join(customersTable.toDF().alias("customers"), "customerid") \ .where("customers.current = true AND updates.address <> customers.address") # Stage the update by unioning two sets of rows # 1. Rows that will be inserted in the whenNotMatched clause # 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers stagedUpdates = ( newAddressesToInsert .selectExpr("NULL as mergeKey", "updates.*") # Rows for 1 .union(updatesDF.selectExpr("updates.customerId as mergeKey", "*")) # Rows for 2. ) # Apply SCD Type 2 operation using merge customersTable.alias("customers").merge( stagedUpdates.alias("staged_updates"), "customers.customerId = mergeKey") \ .whenMatchedUpdate( condition = "customers.current = true AND customers.address <> staged_updates.address", set = { # Set current to false and endDate to source's effective date. "current": "false", "endDate": "staged_updates.effectiveDate" } ).whenNotMatchedInsert( values = { "customerid": "staged_updates.customerId", "address": "staged_updates.address", "current": "true", "effectiveDate": "staged_updates.effectiveDate", # Set current to true along with the new address and its effective date. "endDate": "null" } ).execute()

No comments:

Post a Comment