-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add type three scd upserts functionality #127
base: main
Are you sure you want to change the base?
Add type three scd upserts functionality #127
Conversation
- `delta_table` (`DeltaTable`): An object representing the delta table to be upserted. | ||
- `updates_df` (`DataFrame`): The data to be used in order to upsert the target delta table. | ||
- `primary_key` (`str`): The primary key (i.e. business key) uniquely identifiy each row in the target delta table. | ||
- `curr_prev_col_names` (`dict[str,str]`): A dictionary of column names to store current and previous values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we separate this to two parameters: current_col_name
and previous_col_name
. I generally prefer exposing public interfaces with full works instead of abbreviations. Is there any advantage to grouping these two arguments in a dictionary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that, it depends on if we want to:
- Populate multiple columns at a time when applying the type 3 scd upserts:
Suppose that we have a delta table having this structure:
+----+----+----+-------+--------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+--------+------------+-------------+--------------+
The columns prev_job
, prev_country
and prev_continent
will store the previous values for the columns job
, country
and continent
after applying the type 3 scd.
Uisng a dictionary as a parameter to store the column names (for current and previous values) is less prone to error than storing them in two separate lists.
Option #1: Using a dictinary --> Triggering the type_3_scd_upsert once
col_names
= {"country":"prev_country", "job":"prev_job", "continent":"prev_continent"}
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",col_names)
Option #2: Using two parameters --> Triggering the type_3_scd_upsert multiple times
current_col_names
= ["country","job","continent"]
previous_col_names
= ["prev_country","prev_job","prev_continent"]
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
Here an error on columns order can cause a cartastrophy, like storing the previous values of the column 'job' in the column 'prev_country' (or the opposite):
current_col_names
= ["country", "job", "continent"]
previous_col_names
= ["prev_job","prev_country","prev_continent"]
- Populate one column at a time when applying the type 3 scd upserts
let's use the same delta table structure mentioned above.
In this case we will have something like this:
//type 3 scd upserts for column job
current_col_names
= "job"
previous_col_names
= "prev_job"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
//type 3 scd upserts for column country
current_col_names
= "country"
previous_col_names
= "prev_country"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
//type 3 scd upserts for column continent
current_col_names
= "continent"
previous_col_names
= "prev_continent"
mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
+----+----+----+-------+--------+------------+-------------+--------------+ | ||
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent| | ||
+----+----+---+--------+--------+------------+-------------+--------------+ | ||
| 1| A| AA| null| Japan| null| Asia| null| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some examples where the previous_job
column is not null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okey-dokey
+----+----+---+------------+-------------+ | ||
| 1| A1| AA| Japan| Asia| // update on name | ||
| 2| B1|BBB| Peru|South America| // updates on name,job,country,continent --> storing previous values on prev_job,prev_country,prev_continent | ||
| 3| C| CC| New Zeland| Oceania| // updates on country,continent --> storing previous values on prev_country,prev_continent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should demonstrate these two cases:
- a row that's in source, but not in target
- a row that's identical in source and target
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Just to be sure, do you mean by source
the update dataframe and the target
the delta table ?
@@ -735,3 +736,119 @@ def rename_delta_table( | |||
delta_table.toDF().write.format("delta").mode("overwrite").saveAsTable( | |||
new_table_name | |||
) | |||
|
|||
def type_3_scd_upsert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the user use this function with an effective_date
column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recomend to use it as an optional parameter.
If needed, the function have to update the effective_date
each time a real update occurs on the current and previous columns.
A structure of a delta table with an effective_date
could be something like this:
+----+----+----+-------+------------------+
|pkey|name|job|prev_job|job_effective_date
+----+----+---+--------+------------------+
or much more wider
+----+----+----+-------+------------------+-------+------------+-----------------+---------+-------------------------+
pkey| name| job| prev_job| job_effective_date| country| prev_country| country_effective| continent| prev_continent
+----+----+---+--------+------------------+-------+------------+-----------------+---------+-------------------------+
Now how we will provide the column names representing the effective_date
to the function ?
A parameter as a list of list ? --> param = ( (job
,prev_job
,job_effective_date
),(country
,prev_country
,country_effective
), (continent
,prev_continent
) )
A three parameters: current_col_names
, previous_col_names
and effective_date_col_names
?
I don't like the second option unless we consider that the function cannot handle multiple scd columns at a time.
@CommanderWahid - thanks for submitting this PR. I don't know much about Type 3 SCD, but I am trying to learn here. I added some comments. Thanks for your patience here. I am going to have to learn as we go and we'll have to collaborate to make sure we get the right abstraction! |
Add a new functionality to perform a type 3 scd upsert on a target delta table:
type_3_scd_upsert
function.type_3_scd_upsert
function.README
to include the new feature.