Skip to content

Commit

Permalink
Make LocalExecutionEngine respect partition numbers (#496)
Browse files Browse the repository at this point in the history
* Make LocalExecutionEngine partition number aware

* add tests

* update release notes

* add template
  • Loading branch information
goodwanghan authored Jul 24, 2023
1 parent 0198c46 commit ff6128f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 7 deletions.
8 changes: 8 additions & 0 deletions .github/ISSUE_TEMPLATE/compatibility.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
name: Compatibility
about: Compatibility with dependent packages updates
title: "[COMPATIBILITY]"
labels: ''
assignees: ''

---
10 changes: 10 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Release Notes

## 0.8.6

- [497](https://github.com/fugue-project/fugue/issues/497) Make LocalExecutionEngine respect partition numbers
- [493](https://github.com/fugue-project/fugue/issues/493) Spark Pandas UDF partitioning improvement
- [492](https://github.com/fugue-project/fugue/issues/492) Made AnyDataFrame recognized by Creator, Processor and Ouputter
- [490](https://github.com/fugue-project/fugue/issues/490) Fixed pa.Table as transformer output bug
- [489](https://github.com/fugue-project/fugue/issues/489) Added version cap to Ibis
- [485](https://github.com/fugue-project/fugue/issues/485) Made Fugue compatible with Ray 2.5.0
- [486](https://github.com/fugue-project/fugue/issues/486) Added py.typed to Fugue

## 0.8.5

- [481](https://github.com/fugue-project/fugue/pull/481) Moved Fugue SQL dependencies into functions as soft dependencies
Expand Down
39 changes: 32 additions & 7 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import os
from typing import Any, Callable, Dict, List, Optional, Type, Union

import numpy as np
import pandas as pd
from triad import Schema
from triad.collections.dict import IndexedOrderedDict
Expand All @@ -17,6 +17,7 @@
parse_presort_exp,
)
from fugue.collections.sql import StructuredRawSQL
from fugue.constants import KEYWORD_PARALLELISM, KEYWORD_ROWCOUNT
from fugue.dataframe import (
AnyDataFrame,
DataFrame,
Expand Down Expand Up @@ -108,13 +109,37 @@ def map_dataframe(
.sort_values(presort_keys, ascending=presort_asc)
.reset_index(drop=True)
)
input_df = PandasDataFrame(pdf, df.schema, pandas_df_wrapper=True)
cursor.set(lambda: input_df.peek_array(), cursor.partition_no + 1, 0)
output_df = map_func(cursor, input_df)
input_df: LocalDataFrame = PandasDataFrame(
pdf, df.schema, pandas_df_wrapper=True
)
else:
df = df.as_local()
cursor.set(lambda: df.peek_array(), 0, 0)
output_df = map_func(cursor, df)
input_df = df.as_local()
if (
len(partition_spec.partition_by) == 0
and partition_spec.num_partitions != "0"
):
partitions = partition_spec.get_num_partitions(
**{
KEYWORD_ROWCOUNT: lambda: df.count(), # type: ignore
KEYWORD_PARALLELISM: lambda: 1,
}
)
dfs: List[DataFrame] = []
for p, subdf in enumerate(
np.array_split(input_df.as_pandas(), partitions)
):
if len(subdf) > 0:
tdf = PandasDataFrame(subdf, df.schema, pandas_df_wrapper=True)
cursor.set(lambda: tdf.peek_array(), p, 0)
dfs.append(map_func(cursor, tdf).as_pandas())
output_df: LocalDataFrame = PandasDataFrame(
pd.concat(dfs, ignore_index=True),
schema=output_schema,
pandas_df_wrapper=True,
)
else:
cursor.set(lambda: input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
if (
isinstance(output_df, PandasDataFrame)
and output_df.schema != output_schema
Expand Down
15 changes: 15 additions & 0 deletions tests/fugue/execution/test_naive_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def tr(df: pd.DataFrame) -> Iterable[pd.DataFrame]:
assert len(res) == 0
assert res.dtypes[0] == "int64"

def test_multiple_partitions(self):
def assert_one_item(df: pd.DataFrame) -> pd.DataFrame:
assert 1 == len(df)
return df

df = pd.DataFrame(dict(a=[1, 2, 3]))
res = fa.transform(df, assert_one_item, schema="*", partition="per_row")
assert res.values.tolist() == [[1], [2], [3]]

def num_part(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(b=len(df))

res = fa.transform(df, num_part, schema="*,b:long", partition=2)
assert res.values.tolist() == [[1, 2], [2, 2], [3, 1]]


def test_get_file_threshold():
assert -1 == _get_file_threshold(None)
Expand Down

0 comments on commit ff6128f

Please sign in to comment.