Skip to content

Commit

Permalink
Interactive workflow (#19)
Browse files Browse the repository at this point in the history
* Make building workflow independent from engine

* update version

* update version

* update workflow language

* update

* update

* add join syntax sugar

* update

* update version
  • Loading branch information
Han Wang authored May 31, 2020
1 parent 9f730cb commit 08a5637
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 45 deletions.
2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.1"
__version__ = "0.2.2"
5 changes: 3 additions & 2 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,20 @@ def __getitem__(self, cols: List[Any]) -> "DataFrame":

def show(
self,
n: int = 10,
rows: int = 10,
show_count: bool = False,
title: Optional[str] = None,
best_width: int = 100,
) -> None:
n = rows
arr: List[List[str]] = self.head(n)
count = -1
if len(arr) < n:
count = len(arr)
elif show_count:
count = self.count()
with DataFrame.SHOW_LOCK:
if title is not None:
if title is not None and title != "":
print(title)
print(type(self).__name__)
tb = _PrettyTable(self.schema, arr, best_width)
Expand Down
2 changes: 1 addition & 1 deletion fugue/extensions/builtins/outputters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def process(self, dfs: DataFrames) -> None:
for df in dfs.values():
df.show(
self.params.get("rows", 10),
self.params.get("count", False),
self.params.get("show_count", False),
title=self.params.get("title", ""),
)

Expand Down
6 changes: 5 additions & 1 deletion fugue/workflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from adagio.instances import TaskContext
from adagio.specs import InputSpec, OutputSpec, TaskSpec
from fugue.collections.partition import PartitionSpec
from fugue.workflow.workflow_context import FugueWorkflowContext
from fugue.dataframe import DataFrame, DataFrames
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.exceptions import FugueWorkflowError
from fugue.execution import ExecutionEngine
from fugue.extensions.creator.convert import to_creator
from fugue.extensions.outputter.convert import to_outputter
from fugue.extensions.processor.convert import to_processor
from fugue.workflow.workflow_context import FugueWorkflowContext
from triad.collections.dict import ParamDict
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
Expand Down Expand Up @@ -202,6 +203,7 @@ def __init__(
super().__init__(
params=params,
input_n=input_n,
output_n=1,
deterministic=deterministic,
lazy=lazy,
input_names=input_names,
Expand All @@ -214,3 +216,5 @@ def execute(self, ctx: TaskContext) -> None:
self._outputter.process(DataFrames(ctx.inputs))
else:
self._outputter.process(DataFrames(ctx.inputs.values()))
# TODO: output dummy to force cache to work, should we fix adagio?
ctx.outputs["_0"] = ArrayDataFrame([], "_0:int")
86 changes: 60 additions & 26 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from adagio.specs import WorkflowSpec
from fugue.collections.partition import PartitionSpec
from fugue.workflow.tasks import Create, FugueTask, Output, Process
from fugue.workflow.workflow_context import FugueWorkflowContext
from fugue.workflow.workflow_context import (
FugueInteractiveWorkflowContext,
FugueWorkflowContext,
)
from fugue.dataframe import DataFrame
from fugue.dataframe.dataframes import DataFrames
from fugue.extensions.builtins import (
Expand Down Expand Up @@ -84,7 +87,7 @@ def show(
best_width: int = 100,
) -> None:
# TODO: best_width is not used
self.workflow.show(self, rows=rows, count=show_count, title=title)
self.workflow.show(self, rows=rows, show_count=show_count, title=title)

def assert_eq(self, *dfs: Any, **params: Any) -> None:
self.workflow.assert_eq(self, *dfs, **params)
Expand All @@ -109,53 +112,41 @@ def transform(
)
return self.to_self_type(df)

def join(
self: TDF, *dfs: Any, how: str, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
def join(self: TDF, *dfs: Any, how: str, on: Optional[Iterable[str]] = None) -> TDF:
df = self.workflow.join(self, *dfs, how=how, on=on)
return self.to_self_type(df)

def inner_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
def inner_join(self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None) -> TDF:
return self.join(*dfs, how="inner", on=on)

def semi_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
def semi_join(self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None) -> TDF:
return self.join(*dfs, how="semi", on=on)

def left_semi_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
def left_semi_join(self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None) -> TDF:
return self.join(*dfs, how="left_semi", on=on)

def anti_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
def anti_join(self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None) -> TDF:
return self.join(*dfs, how="anti", on=on)

def left_anti_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
def left_anti_join(self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None) -> TDF:
return self.join(*dfs, how="left_anti", on=on)

def left_outer_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
) -> TDF:
return self.join(*dfs, how="left_outer", on=on)

def right_outer_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
) -> TDF:
return self.join(*dfs, how="right_outer", on=on)

def full_outer_join(
self: TDF, *dfs: Any, on: Optional[Iterable[str]] = None
) -> TDF: # pragma: no cover
) -> TDF:
return self.join(*dfs, how="full_outer", on=on)

def cross_join(self: TDF, *dfs: Any) -> TDF: # pragma: no cover
def cross_join(self: TDF, *dfs: Any) -> TDF:
return self.join(*dfs, how="cross")

def persist(self: TDF, level: Any = None) -> TDF:
Expand Down Expand Up @@ -381,10 +372,12 @@ def show(
self,
*dfs: Any,
rows: int = 10,
count: bool = False,
show_count: bool = False,
title: Optional[str] = None,
) -> None:
self.output(*dfs, using=Show, params=dict(rows=rows, count=count, title=title))
self.output(
*dfs, using=Show, params=dict(rows=rows, show_count=show_count, title=title)
)

def join(
self, *dfs: Any, how: str, on: Optional[Iterable[str]] = None
Expand Down Expand Up @@ -471,6 +464,47 @@ def _to_ctx(self, *args: Any, **kwargs) -> FugueWorkflowContext:
return FugueWorkflowContext(*args, **kwargs)


class FugueInteractiveWorkflow(FugueWorkflow):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__()
self._workflow_ctx = self._to_ctx(*args, **kwargs)

def run(self, *args: Any, **kwargs: Any) -> None:
assert_or_throw(
len(args) == 0 and len(kwargs) == 0,
FugueWorkflowError(
"can't reset workflow context in FugueInteractiveWorkflow"
),
)
with self._lock:
self._computed = False
self._workflow_ctx.run(self._spec, {})
self._computed = True

def get_result(self, df: WorkflowDataFrame) -> DataFrame:
return self._workflow_ctx.get_result(id(df._task))

def __enter__(self):
raise FugueWorkflowError(
"with statement is invalid for FugueInteractiveWorkflow"
)

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
raise FugueWorkflowError( # pragma: no cover
"with statement is invalid for FugueInteractiveWorkflow"
)

def add(self, task: FugueTask, *args: Any, **kwargs: Any) -> WorkflowDataFrame:
df = super().add(task, *args, **kwargs)
self.run()
return df

def _to_ctx(self, *args: Any, **kwargs) -> FugueInteractiveWorkflowContext:
if len(args) == 1 and isinstance(args[0], FugueInteractiveWorkflowContext):
return args[0]
return FugueInteractiveWorkflowContext(*args, **kwargs)


class _Dependencies(object):
def __init__(
self,
Expand Down
74 changes: 70 additions & 4 deletions fugue/workflow/workflow_context.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from threading import RLock
from typing import Any, Dict
from typing import Any, Dict, Tuple

from adagio.instances import (
NoOpCache,
SequentialExecutionEngine,
WorkflowContext,
WorkflowHooks,
WorkflowResultCache,
)
from fugue.dataframe import DataFrame
from fugue.execution.execution_engine import ExecutionEngine
from fugue.execution.native_execution_engine import NativeExecutionEngine
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_instance


Expand All @@ -25,16 +28,16 @@ def __init__(
ee: ExecutionEngine = NativeExecutionEngine()
else:
ee = to_instance(execution_engine, ExecutionEngine)
self._fugue_engine = ee
self._lock = RLock()
self._results: Dict[Any, DataFrame] = {}
super().__init__(
cache=cache,
engine=workflow_engine,
hooks=hooks,
logger=ee.log,
config=ee.conf,
)
self._fugue_engine = ee
self._lock = RLock()
self._results: Dict[Any, DataFrame] = {}

@property
def execution_engine(self) -> ExecutionEngine:
Expand All @@ -47,3 +50,66 @@ def set_result(self, key: Any, df: DataFrame) -> None:
def get_result(self, key: Any) -> DataFrame:
with self._lock:
return self._results[key]


class FugueInteractiveWorkflowContext(FugueWorkflowContext):
def __init__(
self,
execution_engine: Any = None,
cache: Any = NoOpCache,
workflow_engine: Any = SequentialExecutionEngine,
hooks: Any = WorkflowHooks,
):
super().__init__(
cache=cache,
workflow_engine=workflow_engine,
hooks=hooks,
execution_engine=execution_engine,
)
self._cache = _FugueInteractiveCache(self, self._cache) # type: ignore


class _FugueInteractiveCache(WorkflowResultCache):
"""Fugue cache for interactive operations.
"""

def __init__(self, wf_ctx: "WorkflowContext", cache: FugueWorkflowContext):
super().__init__(wf_ctx)
self._lock = RLock()
self._data: Dict[str, Any] = {}
self._cache = cache

def set(self, key: str, value: Any) -> None:
"""Set `key` with `value`
:param key: uuid string
:param value: any value
"""
with self._lock:
self._data[key] = value
self._cache.set(key, value)

def skip(self, key: str) -> None: # pragma: no cover
"""Skip `key`
:param key: uuid string
"""
raise InvalidOperationError("skip is not valid in FugueInteractiveCache")

def get(self, key: str) -> Tuple[bool, bool, Any]:
"""Try to get value for `key`
:param key: uuid string
:return: <hasvalue>, <skipped>, <value>
"""
with self._lock:
if key in self._data:
return True, False, self._data[key]
has_value, skipped, value = self._cache.get(key)
assert_or_throw(
not skipped,
InvalidOperationError("skip is not valid in FugueInteractiveCache"),
)
if has_value:
self._data[key] = value
return has_value, skipped, value
12 changes: 8 additions & 4 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pandas as pd
import pytest
from fugue.workflow.workflow import FugueWorkflow
from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame, PandasDataFrame
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.utils import _df_eq as df_eq
Expand All @@ -18,6 +17,7 @@
cotransformer,
transformer,
)
from fugue.workflow.workflow import FugueInteractiveWorkflow, FugueWorkflow
from triad.collections.fs import FileSystem


Expand All @@ -41,14 +41,18 @@ def make_engine(self) -> ExecutionEngine: # pragma: no cover
def dag(self) -> FugueWorkflow:
return FugueWorkflow(self.engine)

def test_workflows(self):
a = FugueWorkflow().df([[0]], "a:int")
df_eq(a.compute(self.engine), [[0]], "a:int")

a = FugueInteractiveWorkflow(self.engine).df([[0]], "a:int").persist()
df_eq(a.result, [[0]], "a:int")

def test_create_show(self):
with self.dag() as dag:
dag.df([[0]], "a:int").persist().partition(num=2).show()
dag.df(dag.df([[0]], "a:int")).persist().broadcast().show()

a = FugueWorkflow().df([[0]], "a:int")
df_eq(a.compute(self.engine), [[0]], "a:int")

def test_create_process_output(self):
with self.dag() as dag:
a = dag.create(mock_creator, params=dict(p=2))
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
author_email="[email protected]",
keywords="distributed spark dask sql dsl domain specific language",
url="http://github.com/fugue-project/fugue",
install_requires=["triad>=0.3.5", "adagio>=0.1.5", "sqlalchemy"],
install_requires=["triad>=0.3.5", "adagio>=0.1.6", "sqlalchemy"],
extras_require={
"spark": ["pyspark"],
"dask": ["dask[dataframe]", "cloudpickle>=1.4.0"],
Expand Down
2 changes: 1 addition & 1 deletion tests/fugue/dataframe/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_show():

s = " ".join(["x"] * 200)
df = ArrayDataFrame([[s, 1], ["b", "s"]], "a:str,b:str", metadata=dict(a=1, b=2))
df.show(n=1, show_count=True, title="abc")
df.show(rows=1, show_count=True, title="abc")


def test_lazy_schema():
Expand Down
Loading

0 comments on commit 08a5637

Please sign in to comment.