Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Save partitioned parquet dataset using NativeExecutionEngine #285

Closed
LaurentErreca opened this issue Jan 7, 2022 · 7 comments · Fixed by #306
Closed

[BUG] Save partitioned parquet dataset using NativeExecutionEngine #285

LaurentErreca opened this issue Jan 7, 2022 · 7 comments · Fixed by #306
Assignees
Labels

Comments

@LaurentErreca
Copy link
Contributor

I failed to execute following command with NativeExecutionEngine:
SAVE PREPARTITION BY PRODUCT OVERWRITE PARQUET '{{output_path}}'

I received the following message:

partition_spec is not respected in NativeExecutionEngine.save_df

This should be possible using pandas as function to_parquet when engine='pyarrow' has an argument partition_cols to do the job.
I have quickly tested this:
In file native_execution_engine.py, in function save_df (line 369):

 if not partition_spec.empty:
     kwargs['partition_cols'] = partition_spec.partition_by

This worked for me.

Cheers,
Laurent

Environment :

  • Backend: pandas
  • Backend version: 1.3.5
  • Python version: 3.9.9
  • OS: linux
@kvnkho
Copy link
Collaborator

kvnkho commented Jan 9, 2022

@LaurentErreca mentioned on Slack that the above modification fails on the test_io test for NativeExecutionEngine. The test looks like this:

    def test_io(self):
        path = os.path.join(self.tmpdir, "a")
        path2 = os.path.join(self.tmpdir, "b.test.csv")
        with self.dag() as dag:
            b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
            b.partition(num=3).save(path, fmt="parquet", single=True)
            b.save(path2, header=True)
        assert FileSystem().isfile(path)

And it fails in the assert statement.

What does the assert do?

In this line:
b.partition(num=3).save(path, fmt="parquet", single=True)
the single=True makes the output a single file. The assert then checks that the output is a single file.

Why does it fail?

Tracing the code a bit, the code path goes like this (but you can ignore the first 2):

The issue here is that passing partition_cols to the pandas to_parquet method created multiple files so it fails the test in the case where single=True is passed to the NativeExecutionEngine. It creates multiple files even if you pass an empty list.

What is the fix?

From what it looks like, pandas to_parquet cannot be forced to a single file if partition_cols is provided. It makes sense, because why even provide partition_cols if you want a single file? I think in order to pass this test, we need to handle the single=True at the Fugue level. If single=True when saving the file, ignore the partitioning columns provided. If single=False, then we actually use them to output the file. The NativeExecutionEngine save_df already takes in the force_single here and it is currently unused so we just need an if-else around it.

Dask actually doesn't have this functionality as well (but Spark does), so the DaskExecutionEngine is a good reference here because it has that conditional logic based on if force_single is True. I think the solution might look similar.

Other considerations

The PartitionSpec takes in by, but it can also be defined by using num. For example:

from fugue.collections.partition import PartitionSpec
p = PartitionSpec(num=3)
SparkExecutionEngine().save_df(sdf, path, "parquet", "overwrite", p, True)
print(p)

and more info can be found here. It will be quite hard and probably unnecessary to support the use case where the PartitionSpec just contains num. In this event, I think we should just make an error like this? or maybe you can even ignore this case for now and just see if we can do the if-else and get those test cases passing.

@goodwanghan
Copy link
Collaborator

We will try to solve it in #296

@goodwanghan goodwanghan added the IO label Jan 25, 2022
LaurentErreca added a commit to LaurentErreca/fugue that referenced this issue Feb 27, 2022
@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Feb 27, 2022

Hi,
I have been working on it, but I'm still having issues with Dask engine as I'm not able to save a hive partitioned parquet dataset. Work in progress in my fork, here : https://github.com/LaurentErreca/fugue/tree/fix/issue_285

@LaurentErreca
Copy link
Contributor Author

When using Dask execution engine, we can use parameter partition_on to write hive partitioned dataset. Still working on it in https://github.com/LaurentErreca/fugue/tree/fix/issue_285

@goodwanghan
Copy link
Collaborator

Hey, @LaurentErreca it seems I have no access to your fork. But it is exciting to see you are working on it. I am looking forward to it!!

@goodwanghan
Copy link
Collaborator

goodwanghan commented Mar 1, 2022

Oh I can see it now, never mind let me take a look. I will get back to you tomorrow.

@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Mar 2, 2022 via email

LaurentErreca added a commit to LaurentErreca/fugue that referenced this issue Apr 3, 2022
goodwanghan added a commit that referenced this issue Apr 3, 2022
…utionEngine and DaskExecutionEngine (#306)

* Work in progress to fix issue 285 reported here #285

* Use option partition_on in Dask execution engine to write hive partitioned dataset

* Add handling for spark array type (#307)

* adding ecosystem to README

* adding ecosystem to README

* merge conflict

* Fugue plugin (#311)

* plugin

* update

* upgrading black version

* fixing black version

* Work in progress to fix issue 285 reported here #285

* Use option partition_on in Dask execution engine to write hive partitioned dataset

* Handle hive partitioning with Duckdb execution engine

* Clean code with pylint

* Use ArrowDataFrame(df.as_arrow()) instead of ArrowDataFrame(df.native.arrow())

Co-authored-by: WangCHX <[email protected]>
Co-authored-by: Kevin Kho <[email protected]>
Co-authored-by: Han Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants