Skip to content

Commit

Permalink
Remove ABC from Transformers (#29)
Browse files Browse the repository at this point in the history
* setup fugue sql

* SQL: add basic extensions and tests

* update

* update

* clean up sql files

* fix syntax, add save load

* add test for load

* FugueSQLWorkflow

* update version

* Add pandas udf support, add SQL persist broadcast

* update

* update

* update

* update

* update

* make transformer schema more flexible

* Remove ABC from Transformers
  • Loading branch information
Han Wang authored Jun 13, 2020
1 parent b709814 commit f62f918
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flake8: noqa

__version__ = "0.2.7"
__version__ = "0.2.8"

from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.dataframe.array_dataframe import ArrayDataFrame
Expand Down
23 changes: 14 additions & 9 deletions fugue/extensions/transformer/transformer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from abc import ABC, abstractmethod
from typing import Any

from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame
from fugue.extensions.context import ExtensionContext


class Transformer(ExtensionContext, ABC):
class Transformer(ExtensionContext):
"""The interface to process one physical partition of dataframe on one machine.
A dataframe such as SparkDataFrame can be distributed. But this one is to tell
the system how to process each partition locally.
Expand All @@ -29,9 +28,13 @@ class Transformer(ExtensionContext, ABC):
interface? Do you know the interfaceless feature of Fugue? Commonly, if you don't
need to implement `on_init`, you can choose the
interfaceless approach which may decouple your code from Fugue.
Due to similar issue on
`spark pickling ABC objects <https://github.com/cloudpipe/cloudpickle/issues/305>`_.
This class is not ABC. If you encounter the similar issue, possible solution
`here <https://github.com/cloudpipe/cloudpickle/issues/305#issuecomment-529246171>`_
"""

@abstractmethod
def get_output_schema(self, df: DataFrame) -> Any: # pragma: no cover
"""Generate the output schema on the driver side.
Expand All @@ -46,7 +49,7 @@ def get_output_schema(self, df: DataFrame) -> Any: # pragma: no cover
:param df: the entire dataframe you are going to transform.
:return: Schema like object, should not be None or empty
"""
return None
raise NotImplementedError

def on_init(self, df: DataFrame) -> None: # pragma: no cover
"""Initialize physical partition that contains one or multiple logical partitions.
Expand All @@ -67,7 +70,6 @@ def on_init(self, df: DataFrame) -> None: # pragma: no cover
"""
pass

@abstractmethod
def transform(self, df: LocalDataFrame) -> LocalDataFrame: # pragma: no cover
"""Custom logic to transform from one local dataframe to another local dataframe.
Expand All @@ -85,7 +87,7 @@ def transform(self, df: LocalDataFrame) -> LocalDataFrame: # pragma: no cover
raise NotImplementedError


class CoTransformer(ExtensionContext, ABC):
class CoTransformer(ExtensionContext):
"""The interface to process one physical partition of cogrouped dataframes on one
machine. A dataframe such as SparkDataFrame can be distributed. But this one is to
tell the system how to process each partition locally.
Expand All @@ -101,9 +103,13 @@ class CoTransformer(ExtensionContext, ABC):
interface? Do you know the interfaceless feature of Fugue? Commonly, if you don't
need to implement `on_init`, you can choose the
interfaceless approach which may decouple your code from Fugue.
Due to similar issue on
`spark pickling ABC objects <https://github.com/cloudpipe/cloudpickle/issues/305>`_.
This class is not ABC. If you encounter the similar issue, possible solution
`here <https://github.com/cloudpipe/cloudpickle/issues/305#issuecomment-529246171>`_
"""

@abstractmethod
def get_output_schema(self, dfs: DataFrames) -> Any: # pragma: no cover
"""Generate the output schema on the driver side.
Expand All @@ -118,7 +124,7 @@ def get_output_schema(self, dfs: DataFrames) -> Any: # pragma: no cover
:param dfs: the collection of dataframes you are going to transform.
:return: Schema like object, should not be None or empty
"""
return None
raise NotImplementedError

def on_init(self, dfs: DataFrames) -> None: # pragma: no cover
"""Initialize physical partition that contains one or multiple logical partitions.
Expand All @@ -137,7 +143,6 @@ def on_init(self, dfs: DataFrames) -> None: # pragma: no cover
"""
pass

@abstractmethod
def transform(self, dfs: DataFrames) -> LocalDataFrame: # pragma: no cover
"""Custom logic to transform from one local dataframe to another local dataframe.
Expand Down

0 comments on commit f62f918

Please sign in to comment.