-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QUESTION] How to use a CoTransformer on data frames with shared non-key columns #358
Comments
You can do df1.partition_by("a").zip(df2).transform(scd2_merge).show() or dag.zip(df1, df2, partition={"by":"a"}).transform(scd2_merge).show() |
I also modified your code a little bit to follow good practices: import pandas as pd
from fugue import FugueWorkflow
_df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
_df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})
# schema: a:int,b:float,c:int
def scd2_merge(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
"""performs an SCD2-type merge.
Any rows in df1 that have a matching key value in df2 will have
their current flag 'c'
set to 0, before the rows in df2 are inserted"""
ix = "a"
df1 = df1.assign(c=~df1[ix].isin(df2[ix]))
df2 = df2.assign(c=1)
return pd.concat([df1, df2])
dag = FugueWorkflow()
df1 = dag.df(_df1)
df2 = dag.df(_df2)
df1.partition_by("a").zip(df2).transform(scd2_merge).show()
dag.run()
|
HI @goodwanghan, thanks for the suggestions - I've been able to modify my code as you've suggested to get it to work using the .zip(df2, how="full_outer") to ensure that I had non-intersecting keys in the output. Also, I am intending to run this in production using the |
Wonderful! When you test on spark, don't use dag.run(spark_session)
You can enable |
I have a function that aims to implement an SCD2 merge on two dataframes.
In my example, I am attempting to merge two dataframes together, using a single column as the key. The transformation should modify rows with a matching key, and insert all rows from the second dataframe.
When I execute this code, the
zip
method by default performs an inner join using the columns which are duplicated across the dataframes. This has the effect of dropping rows with missing values in shared columns, which means that the input dataframes to thescd2_merge
are not what is expected.Is there a correct way to implement this type of transformation?
The text was updated successfully, but these errors were encountered: