Skip to content

Commit

Permalink
Coarse Partitioning, add as_fugue_engine_df (#450)
Browse files Browse the repository at this point in the history
* Coarse Partitioning

* Add coarse partition to ray

* Add coarse partitioning to dask

* fix tests

* Fix checkpoint table name

* Unify as_fugue_engine_df

* update

* update

* update

* Add coarse partitioning to pandas
  • Loading branch information
goodwanghan authored Mar 29, 2023
1 parent 1ffa28e commit cb6733e
Show file tree
Hide file tree
Showing 29 changed files with 508 additions and 211 deletions.
5 changes: 5 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release Notes

## 0.8.3

- [449](https://github.com/fugue-project/fugue/issues/449) Add coarse partitioning concept
- [452](https://github.com/fugue-project/fugue/issues/452) Add as_fugue_engine_df

## 0.8.2

- [430](https://github.com/fugue-project/fugue/issues/430) Support Polars DataFrames
Expand Down
1 change: 1 addition & 0 deletions docs/top_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ IO
.. autofunction:: fugue.api.as_fugue_dataset

.. autofunction:: fugue.api.as_fugue_df
.. autofunction:: fugue.api.as_fugue_engine_df
.. autofunction:: fugue.api.load
.. autofunction:: fugue.api.save

Expand Down
1 change: 1 addition & 0 deletions fugue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .execution.api import (
aggregate,
anti_join,
as_fugue_engine_df,
assign,
broadcast,
clear_global_engine,
Expand Down
18 changes: 12 additions & 6 deletions fugue/collections/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PartitionSpec(object):
Partition consists for these specs:
* **algo**: can be one of ``hash`` (default), ``rand`` and ``even``
* **algo**: can be one of ``hash`` (default), ``rand``, ``even`` or ``coarse``
* **num** or **num_partitions**: number of physical partitions, it can be an
expression or integer numbers, e.g ``(ROWCOUNT+4) / 3``
* **by** or **partition_by**: keys to partition on
Expand Down Expand Up @@ -208,7 +208,9 @@ def get_num_partitions(self, **expr_map_funcs: Any) -> int:

@property
def algo(self) -> str:
"""Get algo of the spec, one of ``hash`` (default), ``rand`` and ``even``"""
"""Get algo of the spec, one of ``hash`` (default),
``rand`` ``even`` or ``coarse``
"""
return self._algo if self._algo != "" else "hash"

@property
Expand Down Expand Up @@ -258,11 +260,14 @@ def __uuid__(self) -> str:
"""Get deterministic unique id of this object"""
return to_uuid(self.jsondict)

def get_sorts(self, schema: Schema) -> IndexedOrderedDict[str, bool]:
def get_sorts(
self, schema: Schema, with_partition_keys: bool = True
) -> IndexedOrderedDict[str, bool]:
"""Get keys for sorting in a partition, it's the combination of partition
keys plus the presort keys
:param schema: the dataframe schema this partition spec to operate on
:param with_partition_keys: whether to include partition keys
:return: an ordered dictionary of key, order pairs
.. admonition:: Examples
Expand All @@ -272,9 +277,10 @@ def get_sorts(self, schema: Schema) -> IndexedOrderedDict[str, bool]:
>>> assert p.get_sorts(schema) == {"a":True, "b":True, "c": False}
"""
d: IndexedOrderedDict[str, bool] = IndexedOrderedDict()
for p in self.partition_by:
aot(p in schema, lambda: KeyError(f"{p} not in {schema}"))
d[p] = True
if with_partition_keys:
for p in self.partition_by:
aot(p in schema, lambda: KeyError(f"{p} not in {schema}"))
d[p] = True
for p, v in self.presort.items():
aot(p in schema, lambda: KeyError(f"{p} not in {schema}"))
d[p] = v
Expand Down
2 changes: 1 addition & 1 deletion fugue/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from triad import ParamDict

KEYWORD_ROWCOUNT = "ROWCOUNT"
KEYWORD_CORECOUNT = "CORECOUNT"
KEYWORD_PARALLELISM = "CONCURRENCY"

FUGUE_ENTRYPOINT = "fugue.plugins"

Expand Down
46 changes: 34 additions & 12 deletions fugue/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ExecutionEngine,
)
from .factory import make_execution_engine, try_get_context_execution_engine
from .._utils.registry import fugue_plugin


@contextmanager
Expand Down Expand Up @@ -120,6 +121,27 @@ def get_current_parallelism() -> int:
return make_execution_engine().get_current_parallelism()


@fugue_plugin
def as_fugue_engine_df(
engine: ExecutionEngine, df: AnyDataFrame, schema: Any = None
) -> DataFrame:
"""Convert a dataframe to a Fugue engine dependent DataFrame.
This function is used internally by Fugue. It is not recommended
to use
:param engine: the ExecutionEngine to use, must not be None
:param df: a dataframe like object
:param schema: the schema of the dataframe, defaults to None
:return: the engine dependent DataFrame
"""
if schema is None:
fdf = as_fugue_df(df)
else:
fdf = as_fugue_df(df, schema=schema)
return engine.to_df(fdf)


def run_engine_function(
func: Callable[[ExecutionEngine], Any],
engine: AnyExecutionEngine = None,
Expand Down Expand Up @@ -549,11 +571,11 @@ def join(
"""

def _join(e: ExecutionEngine):
edf1 = e.to_df(df1)
edf2 = e.to_df(df2)
edf1 = as_fugue_engine_df(e, df1)
edf2 = as_fugue_engine_df(e, df2)
res = e.join(edf1, edf2, how=how, on=on)
for odf in dfs:
res = e.join(res, e.to_df(odf), how=how, on=on)
res = e.join(res, as_fugue_engine_df(e, odf), how=how, on=on)
return res

return run_engine_function(
Expand Down Expand Up @@ -837,11 +859,11 @@ def union(
"""

def _union(e: ExecutionEngine):
edf1 = e.to_df(df1)
edf2 = e.to_df(df2)
edf1 = as_fugue_engine_df(e, df1)
edf2 = as_fugue_engine_df(e, df2)
res = e.union(edf1, edf2, distinct=distinct)
for odf in dfs:
res = e.union(res, e.to_df(odf), distinct=distinct)
res = e.union(res, as_fugue_engine_df(e, odf), distinct=distinct)
return res

return run_engine_function(
Expand Down Expand Up @@ -885,11 +907,11 @@ def subtract(
"""

def _subtract(e: ExecutionEngine):
edf1 = e.to_df(df1)
edf2 = e.to_df(df2)
edf1 = as_fugue_engine_df(e, df1)
edf2 = as_fugue_engine_df(e, df2)
res = e.subtract(edf1, edf2, distinct=distinct)
for odf in dfs:
res = e.subtract(res, e.to_df(odf), distinct=distinct)
res = e.subtract(res, as_fugue_engine_df(e, odf), distinct=distinct)
return res

return run_engine_function(
Expand Down Expand Up @@ -933,11 +955,11 @@ def intersect(
"""

def _intersect(e: ExecutionEngine):
edf1 = e.to_df(df1)
edf2 = e.to_df(df2)
edf1 = as_fugue_engine_df(e, df1)
edf2 = as_fugue_engine_df(e, df2)
res = e.intersect(edf1, edf2, distinct=distinct)
for odf in dfs:
res = e.intersect(res, e.to_df(odf), distinct=distinct)
res = e.intersect(res, as_fugue_engine_df(e, odf), distinct=distinct)
return res

return run_engine_function(
Expand Down
50 changes: 31 additions & 19 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,36 @@ def map_dataframe(
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
map_func_format_hint: Optional[str] = None,
) -> DataFrame:
if partition_spec.num_partitions != "0":
self.log.warning(
"%s doesn't respect num_partitions %s",
self,
partition_spec.num_partitions,
)
# if partition_spec.num_partitions != "0":
# self.log.warning(
# "%s doesn't respect num_partitions %s",
# self,
# partition_spec.num_partitions,
# )
is_coarse = partition_spec.algo == "coarse"
presort = partition_spec.get_sorts(df.schema, with_partition_keys=is_coarse)
presort_keys = list(presort.keys())
presort_asc = list(presort.values())
output_schema = Schema(output_schema)
cursor = partition_spec.get_cursor(df.schema, 0)
if on_init is not None:
on_init(0, df)
if len(partition_spec.partition_by) == 0: # no partition
df = df.as_local()
cursor.set(lambda: df.peek_array(), 0, 0)
output_df = map_func(cursor, df)
if (
len(partition_spec.partition_by) == 0 or partition_spec.algo == "coarse"
): # no partition
if len(partition_spec.presort) > 0:
pdf = (
df.as_pandas()
.sort_values(presort_keys, ascending=presort_asc)
.reset_index(drop=True)
)
input_df = PandasDataFrame(pdf, df.schema, pandas_df_wrapper=True)
cursor.set(lambda: input_df.peek_array(), cursor.partition_no + 1, 0)
output_df = map_func(cursor, input_df)
else:
df = df.as_local()
cursor.set(lambda: df.peek_array(), 0, 0)
output_df = map_func(cursor, df)
if (
isinstance(output_df, PandasDataFrame)
and output_df.schema != output_schema
Expand All @@ -107,13 +124,9 @@ def map_dataframe(
f"mismatches given {output_schema}",
)
return self.to_df(output_df) # type: ignore
presort = partition_spec.presort
presort_keys = list(presort.keys())
presort_asc = list(presort.values())
output_schema = Schema(output_schema)

def _map(pdf: pd.DataFrame) -> pd.DataFrame:
if len(presort_keys) > 0:
if len(partition_spec.presort) > 0:
pdf = pdf.sort_values(presort_keys, ascending=presort_asc).reset_index(
drop=True
)
Expand Down Expand Up @@ -177,7 +190,7 @@ def to_df(self, df: AnyDataFrame, schema: Any = None) -> LocalBoundedDataFrame:
def repartition(
self, df: DataFrame, partition_spec: PartitionSpec
) -> DataFrame: # pragma: no cover
self.log.warning("%s doesn't respect repartition", self)
# self.log.warning("%s doesn't respect repartition", self)
return df

def broadcast(self, df: DataFrame) -> DataFrame:
Expand Down Expand Up @@ -384,6 +397,5 @@ class _NativeExecutionEngineParam(ExecutionEngineParam):


def _to_native_execution_engine_df(df: AnyDataFrame, schema: Any = None) -> DataFrame:
if schema is None:
return as_fugue_df(df).as_local_bounded()
return as_fugue_df(df, schema=schema).as_local_bounded()
fdf = as_fugue_df(df) if schema is None else as_fugue_df(df, schema=schema)
return fdf.as_local_bounded()
6 changes: 4 additions & 2 deletions fugue/extensions/_builtins/creators.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Any, Callable, Optional

from triad import Schema, assert_or_throw, to_uuid

from fugue.collections.yielded import Yielded
from fugue.dataframe import DataFrame
from fugue.exceptions import FugueWorkflowCompileError
from fugue.execution.api import as_fugue_engine_df
from fugue.extensions.creator import Creator
from triad import Schema, assert_or_throw, to_uuid


class Load(Creator):
Expand Down Expand Up @@ -39,7 +41,7 @@ def __init__(
def create(self) -> DataFrame:
if isinstance(self._df, Yielded):
return self.execution_engine.load_yielded(self._df)
return self.execution_engine.to_df(self._df, schema=self._schema)
return as_fugue_engine_df(self.execution_engine, self._df, schema=self._schema)

def _df_uid(self):
if self._data_determiner is not None:
Expand Down
1 change: 1 addition & 0 deletions fugue/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
is_empty,
is_local,
)
from fugue.execution.api import as_fugue_engine_df
from fugue.execution.factory import (
infer_execution_engine,
parse_execution_engine,
Expand Down
2 changes: 1 addition & 1 deletion fugue/workflow/_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def get_temp_file(self, obj_id: str, permanent: bool) -> str:

def get_table_name(self, obj_id: str, permanent: bool) -> str:
path = self._path if permanent else self._temp_path
return to_uuid(path, obj_id)[:5]
return "temp_" + to_uuid(path, obj_id)[:5]

def temp_file_exists(self, path: str) -> bool:
try:
Expand Down
Loading

0 comments on commit cb6733e

Please sign in to comment.