Skip to content

Commit

Permalink
Fix pa.Table output bug, add support for AnyDataFrame, improve Spark …
Browse files Browse the repository at this point in the history
…Pandas UDF partitioning (#491)

* Fix pa.Table output bug, add support for AnyDataFrame

* fix test

* fix

* Improve partitioning logic for Spark pandas udf

* update

* update

* fix

* update

* fix
  • Loading branch information
goodwanghan authored Jul 22, 2023
1 parent 19e1b26 commit 0198c46
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 91 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ testpolars:

testnotebook:
pip install .
jupyter contrib nbextension install --user
jupyter nbextension install --user --py fugue_notebook
jupyter nbextension enable fugue_notebook --py
jupyter nbconvert --execute --clear-output tests/fugue_notebook/test_notebook.ipynb
Expand Down
7 changes: 7 additions & 0 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ def peek_dict(self) -> Dict[str, Any]:
def as_pandas(self) -> pd.DataFrame:
"""Convert to pandas DataFrame"""
pdf = pd.DataFrame(self.as_array(), columns=self.columns)
if len(pdf) == 0: # TODO: move to triad
return pd.DataFrame(
{
k: pd.Series(dtype=v.type.to_pandas_dtype())
for k, v in self.schema.items()
}
)
return PD_UTILS.enforce_type(pdf, self.schema.pa_schema, null_safe=True)

def as_arrow(self, type_safe: bool = False) -> pa.Table:
Expand Down
2 changes: 1 addition & 1 deletion fugue/dataframe/dataframe_iterable_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def as_array_iterable(

def as_pandas(self) -> pd.DataFrame:
if self.empty:
return ArrayDataFrame([], self.schema).as_pandas()
return PandasDataFrame(schema=self.schema).as_pandas()

return pd.concat(df.as_pandas() for df in self.native)

Expand Down
45 changes: 38 additions & 7 deletions fugue/dataframe/function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from triad.utils.iter import EmptyAwareIterable, make_empty_aware

from ..constants import FUGUE_ENTRYPOINT
from ..dataset.api import count as df_count
from .array_dataframe import ArrayDataFrame
from .arrow_dataframe import ArrowDataFrame
from .dataframe import DataFrame, LocalDataFrame
from .dataframe import AnyDataFrame, DataFrame, LocalDataFrame, as_fugue_df
from .dataframe_iterable_dataframe import (
IterableArrowDataFrame,
IterablePandasDataFrame,
Expand Down Expand Up @@ -172,6 +173,19 @@ def count(self, df: Any) -> int:
return sum(1 for _ in df.as_array_iterable())


@fugue_annotated_param(AnyDataFrame)
class _AnyDataFrameParam(DataFrameParam):
def to_output_df(self, output: AnyDataFrame, schema: Any, ctx: Any) -> DataFrame:
return (
as_fugue_df(output)
if schema is None
else as_fugue_df(output, schema=schema)
)

def count(self, df: Any) -> int:
return df_count(df)


@fugue_annotated_param(LocalDataFrame, "l", child_can_reuse_code=True)
class LocalDataFrameParam(DataFrameParam):
def to_input_data(self, df: DataFrame, ctx: Any) -> LocalDataFrame:
Expand Down Expand Up @@ -333,6 +347,9 @@ def to_input_data(self, df: DataFrame, ctx: Any) -> pd.DataFrame:

@no_type_check
def to_output_df(self, output: pd.DataFrame, schema: Any, ctx: Any) -> DataFrame:
_schema: Optional[Schema] = None if schema is None else Schema(schema)
if _schema is not None and _schema.names != list(output.columns):
output = output[_schema.names]
return PandasDataFrame(output, schema)

@no_type_check
Expand Down Expand Up @@ -361,8 +378,15 @@ def to_output_df(
self, output: Iterable[pd.DataFrame], schema: Any, ctx: Any
) -> DataFrame:
def dfs():
_schema: Optional[Schema] = None if schema is None else Schema(schema)
has_return = False
for df in output:
yield PandasDataFrame(df, schema)
if _schema is not None and _schema.names != list(df.columns):
df = df[_schema.names]
yield PandasDataFrame(df, _schema)
has_return = True
if not has_return and _schema is not None:
yield PandasDataFrame(schema=_schema)

return IterablePandasDataFrame(dfs())

Expand All @@ -381,7 +405,12 @@ def to_input_data(self, df: DataFrame, ctx: Any) -> Any:

def to_output_df(self, output: Any, schema: Any, ctx: Any) -> DataFrame:
assert isinstance(output, pa.Table)
return ArrowDataFrame(output, schema=schema)
adf: DataFrame = ArrowDataFrame(output)
if schema is not None:
_schema = Schema(schema)
if adf.schema != _schema:
adf = adf[_schema.names].alter_columns(_schema)
return adf

def count(self, df: Any) -> int: # pragma: no cover
return df.count()
Expand Down Expand Up @@ -409,13 +438,15 @@ def to_output_df(
) -> DataFrame:
def dfs():
_schema: Optional[Schema] = None if schema is None else Schema(schema)
has_return = False
for df in output:
adf = ArrowDataFrame(df)
if _schema is not None and not ( # pylint: disable-all
adf.schema == schema
):
adf: DataFrame = ArrowDataFrame(df)
if _schema is not None and adf.schema != _schema:
adf = adf[_schema.names].alter_columns(_schema)
yield adf
has_return = True
if not has_return and _schema is not None:
yield ArrowDataFrame(schema=_schema)

return IterableArrowDataFrame(dfs())

Expand Down
18 changes: 14 additions & 4 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ def map_dataframe(
) -> DataFrame:
output_schema = Schema(output_schema)
if self._should_use_pandas_udf(output_schema):
# pandas udf can only be used for pyspark > 3
if len(partition_spec.partition_by) > 0:
if partition_spec.algo == "coarse":
if partition_spec.algo in ["coarse", "even"]:
return self._map_by_pandas_udf(
df,
map_func=map_func,
Expand All @@ -145,7 +144,18 @@ def map_dataframe(
on_init=on_init,
map_func_format_hint=map_func_format_hint,
)
elif partition_spec.algo != "even" or self.is_spark_connect:
else:
if ( # not simple partitioning
partition_spec.algo != "hash"
or partition_spec.num_partitions != "0"
):
# TODO: not sure if presort should be done
# on physical partition level
df = self.to_df(
self.execution_engine.repartition(
df, PartitionSpec(partition_spec, presort=[])
)
)
return self._group_map_by_pandas_udf(
df,
map_func=map_func,
Expand All @@ -154,7 +164,7 @@ def map_dataframe(
on_init=on_init,
map_func_format_hint=map_func_format_hint,
)
elif len(partition_spec.partition_by) == 0:
else:
return self._map_by_pandas_udf(
df,
map_func=map_func,
Expand Down
60 changes: 55 additions & 5 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import fugue.api as fa
from fugue import (
AnyDataFrame,
ArrayDataFrame,
CoTransformer,
DataFrame,
Expand Down Expand Up @@ -365,6 +366,12 @@ def test_create_process_output(self):
dag.output(dict(df=a), using=mock_outputter2)
a.partition(num=3).output(MockOutputter3)
dag.output(dict(aa=a, bb=b), using=MockOutputter4)

a = dag.create(mock_creator2, params=dict(p=2))
b = dag.create(mock_creator2, params=dict(p=2))
c = dag.process(a, b, using=mock_processor4)
c.assert_eq(ArrayDataFrame([[2]], "a:int"))
dag.output(a, b, using=mock_outputter4)
dag.run(self.engine)

def test_zip(self):
Expand Down Expand Up @@ -435,20 +442,40 @@ def test_transform_iterable_dfs(self):
# this test is important for using mapInPandas in spark

# schema: *,c:int
def mt_pandas(dfs: Iterable[pd.DataFrame]) -> Iterator[pd.DataFrame]:
def mt_pandas(
dfs: Iterable[pd.DataFrame], empty: bool = False
) -> Iterator[pd.DataFrame]:
for df in dfs:
yield df.assign(c=2)
if not empty:
df = df.assign(c=2)
df = df[reversed(list(df.columns))]
yield df

with FugueWorkflow() as dag:
a = dag.df([[1, 2], [3, 4]], "a:int,b:int")
b = a.transform(mt_pandas)
dag.df([[1, 2, 2], [3, 4, 2]], "a:int,b:int,c:int").assert_eq(b)
dag.run(self.engine)

# when iterable returns nothing
with FugueWorkflow() as dag:
a = dag.df([[1, 2], [3, 4]], "a:int,b:int")
# without partitioning
b = a.transform(mt_pandas, params=dict(empty=True))
dag.df([], "a:int,b:int,c:int").assert_eq(b)
# with partitioning
b = a.partition_by("a").transform(mt_pandas, params=dict(empty=True))
dag.df([], "a:int,b:int,c:int").assert_eq(b)
dag.run(self.engine)

# schema: *
def mt_arrow(dfs: Iterable[pa.Table]) -> Iterator[pa.Table]:
def mt_arrow(
dfs: Iterable[pa.Table], empty: bool = False
) -> Iterator[pa.Table]:
for df in dfs:
yield df
if not empty:
df = df.select(reversed(df.schema.names))
yield df

# schema: a:long
def mt_arrow_2(dfs: Iterable[pa.Table]) -> Iterator[pa.Table]:
Expand All @@ -463,6 +490,17 @@ def mt_arrow_2(dfs: Iterable[pa.Table]) -> Iterator[pa.Table]:
dag.df([[1], [3]], "a:long").assert_eq(b)
dag.run(self.engine)

# when iterable returns nothing
with FugueWorkflow() as dag:
a = dag.df([[1, 2], [3, 4]], "a:int,b:int")
# without partitioning
b = a.transform(mt_arrow, params=dict(empty=True))
dag.df([], "a:int,b:int").assert_eq(b)
# with partitioning
b = a.partition_by("a").transform(mt_arrow, params=dict(empty=True))
dag.df([], "a:int,b:int").assert_eq(b)
dag.run(self.engine)

def test_transform_binary(self):
with FugueWorkflow() as dag:
a = dag.df([[1, pickle.dumps([0, "a"])]], "a:int,b:bytes")
Expand Down Expand Up @@ -1829,6 +1867,10 @@ def mock_creator(p: int) -> DataFrame:
return ArrayDataFrame([[p]], "a:int")


def mock_creator2(p: int) -> AnyDataFrame:
return fa.as_fugue_df([[p]], schema="a:int")


def mock_processor(df1: List[List[Any]], df2: List[List[Any]]) -> DataFrame:
return ArrayDataFrame([[len(df1) + len(df2)]], "a:int")

Expand All @@ -1844,6 +1886,10 @@ def process(self, dfs):
return ArrayDataFrame([[sum(s.count() for s in dfs.values())]], "a:int")


def mock_processor4(df1: AnyDataFrame, df2: AnyDataFrame) -> AnyDataFrame:
return ArrayDataFrame([[fa.count(df1) + fa.count(df2)]], "a:int")


def mock_outputter(df1: List[List[Any]], df2: List[List[Any]]) -> None:
assert len(df1) == len(df2)

Expand All @@ -1857,6 +1903,10 @@ def process(self, dfs):
assert "3" == self.partition_spec.num_partitions


def mock_outputter4(df1: AnyDataFrame, df2: AnyDataFrame) -> None:
assert fa.count(df1) == fa.count(df2)


class MockOutputter4(Outputter):
def process(self, dfs):
for k, v in dfs.items():
Expand Down Expand Up @@ -1895,8 +1945,8 @@ def mock_tf0(df: pd.DataFrame, p=1, col="p") -> pd.DataFrame:

# schema: *,ct:int,p:int
def mock_tf1(df: pd.DataFrame, p=1) -> pd.DataFrame:
df["ct"] = df.shape[0]
df["p"] = p
df["ct"] = df.shape[0]
return df


Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ psutil
matplotlib
seaborn

notebook<7
jupyter_contrib_nbextensions

pyspark[connect]
duckdb-engine>=0.6.4
duckdb!=0.8.1
sqlalchemy==2.0.10 # 2.0.11 has a bug
ray[data]>=2.5.0
# pyarrow==7.0.0
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_version() -> str:
"spark": ["pyspark>=3.1.1"],
"dask": [
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"dask[distributed,dataframe]>=2022.9.0,<2023.7.1; python_version >= '3.8'",
"qpd[dask]>=0.4.4",
],
"ray": ["ray[data]>=2.1.0", "duckdb>=0.5.0", "pyarrow>=6.0.1"],
Expand All @@ -73,7 +73,7 @@ def get_version() -> str:
"fugue-sql-antlr[cpp]>=0.1.6",
"pyspark>=3.1.1",
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"dask[distributed,dataframe]>=2022.9.0,<2023.7.1; python_version >= '3.8'",
"ray[data]>=2.1.0",
"qpd[dask]>=0.4.4",
"notebook",
Expand Down
2 changes: 2 additions & 0 deletions tests/fugue/collections/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def test_partition_spec():
assert a.presort == c.presort
c = PartitionSpec(b, presort=[("b", False), ("c", True)])
assert a.presort == c.presort
d = PartitionSpec(b, presort=[])
assert len(d.presort) == 0

a = PartitionSpec(by=["a"], presort="b DESC, c")
b = PartitionSpec(by=["a"], presort=[("c", True), ("b", False)])
Expand Down
41 changes: 32 additions & 9 deletions tests/fugue/dataframe/test_dataframe_iterable_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import json
from datetime import datetime
from typing import Any

import numpy as np
import pandas as pd
from pytest import raises

import fugue.api as fa
from fugue.dataframe import (
IterableDataFrame,
PandasDataFrame,
ArrayDataFrame,
ArrowDataFrame,
IterableArrowDataFrame,
IterableDataFrame,
IterablePandasDataFrame,
LocalDataFrameIterableDataFrame,
PandasDataFrame,
)
from fugue.dataframe.utils import _df_eq as df_eq
from fugue_test.dataframe_suite import DataFrameTests
from pytest import raises
from triad.collections.schema import Schema, SchemaError
from triad.exceptions import InvalidOperationError


class LocalDataFrameIterableDataFrameTests(DataFrameTests.Tests):
Expand All @@ -31,6 +30,30 @@ def get_dfs():

return LocalDataFrameIterableDataFrame(get_dfs(), schema)

def test_empty_dataframes(self):
df = IterablePandasDataFrame([], schema="a:long,b:int")
assert df.empty
pdf = df.as_pandas()
assert pdf.dtypes["a"] == np.int64
assert pdf.dtypes["b"] == np.int32
assert fa.get_schema(df.as_arrow()) == "a:long,b:int"

dfs = [PandasDataFrame(schema="a:long,b:int")]
df = IterablePandasDataFrame(dfs)
assert df.empty
pdf = df.as_pandas()
assert pdf.dtypes["a"] == np.int64
assert pdf.dtypes["b"] == np.int32
assert fa.get_schema(df.as_arrow()) == "a:long,b:int"

dfs = [ArrowDataFrame(schema="a:long,b:int")]
df = IterableArrowDataFrame(dfs)
assert df.empty
pdf = df.as_pandas()
assert pdf.dtypes["a"] == np.int64
assert pdf.dtypes["b"] == np.int32
assert fa.get_schema(df.as_arrow()) == "a:long,b:int"


def test_init():
raises(Exception, lambda: LocalDataFrameIterableDataFrame(1))
Expand Down
Loading

0 comments on commit 0198c46

Please sign in to comment.