Skip to content

Commit

Permalink
Improve dask engine (#10)
Browse files Browse the repository at this point in the history
* fix select bug, add col ops

* add modin

* add modin

* fix tests

* fix tests

* add dask

* fix tests

* update tests

* fix lint

* update dask engine config
  • Loading branch information
Han Wang authored May 11, 2020
1 parent f364c36 commit 86e2f58
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 14 deletions.
3 changes: 3 additions & 0 deletions fugue_dask/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from triad.collections.dict import ParamDict

DEFAULT_CONFIG = ParamDict({"fugue.dask.dataframe.default.partitions": 16})
11 changes: 10 additions & 1 deletion fugue_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from fugue_dask.utils import DASK_UTILS
from fugue_dask.constants import DEFAULT_CONFIG


class DaskDataFrame(DataFrame):
Expand All @@ -16,9 +17,13 @@ def __init__( # noqa: C901
df: Any = None,
schema: Any = None,
metadata: Any = None,
num_partitions: int = 2,
num_partitions: int = 0,
type_safe=True,
):
if num_partitions <= 0:
num_partitions = DEFAULT_CONFIG.get_or_throw(
"fugue.dask.dataframe.default.partitions", int
)
if df is None:
schema = _input_schema(schema).assert_not_empty()
df = []
Expand Down Expand Up @@ -74,6 +79,10 @@ def num_partitions(self) -> int:
def peek_array(self) -> Any:
return self.as_pandas().iloc[0].values.tolist()

def persist(self, **kwargs: Any) -> "DaskDataFrame":
self._native = self.native.persist(**kwargs)
return self

def count(self, persist: bool = False) -> int:
return self.as_pandas().shape[0]

Expand Down
35 changes: 25 additions & 10 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
ExecutionEngine,
SQLEngine,
)
from fugue_dask.dataframe import DaskDataFrame
from fugue_dask.dataframe import DEFAULT_CONFIG, DaskDataFrame
from fugue_dask.utils import DASK_UTILS
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.collections.dict import ParamDict


class DaskExecutionEngine(ExecutionEngine):
def __init__(self, conf: Any = None):
super().__init__(conf)
p = ParamDict(DEFAULT_CONFIG)
p.update(ParamDict(conf))
super().__init__(p)
self._fs = OSFS("/")
self._log = logging.getLogger()
self._default_sql_engine = SqliteEngine(self)
Expand All @@ -49,6 +52,9 @@ def stop(self) -> None: # pragma: no cover
return

def to_df(self, df: Any, schema: Any = None, metadata: Any = None) -> DaskDataFrame:
default_partitions = self.conf.get_or_throw(
"fugue.dask.dataframe.default.partitions", int
)
if isinstance(df, DataFrame):
assert_or_throw(
schema is None and metadata is None,
Expand All @@ -57,9 +63,16 @@ def to_df(self, df: Any, schema: Any = None, metadata: Any = None) -> DaskDataFr
if isinstance(df, DaskDataFrame):
return df
if isinstance(df, PandasDataFrame):
return DaskDataFrame(df.native, df.schema, df.metadata)
return DaskDataFrame(df.as_array(type_safe=True), df.schema, df.metadata)
return DaskDataFrame(df, schema, metadata)
return DaskDataFrame(
df.native, df.schema, df.metadata, num_partitions=default_partitions
)
return DaskDataFrame(
df.as_array(type_safe=True),
df.schema,
df.metadata,
num_partitions=default_partitions,
)
return DaskDataFrame(df, schema, metadata, num_partitions=default_partitions)

def repartition(
self, df: DataFrame, partition_spec: PartitionSpec
Expand All @@ -71,8 +84,8 @@ def repartition(
return df
p = partition_spec.get_num_partitions(
**{
KEYWORD_ROWCOUNT: lambda: df.count(persist=True),
KEYWORD_CORECOUNT: lambda: 2,
KEYWORD_ROWCOUNT: lambda: df.persist().count(), # type: ignore
KEYWORD_CORECOUNT: lambda: 2, # TODO: remove this hard code
}
)
if p > 0:
Expand Down Expand Up @@ -114,16 +127,18 @@ def _map(pdf: Any) -> pd.DataFrame:
result = pdf.native.map_partitions(_map, meta=output_schema.pandas_dtype)
else:
result = DASK_UTILS.safe_groupby_apply(
df.native, partition_spec.partition_by, _map
df.native,
partition_spec.partition_by,
_map,
meta=output_schema.pandas_dtype,
)
return DaskDataFrame(result, output_schema)

def broadcast(self, df: DataFrame) -> DataFrame:
return self.to_df(df)

def persist(self, df: DataFrame, level: Any = None) -> DataFrame:
pdf = self.to_df(df).native.persist()
return DaskDataFrame(pdf, df.schema, type_safe=False)
return self.to_df(df).persist()

def join(
self,
Expand Down
9 changes: 6 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = "0.1.2"
VERSION = "0.1.5"

with open("README.md") as f:
LONG_DESCRIPTION = f.read()
Expand All @@ -17,8 +17,11 @@
author_email="[email protected]",
keywords="distributed spark sql",
url="http://github.com/goodwanghan/fugue",
install_requires=["triad>=0.2.7", "adagio>=0.1.2", "fs", "sqlalchemy"],
extras_require={"modin": ["modin[ray]"], "dask": ["dask[dataframe]"]},
install_requires=["triad>=0.2.8", "adagio>=0.1.2", "fs", "sqlalchemy"],
extras_require={
"modin": ["modin[ray]"],
"dask": ["dask[dataframe]", "cloudpickle>1.4.0"],
},
classifiers=[
# "3 - Alpha", "4 - Beta" or "5 - Production/Stable"
"Development Status :: 3 - Alpha",
Expand Down

0 comments on commit 86e2f58

Please sign in to comment.