Skip to content

Commit

Permalink
Enable engine context (#394)
Browse files Browse the repository at this point in the history
* Enable engine context

* refactor infer_execution_engine

* fix

* fix ray breaking change

* improve make_execution_engine

* update docs
  • Loading branch information
goodwanghan authored Dec 17, 2022
1 parent eb5b7cc commit d0da068
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 55 deletions.
35 changes: 35 additions & 0 deletions docs/api_sql/fugue_sql.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,37 @@
fugue\_sql
===========

.. |SchemaLikeObject| replace:: :ref:`Schema like object <tutorial:tutorials/advanced/x-like:schema>`
.. |ParamsLikeObject| replace:: :ref:`Parameters like object <tutorial:tutorials/advanced/x-like:parameters>`
.. |DataFrameLikeObject| replace:: :ref:`DataFrame like object <tutorial:tutorials/advanced/x-like:dataframe>`
.. |DataFramesLikeObject| replace:: :ref:`DataFrames like object <tutorial:tutorials/advanced/x-like:dataframes>`
.. |PartitionLikeObject| replace:: :ref:`Partition like object <tutorial:tutorials/advanced/x-like:partition>`
.. |RPCHandlerLikeObject| replace:: :ref:`RPChandler like object <tutorial:tutorials/advanced/x-like:rpc>`

.. |ExecutionEngine| replace:: :class:`~fugue.execution.execution_engine.ExecutionEngine`
.. |NativeExecutionEngine| replace:: :class:`~fugue.execution.native_execution_engine.NativeExecutionEngine`
.. |FugueWorkflow| replace:: :class:`~fugue.workflow.workflow.FugueWorkflow`

.. |ReadJoin| replace:: Read Join tutorials on :ref:`workflow <tutorial:tutorials/advanced/dag:join>` and :ref:`engine <tutorial:tutorials/advanced/execution_engine:join>` for details
.. |FugueConfig| replace:: :doc:`the Fugue Configuration Tutorial <tutorial:tutorials/advanced/useful_config>`
.. |PartitionTutorial| replace:: :doc:`the Partition Tutorial <tutorial:tutorials/advanced/partition>`
.. |FugueSQLTutorial| replace:: :doc:`the Fugue SQL Tutorial <tutorial:tutorials/fugue_sql/index>`
.. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial <tutorial:tutorials/advanced/schema_dataframes:dataframe>`
.. |ExecutionEngineTutorial| replace:: :doc:`the ExecutionEngine Tutorial <tutorial:tutorials/advanced/execution_engine>`
.. |ZipComap| replace:: :ref:`Zip & Comap <tutorial:tutorials/advanced/execution_engine:zip & comap>`
.. |LoadSave| replace:: :ref:`Load & Save <tutorial:tutorials/advanced/execution_engine:load & save>`
.. |AutoPersist| replace:: :ref:`Auto Persist <tutorial:tutorials/advanced/useful_config:auto persist>`
.. |TransformerTutorial| replace:: :doc:`the Transformer Tutorial <tutorial:tutorials/extensions/transformer>`
.. |CoTransformer| replace:: :ref:`CoTransformer <tutorial:tutorials/advanced/dag:cotransformer>`
.. |CoTransformerTutorial| replace:: :doc:`the CoTransformer Tutorial <tutorial:tutorials/extensions/cotransformer>`
.. |FugueDataTypes| replace:: :doc:`Fugue Data Types <tutorial:tutorials/appendix/generate_types>`


fugue\_sql.exceptions
---------------------

.. automodule:: fugue_sql.exceptions
:members:
:undoc-members:
:show-inheritance:

1 change: 1 addition & 0 deletions fugue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine
from fugue.execution.factory import (
infer_execution_engine,
is_pandas_or,
make_execution_engine,
make_sql_engine,
parse_execution_engine,
Expand Down
4 changes: 4 additions & 0 deletions fugue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ class FugueBug(FugueError):
"""Fugue internal bug"""


class FugueInvalidOperation(FugueError):
"""Invalid operation on the Fugue framework"""


class FuguePluginsRegistrationError(FugueError):
"""Fugue plugins registration error"""

Expand Down
27 changes: 26 additions & 1 deletion fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Union
from uuid import uuid4

from triad import ParamDict, Schema, assert_or_throw
Expand All @@ -24,6 +26,10 @@
from fugue.dataframe.utils import deserialize_df, serialize_df
from fugue.exceptions import FugueBug

_FUGUE_EXECUTION_ENGINE_CONTEXT = ContextVar(
"_FUGUE_EXECUTION_ENGINE_CONTEXT", default=None
)

_DEFAULT_JOIN_KEYS: List[str] = []


Expand Down Expand Up @@ -150,6 +156,25 @@ def __init__(self, conf: Any):
self._sql_engine: Optional[SQLEngine] = None
self._map_engine: Optional[MapEngine] = None

@contextmanager
def as_context(self) -> Iterator["ExecutionEngine"]:
"""Set this execution engine as the context engine. This function
is thread safe and async safe.
.. admonition:: Examples
.. code-block:: python
with engine.as_context():
transform(df, func) # will use engine in this transformation
"""
token = _FUGUE_EXECUTION_ENGINE_CONTEXT.set(self) # type: ignore
try:
yield self
finally:
_FUGUE_EXECUTION_ENGINE_CONTEXT.reset(token)

def stop(self) -> None:
"""Stop this execution engine, do not override
You should customize :meth:`~.stop_engine` if necessary.
Expand Down
77 changes: 68 additions & 9 deletions fugue/execution/factory.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from typing import Any, Callable, Optional, Type, Union
from typing import Any, Callable, List, Optional, Type, Union

from fugue._utils.registry import fugue_plugin
from fugue.exceptions import FuguePluginsRegistrationError
from fugue.execution.execution_engine import ExecutionEngine, SQLEngine
from fugue.execution.native_execution_engine import NativeExecutionEngine
import pandas as pd
from triad import ParamDict, assert_or_throw
from triad.utils.convert import to_instance

from .._utils.registry import fugue_plugin
from ..exceptions import FuguePluginsRegistrationError
from .execution_engine import (
_FUGUE_EXECUTION_ENGINE_CONTEXT,
ExecutionEngine,
SQLEngine,
)
from .native_execution_engine import NativeExecutionEngine


def register_execution_engine(
name_or_type: Union[str, Type], func: Callable, on_dup="overwrite"
Expand Down Expand Up @@ -215,7 +221,10 @@ def register_default_sql_engine(func: Callable, on_dup="overwrite") -> None:


def make_execution_engine(
engine: Any = None, conf: Any = None, **kwargs: Any
engine: Any = None,
conf: Any = None,
infer_by: Optional[List[Any]] = None,
**kwargs: Any,
) -> ExecutionEngine:
"""Create :class:`~fugue.execution.execution_engine.ExecutionEngine`
with specified ``engine``
Expand All @@ -228,11 +237,29 @@ def make_execution_engine(
engine and the second value represents the sql engine (you can use ``None``
for either of them to use the default one), defaults to None
:param conf: |ParamsLikeObject|, defaults to None
:param infer_by: List of objects that can be used to infer the execution
engine using :func:`~.infer_execution_engine`
:param kwargs: additional parameters to initialize the execution engine
:return: the :class:`~fugue.execution.execution_engine.ExecutionEngine`
instance
.. note::
This function finds/constructs the engine in the following order:
* If ``engine`` is None, it first try to see if there is any defined
context engine to use (=> engine)
* If ``engine`` is still empty, then if ``infer_by``
is given, it will try to infer the execution engine (=> engine)
* If ``engine`` is still empty, then it will construct the default
engine defined by :func:`~.register_default_execution_engine` (=> engine)
* Now, ``engine`` must not be empty, if it is an object other than
:class:`~fugue.execution.execution_engine.ExecutionEngine`, we will use
:func:`~.parse_execution_engine` to construct (=> engine)
* Now, ``engine`` must have been an ExecutionEngine object. We update its
SQL engine if specified, then update its config using ``conf`` and ``kwargs``
.. admonition:: Examples
.. code-block:: python
Expand Down Expand Up @@ -260,7 +287,20 @@ def make_execution_engine(
# SparkExecutionEngine + S2
make_execution_engine((SparkExecutionEngine, "s"))
# assume object e2_df can infer E2 engine
make_execution_engine(infer_by=[e2_df]) # an E2 engine
# context
with E2(conf).as_context() as ec:
make_execution_engine() # ec
make_execution_engine() # the default execution engine
"""
if engine is None:
engine = _FUGUE_EXECUTION_ENGINE_CONTEXT.get()
if engine is None and infer_by is not None:
engine = infer_execution_engine(infer_by)

if isinstance(engine, tuple):
execution_engine = make_execution_engine(engine[0], conf=conf, **kwargs)
sql_engine = make_sql_engine(engine[1], execution_engine)
Expand Down Expand Up @@ -336,12 +376,31 @@ def parse_execution_engine(
) from e


def is_pandas_or(objs: List[Any], obj_type: Any) -> bool:
"""Check whether the input contains at least one ``obj_type`` object and the
rest are Pandas DataFrames. This function is a utility function for extending
:func:`~.infer_execution_engine`
:param objs: the list of objects to check
:return: whether all objs are of type ``obj_type`` or pandas DataFrame and at
least one is of type ``obj_type``
"""
tc = 0
for obj in objs:
if not isinstance(obj, pd.DataFrame):
if isinstance(obj, obj_type):
tc += 1
else:
return False
return tc > 0


@fugue_plugin
def infer_execution_engine(obj: Any) -> Any:
"""Infer the correspondent ExecutionEngine based on the input object. This is
def infer_execution_engine(obj: List[Any]) -> Any:
"""Infer the correspondent ExecutionEngine based on the input objects. This is
used in interfaceless functions.
:param obj: the object
:param objs: the objects
:return: if the inference succeeded, it returns an object that can be used by
:func:`~.parse_execution_engine` in the ``engine`` field to construct an
ExecutionEngine. Otherwise, it returns None.
Expand Down
13 changes: 4 additions & 9 deletions fugue/interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from fugue.constants import FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT
from fugue.dataframe import DataFrame
from fugue.exceptions import FugueInterfacelessError, FugueWorkflowCompileError
from fugue.execution import infer_execution_engine
from fugue.execution import make_execution_engine
from fugue.workflow import FugueWorkflow


Expand Down Expand Up @@ -170,10 +170,7 @@ def _no_op_processor(df: DataFrame) -> DataFrame:
else:
tdf.save(save_path, fmt="parquet")

if engine is None:
engine = infer_execution_engine(df)

dag.run(engine, conf=engine_conf)
dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
if checkpoint:
result = dag.yields["result"].result # type:ignore
else:
Expand Down Expand Up @@ -226,6 +223,7 @@ def out_transform(
:param engine_conf: |ParamsLikeObject|, defaults to None
.. note::
This function can only take parquet file paths in `df`. Csv and other file
formats are disallowed.
Expand All @@ -248,7 +246,4 @@ def out_transform(
ignore_errors=ignore_errors or [],
)

if engine is None:
engine = infer_execution_engine(df)

dag.run(engine, conf=engine_conf)
dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
19 changes: 12 additions & 7 deletions fugue_dask/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,30 @@

import dask.dataframe as dd
from dask.distributed import Client
from fugue import DataFrame, infer_execution_engine, register_execution_engine
from triad import run_at_def

from fugue import (
DataFrame,
infer_execution_engine,
is_pandas_or,
register_execution_engine,
)
from fugue._utils.interfaceless import (
DataFrameParam,
ExecutionEngineParam,
SimpleAnnotationConverter,
register_annotation_converter,
)
from fugue.workflow import register_raw_df_type
from triad import run_at_def

from fugue_dask.execution_engine import DaskExecutionEngine
from fugue_dask.dataframe import DaskDataFrame
from fugue_dask._utils import DASK_UTILS
from fugue_dask.dataframe import DaskDataFrame
from fugue_dask.execution_engine import DaskExecutionEngine


@infer_execution_engine.candidate(
lambda obj: isinstance(obj, (dd.DataFrame, DaskDataFrame))
lambda objs: is_pandas_or(objs, (dd.DataFrame, DaskDataFrame))
)
def _infer_dask_client(obj: Any) -> Any:
def _infer_dask_client(objs: Any) -> Any:
return DASK_UTILS.get_or_create_client()


Expand Down
2 changes: 1 addition & 1 deletion fugue_duckdb/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def to_df(self, df: Any, schema: Any = None) -> DuckDataFrame:
if isinstance(df, (dd.DataFrame, DaskDataFrame)):
ddf = self._to_dask_df(df, schema)
if all(not pa.types.is_nested(f.type) for f in ddf.schema.fields):
return DuckDataFrame(self.connection.df(ddf.as_pandas()))
return DuckDataFrame(self.connection.from_df(ddf.as_pandas()))
else:
return DuckDataFrame(
duckdb.arrow(ddf.as_arrow(), connection=self.connection)
Expand Down
2 changes: 1 addition & 1 deletion fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def _to_duck_df(self, df: Any, schema: Any = None) -> DuckDataFrame:
if isinstance(df, PandasDataFrame) and all(
not pa.types.is_nested(f.type) for f in df.schema.fields
):
rdf = DuckDataFrame(self.connection.df(df.as_pandas()))
rdf = DuckDataFrame(self.connection.from_df(df.as_pandas()))
else:
rdf = DuckDataFrame(
duckdb.arrow(df.as_arrow(), connection=self.connection)
Expand Down
9 changes: 5 additions & 4 deletions fugue_duckdb/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
from typing import Any, Optional

from duckdb import DuckDBPyConnection, DuckDBPyRelation
from triad import run_at_def

from fugue import (
DataFrame,
ExecutionEngine,
infer_execution_engine,
is_pandas_or,
register_execution_engine,
register_sql_engine,
)
Expand All @@ -16,16 +19,14 @@
register_annotation_converter,
)
from fugue.workflow import register_raw_df_type
from triad import run_at_def

from fugue_duckdb.dataframe import DuckDataFrame
from fugue_duckdb.execution_engine import DuckDBEngine, DuckExecutionEngine


@infer_execution_engine.candidate(
lambda obj: isinstance(obj, (DuckDBPyRelation, DuckDataFrame))
lambda objs: is_pandas_or(objs, (DuckDBPyRelation, DuckDataFrame))
)
def _infer_duckdb_client(obj: Any) -> Any:
def _infer_duckdb_client(objs: Any) -> Any:
return "duckdb"


Expand Down
6 changes: 4 additions & 2 deletions fugue_ray/_utils/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@


def get_dataset_format(df: rd.Dataset) -> Optional[str]:
try:
return df._dataset_format()
try: # pragma: no cover
if hasattr(df, "_dataset_format"): # ray<2.2
return df._dataset_format()
return df.dataset_format() # ray>=2.2
except Exception:
return None

Expand Down
Loading

0 comments on commit d0da068

Please sign in to comment.