Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix issue #285 : save hive partitioned dataset using NativeExecutionEngine and DaskExecutionEngine #306

Merged
merged 18 commits into from
Apr 3, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ repos:
hooks:
- id: pylint
- repo: https://github.com/ambv/black
rev: 20.8b1
rev: 22.3.0
hooks:
- id: black
types: [python]
Expand Down
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[![PyPI license](https://img.shields.io/pypi/l/fugue.svg)](https://pypi.python.org/pypi/fugue/)
[![codecov](https://codecov.io/gh/fugue-project/fugue/branch/master/graph/badge.svg?token=ZO9YD5N3IA)](https://codecov.io/gh/fugue-project/fugue)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/4fa5f2f53e6f48aaa1218a89f4808b91)](https://www.codacy.com/gh/fugue-project/fugue/dashboard?utm_source=github.com&utm_medium=referral&utm_content=fugue-project/fugue&utm_campaign=Badge_Grade)
[![Downloads](https://pepy.tech/badge/fugue)](https://pepy.tech/project/fugue)

| Documentation | Tutorials | Chat with us on slack! |
| --- | --- | --- |
Expand All @@ -19,6 +20,8 @@
* Big data practitioners finding **testing code** to be costly and slow
* Data teams with big data projects that **struggle maintaining code**

For a more comprehensive overview of Fugue, read [this](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de) article.

## Select Features

* **Cross-framework code**: Write code once in native Python, SQL, or pandas then execute it on Dask or Spark with no rewrites. Logic and execution are decoupled through Fugue, enabling users to leverage the Spark and Dask engines without learning the specific framework syntax.
Expand Down Expand Up @@ -46,13 +49,13 @@ Now, the `map_letter_to_food()` function is brought to the Spark execution engin

```python
from fugue import transform
from fugue_spark import SparkExecutionEngine
import fugue_spark

df = transform(input_df,
map_letter_to_food,
schema="*",
params=dict(mapping=map_dict),
engine=SparkExecutionEngine
engine="spark"
)
df.show()
```
Expand Down Expand Up @@ -185,22 +188,41 @@ docker run -p 8888:8888 fugueproject/tutorials:latest

For the API docs, [click here](https://fugue.readthedocs.org)

## Ecosystem

By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly.

Fugue can use the following projects as backends:

* [Spark](https://github.com/apache/spark)
* [Dask](https://github.com/dask/dask)
* [Duckdb](https://github.com/duckdb/duckdb) - in-process SQL OLAP database management
* [Ibis](https://github.com/ibis-project/ibis/) - pandas-like interface for SQL engines
* [blazing-sql](https://github.com/BlazingDB/blazingsql) - GPU accelerated SQL engine based on cuDF
* [dask-sql](https://github.com/dask-contrib/dask-sql) - SQL interface for Dask

Fugue is available as a backend or can integrate with the following projects:

* [PyCaret](https://github.com/pycaret/pycaret) - low code machine learning
* [Pandera](https://github.com/pandera-dev/pandera) - data validation


## Further Resources

View some of our latest conferences presentations and content. For a more complete list, check the [Resources](https://fugue-tutorials.readthedocs.io/en/latest/tutorials/resources.html) page in the tutorials.

### Blogs

* [Fugue: Reducing Spark Developer Friction (James Le)](https://jameskle.com/writes/fugue)
* [Introducing Fugue - Reducing PySpark Developer Friction](https://towardsdatascience.com/introducing-fugue-reducing-pyspark-developer-friction-a702230455de)
* [Introducing FugueSQL — SQL for Pandas, Spark, and Dask DataFrames (Towards Data Science by Khuyen Tran)](https://towardsdatascience.com/introducing-fuguesql-sql-for-pandas-spark-and-dask-dataframes-63d461a16b27)
* [Interoperable Python and SQL in Jupyter Notebooks (Towards Data Science)](https://towardsdatascience.com/interoperable-python-and-sql-in-jupyter-notebooks-86245e711352)
* [Using Pandera on Spark for Data Validation through Fugue (Towards Data Science)](https://towardsdatascience.com/using-pandera-on-spark-for-data-validation-through-fugue-72956f274793)

### Conferences

* [Large Scale Data Validation with Spark and Dask (PyCon US 2021)](https://www.youtube.com/watch?v=2AdvBgjO_3Q)
* [Dask SQL Query Engines (Dask Summit 2021)](https://www.youtube.com/watch?v=bQDN41Bc3bw)
* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon 2021)](https://www.youtube.com/watch?v=fDIRMiwc0aA)
* [Large Scale Data Validation with Spark and Dask (PyCon US)](https://www.youtube.com/watch?v=2AdvBgjO_3Q)
* [FugueSQL - The Enhanced SQL Interface for Pandas, Spark, and Dask DataFrames (PyData Global)](https://www.youtube.com/watch?v=OBpnGYjNBBI)
* [Scaling Machine Learning Workflows to Big Data with Fugue (KubeCon)](https://www.youtube.com/watch?v=fDIRMiwc0aA)

## Community and Contributing

Expand Down
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- Enable DaskExecutionEngine to transform dataframes with [nested](https://github.com/fugue-project/fugue/issues/299) columns
- A [smarter](https://github.com/fugue-project/fugue/issues/304) way to determine default npartitions in Dask
- Support [even partitioning](https://github.com/fugue-project/fugue/issues/303) on Dask
- Add handling of [nested ArrayType](https://github.com/fugue-project/fugue/issues/308) on Spark
- Change to [plugin approach](https://github.com/fugue-project/fugue/issues/310) to avoid explicit import

## 0.6.5

Expand Down
2 changes: 2 additions & 0 deletions fugue/_utils/interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def _parse_param( # noqa: C901
param: Optional[inspect.Parameter],
none_as_other: bool = True,
) -> "_FuncParam":
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

if annotation == type(None): # noqa: E721
return _NoneParam(param)
if annotation == inspect.Parameter.empty:
Expand Down
17 changes: 17 additions & 0 deletions fugue/_utils/register.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
try:
from importlib.metadata import entry_points # type:ignore
except ImportError: # pragma: no cover
from importlib_metadata import entry_points # type:ignore


def register_plugins():
for plugin in entry_points().get("fugue.plugins", []):
try:
register_func = plugin.load()
assert callable(register_func), f"{plugin.name} is not a callable"
register_func()
except ImportError: # pragma: no cover
pass


register_plugins()
31 changes: 18 additions & 13 deletions fugue/execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
SqliteEngine,
)

register_execution_engine(
"native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_execution_engine(
"pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore")
register_sql_engine(
"qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)
register_sql_engine(
"qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)

def register():
register_execution_engine(
"native", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_execution_engine(
"pandas", lambda conf: NativeExecutionEngine(conf), on_dup="ignore"
)
register_sql_engine("sqlite", lambda engine: SqliteEngine(engine), on_dup="ignore")
register_sql_engine(
"qpdpandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)
register_sql_engine(
"qpd_pandas", lambda engine: QPDPandasEngine(engine), on_dup="ignore"
)


register()
4 changes: 4 additions & 0 deletions fugue/execution/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ def make_execution_engine(
# SparkExecutionEngine + S2
make_execution_engine((SparkExecutionEngine, "s"))
"""
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

return _EXECUTION_ENGINE_FACTORY.make(engine, conf, **kwargs)


Expand Down Expand Up @@ -404,4 +406,6 @@ def make_sql_engine(
# SqliteEngine(engine)
make_sql_engine(SqliteEngine)
"""
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

return _EXECUTION_ENGINE_FACTORY.make_sql_engine(engine, execution_engine, **kwargs)
6 changes: 2 additions & 4 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,8 @@ def save_df(
force_single: bool = False,
**kwargs: Any,
) -> None:
if not partition_spec.empty:
self.log.warning( # pragma: no cover
"partition_spec is not respected in %s.save_df", self
)
if not force_single and not partition_spec.empty:
kwargs["partition_cols"] = partition_spec.partition_by
self.fs.makedirs(os.path.dirname(path), recreate=True)
df = self.to_df(df)
save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions fugue/extensions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def register(self, name: str, extension: Any, on_dup="overwrite") -> None:
raise ValueError(on_dup)

def get(self, obj: Any) -> Any:
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

if isinstance(obj, str) and obj in self._dict:
return self._dict[obj]
return obj
Expand Down
2 changes: 2 additions & 0 deletions fugue/workflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ def is_acceptable_raw_df(df: Any) -> bool:
:param df: input raw dataframe
:return: whether this dataframe is convertible
"""
import fugue._utils.register # pylint: disable=W0611 # noqa: F401

return any(isinstance(df, t) for t in _VALID_RAW_DF_TYPES)
5 changes: 2 additions & 3 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,12 @@ def save_df(
format_hint=format_hint,
mode=mode,
partition_spec=partition_spec,
force_single=force_single,
**kwargs,
)
else:
if not partition_spec.empty:
self.log.warning( # pragma: no cover
"partition_spec is not respected in %s.save_df", self
)
kwargs["partition_on"] = partition_spec.partition_by
self.fs.makedirs(os.path.dirname(path), recreate=True)
df = self.to_df(df)
save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs)
2 changes: 1 addition & 1 deletion fugue_duckdb/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def save_df(
NotImplementedError(f"{mode} is not supported"),
)
p = FileParser(uri, format_hint).assert_no_glob()
if p.file_format not in self._format_save:
if (p.file_format not in self._format_save) or ("partition_cols" in kwargs):
self._fs.makedirs(os.path.dirname(uri), recreate=True)
ldf = ArrowDataFrame(df.native.arrow())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use ArrowDataFrame(df.as_arrow()) instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

return save_df(
Expand Down
2 changes: 2 additions & 0 deletions fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ def save_df(
force_single: bool = False,
**kwargs: Any,
) -> None:
if not partition_spec.empty and not force_single:
kwargs["partition_cols"] = partition_spec.partition_by
dio = DuckDBIO(self.fs, self.connection)
dio.save_df(self.to_df(df), path, format_hint, mode, **kwargs)

Expand Down
10 changes: 7 additions & 3 deletions fugue_ibis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# flake8: noqa
from fugue_ibis.execution import pandas_backend
from fugue_ibis.execution.ibis_engine import IbisEngine
from fugue_ibis.extensions import run_ibis, as_ibis, as_fugue
from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from fugue_ibis.execution.pandas_backend import _to_pandas_ibis_engine
from fugue_ibis.extensions import as_fugue, as_ibis, run_ibis


def register():
register_ibis_engine(1, _to_pandas_ibis_engine)
5 changes: 1 addition & 4 deletions fugue_ibis/execution/pandas_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from triad.utils.assertion import assert_or_throw
import pandas as pd

from fugue_ibis.execution.ibis_engine import IbisEngine, register_ibis_engine
from fugue_ibis.execution.ibis_engine import IbisEngine
from fugue_ibis._utils import to_schema, to_ibis_schema
from ibis.backends.pandas import Backend

Expand Down Expand Up @@ -53,6 +53,3 @@ def table(self, name: str, schema: Any = None):
if schema is None and name in self._schemas
else schema,
)


register_ibis_engine(1, _to_pandas_ibis_engine)
35 changes: 1 addition & 34 deletions fugue_notebook/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

import fugue_sql
import pandas as pd
from fugue import (
ExecutionEngine,
NativeExecutionEngine,
make_execution_engine,
register_execution_engine,
)
from fugue import ExecutionEngine, make_execution_engine
from fugue.dataframe import YieldedDataFrame
from fugue.extensions._builtins.outputters import Show
from fugue_sql.exceptions import FugueSQLSyntaxError
Expand Down Expand Up @@ -38,33 +33,6 @@ def get_pretty_print(self) -> Callable:
"""Fugue dataframe pretty print handler"""
return _default_pretty_print

def register_execution_engines(self):
"""Register execution engines with names. This will also try to register
spark and dask engines if the dependent packages are available and they
are not registered"""
register_execution_engine(
"native",
lambda conf, **kwargs: NativeExecutionEngine(conf=conf),
on_dup="ignore",
)

try:
import pyspark # noqa: F401
import fugue_spark # noqa: F401
except ImportError:
pass

try:
import dask.dataframe # noqa: F401
import fugue_dask # noqa: F401
except ImportError:
pass

try:
import fugue_duckdb # noqa: F401
except ImportError:
pass


@magics_class
class _FugueSQLMagics(Magics):
Expand Down Expand Up @@ -151,5 +119,4 @@ def _setup_fugue_notebook(
fsql_ignore_case=fsql_ignore_case,
)
ipython.register_magics(magics)
s.register_execution_engines()
Show.set_hook(s.get_pretty_print())
2 changes: 2 additions & 0 deletions fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def _to_arrow_type(dt: pt.DataType) -> pa.DataType:
for field in dt
]
return pa.struct(fields)
if isinstance(dt, pt.ArrayType):
return pa.list_(_to_arrow_type(dt.elementType))
return to_arrow_type(dt)


Expand Down
18 changes: 17 additions & 1 deletion fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def test_out_cotransform(self): # noqa: C901
def incr():
fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True)
fs.writetext(str(uuid4()) + ".txt", "")
return fs.glob("*.txt").count().files
return fs.glob("*.tx" "t").count().files

def t1(
df: Iterable[Dict[str, Any]], df2: pd.DataFrame
Expand Down Expand Up @@ -1175,6 +1175,7 @@ def init_tmpdir(self, tmpdir):
def test_io(self):
path = os.path.join(self.tmpdir, "a")
path2 = os.path.join(self.tmpdir, "b.test.csv")
path3 = os.path.join(self.tmpdir, "c.partition")
with self.dag() as dag:
b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
b.partition(num=3).save(path, fmt="parquet", single=True)
Expand All @@ -1185,6 +1186,21 @@ def test_io(self):
a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int"))
a = dag.load(path2, header=True, columns="c:int,a:long")
a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long"))
with self.dag() as dag:
b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
b.partition(by="c").save(path3, fmt="parquet", single=False)
assert FileSystem().isdir(path3)
assert FileSystem().isdir(os.path.join(path3, "c=6"))
assert FileSystem().isdir(os.path.join(path3, "c=2"))
# TODO: in test below, once issue #288 is fixed, use dag.load
# instead of pd.read_parquet
pd.testing.assert_frame_equal(
pd.read_parquet(path3).sort_values("a").reset_index(drop=True),
pd.DataFrame({"c": pd.Categorical([6, 2]), "a": [1, 7]}).reset_index(
drop=True
),
check_like=True,
)

def test_save_and_use(self):
path = os.path.join(self.tmpdir, "a")
Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.5"
__version__ = "0.6.6"
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ qpd[dask]

# test requirements
pre-commit
black
black>=22.3.0
mypy
flake8
autopep8
Expand Down
Loading