Skip to content

Commit

Permalink
Make PartitionCursor take functions, fix Ray perf issue, fix spark ge…
Browse files Browse the repository at this point in the history
…t_current_parallelism (#435)

* Make PartitionCursor take function

* fix spark

* update

* fix

* fix ray perf issue, fix spark get_current_parallelism

* update

* update
  • Loading branch information
goodwanghan authored Mar 9, 2023
1 parent 8f93d88 commit 19f1479
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 53 deletions.
12 changes: 9 additions & 3 deletions fugue/collections/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def set(self, item: Any, partition_no: int, slice_no: int) -> None:
"""reset the cursor to a row (which should be the first row of a
new logical partition)
:param item: an item of the dataset
:param item: an item of the dataset, or an function generating the item
:param partition_no: logical partition number
:param slice_no: slice number inside the logical partition (to be deprecated)
"""
Expand All @@ -359,6 +359,8 @@ def set(self, item: Any, partition_no: int, slice_no: int) -> None:
@property
def item(self) -> Any:
"""Get current item"""
if callable(self._item):
self._item = self._item()
return self._item

@property
Expand Down Expand Up @@ -417,11 +419,15 @@ def set(self, row: Any, partition_no: int, slice_no: int) -> None:
"""reset the cursor to a row (which should be the first row of a
new logical partition)
:param row: list-like row data
:param row: list-like row data or a function generating a list-like row
:param partition_no: logical partition number
:param slice_no: slice number inside the logical partition (to be deprecated)
"""
super().set(list(row), partition_no=partition_no, slice_no=slice_no)
super().set(
list(row) if not callable(row) else lambda: list(row()),
partition_no=partition_no,
slice_no=slice_no,
)

@property
def row(self) -> List[Any]:
Expand Down
38 changes: 37 additions & 1 deletion fugue/dataframe/function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ def format_hint(self) -> Optional[str]:

@annotated_param(
Iterable[pd.DataFrame],
"q",
matcher=lambda x: x == Iterable[pd.DataFrame] or x == Iterator[pd.DataFrame],
)
class _IterablePandasParam(LocalDataFrameParam):
Expand Down Expand Up @@ -345,6 +344,43 @@ def format_hint(self) -> Optional[str]:
return "pyarrow"


@annotated_param(
Iterable[pa.Table],
matcher=lambda x: x == Iterable[pa.Table] or x == Iterator[pa.Table],
)
class _IterableArrowParam(LocalDataFrameParam):
@no_type_check
def to_input_data(self, df: DataFrame, ctx: Any) -> Iterable[pa.Table]:
if not isinstance(df, LocalDataFrameIterableDataFrame):
yield df.as_arrow()
else:
for sub in df.native:
yield sub.as_arrow()

@no_type_check
def to_output_df(
self, output: Iterable[pa.Table], schema: Any, ctx: Any
) -> DataFrame:
def dfs():
_schema: Optional[Schema] = None if schema is None else Schema(schema)
for df in output:
adf = ArrowDataFrame(df)
if _schema is not None and not ( # pylint: disable-all
adf.schema == schema
):
adf = adf[_schema.names].alter_columns(_schema)
yield adf

return LocalDataFrameIterableDataFrame(dfs())

@no_type_check
def count(self, df: Iterable[pa.Table]) -> int:
return sum(_.shape[0] for _ in df)

def format_hint(self) -> Optional[str]:
return "pyarrow"


@annotated_param(DataFrames, "c")
class _DataFramesParam(AnnotatedParam):
pass
4 changes: 2 additions & 2 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def map_dataframe(
on_init(0, df)
if len(partition_spec.partition_by) == 0: # no partition
df = to_local_df(df)
cursor.set(df.peek_array(), 0, 0)
cursor.set(lambda: df.peek_array(), 0, 0)
output_df = map_func(cursor, df)
if (
isinstance(output_df, PandasDataFrame)
Expand All @@ -142,7 +142,7 @@ def _map(pdf: pd.DataFrame) -> pd.DataFrame:
drop=True
)
input_df = PandasDataFrame(pdf, df.schema, pandas_df_wrapper=True)
cursor.set(input_df.peek_array(), cursor.partition_no + 1, 0)
cursor.set(lambda: input_df.peek_array(), cursor.partition_no + 1, 0)
output_df = map_func(cursor, input_df)
return output_df.as_pandas()

Expand Down
4 changes: 2 additions & 2 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def map_dataframe(
presort_asc = list(presort.values())
output_schema = Schema(output_schema)
input_schema = df.schema
cursor = partition_spec.get_cursor(input_schema, 0)
on_init_once: Any = (
None
if on_init is None
Expand All @@ -97,8 +98,7 @@ def _map(pdf: Any) -> dd.DataFrame:
)
if on_init_once is not None:
on_init_once(0, input_df)
cursor = partition_spec.get_cursor(input_schema, 0)
cursor.set(input_df.peek_array(), 0, 0)
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_pandas()

Expand Down
2 changes: 2 additions & 0 deletions fugue_ray/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
FUGUE_RAY_CONF_SHUFFLE_PARTITIONS = "fugue.ray.shuffle.partitions"
FUGUE_RAY_DEFAULT_PARTITIONS = "fugue.ray.default.partitions"
FUGUE_RAY_DEFAULT_BATCH_SIZE = "fugue.ray.default.batch_size"
FUGUE_RAY_ZERO_COPY = "fugue.ray.zero_copy"

FUGUE_RAY_DEFAULT_CONF: Dict[str, Any] = {
FUGUE_RAY_CONF_SHUFFLE_PARTITIONS: -1,
FUGUE_RAY_DEFAULT_PARTITIONS: 0,
FUGUE_RAY_ZERO_COPY: True,
}
4 changes: 2 additions & 2 deletions fugue_ray/_utils/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@


def get_dataset_format(df: rd.Dataset) -> Optional[str]:
schema = df.schema(fetch_if_missing=True)
if schema is None:
df.fully_executed()
if df.count() == 0:
return None
if hasattr(df, "_dataset_format"): # pragma: no cover
return df._dataset_format() # ray<2.2
Expand Down
2 changes: 1 addition & 1 deletion fugue_ray/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def peek_array(self) -> List[Any]:
def persist(self, **kwargs: Any) -> "RayDataFrame":
# TODO: it mutates the dataframe, is this a good bahavior
if not self.native.is_fully_executed(): # pragma: no cover
self._native = self.native.fully_executed()
self.native.fully_executed()
return self

def count(self) -> int:
Expand Down
24 changes: 13 additions & 11 deletions fugue_ray/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from fugue_duckdb.dataframe import DuckDataFrame
from fugue_duckdb.execution_engine import DuckExecutionEngine

from ._constants import FUGUE_RAY_DEFAULT_BATCH_SIZE
from ._constants import FUGUE_RAY_DEFAULT_BATCH_SIZE, FUGUE_RAY_ZERO_COPY
from ._utils.cluster import get_default_partitions, get_default_shuffle_partitions
from ._utils.dataframe import add_partition_key
from ._utils.io import RayIO
Expand Down Expand Up @@ -78,6 +78,7 @@ def _group_map(
]
output_schema = Schema(output_schema)
input_schema = df.schema
cursor = partition_spec.get_cursor(input_schema, 0)
on_init_once: Any = (
None
if on_init is None
Expand All @@ -101,8 +102,7 @@ def _udf(adf: pa.Table) -> pa.Table: # pragma: no cover
input_df = ArrowDataFrame(adf)
if on_init_once is not None:
on_init_once(0, input_df)
cursor = partition_spec.get_cursor(input_schema, 0)
cursor.set(input_df.peek_array(), 0, 0)
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_arrow()

Expand Down Expand Up @@ -143,6 +143,7 @@ def _map(
) -> DataFrame:
output_schema = Schema(output_schema)
input_schema = df.schema
cursor = partition_spec.get_cursor(input_schema, 0)
on_init_once: Any = (
None
if on_init is None
Expand All @@ -157,8 +158,7 @@ def _udf(adf: pa.Table) -> pa.Table: # pragma: no cover
input_df = ArrowDataFrame(adf)
if on_init_once is not None:
on_init_once(0, input_df)
cursor = partition_spec.get_cursor(input_schema, 0)
cursor.set(input_df.peek_array(), 0, 0)
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_arrow()

Expand All @@ -176,15 +176,17 @@ def _udf(adf: pa.Table) -> pa.Table: # pragma: no cover
rdf = self.execution_engine.repartition( # type: ignore
rdf, PartitionSpec(num=n)
)
batch_size = (
self.conf.get_or_throw(FUGUE_RAY_DEFAULT_BATCH_SIZE, object)
if FUGUE_RAY_DEFAULT_BATCH_SIZE in self.execution_engine.conf
else "default"
)
mb_args: Dict[str, Any] = {}
if FUGUE_RAY_DEFAULT_BATCH_SIZE in self.conf:
mb_args["batch_size"] = self.conf.get_or_throw(
FUGUE_RAY_DEFAULT_BATCH_SIZE, int
)
if ray.__version__ >= "2.3":
mb_args["zero_copy_batch"] = self.conf.get(FUGUE_RAY_ZERO_COPY, True)
sdf = rdf.native.map_batches(
_udf,
batch_format="pyarrow",
batch_size=batch_size,
**mb_args,
**self.execution_engine._get_remote_args(), # type: ignore
)
return RayDataFrame(sdf, schema=output_schema, internal_schema=True)
Expand Down
25 changes: 16 additions & 9 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def _udf_pandas(pdf: Any) -> pd.DataFrame: # pragma: no cover
)
if on_init_once is not None:
on_init_once(0, input_df)
cursor.set(input_df.peek_array(), 0, 0)
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_pandas()

Expand Down Expand Up @@ -221,20 +221,23 @@ def _udf_pandas(
dfs: Iterable[pd.DataFrame],
) -> Iterable[pd.DataFrame]: # pragma: no cover
def get_dfs() -> Iterable[LocalDataFrame]:
cursor_set = False
for df in dfs:
if df.shape[0] > 0:
yield PandasDataFrame(
pdf = PandasDataFrame(
df.reset_index(drop=True),
input_schema,
pandas_df_wrapper=True,
)
if not cursor_set:
cursor.set(lambda: pdf.peek_array(), 0, 0)
yield pdf

input_df = LocalDataFrameIterableDataFrame(get_dfs(), input_schema)
if input_df.empty:
return PandasDataFrame([], output_schema).as_pandas()
if on_init_once is not None:
on_init_once(0, input_df)
cursor.set(input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
if isinstance(output_df, LocalDataFrameIterableDataFrame):
for res in output_df.native:
Expand Down Expand Up @@ -315,9 +318,13 @@ def get_current_parallelism(self) -> int:
e_cores = int(spark.conf.get("spark.executor.cores", "1"))
tc = int(spark.conf.get("spark.task.cpus", "1"))
sc = spark._jsc.sc()
nodes = len(list(sc.statusTracker().getExecutorInfos()))
workers = 1 if nodes <= 1 else nodes - 1
return max(workers * (e_cores // tc), 1)
try:
return spark.sparkContext.defaultParallelism // tc
except Exception: # pragma: no cover
# for pyspark < 3.1.1
nodes = len(list(sc.statusTracker().getExecutorInfos()))
workers = 1 if nodes <= 1 else nodes - 1
return max(workers * (e_cores // tc), 1)

def to_df(self, df: Any, schema: Any = None) -> SparkDataFrame: # noqa: C901
"""Convert a data structure to :class:`~fugue_spark.dataframe.SparkDataFrame`
Expand Down Expand Up @@ -742,12 +749,12 @@ def __init__(
self.partition_spec = partition_spec
self.map_func = map_func
self.on_init = on_init
self.cursor = self.partition_spec.get_cursor(self.schema, 0)

def run(self, no: int, rows: Iterable[ps.Row]) -> Iterable[Any]:
df = IterableDataFrame(to_type_safe_input(rows, self.schema), self.schema)
if df.empty: # pragma: no cover
return
cursor = self.partition_spec.get_cursor(self.schema, no)
if self.on_init is not None:
self.on_init(no, df)
if self.partition_spec.empty:
Expand All @@ -758,8 +765,8 @@ def run(self, no: int, rows: Iterable[ps.Row]) -> Iterable[Any]:
partitioner = self.partition_spec.get_partitioner(self.schema)
partitions = partitioner.partition(df.native)
for pn, sn, sub in partitions:
cursor.set(sub.peek(), pn, sn)
self.cursor.set(sub.peek(), pn, sn)
sub_df = IterableDataFrame(sub, self.schema)
res = self.map_func(cursor, sub_df)
res = self.map_func(self.cursor, sub_df)
for r in res.as_array_iterable(type_safe=True):
yield r
34 changes: 29 additions & 5 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import datetime
import os
import pickle
from typing import Any, Callable, Dict, Iterable, List, Optional
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional
from unittest import TestCase
from uuid import uuid4

import numpy as np
import pandas as pd
import pyarrow as pa
import pytest
from pytest import raises
from triad import SerializableRLock
Expand Down Expand Up @@ -425,20 +426,38 @@ def test(self):
b.assert_eq(a)
dag.run(self.engine)

def test_transform_iterable_pd(self):
def test_transform_iterable_dfs(self):
# this test is important for using mapInPandas in spark

# schema: *,c:int
def mt(dfs: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
def mt_pandas(dfs: Iterable[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for df in dfs:
yield df.assign(c=2)

with FugueWorkflow() as dag:
a = dag.df([[1, 2], [3, 4]], "a:int,b:int")
b = a.transform(mt)
b = a.transform(mt_pandas)
dag.df([[1, 2, 2], [3, 4, 2]], "a:int,b:int,c:int").assert_eq(b)
dag.run(self.engine)

# schema: *
def mt_arrow(dfs: Iterable[pa.Table]) -> Iterator[pa.Table]:
for df in dfs:
yield df

# schema: a:long
def mt_arrow_2(dfs: Iterable[pa.Table]) -> Iterator[pa.Table]:
for df in dfs:
yield df.drop(["b"])

with FugueWorkflow() as dag:
a = dag.df([[1, 2], [3, 4]], "a:int,b:int")
b = a.transform(mt_arrow)
dag.df([[1, 2], [3, 4]], "a:int,b:int").assert_eq(b)
b = a.transform(mt_arrow_2)
dag.df([[1], [3]], "a:long").assert_eq(b)
dag.run(self.engine)

def test_transform_binary(self):
with FugueWorkflow() as dag:
a = dag.df([[1, pickle.dumps([0, "a"])]], "a:int,b:bytes")
Expand Down Expand Up @@ -626,6 +645,10 @@ def t9(df: pd.DataFrame) -> Iterable[pd.DataFrame]:
incr()
yield df

def t10(df: pd.DataFrame) -> Iterable[pa.Table]:
incr()
yield pa.Table.from_pandas(df)

with FugueWorkflow() as dag:
a = dag.df([[1, 2], [3, 4]], "a:double,b:int")
a.out_transform(t1) # +2
Expand All @@ -637,14 +660,15 @@ def t9(df: pd.DataFrame) -> Iterable[pd.DataFrame]:
a.partition_by("b").out_transform(T7) # +1
a.out_transform(t8, ignore_errors=[NotImplementedError]) # +1
a.out_transform(t9) # +1
a.out_transform(t10) # +1
raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t2))
raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t3))
raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t4))
raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(t5))
raises(FugueWorkflowCompileValidationError, lambda: a.out_transform(T7))
dag.run(self.engine)

assert 12 <= incr()
assert 13 <= incr()

def test_out_cotransform(self): # noqa: C901
tmpdir = str(self.tmpdir)
Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.1"
__version__ = "0.8.2"
2 changes: 1 addition & 1 deletion tests/fugue/collections/test_function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _parse_function(f, params_re, return_re):
raises(TypeError, lambda: _parse_function(f15, "^e?(c|[dl]+)x*$", "n"))
_parse_function(f14, "^0?e?(c|[dl]+)x*$", "n")
_parse_function(f16, "^0e?(c|[dl]+)x*$", "n")
_parse_function(f33, "^$", "q")
_parse_function(f33, "^$", "l")
raises(TypeError, lambda: _parse_function(f34, "^[sq]$", "q"))
_parse_function(f36, "^FFfff+$", "F")

Expand Down
Loading

0 comments on commit 19f1479

Please sign in to comment.