Skip to content

Commit

Permalink
[WIP] Fix issue #285 : save hive partitioned dataset using NativeExec…
Browse files Browse the repository at this point in the history
…utionEngine and DaskExecutionEngine (#306)

* Work in progress to fix issue 285 reported here #285

* Use option partition_on in Dask execution engine to write hive partitioned dataset

* Add handling for spark array type (#307)

* adding ecosystem to README

* adding ecosystem to README

* merge conflict

* Fugue plugin (#311)

* plugin

* update

* upgrading black version

* fixing black version

* Work in progress to fix issue 285 reported here #285

* Use option partition_on in Dask execution engine to write hive partitioned dataset

* Handle hive partitioning with Duckdb execution engine

* Clean code with pylint

* Use ArrowDataFrame(df.as_arrow()) instead of ArrowDataFrame(df.native.arrow())

Co-authored-by: WangCHX <[email protected]>
Co-authored-by: Kevin Kho <[email protected]>
Co-authored-by: Han Wang <[email protected]>
  • Loading branch information
4 people authored Apr 3, 2022
1 parent 1c4a7d1 commit 8a16603
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
6 changes: 2 additions & 4 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,8 @@ def save_df(
force_single: bool = False,
**kwargs: Any,
) -> None:
if not partition_spec.empty:
self.log.warning( # pragma: no cover
"partition_spec is not respected in %s.save_df", self
)
if not force_single and not partition_spec.empty:
kwargs["partition_cols"] = partition_spec.partition_by
self.fs.makedirs(os.path.dirname(path), recreate=True)
df = self.to_df(df)
save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs)
Expand Down
5 changes: 2 additions & 3 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,12 @@ def save_df(
format_hint=format_hint,
mode=mode,
partition_spec=partition_spec,
force_single=force_single,
**kwargs,
)
else:
if not partition_spec.empty:
self.log.warning( # pragma: no cover
"partition_spec is not respected in %s.save_df", self
)
kwargs["partition_on"] = partition_spec.partition_by
self.fs.makedirs(os.path.dirname(path), recreate=True)
df = self.to_df(df)
save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs)
4 changes: 2 additions & 2 deletions fugue_duckdb/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def save_df(
NotImplementedError(f"{mode} is not supported"),
)
p = FileParser(uri, format_hint).assert_no_glob()
if p.file_format not in self._format_save:
if (p.file_format not in self._format_save) or ("partition_cols" in kwargs):
self._fs.makedirs(os.path.dirname(uri), recreate=True)
ldf = ArrowDataFrame(df.native.arrow())
ldf = ArrowDataFrame(df.as_arrow())
return save_df(
ldf, uri=uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs
)
Expand Down
2 changes: 2 additions & 0 deletions fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ def save_df(
force_single: bool = False,
**kwargs: Any,
) -> None:
if not partition_spec.empty and not force_single:
kwargs["partition_cols"] = partition_spec.partition_by
dio = DuckDBIO(self.fs, self.connection)
dio.save_df(self.to_df(df), path, format_hint, mode, **kwargs)

Expand Down
18 changes: 17 additions & 1 deletion fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def test_out_cotransform(self): # noqa: C901
def incr():
fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True)
fs.writetext(str(uuid4()) + ".txt", "")
return fs.glob("*.txt").count().files
return fs.glob("*.tx" "t").count().files

def t1(
df: Iterable[Dict[str, Any]], df2: pd.DataFrame
Expand Down Expand Up @@ -1175,6 +1175,7 @@ def init_tmpdir(self, tmpdir):
def test_io(self):
path = os.path.join(self.tmpdir, "a")
path2 = os.path.join(self.tmpdir, "b.test.csv")
path3 = os.path.join(self.tmpdir, "c.partition")
with self.dag() as dag:
b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
b.partition(num=3).save(path, fmt="parquet", single=True)
Expand All @@ -1185,6 +1186,21 @@ def test_io(self):
a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int"))
a = dag.load(path2, header=True, columns="c:int,a:long")
a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long"))
with self.dag() as dag:
b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
b.partition(by="c").save(path3, fmt="parquet", single=False)
assert FileSystem().isdir(path3)
assert FileSystem().isdir(os.path.join(path3, "c=6"))
assert FileSystem().isdir(os.path.join(path3, "c=2"))
# TODO: in test below, once issue #288 is fixed, use dag.load
# instead of pd.read_parquet
pd.testing.assert_frame_equal(
pd.read_parquet(path3).sort_values("a").reset_index(drop=True),
pd.DataFrame({"c": pd.Categorical([6, 2]), "a": [1, 7]}).reset_index(
drop=True
),
check_like=True,
)

def test_save_and_use(self):
path = os.path.join(self.tmpdir, "a")
Expand Down

0 comments on commit 8a16603

Please sign in to comment.