Skip to content

Commit

Permalink
Fix for Ray fully_executed deprecation, fix ibis compatibility issues (
Browse files Browse the repository at this point in the history
…#487)

* Fix for Ray fully_executed deprecation

* update

* Fix ibis

* Fix ibis
  • Loading branch information
goodwanghan authored Jul 13, 2023
1 parent 5f6d6f1 commit 19e1b26
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 40 deletions.
31 changes: 12 additions & 19 deletions fugue_ibis/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,19 @@ def join(
_df2 = self.to_df(df2)
key_schema, end_schema = get_join_schemas(_df1, _df2, how=how, on=on)
on_fields = [_df1.native[k] == _df2.native[k] for k in key_schema]
if ibis.__version__ < "6":
suffixes: Dict[str, Any] = dict(suffixes=("", _JOIN_RIGHT_SUFFIX))
else: # pragma: no cover
# breaking change in ibis 6.0
suffixes = dict(lname="", rname=_JOIN_RIGHT_SUFFIX)
if how.lower() == "cross":
tb = _df1.native.cross_join(_df2.native, suffixes=("", _JOIN_RIGHT_SUFFIX))
tb = _df1.native.cross_join(_df2.native, **suffixes)
elif how.lower() == "right_outer":
tb = _df2.native.left_join(
_df1.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX)
)
tb = _df2.native.left_join(_df1.native, on_fields, **suffixes)
elif how.lower() == "left_outer":
tb = _df1.native.left_join(
_df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX)
)
tb = _df1.native.left_join(_df2.native, on_fields, **suffixes)
elif how.lower() == "full_outer":
tb = _df1.native.outer_join(
_df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX)
)
tb = _df1.native.outer_join(_df2.native, on_fields, **suffixes)
cols: List[Any] = []
for k in end_schema.names:
if k not in key_schema:
Expand All @@ -116,17 +115,11 @@ def join(
)
tb = tb[cols]
elif how.lower() in ["semi", "left_semi"]:
tb = _df1.native.semi_join(
_df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX)
)
tb = _df1.native.semi_join(_df2.native, on_fields, **suffixes)
elif how.lower() in ["anti", "left_anti"]:
tb = _df1.native.anti_join(
_df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX)
)
tb = _df1.native.anti_join(_df2.native, on_fields, **suffixes)
else:
tb = _df1.native.inner_join(
_df2.native, on_fields, suffixes=("", _JOIN_RIGHT_SUFFIX)
)
tb = _df1.native.inner_join(_df2.native, on_fields, **suffixes)
return self.to_df(tb[end_schema.names], schema=end_schema)

def union(self, df1: DataFrame, df2: DataFrame, distinct: bool = True) -> DataFrame:
Expand Down
31 changes: 23 additions & 8 deletions fugue_ray/_utils/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,38 @@
_RAY_NULL_REPR = "__RAY_NULL__"


def get_dataset_format(df: rd.Dataset) -> Optional[str]:
df.fully_executed()
def is_materialized(df: rd.Dataset) -> bool:
if hasattr(rd.dataset, "MaterializedDataset"):
return isinstance(df, rd.dataset.MaterializedDataset)
return df.is_fully_executed() # pragma: no cover


def materialize(df: rd.Dataset) -> rd.Dataset:
if not is_materialized(df):
if hasattr(df, "materialize"):
df = df.materialize()
else: # pragma: no cover
df = df.fully_executed()
return df


def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
df = materialize(df)
if df.count() == 0:
return None
return None, df
if ray.__version__ < "2.5.0": # pragma: no cover
if hasattr(df, "_dataset_format"): # pragma: no cover
return df._dataset_format() # ray<2.2
return df._dataset_format(), df # ray<2.2
ctx = rd.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
return df.dataset_format() # ray>=2.2
return df.dataset_format(), df # ray>=2.2
else:
schema = df.schema(fetch_if_missing=True)
if schema is None: # pragma: no cover
return None
return None, df
if isinstance(schema.base_schema, pa.Schema):
return "arrow"
return "pandas"
return "arrow", df
return "pandas", df


def to_schema(schema: Any) -> Schema: # pragma: no cover
Expand Down
12 changes: 6 additions & 6 deletions fugue_ray/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)

from ._constants import _ZERO_COPY
from ._utils.dataframe import build_empty, get_dataset_format, to_schema
from ._utils.dataframe import build_empty, get_dataset_format, materialize, to_schema


class RayDataFrame(DataFrame):
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__( # noqa: C901
self._native = build_empty(schema)
return
if isinstance(df, rd.Dataset):
fmt = get_dataset_format(df)
fmt, df = get_dataset_format(df)
if fmt is None: # empty:
schema = _input_schema(schema).assert_not_empty()
super().__init__(schema)
Expand Down Expand Up @@ -156,8 +156,7 @@ def peek_array(self) -> List[Any]:

def persist(self, **kwargs: Any) -> "RayDataFrame":
# TODO: it mutates the dataframe, is this a good bahavior
if not self.native.is_fully_executed(): # pragma: no cover
self.native.fully_executed()
self._native = materialize(self._native)
return self

def count(self) -> int:
Expand Down Expand Up @@ -226,7 +225,8 @@ def _apply_schema(
) -> Tuple[rd.Dataset, Schema]:
if internal_schema:
return rdf, schema
if get_dataset_format(rdf) is None: # empty
fmt, rdf = get_dataset_format(rdf)
if fmt is None: # empty
schema = _input_schema(schema).assert_not_empty()
return build_empty(schema), schema
if schema is None or schema == to_schema(rdf.schema(fetch_if_missing=True)):
Expand Down Expand Up @@ -266,7 +266,7 @@ def _get_ray_dataframe_columns(df: rd.Dataset) -> List[Any]:
if hasattr(df, "columns"): # higher version of ray
return df.columns(fetch_if_missing=True)
else: # pragma: no cover
fmt = get_dataset_format(df)
fmt, _ = get_dataset_format(df)
if fmt == "pandas":
return list(df.schema(True).names)
elif fmt == "arrow":
Expand Down
14 changes: 7 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ def get_version() -> str:
keywords="distributed spark dask sql dsl domain specific language",
url="http://github.com/fugue-project/fugue",
install_requires=[
"triad>=0.9.0",
"triad>=0.9.1",
"adagio>=0.2.4",
"pyarrow>=0.15.1",
"pandas>=1.2.0",
# sql dependencies
"qpd>=0.4.3",
"qpd>=0.4.4",
"fugue-sql-antlr>=0.1.6",
"sqlglot",
"jinja2",
],
extras_require={
"sql": [
"qpd>=0.4.3",
"qpd>=0.4.4",
"fugue-sql-antlr>=0.1.6",
"sqlglot",
"jinja2",
Expand All @@ -53,7 +53,7 @@ def get_version() -> str:
"dask": [
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"qpd[dask]>=0.4.3",
"qpd[dask]>=0.4.4",
],
"ray": ["ray[data]>=2.1.0", "duckdb>=0.5.0", "pyarrow>=6.0.1"],
"duckdb": [
Expand All @@ -64,7 +64,7 @@ def get_version() -> str:
"polars": ["polars"],
"ibis": [
"ibis-framework>=2.1.1; python_version < '3.8'",
"ibis-framework>=3.2.0; python_version >= '3.8'",
"ibis-framework>=3.2.0,<6; python_version >= '3.8'",
],
"notebook": ["notebook", "jupyterlab", "ipython>=7.10.0"],
"all": [
Expand All @@ -75,14 +75,14 @@ def get_version() -> str:
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"ray[data]>=2.1.0",
"qpd[dask]>=0.4.3",
"qpd[dask]>=0.4.4",
"notebook",
"jupyterlab",
"ipython>=7.10.0",
"duckdb>=0.5.0",
"pyarrow>=6.0.1",
"ibis-framework>=2.1.1; python_version < '3.8'",
"ibis-framework>=3.2.0; python_version >= '3.8'",
"ibis-framework>=3.2.0,<6; python_version >= '3.8'",
"polars",
],
},
Expand Down

0 comments on commit 19e1b26

Please sign in to comment.