Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Ray/Dask engines guess optimal default partitions #404

Closed
goodwanghan opened this issue Dec 28, 2022 · 1 comment · Fixed by #396
Closed

[FEATURE] Ray/Dask engines guess optimal default partitions #404

goodwanghan opened this issue Dec 28, 2022 · 1 comment · Fixed by #396

Comments

@goodwanghan
Copy link
Collaborator

goodwanghan commented Dec 28, 2022

When converting local dataframes to a Ray Dataset and Dask DataFrame or when there is a group-map operation, Ray requires users to be explicit about the number of partitions and reducers. However, most users are not aware of this causing their transform function to use single cpu to process everything. We need to make this process smarter (but also safer). For example, can we find the current ray cluster's total number of cpus and use that to set the parallelism (if not specified). There can be pros and cons, we also need to careful about the strategy.

@goodwanghan goodwanghan linked a pull request Dec 29, 2022 that will close this issue
@goodwanghan goodwanghan added this to the 0.8.0 milestone Dec 29, 2022
@goodwanghan goodwanghan changed the title [FEATURE] Ray engine should automatically find optimal number of partitions if not specified [FEATURE] Ray/Dask engines should automatically find optimal number of partitions if not specified Dec 29, 2022
@goodwanghan
Copy link
Collaborator Author

goodwanghan commented Dec 29, 2022

The solution is

  1. Create config fugue.default.partitions (-1), fugue.dask.default.partitions, fugue.ray.default.partitions, fugue.ray.shuffle.partitions
  2. The default logic: fugue.ray.shuffle.partitions -> fugue.ray.default.partitions -> fugue.default.partitions and fugue.dask.default.partitions -> fugue.default.partitions
  3. When the default partitions setting is -1 that means it let Fugue guess an optimal partition number.

The guessing logic:

  1. If partition number is specified by user code, use it
  2. It will try to find the current parallelism of the execution engine (an abstract function in the engine class) and multiply with 2

Currently this logic only applies to Dask and Ray, not Spark.

Example:

import fugue.api as fa
import pandas as pd

pdf = pd.DataFrame(dict(a=range(100)))

# schema: *,c:int
def tr(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(c=len(df))

# schema: *,d:int
def tr2(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(d=len(df))

with fa.engine_context("ray"):  # on a 4 cpu machine
    res = fa.transform(pdf, tr)
    print(fa.get_num_partitions(res))  # 4*2=8
    res = fa.transform(pdf, tr2, partition={"by":"a"})
    print(fa.get_num_partitions(res))  # 4*2=8
    fa.show(res)

with fa.engine_context("ray", {"fugue.default.partitions":3}):
    res = fa.transform(pdf, tr)
    print(fa.get_num_partitions(res))  # 3
    res = fa.transform(pdf, tr2, partition={"by":"a"})
    print(fa.get_num_partitions(res))  # 3
    fa.show(res)

@goodwanghan goodwanghan changed the title [FEATURE] Ray/Dask engines should automatically find optimal number of partitions if not specified [FEATURE] Ray/Dask engines guess optimal default partitions Dec 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant