Skip to content

Commit

Permalink
FugueSQLWorkflow (#26)
Browse files Browse the repository at this point in the history
* setup fugue sql

* SQL: add basic extensions and tests

* update

* update

* clean up sql files

* fix syntax, add save load

* add test for load

* FugueSQLWorkflow

* update version
  • Loading branch information
Han Wang authored Jun 10, 2020
1 parent 4c61142 commit a55dc33
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 332 deletions.
2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.2"
__version__ = "0.2.5"
3 changes: 3 additions & 0 deletions fugue/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# flake8: noqa

from fugue.workflow.workflow import FugueWorkflow, WorkflowDataFrame
19 changes: 12 additions & 7 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@

from adagio.specs import WorkflowSpec
from fugue.collections.partition import PartitionSpec
from fugue.workflow.tasks import Create, FugueTask, Output, Process
from fugue.workflow.workflow_context import (
FugueInteractiveWorkflowContext,
FugueWorkflowContext,
)
from fugue.dataframe import DataFrame
from fugue.dataframe.dataframes import DataFrames
from fugue.exceptions import FugueWorkflowError
from fugue.extensions.builtins import (
AssertEqual,
CreateData,
Expand All @@ -24,10 +20,15 @@
Show,
Zip,
)
from fugue.extensions.transformer.convert import to_transformer
from fugue.workflow.tasks import Create, FugueTask, Output, Process
from fugue.workflow.workflow_context import (
FugueInteractiveWorkflowContext,
FugueWorkflowContext,
)
from triad.collections import Schema
from triad.collections.dict import ParamDict
from triad.utils.assertion import assert_or_throw
from fugue.exceptions import FugueWorkflowError
from fugue.extensions.transformer.convert import to_transformer

_DEFAULT_IGNORE_ERRORS: List[Any] = []

Expand Down Expand Up @@ -286,6 +287,10 @@ def __init__(self, *args: Any, **kwargs: Any):
self._workflow_ctx = self._to_ctx(*args, **kwargs)
self._computed = False

@property
def conf(self) -> ParamDict:
return self._workflow_ctx.conf

def spec_uuid(self) -> str:
return self._spec.__uuid__()

Expand Down
6 changes: 6 additions & 0 deletions fugue_sql/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Dict, Any

FUGUE_SQL_DEFAULT_CONF: Dict[str, Any] = {
"fugue.sql.compile.ignore_case": False,
"fugue.sql.compile.simple_assign": True,
}
8 changes: 8 additions & 0 deletions fugue_sql/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from jinjasql import JinjaSql
from typing import Dict, Any


def fill_sql_template(sql: str, params: Dict[str, Any]):
sql = sql.replace("%", "%%")
query, bind = JinjaSql(param_style="pyformat").prepare_query(sql, params)
return query % bind
35 changes: 31 additions & 4 deletions fugue_sql/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
from triad.utils.pyarrow import to_pa_datatype


class FugueSQLHooks(object):
def on_select_source_not_found(
self, workflow: FugueWorkflow, name: str
) -> Union[WorkflowDataFrame, str]:
return name


class _VisitorBase(FugueSQLVisitor):
def __init__(self, sql: FugueSQL):
self._sql = sql
Expand Down Expand Up @@ -199,6 +206,7 @@ class _Extensions(_VisitorBase):
def __init__(
self,
sql: FugueSQL,
hooks: FugueSQLHooks,
workflow: FugueWorkflow,
variables: Optional[Dict[str, WorkflowDataFrame]] = None,
last: Optional[WorkflowDataFrame] = None,
Expand All @@ -209,11 +217,16 @@ def __init__(
if variables is not None:
self._variables.update(variables)
self._last: Optional[WorkflowDataFrame] = last
self._hooks = hooks

@property
def workflow(self) -> FugueWorkflow:
return self._workflow

@property
def hooks(self) -> FugueSQLHooks:
return self._hooks

@property
def variables(self) -> Dict[str, WorkflowDataFrame]:
return self._variables
Expand All @@ -233,7 +246,11 @@ def visitFugueDataFrameNested(
self, ctx: fp.FugueDataFrameNestedContext
) -> WorkflowDataFrame:
sub = _Extensions(
self.sql, workflow=self.workflow, variables=self.variables, last=self._last
self.sql,
self.hooks,
workflow=self.workflow,
variables=self.variables,
last=self._last,
)
sub.visit(ctx.task)
return sub.last
Expand Down Expand Up @@ -380,13 +397,19 @@ def visitQuery(self, ctx: fp.QueryContext) -> Iterable[Any]:
def visitTableName(self, ctx: fp.TableNameContext) -> Iterable[Any]:
table_name = self.ctxToStr(ctx.multipartIdentifier(), delimit="")
if table_name not in self.variables:
yield table_name
table: Any = self.hooks.on_select_source_not_found(
self.workflow, table_name
)
else:
table = self.variables[table_name]
if isinstance(table, str):
yield table
for x in self._get_query_elements(ctx.sample()):
yield x
for x in self._get_query_elements(ctx.tableAlias()):
yield x
else:
yield self.variables[table_name]
yield table
for x in self._get_query_elements(ctx.sample()):
yield x
if ctx.tableAlias().strictIdentifier() is not None:
Expand All @@ -400,7 +423,11 @@ def visitAliasedFugueNested(
self, ctx: fp.AliasedFugueNestedContext
) -> Iterable[Any]:
sub = _Extensions(
self.sql, workflow=self.workflow, variables=self.variables, last=self._last
self.sql,
self.hooks,
workflow=self.workflow,
variables=self.variables,
last=self._last,
)
sub.visit(ctx.fugueNestableTaskNoSelect())
yield sub.last
Expand Down
55 changes: 55 additions & 0 deletions fugue_sql/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import inspect
from typing import Any, Dict

from fugue.workflow import FugueWorkflow, WorkflowDataFrame
from fugue_sql.constants import FUGUE_SQL_DEFAULT_CONF
from fugue_sql.parse import FugueSQL
from fugue_sql.utils import fill_sql_template
from fugue_sql.visitors import FugueSQLHooks, _Extensions
from triad.collections.dict import ParamDict
from triad.utils.assertion import assert_or_throw


class FugueSQLWorkflow(FugueWorkflow):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self._sql_vars: Dict[str, WorkflowDataFrame] = {}
self._sql_conf = ParamDict({**FUGUE_SQL_DEFAULT_CONF, **super().conf})

@property
def conf(self) -> ParamDict:
return self._sql_conf

def __getitem__(self, key: str) -> WorkflowDataFrame:
return self._sql_vars[key]

def __call__(self, code: str, *args: Any, **kwargs: Any):
cf = inspect.currentframe()
local_vars: Dict[str, Any] = {}
if cf is not None and cf.f_back is not None:
local_vars = cf.f_back.f_locals
variables = self.sql(code, self._sql_vars, local_vars, *args, **kwargs)
if cf is not None:
for k, v in variables.items():
if isinstance(v, WorkflowDataFrame):
self._sql_vars[k] = v

def sql(self, code: str, *args: Any, **kwargs: Any) -> Dict[str, WorkflowDataFrame]:
params: Dict[str, Any] = {}
for a in args:
assert_or_throw(isinstance(a, Dict), f"args can only have dict: {a}")
params.update(a)
params.update(kwargs)
code = fill_sql_template(code, params)
sql = FugueSQL(
code,
"fugueLanguage",
ignore_case=self.conf.get_or_throw("fugue.sql.compile.ignore_case", bool),
simple_assign=self.conf.get_or_throw(
"fugue.sql.compile.simple_assign", bool
),
)
dfs = {k: v for k, v in params.items() if isinstance(v, WorkflowDataFrame)}
v = _Extensions(sql, FugueSQLHooks(), self, dfs)
v.visit(sql.tree)
return v.variables
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
url="http://github.com/fugue-project/fugue",
install_requires=["triad>=0.3.5", "adagio>=0.1.6", "sqlalchemy"],
extras_require={
"sql": ["antlr4-python3-runtime"],
"sql": ["antlr4-python3-runtime", "jinjasql"],
"spark": ["pyspark"],
"dask": ["dask[dataframe]", "cloudpickle>=1.4.0"],
},
Expand Down
11 changes: 11 additions & 0 deletions tests/fugue_sql/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from fugue_sql.utils import fill_sql_template


def test_fill_sql_template():
data = {"a": 1, "b": "x"}
assert "a=select " == fill_sql_template("a=select ", data)
assert "1x1" == fill_sql_template("{{a}}{{b}}{{a}}", data)
assert "" == fill_sql_template("", data)
assert "%s" == fill_sql_template("%s", data)
assert "%%s" == fill_sql_template("%%s", data)
assert "1%%sx1" == fill_sql_template("{{a}}%%s{{b}}{{a}}", data)
Loading

0 comments on commit a55dc33

Please sign in to comment.