Skip to content

Commit

Permalink
fix select bug, add col ops (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Han Wang authored May 9, 2020
1 parent d75f6b0 commit ec02cb3
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 22 deletions.
9 changes: 8 additions & 1 deletion fugue/builtins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# flake8: noqa
from fugue.builtins.outputters import Show, AssertEqual
from fugue.builtins.creators import CreateData
from fugue.builtins.processors import RunJoin, RunTransformer, RunSQLSelect
from fugue.builtins.processors import (
RunJoin,
RunTransformer,
RunSQLSelect,
Rename,
DropColumns,
SelectColumns,
)
26 changes: 26 additions & 0 deletions fugue/builtins/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
IterableDataFrame,
to_local_bounded_df,
)
from fugue.exceptions import FugueWorkflowError
from fugue.execution import SQLEngine
from fugue.processor import Processor
from fugue.transformer import Transformer, to_transformer
from triad.collections import ParamDict
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_instance, to_type
from triad.utils.iter import EmptyAwareIterable

Expand Down Expand Up @@ -59,6 +61,30 @@ def process(self, dfs: DataFrames) -> DataFrame:
return engine.select(dfs, statement)


class Rename(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
columns = self.params.get_or_throw("columns", dict)
return dfs[0].rename(columns)


class DropColumns(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
if_exists = self.params.get("if_exists", False)
columns = self.params.get_or_throw("columns", list)
if if_exists:
columns = set(columns).intersection(dfs[0].schema.keys())
return dfs[0].drop(list(columns))


class SelectColumns(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
columns = self.params.get_or_throw("columns", list)
return dfs[0][columns]


class _TransformerRunner(object):
def __init__(
self, df: DataFrame, transformer: Transformer, ignore_errors: List[type]
Expand Down
20 changes: 16 additions & 4 deletions fugue/dag/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from abc import ABC, abstractmethod
from typing import Any, Dict, no_type_check
from typing import Any, Dict, no_type_check, Optional, List

from adagio.instances import TaskContext
from adagio.specs import InputSpec, OutputSpec, TaskSpec
Expand All @@ -25,14 +25,22 @@ def __init__(
params: Any = None,
deterministic: bool = True,
lazy: bool = False,
input_names: Optional[List[str]] = None,
):
assert_or_throw(
output_n <= 1, # TODO: for now we don't support multi output
NotImplementedError("Fugue doesn't support multiple output tasks"),
)
inputs = [
InputSpec("_" + str(i), DataFrame, nullable=False) for i in range(input_n)
]
if input_names is None:
inputs = [
InputSpec("_" + str(i), DataFrame, nullable=False)
for i in range(input_n)
]
else:
inputs = [
InputSpec(input_names[i], DataFrame, nullable=False)
for i in range(input_n)
]
outputs = [
OutputSpec("_" + str(i), DataFrame, nullable=False) for i in range(output_n)
]
Expand Down Expand Up @@ -143,6 +151,7 @@ def __init__(
pre_partition: Any = None,
deterministic: bool = True,
lazy: bool = False,
input_names: Optional[List[str]] = None,
):
self._processor = to_processor(processor, schema)
self._processor._params = ParamDict(params)
Expand All @@ -155,6 +164,7 @@ def __init__(
output_n=1,
deterministic=deterministic,
lazy=lazy,
input_names=input_names,
)

@no_type_check
Expand All @@ -176,6 +186,7 @@ def __init__(
pre_partition: Any = None,
deterministic: bool = True,
lazy: bool = False,
input_names: Optional[List[str]] = None,
):
assert_or_throw(input_n > 0, FugueWorkflowError("must have at least one input"))
self._outputter = to_outputter(outputter)
Expand All @@ -188,6 +199,7 @@ def __init__(
input_n=input_n,
deterministic=deterministic,
lazy=lazy,
input_names=input_names,
)

@no_type_check
Expand Down
46 changes: 37 additions & 9 deletions fugue/dag/workflow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from typing import Any, Dict, Iterable, List, Optional, TypeVar

from adagio.specs import WorkflowSpec
from fugue.builtins import AssertEqual, CreateData, RunJoin, RunTransformer, Show
from fugue.builtins.processors import RunSQLSelect
from fugue.builtins import (
AssertEqual,
CreateData,
DropColumns,
Rename,
RunJoin,
RunSQLSelect,
RunTransformer,
SelectColumns,
Show,
)
from fugue.collections.partition import PartitionSpec
from fugue.dag.tasks import Create, FugueTask, Output, Process
from fugue.dataframe import DataFrame
Expand Down Expand Up @@ -107,8 +116,30 @@ def partition(self: TDF, *args, **kwargs) -> TDF:
def to_self_type(self: TDF, df: "WorkflowDataFrame") -> TDF:
return df # type: ignore

def drop( # type: ignore
self: TDF, columns: Dict[str, str], if_exists: bool = False
) -> TDF:
df = self.workflow.process(
self, using=DropColumns, params=dict(columns=columns, if_exists=if_exists)
)
return self.to_self_type(df)

def rename(self: TDF, *args: Any, **kwargs: Any) -> TDF:
m: Dict[str, str] = {}
for a in args:
m.update(a)
m.update(kwargs)
df = self.workflow.process(self, using=Rename, params=dict(columns=m))
return self.to_self_type(df)

def __getitem__(self: TDF, columns: List[Any]) -> TDF:
df = self.workflow.process(
self, using=SelectColumns, params=dict(columns=columns)
)
return self.to_self_type(df)

@property
def schema(self) -> Schema:
def schema(self) -> Schema: # pragma: no cover
raise NotImplementedError(f"WorkflowDataFrame does not support this method")

@property
Expand Down Expand Up @@ -142,9 +173,6 @@ def as_array_iterable(
) -> Iterable[Any]: # pragma: no cover
raise NotImplementedError(f"WorkflowDataFrame does not support this method")

def drop(self, cols: List[str]) -> "DataFrame": # pragma: no cover
raise NotImplementedError(f"WorkflowDataFrame does not support this method")


class FugueWorkflow(object):
def __init__(self, execution_engine: ExecutionEngine):
Expand Down Expand Up @@ -179,6 +207,7 @@ def process(
schema=schema,
params=params,
pre_partition=pre_partition,
input_names=None if not dfs.has_key else list(dfs.keys()),
)
if dfs.has_key:
return self.add(task, **dfs)
Expand All @@ -195,6 +224,7 @@ def output(
outputter=using,
params=params,
pre_partition=pre_partition,
input_names=None if not dfs.has_key else list(dfs.keys()),
)
if dfs.has_key:
self.add(task, **dfs)
Expand Down Expand Up @@ -247,9 +277,7 @@ def select(self, *statements: Any, sql_engine: Any = None) -> WorkflowDataFrame:
if not sql.upper().startswith("SELECT"):
sql = "SELECT " + sql
return self.process(
self._to_dfs(dfs),
using=RunSQLSelect,
params=dict(statement=sql, sql_engine=sql_engine),
dfs, using=RunSQLSelect, params=dict(statement=sql, sql_engine=sql_engine)
)

def assert_eq(self, *dfs: Any, **params: Any) -> None:
Expand Down
7 changes: 5 additions & 2 deletions fugue/dataframe/array_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any, Iterable, List, Optional

from typing import Any, Dict, Iterable, List, Optional
from fugue.dataframe.dataframe import (
DataFrame,
LocalBoundedDataFrame,
Expand Down Expand Up @@ -54,6 +53,10 @@ def drop(self, cols: List[str]) -> DataFrame:
raise InvalidOperationError("Can't remove all columns of a dataframe")
return ArrayDataFrame(self, schema)

def rename(self, columns: Dict[str, str]) -> "DataFrame":
schema = self.schema.rename(columns)
return ArrayDataFrame(self.native, schema)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
Expand Down
8 changes: 8 additions & 0 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def as_array_iterable(
def drop(self, cols: List[str]) -> "DataFrame": # pragma: no cover
raise NotImplementedError

@abstractmethod
def rename(self, columns: Dict[str, str]) -> "DataFrame": # pragma: no cover
raise NotImplementedError

def __getitem__(self, keys: List[Any]) -> "DataFrame":
cols = list((self.schema - keys).keys())
return self.drop(cols)

def show(
self,
n: int = 10,
Expand Down
6 changes: 5 additions & 1 deletion fugue/dataframe/iterable_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional

from fugue.dataframe.dataframe import (
DataFrame,
Expand Down Expand Up @@ -54,6 +54,10 @@ def drop(self, cols: List[str]) -> DataFrame:
raise InvalidOperationError("Can't remove all columns of a dataframe")
return IterableDataFrame(self, schema)

def rename(self, columns: Dict[str, str]) -> "DataFrame":
schema = self.schema.rename(columns)
return IterableDataFrame(self.native, schema)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
Expand Down
7 changes: 6 additions & 1 deletion fugue/dataframe/pandas_dataframes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -71,6 +71,11 @@ def drop(self, cols: List[str]) -> DataFrame:
raise InvalidOperationError("Can't remove all columns of a dataframe")
return PandasDataFrame(self.native.drop(cols, axis=1), schema)

def rename(self, columns: Dict[str, str]) -> "DataFrame":
df = self.native.rename(columns=columns)
schema = self.schema.rename(columns)
return PandasDataFrame(df, schema)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
Expand Down
20 changes: 20 additions & 0 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_select(self):
with self.dag() as dag:
a = dag.df([[1, 10], [2, 20], [3, 30]], "x:long,y:long")
b = dag.df([[2, 20, 40], [3, 30, 90]], "x:long,y:long,z:long")
dag.select("* FROM", a).assert_eq(a)
dag.select("SELECT *,x*y AS z FROM", a, "WHERE x>=2").assert_eq(b)

c = ArrayDataFrame([[2, 20, 40], [3, 30, 90]], "x:long,y:long,zb:long")
Expand Down Expand Up @@ -125,6 +126,25 @@ def test_select(self):
# no input
dag.select("1 AS a").assert_eq(ArrayDataFrame([[1]], "a:long"))

# make sure transform -> select works
b = a.transform(mock_tf1)
a = a.transform(mock_tf1)
aa = dag.select("* FROM", a)
dag.select("* FROM", b).assert_eq(aa)

def test_col_ops(self):
with self.dag() as dag:
a = dag.df([[1, 10], [2, 20]], "x:long,y:long")
aa = dag.df([[1, 10], [2, 20]], "xx:long,y:long")
a.rename({"x": "xx"}).assert_eq(aa)
a[["x"]].assert_eq(ArrayDataFrame([[1], [2]], "x:long"))

a.drop(["y", "yy"], if_exists=True).assert_eq(
ArrayDataFrame([[1], [2]], "x:long")
)

a[["x"]].rename(x="xx").assert_eq(ArrayDataFrame([[1], [2]], "xx:long"))


class DagTester(FugueWorkflow):
def __init__(self, engine: ExecutionEngine):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = "0.1.0"
VERSION = "0.1.1"

with open("README.md") as f:
LONG_DESCRIPTION = f.read()
Expand All @@ -17,7 +17,7 @@
author_email="[email protected]",
keywords="distributed spark sql",
url="http://github.com/goodwanghan/fugue",
install_requires=["triad>=0.2.0", "adagio>=0.1.2", "fs", "sqlalchemy"],
install_requires=["triad>=0.2.1", "adagio>=0.1.2", "fs", "sqlalchemy"],
extras_require={},
classifiers=[
# "3 - Alpha", "4 - Beta" or "5 - Production/Stable"
Expand Down
17 changes: 16 additions & 1 deletion tests/fugue/dataframe/test_array_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import numpy as np
import pandas as pd
from pytest import raises
from fugue.dataframe import ArrayDataFrame, PandasDataFrame
from fugue.dataframe.utils import _df_eq as df_eq
from pytest import raises
from triad.collections.schema import Schema, SchemaError
from triad.exceptions import InvalidOperationError

Expand Down Expand Up @@ -107,6 +108,20 @@ def test_drop():
raises(InvalidOperationError, lambda: df.drop(["x"])) # cols must exist
assert [[1]] == df.as_array(type_safe=True)

df = ArrayDataFrame([["a", 1, 2]], "a:str,b:int,c:int")
df_eq(df[["a", "c"]], [["a", 2]], "a:str,c:int")
assert isinstance(df[["a", "c"]], ArrayDataFrame)

with raises(SchemaError):
df[["a", "x"]]


def test_rename():
df = ArrayDataFrame([["a", 1]], "a:str,b:int")
df2 = df.rename(columns=dict(a="aa"))
df_eq(df2, [["a", 1]], "aa:str,b:int", throw=True)
df_eq(df, [["a", 1]], "a:str,b:int", throw=True)


def test_as_array():
df = ArrayDataFrame([], "a:str,b:int")
Expand Down
8 changes: 8 additions & 0 deletions tests/fugue/dataframe/test_iterable_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fugue.dataframe import IterableDataFrame, PandasDataFrame
from triad.collections.schema import Schema, SchemaError
from triad.exceptions import InvalidOperationError
from fugue.dataframe.utils import _df_eq as df_eq


def test_init():
Expand Down Expand Up @@ -108,6 +109,13 @@ def test_drop():
assert [[1]] == df.as_array(type_safe=True)


def test_rename():
df = IterableDataFrame([["a", 1]], "a:str,b:int")
df2 = df.rename(columns=dict(a="aa"))
assert isinstance(df, IterableDataFrame)
df_eq(df2, [["a", 1]], "aa:str,b:int", throw=True)


def test_as_array():
df = IterableDataFrame([], "a:str,b:int")
assert [] == df.as_array()
Expand Down
Loading

0 comments on commit ec02cb3

Please sign in to comment.