Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Aggregations on Spark dataframes fail intermittently #392

Closed
jstammers opened this issue Dec 4, 2022 · 5 comments · Fixed by #396
Closed

[BUG] Aggregations on Spark dataframes fail intermittently #392

jstammers opened this issue Dec 4, 2022 · 5 comments · Fixed by #396
Milestone

Comments

@jstammers
Copy link

Minimal Code To Reproduce

from fugue import DataFrame, FugueWorkflow
from fugue.column import lit, col
import pandas as pd

def aggregate_prices(
    df: DataFrame,
    rollup: DataFrame,
) -> DataFrame:
    agg_order = [["a"], ["a", "b"], ["a","b", "c"]]
    agg_levels = [1,2,3]
    for i, (cols, gid) in enumerate(zip(agg_order, agg_levels)):
        levels = ",".join(cols)
        price = rollup.filter(col("group_id") == gid).select(
            *cols
            + [
                lit(levels).alias(f"level_{i}"),
            ]
        )
        df = df.join(price, how="left_outer", on=cols)
    return df

prices= pd.DataFrame({"a":[1,2,3], "b":[1,2,3], "c":[1,1,2], "price":[0.1,0.2,0.3], "group_id":[1,2,3]})
df = pd.DataFrame({"a": [1,1,2,3], "b":[1,2,3,4], "c":[1,2,3,4]})


dag = FugueWorkflow()
df_f  = dag.df(df)
prices_f = dag.df(prices)
agg = aggregate_prices(df_f, prices_f)
dag.run('spark')

Describe the bug
When running the above code, I intermittently encounter the following error

ERROR:root:_5 _State.RUNNING -> _State.FAILED  Table or view not found: _a2656; line 1 pos 14;
'Project [*]
+- 'Filter ('group_id = 2)
   +- 'UnresolvedRelation [_a2656], [], false

AnalysisException: Table or view not found: _a2656; line 1 pos 14;
'Project [*]
+- 'Filter ('group_id = 2)
   +- 'UnresolvedRelation [_a2656], [], false

I have observed this issue in a few of my pipelines. From what I have seen, the error seems to occur during inline transformations,
e.g.

agg = df.filter(...).partition_by(...).aggregate(...)

Expected behavior
This transformation should execute without failing every time the workflow is run

Environment (please complete the following information):

  • Backend: spark
  • Backend version: pyspark - 3.3.0
  • Python version: 3.9
  • OS: linux
@goodwanghan
Copy link
Collaborator

Hi, thanks for reporting. I remember Spark had an issue on diamond joins. If B and C are simple selected results from A, joining B and C will throw an error saying C is not found when you use Spark SQL.

I think that bug has been resolved in later version of Spark (>2.4). But I feel this is very similar.

There are two things you can try:

You can add persist to rollup df (breaking the lineage may help):

prices_f = dag.df(prices).persist()

You can also call join once because join can take in multiple dataframes together, this will change the structure of the Spark SQL, although I don't believe it should change the underlying execution plan, since we are dealing with an unknown bug who knows.

https://github.com/fugue-project/fugue/blob/master/fugue/workflow/workflow.py#L587

@jstammers
Copy link
Author

Hi, thanks for the reply. I will try persisting the dataframes and see if that resolves the issue.
I have just encountered the problem during the following

    449     df_f = dag.df(df)
    450     keys = df_f.select("CGAIdent", "SingleBuyProductItemId", "State").distinct()
--> 451     date_keys = df_f.select("USPeriod").distinct()
    452     keys = date_keys.join(dates_f, on=["USPeriod"], how="inner").join(keys, how="cross")

which I can't understand. If it helps, I am using the 11.4 DBR

@goodwanghan
Copy link
Collaborator

@jstammers I apologize for the delay. I have a theory how it happened. I am currently in a very big code change (almost finished) I will also include the fix, and if possible please help us test.

@goodwanghan goodwanghan linked a pull request Dec 29, 2022 that will close this issue
@goodwanghan
Copy link
Collaborator

@jstammers I think the problem is resolved in the latest pre-release. I was able to reproduce the issue once in 0.7.3 but not able to reproduce in 0.8.0.dev3.

Also from 0.8.0, you no longer need to use FugueWorkflow, here is the modified version

import fugue.api as fa
from fugue.column import lit, col
from fugue import AnyDataFrame
import pandas as pd

def aggregate_prices(
    df: AnyDataFrame,
    rollup: AnyDataFrame,
) -> AnyDataFrame:
    agg_order = [["a"], ["a", "b"], ["a","b", "c"]]
    agg_levels = [1,2,3]
    for i, (cols, gid) in enumerate(zip(agg_order, agg_levels)):
        levels = ",".join(cols)
        price = fa.select(
            rollup,
            *cols
            + [
                lit(levels).alias(f"level_{i}"),
            ],
            where = col("group_id") == gid
        )
        df = fa.left_outer_join(df, price)
    return df

prices= pd.DataFrame({"a":[1,2,3], "b":[1,2,3], "c":[1,1,2], "price":[0.1,0.2,0.3], "group_id":[1,2,3]})
df = pd.DataFrame({"a": [1,1,2,3], "b":[1,2,3,4], "c":[1,2,3,4]})


with fa.engine_context(spark):
    agg = aggregate_prices(df, prices)
    fa.show(agg)

You can change to None or duckdb in fa.engine_context to verify locally without spark.
agg in this code is just a pyspark DataFrame.
AnyDataFrame is just a readable type annotation, you can use Any instead

@goodwanghan goodwanghan added this to the 0.8.0 milestone Dec 30, 2022
@goodwanghan
Copy link
Collaborator

goodwanghan commented Dec 30, 2022

Also if you want to iterate on notebook, you can run

fa.set_global_engine(spark)

in a cell, and then you don't need to specify engine again, you won't need the with statement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants