From c6a7f7c7613f13a224356e9b95b8a7ebd49a5a6d Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 27 Jun 2024 22:27:15 -0700 Subject: [PATCH] Add dict[str,Any] as supported input and output (#547) * Add dict[str,Any] as supported input and output * update * fix --- fugue/dataframe/function_wrapper.py | 91 ++++++++++++++++++- fugue/extensions/transformer/convert.py | 8 +- fugue_ray/_utils/io.py | 37 +++++--- fugue_test/builtin_suite.py | 37 +++++++- fugue_version/__init__.py | 2 +- .../transformer/test_convert_cotransformer.py | 8 ++ .../test_convert_output_cotransformer.py | 8 ++ .../test_convert_output_transformer.py | 6 ++ .../transformer/test_convert_transformer.py | 14 +++ 9 files changed, 188 insertions(+), 23 deletions(-) diff --git a/fugue/dataframe/function_wrapper.py b/fugue/dataframe/function_wrapper.py index f6571f22..8e15324b 100644 --- a/fugue/dataframe/function_wrapper.py +++ b/fugue/dataframe/function_wrapper.py @@ -80,6 +80,7 @@ def run( # noqa: C901 p.update(kwargs) has_kw = False rargs: Dict[str, Any] = {} + row_param_info: Any = None for k, v in self._params.items(): if isinstance(v, (PositionalParam, KeywordParam)): if isinstance(v, KeywordParam): @@ -90,7 +91,14 @@ def run( # noqa: C901 isinstance(p[k], DataFrame), lambda: TypeError(f"{p[k]} is not a DataFrame"), ) - rargs[k] = v.to_input_data(p[k], ctx=ctx) + if v.is_per_row: + assert_or_throw( + row_param_info is None, + lambda: ValueError("only one row parameter is allowed"), + ) + row_param_info = (k, v, p[k]) + else: + rargs[k] = v.to_input_data(p[k], ctx=ctx) else: rargs[k] = p[k] # TODO: should we do auto type conversion? del p[k] @@ -100,12 +108,38 @@ def run( # noqa: C901 rargs.update(p) elif not ignore_unknown and len(p) > 0: raise ValueError(f"{p} are not acceptable parameters") + if row_param_info is None: + return self._run_func(rargs, output, output_schema, ctx, raw=False) + else: # input contains row parameter + + def _dfs() -> Iterable[Any]: + k, v, df = row_param_info + for row in v.to_input_rows(df, ctx): + rargs[k] = None + _rargs = rargs.copy() + _rargs[k] = row + yield self._run_func(_rargs, output, output_schema, ctx, raw=True) + + if not output: + sum(1 for _ in _dfs()) + return + else: + return self._rt.iterable_to_output_df(_dfs(), output_schema, ctx) + + def _run_func( + self, + rargs: Dict[str, Any], + output: bool, + output_schema: Any, + ctx: Any, + raw: bool, + ) -> Any: rt = self._func(**rargs) if not output: if isinstance(self._rt, _DataFrameParamBase): self._rt.count(rt) return - if isinstance(self._rt, _DataFrameParamBase): + if not raw and isinstance(self._rt, _DataFrameParamBase): return self._rt.to_output_df(rt, output_schema, ctx=ctx) return rt @@ -145,14 +179,30 @@ def __init__(self, param: Optional[inspect.Parameter]): super().__init__(param) assert_or_throw(self.required, lambda: TypeError(f"{self} must be required")) + @property + def is_per_row(self) -> bool: + return False + def to_input_data(self, df: DataFrame, ctx: Any) -> Any: # pragma: no cover raise NotImplementedError + def to_input_rows( + self, + df: DataFrame, + ctx: Any, + ) -> Iterable[Any]: + raise NotImplementedError # pragma: no cover + def to_output_df( self, df: Any, schema: Any, ctx: Any ) -> DataFrame: # pragma: no cover raise NotImplementedError + def iterable_to_output_df( + self, dfs: Iterable[Any], schema: Any, ctx: Any + ) -> DataFrame: # pragma: no cover + raise NotImplementedError + def count(self, df: Any) -> int: # pragma: no cover raise NotImplementedError @@ -182,6 +232,34 @@ def count(self, df: Any) -> int: return sum(1 for _ in df.as_array_iterable()) +@fugue_annotated_param(DataFrame, "r", child_can_reuse_code=True) +class RowParam(_DataFrameParamBase): + @property + def is_per_row(self) -> bool: + return True + + def count(self, df: Any) -> int: + return 1 + + +@fugue_annotated_param(Dict[str, Any]) +class DictParam(RowParam): + def to_input_rows(self, df: DataFrame, ctx: Any) -> Iterable[Any]: + yield from df.as_dict_iterable() + + def to_output_df(self, output: Dict[str, Any], schema: Any, ctx: Any) -> DataFrame: + return ArrayDataFrame([list(output.values())], schema) + + def iterable_to_output_df( + self, dfs: Iterable[Dict[str, Any]], schema: Any, ctx: Any + ) -> DataFrame: # pragma: no cover + params: Dict[str, Any] = {} + if schema is not None: + params["schema"] = Schema(schema).pa_schema + adf = pa.Table.from_pylist(list(dfs), **params) + return ArrowDataFrame(adf) + + @fugue_annotated_param(AnyDataFrame) class _AnyDataFrameParam(DataFrameParam): def to_output_df(self, output: AnyDataFrame, schema: Any, ctx: Any) -> DataFrame: @@ -207,6 +285,15 @@ def to_output_df(self, output: LocalDataFrame, schema: Any, ctx: Any) -> DataFra ) return output + def iterable_to_output_df( + self, dfs: Iterable[Any], schema: Any, ctx: Any + ) -> DataFrame: # pragma: no cover + def _dfs() -> Iterable[DataFrame]: + for df in dfs: + yield self.to_output_df(df, schema, ctx) + + return LocalDataFrameIterableDataFrame(_dfs(), schema=schema) + def count(self, df: LocalDataFrame) -> int: if df.is_bounded: return df.count() diff --git a/fugue/extensions/transformer/convert.py b/fugue/extensions/transformer/convert.py index e3156248..1ad7dbd3 100644 --- a/fugue/extensions/transformer/convert.py +++ b/fugue/extensions/transformer/convert.py @@ -375,7 +375,7 @@ def from_func( assert_arg_not_none(schema, "schema") tr = _FuncAsTransformer() tr._wrapper = DataFrameFunctionWrapper( # type: ignore - func, "^[lspq][fF]?x*z?$", "^[lspq]$" + func, "^[lspqr][fF]?x*z?$", "^[lspqr]$" ) tr._output_schema_arg = schema # type: ignore tr._validation_rules = validation_rules # type: ignore @@ -410,7 +410,7 @@ def from_func( validation_rules.update(parse_validation_rules_from_comment(func)) tr = _FuncAsOutputTransformer() tr._wrapper = DataFrameFunctionWrapper( # type: ignore - func, "^[lspq][fF]?x*z?$", "^[lspnq]$" + func, "^[lspqr][fF]?x*z?$", "^[lspnqr]$" ) tr._output_schema_arg = None # type: ignore tr._validation_rules = validation_rules # type: ignore @@ -503,7 +503,7 @@ def from_func( assert_arg_not_none(schema, "schema") tr = _FuncAsCoTransformer() tr._wrapper = DataFrameFunctionWrapper( # type: ignore - func, "^(c|[lspq]+)[fF]?x*z?$", "^[lspq]$" + func, "^(c|[lspq]+)[fF]?x*z?$", "^[lspqr]$" ) tr._dfs_input = tr._wrapper.input_code[0] == "c" # type: ignore tr._output_schema_arg = schema # type: ignore @@ -562,7 +562,7 @@ def from_func( tr = _FuncAsOutputCoTransformer() tr._wrapper = DataFrameFunctionWrapper( # type: ignore - func, "^(c|[lspq]+)[fF]?x*z?$", "^[lspnq]$" + func, "^(c|[lspq]+)[fF]?x*z?$", "^[lspnqr]$" ) tr._dfs_input = tr._wrapper.input_code[0] == "c" # type: ignore tr._output_schema_arg = None # type: ignore diff --git a/fugue_ray/_utils/io.py b/fugue_ray/_utils/io.py index c9b35108..5f125975 100644 --- a/fugue_ray/_utils/io.py +++ b/fugue_ray/_utils/io.py @@ -7,7 +7,7 @@ from packaging import version from pyarrow import csv as pacsv from pyarrow import json as pajson -from ray.data.datasource import FileExtensionFilter + from triad.collections import Schema from triad.collections.dict import ParamDict from triad.utils.assertion import assert_or_throw @@ -21,6 +21,27 @@ from .._constants import RAY_VERSION +try: + from ray.data.datasource import FileExtensionFilter + + class _FileFiler(FileExtensionFilter): # pragma: no cover + def __init__( + self, file_extensions: Union[str, List[str]], exclude: Iterable[str] + ): + super().__init__(file_extensions, allow_if_no_extension=True) + self._exclude = set(exclude) + + def _is_valid(self, path: str) -> bool: + return pathlib.Path( + path + ).name not in self._exclude and self._file_has_extension(path) + + def __call__(self, paths: List[str]) -> List[str]: + return [path for path in paths if self._is_valid(path)] + +except ImportError: # pragma: no cover + pass # ray >=2.10 + class RayIO(object): def __init__(self, engine: ExecutionEngine): @@ -248,17 +269,3 @@ def _read_json() -> RayDataFrame: # pragma: no cover def _remote_args(self) -> Dict[str, Any]: return {"num_cpus": 1} - - -class _FileFiler(FileExtensionFilter): # pragma: no cover - def __init__(self, file_extensions: Union[str, List[str]], exclude: Iterable[str]): - super().__init__(file_extensions, allow_if_no_extension=True) - self._exclude = set(exclude) - - def _is_valid(self, path: str) -> bool: - return pathlib.Path( - path - ).name not in self._exclude and self._file_has_extension(path) - - def __call__(self, paths: List[str]) -> List[str]: - return [path for path in paths if self._is_valid(path)] diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 15d435d8..0665912a 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -486,6 +486,23 @@ def mt_arrow_2(dfs: Iterable[pa.Table]) -> Iterator[pa.Table]: dag.df([], "a:int,b:int").assert_eq(b) dag.run(self.engine) + def test_transform_row_wise(self): + def t1(row: Dict[str, Any]) -> Dict[str, Any]: + row["b"] = 1 + return row + + def t2(rows: List[Dict[str, Any]]) -> Dict[str, Any]: + return rows[0] + + with fa.engine_context(self.engine): + a = pd.DataFrame([[3, 4], [1, 2], [3, 5]], columns=["a", "b"]) + b = fa.transform(a, t1, schema="*") + assert sorted(fa.as_array(b)) == [[1, 1], [3, 1], [3, 1]] + b = fa.transform( + a, t2, schema="*", partition={"by": "a", "presort": "b"} + ) + assert sorted(fa.as_array(b)) == [[1, 2], [3, 4]] + def test_transform_binary(self): with FugueWorkflow() as dag: a = dag.df([[1, pickle.dumps([0, "a"])]], "a:int,b:bytes") @@ -548,6 +565,8 @@ def test_cotransform(self): e = dag.df([[1, 2, 1, 10]], "a:int,ct1:int,ct2:int,x:int") e.assert_eq(c) + a.zip(b).transform(mock_co_tf1_d, params=dict(p=10)).assert_eq(e) + # interfaceless c = dag.transform( a.zip(b), @@ -676,6 +695,13 @@ def t10(df: pd.DataFrame) -> Iterable[pa.Table]: incr() yield pa.Table.from_pandas(df) + def t11(row: Dict[str, Any]) -> Dict[str, Any]: + incr() + return row + + def t12(row: Dict[str, Any]) -> None: + incr() + with FugueWorkflow() as dag: a = dag.df([[1, 2], [3, 4]], "a:double,b:int") a.out_transform(t1) # +2 @@ -688,6 +714,8 @@ def t10(df: pd.DataFrame) -> Iterable[pa.Table]: a.out_transform(t8, ignore_errors=[NotImplementedError]) # +1 a.out_transform(t9) # +1 a.out_transform(t10) # +1 + a.out_transform(t11) # +2 + a.out_transform(t12) # +2 raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t2)) raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t3)) raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t4)) @@ -695,7 +723,7 @@ def t10(df: pd.DataFrame) -> Iterable[pa.Table]: raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(T7)) dag.run(self.engine) - assert 13 <= incr() + assert 17 <= incr() def test_out_cotransform(self): # noqa: C901 tmpdir = str(self.tmpdir) @@ -2001,6 +2029,13 @@ def mock_co_tf1( return [[df1[0]["a"], len(df1), len(df2), p]] +@cotransformer(lambda dfs, **kwargs: "a:int,ct1:int,ct2:int,x:int") +def mock_co_tf1_d( + df1: List[Dict[str, Any]], df2: List[List[Any]], p=1 +) -> Dict[str, Any]: + return dict(a=df1[0]["a"], ct1=len(df1), ct2=len(df2), x=p) + + def mock_co_tf2(dfs: DataFrames, p=1) -> List[List[Any]]: return [[dfs[0].peek_dict()["a"], dfs[0].count(), dfs[1].count(), p]] diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index d69d16e9..a2fecb45 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.9.1" +__version__ = "0.9.2" diff --git a/tests/fugue/extensions/transformer/test_convert_cotransformer.py b/tests/fugue/extensions/transformer/test_convert_cotransformer.py index 056597c7..85e87864 100644 --- a/tests/fugue/extensions/transformer/test_convert_cotransformer.py +++ b/tests/fugue/extensions/transformer/test_convert_cotransformer.py @@ -58,6 +58,8 @@ def test__to_transformer(): assert isinstance(g, CoTransformer) i = _to_transformer("t7", "a:int,b:int") assert isinstance(i, CoTransformer) + j = _to_transformer("t8", "a:int,b:int") + assert isinstance(j, CoTransformer) def test__register(): @@ -135,6 +137,12 @@ def t7( yield df +def t8( + df1: pd.DataFrame, df2: pd.DataFrame, c: callable, **kwargs +) -> Dict[str, Any]: + return {} + + class MockTransformer(CoTransformer): def get_output_schema(self, dfs): pass diff --git a/tests/fugue/extensions/transformer/test_convert_output_cotransformer.py b/tests/fugue/extensions/transformer/test_convert_output_cotransformer.py index 2e03e4d4..9aa646b5 100644 --- a/tests/fugue/extensions/transformer/test_convert_output_cotransformer.py +++ b/tests/fugue/extensions/transformer/test_convert_output_cotransformer.py @@ -50,6 +50,8 @@ def test__to_output_transformer(): assert isinstance(g, CoTransformer) i = _to_output_transformer("t7") assert isinstance(i, CoTransformer) + j = _to_output_transformer("t8") + assert isinstance(j, CoTransformer) def test__register(): @@ -106,6 +108,12 @@ def t7( pass +def t8( + df1: Iterable[List[Any]], df2: pd.DataFrame, c: Callable +) -> Dict[str, Any]: + pass + + class MockTransformer(CoTransformer): def get_output_schema(self, dfs): pass diff --git a/tests/fugue/extensions/transformer/test_convert_output_transformer.py b/tests/fugue/extensions/transformer/test_convert_output_transformer.py index 84b4c117..dc319ad8 100644 --- a/tests/fugue/extensions/transformer/test_convert_output_transformer.py +++ b/tests/fugue/extensions/transformer/test_convert_output_transformer.py @@ -48,6 +48,8 @@ def test__to_output_transformer(): assert isinstance(h, Transformer) i = _to_output_transformer("t8") assert isinstance(i, Transformer) + j = _to_output_transformer("t9") + assert isinstance(j, Transformer) def test__register(): @@ -156,6 +158,10 @@ def t8(df: pd.DataFrame, c: Callable[[str], str]) -> Iterable[pd.DataFrame]: pass +def t9(df: Dict[str, Any], c: Callable[[str], str]) -> Dict[str, Any]: + pass + + class MockTransformer(Transformer): def __init__(self, x=""): self._x = x diff --git a/tests/fugue/extensions/transformer/test_convert_transformer.py b/tests/fugue/extensions/transformer/test_convert_transformer.py index a2b10528..6e4a7393 100644 --- a/tests/fugue/extensions/transformer/test_convert_transformer.py +++ b/tests/fugue/extensions/transformer/test_convert_transformer.py @@ -66,6 +66,10 @@ def test__to_transformer(): assert isinstance(j, Transformer) k = _to_transformer("t10") assert isinstance(k, Transformer) + l = _to_transformer("t11") + assert isinstance(l, Transformer) + m = _to_transformer("t12") + assert isinstance(m, Transformer) def test__register(): @@ -253,6 +257,16 @@ def t10(df: pd.DataFrame, c: callable) -> pd.DataFrame: yield df +# schema: * +def t11(row: Dict[str, Any]) -> pd.DataFrame: + yield pd.DataFrame([row]) + + +# schema: * +def t12(df: pd.DataFrame) -> Dict[str, Any]: + return df.to_dict("records")[0] + + class MockTransformer(Transformer): def __init__(self, x=""): self._x = x