Skip to content

Commit

Permalink
Fix bugs (#468)
Browse files Browse the repository at this point in the history
* Fix bugs

* Change Ray union all to use Ray

* Fix dask enforce type bug
  • Loading branch information
goodwanghan authored May 8, 2023
1 parent 63e1f2d commit 5598175
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 15 deletions.
14 changes: 10 additions & 4 deletions fugue_dask/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ def enforce_type( # noqa: C901
return df.astype(dtype=to_pandas_dtype(schema))
for v in schema:
s = df[v.name]
if pa.types.is_string(v.type):
if pa.types.is_string(v.type) and not pandas.api.types.is_string_dtype(
s.dtype
):
ns = s.isnull()
s = s.astype(str).mask(ns, None)
elif pa.types.is_boolean(v.type):
elif pa.types.is_boolean(v.type) and not pandas.api.types.is_bool_dtype(
s.dtype
):
ns = s.isnull()
if pandas.api.types.is_string_dtype(s.dtype):
try:
Expand All @@ -98,8 +102,10 @@ def enforce_type( # noqa: C901
s = s.fillna(0).astype(bool)
else:
s = s.fillna(0).astype(bool)
s = s.mask(ns, None)
elif pa.types.is_integer(v.type):
s = s.mask(ns, None).astype("boolean")
elif pa.types.is_integer(v.type) and not pandas.api.types.is_integer_dtype(
s.dtype
):
ns = s.isnull()
s = s.fillna(0).astype(v.type.to_pandas_dtype()).mask(ns, None)
elif not pa.types.is_struct(v.type) and not pa.types.is_list(v.type):
Expand Down
18 changes: 9 additions & 9 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def join(
join_type=how,
on=key_schema.names,
)
return DaskDataFrame(d, output_schema)
return DaskDataFrame(d, output_schema, type_safe=False)

def union(
self,
Expand All @@ -294,7 +294,7 @@ def union(
d = self.pl_utils.union(
self.to_df(df1).native, self.to_df(df2).native, unique=distinct
)
return DaskDataFrame(d, df1.schema)
return DaskDataFrame(d, df1.schema, type_safe=False)

def subtract(
self,
Expand All @@ -312,7 +312,7 @@ def subtract(
d = self.pl_utils.except_df(
self.to_df(df1).native, self.to_df(df2).native, unique=distinct
)
return DaskDataFrame(d, df1.schema)
return DaskDataFrame(d, df1.schema, type_safe=False)

def intersect(
self,
Expand All @@ -330,11 +330,11 @@ def intersect(
d = self.pl_utils.intersect(
self.to_df(df1).native, self.to_df(df2).native, unique=distinct
)
return DaskDataFrame(d, df1.schema)
return DaskDataFrame(d, df1.schema, type_safe=False)

def distinct(self, df: DataFrame) -> DataFrame:
d = self.pl_utils.drop_duplicates(self.to_df(df).native)
return DaskDataFrame(d, df.schema)
return DaskDataFrame(d, df.schema, type_safe=False)

def dropna(
self,
Expand All @@ -351,7 +351,7 @@ def dropna(
if how == "any" and thresh is not None:
del kw["how"] # to deal with a dask logic flaw
d = self.to_df(df).native.dropna(**kw)
return DaskDataFrame(d, df.schema)
return DaskDataFrame(d, df.schema, type_safe=False)

def fillna(self, df: DataFrame, value: Any, subset: List[str] = None) -> DataFrame:
assert_or_throw(
Expand All @@ -371,7 +371,7 @@ def fillna(self, df: DataFrame, value: Any, subset: List[str] = None) -> DataFra
subset = subset or df.columns
mapping = {col: value for col in subset}
d = self.to_df(df).native.fillna(mapping)
return DaskDataFrame(d, df.schema)
return DaskDataFrame(d, df.schema, type_safe=False)

def sample(
self,
Expand All @@ -389,7 +389,7 @@ def sample(
d = self.to_df(df).native.sample(
n=n, frac=frac, replace=replace, random_state=seed
)
return DaskDataFrame(d, df.schema)
return DaskDataFrame(d, df.schema, type_safe=False)

def take(
self,
Expand Down Expand Up @@ -445,7 +445,7 @@ def _partition_take(partition, n, presort):
.reset_index(drop=True)
)

return DaskDataFrame(d, df.schema)
return DaskDataFrame(d, df.schema, type_safe=False)

def load_df(
self,
Expand Down
10 changes: 10 additions & 0 deletions fugue_ray/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ def convert_yield_dataframe(self, df: DataFrame, as_local: bool) -> DataFrame:
return df if not as_local else df.as_local()
return super().convert_yield_dataframe(df, as_local)

def union(self, df1: DataFrame, df2: DataFrame, distinct: bool = True) -> DataFrame:
if distinct:
return super().union(df1, df2, distinct)
assert_or_throw(
df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}")
)
tdf1 = self._to_ray_df(df1)
tdf2 = self._to_ray_df(df2)
return RayDataFrame(tdf1.native.union(tdf2.native), df1.schema)

def load_df( # type:ignore
self,
path: Union[str, List[str]],
Expand Down
6 changes: 4 additions & 2 deletions fugue_spark/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@


@infer_execution_engine.candidate(
lambda objs: is_pandas_or(
objs, (ps.DataFrame, SparkConnectDataFrame, SparkDataFrame)
lambda objs: (
is_pandas_or(objs, (ps.DataFrame, SparkConnectDataFrame, SparkDataFrame))
if SparkConnectDataFrame is not None
else is_pandas_or(objs, (ps.DataFrame, SparkDataFrame))
)
or any(_is_sparksql(obj) for obj in objs)
)
Expand Down
14 changes: 14 additions & 0 deletions tests/fugue_dask/test_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ class DaskExecutionEngineBuiltInTests(BuiltInTests.Tests):
@classmethod
def setUpClass(cls):
cls._engine = cls.make_engine(cls)
fa.set_global_engine(cls._engine)

@classmethod
def tearDownClass(cls):
fa.clear_global_engine()
cls._engine.dask_client.close()

def make_engine(self):
Expand Down Expand Up @@ -153,6 +155,18 @@ def m_o(engine: DaskExecutionEngine, df: dd.DataFrame) -> None:
df.output(m_o)
dag.run(self.engine)

def test_bool_bytes_union(self):
# this is to verify a bug in enforce type is fixed
def tr(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(data=b"asdf")

df = pd.DataFrame(dict(a=[True, False], b=[1, 2]))

r1 = fa.transform(df, tr, schema="*,data:bytes", as_fugue=True)
r2 = fa.transform(df, tr, schema="*,data:bytes", as_fugue=True)
r3 = fa.union(r1, r2, distinct=False)
r3.show()

def test_coarse_partition(self):
def verify_coarse_partition(df: pd.DataFrame) -> List[List[Any]]:
ct = df.a.nunique()
Expand Down

0 comments on commit 5598175

Please sign in to comment.