Skip to content

Commit

Permalink
Move soft dependencies into functions (#481)
Browse files Browse the repository at this point in the history
* Move soft dependencies to functions

* update

* update

* update

* update

* update

* update
  • Loading branch information
goodwanghan authored Jun 7, 2023
1 parent b92be71 commit e600dad
Show file tree
Hide file tree
Showing 25 changed files with 194 additions and 31 deletions.
27 changes: 22 additions & 5 deletions .github/workflows/test_all.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: All Tests & Lint
name: Full Tests

on:
push:
Expand All @@ -20,11 +20,12 @@ concurrency:
cancel-in-progress: true

jobs:
build:
all:
name: Tests & Lint
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.9]
python-version: [3.8]

steps:
- uses: actions/checkout@v2
Expand All @@ -35,12 +36,28 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Lint
if: matrix.python-version == 3.8
run: make lint
- name: Test
run: make test
- name: "Upload coverage to Codecov"
if: matrix.python-version == 3.8
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: false

no_spark:
name: Tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9, "3.10"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: make devenv
- name: Test
run: make testnospark
3 changes: 2 additions & 1 deletion .github/workflows/test_core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ concurrency:
cancel-in-progress: true

jobs:
build:
core-tests:
name: Tests
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/test_no_sql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Tests Excluding SQL Dependencies

on:
push:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'
pull_request:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
tests-no-sql:
name: Tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, "3.10"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Fix setuptools_scm
run: pip install "setuptools_scm<7"
- name: Save time
if: matrix.python-version == 3.7
run: pip install "pandas<1.3.0"
- name: Install dependencies
run: make devenv
- name: Install pandas 2
if: matrix.python-version == '3.10'
run: pip install "pandas>=2"
- name: Remove SQL dependencies
run: pip uninstall -y qpd fugue-sql-antlr sqlglot
- name: Test
run: make testnosql
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ lab:
test:
python3 -b -m pytest --reruns 2 --only-rerun 'Overflow in cast' --only-rerun 'Table or view not found' tests/

testnospark:
python3 -b -m pytest --ignore=tests/fugue_spark tests/

testcore:
python3 -b -m pytest tests/fugue

Expand All @@ -98,6 +101,9 @@ testdask:
testray:
python3 -b -m pytest tests/fugue_ray

testnosql:
python3 -b -m pytest --reruns 2 --only-rerun 'Table or view not found' tests/fugue tests/fugue_spark tests/fugue_dask tests/fugue_ray

testduck:
python3 -b -m pytest --reruns 2 --only-rerun 'Overflow in cast' tests/fugue_duckdb

Expand Down
15 changes: 14 additions & 1 deletion fugue/_utils/misc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Type, TypeVar
from typing import Any, Type, TypeVar

from triad.utils.assertion import assert_or_throw

Expand All @@ -13,3 +13,16 @@ def get_attribute(obj: object, attr_name: str, data_type: Type[T]) -> T:
lambda: TypeError(f"{obj.__dict__[attr_name]} is not type {data_type}"),
)
return obj.__dict__[attr_name]


def import_or_throw(package_name: str, message: str) -> Any:
try:
return __import__(package_name)
except Exception as e: # pragma: no cover
raise ImportError(str(e) + ". " + message)


def import_fsql_dependency(package_name: str) -> Any:
return import_or_throw(
package_name, "Please try to install the package by `pip install fugue[sql]`."
)
4 changes: 3 additions & 1 deletion fugue/collections/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from triad import to_uuid

from fugue._utils.registry import fugue_plugin
import sqlglot
from fugue._utils.misc import import_fsql_dependency

_TEMP_TABLE_EXPR_PREFIX = "<tmpdf:"
_TEMP_TABLE_EXPR_SUFFIX = ">"
Expand Down Expand Up @@ -38,6 +38,8 @@ def transpile_sql(
and to_dialect is not None
and from_dialect != to_dialect
):
sqlglot = import_fsql_dependency("sqlglot")

return " ".join(sqlglot.transpile(raw, read=from_dialect, write=to_dialect))
else:
return raw
Expand Down
8 changes: 5 additions & 3 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from typing import Any, Callable, Dict, List, Optional, Type, Union

import pandas as pd
from qpd_pandas import run_sql_on_pandas
from qpd_pandas.engine import PandasUtils
from triad import Schema
from triad.collections.dict import IndexedOrderedDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw
from triad.utils.pandas_like import PandasUtils

from fugue._utils.io import load_df, save_df
from fugue._utils.misc import import_fsql_dependency
from fugue.collections.partition import (
PartitionCursor,
PartitionSpec,
Expand Down Expand Up @@ -55,10 +55,12 @@ def is_distributed(self) -> bool:
return False

def select(self, dfs: DataFrames, statement: StructuredRawSQL) -> DataFrame:
qpd_pandas = import_fsql_dependency("qpd_pandas")

_dfs, _sql = self.encode(dfs, statement)
_dd = {k: self.to_df(v).as_pandas() for k, v in _dfs.items()} # type: ignore

df = run_sql_on_pandas(_sql, _dd, ignore_case=True)
df = qpd_pandas.run_sql_on_pandas(_sql, _dd, ignore_case=True)
return self.to_df(df)


Expand Down
5 changes: 3 additions & 2 deletions fugue/sql/_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import re
from typing import Any, Dict, Optional

import jinja2
from jinja2 import Template
from triad import assert_or_throw

from ..collections.yielded import Yielded
Expand All @@ -18,6 +16,9 @@ def fill_sql_template(sql: str, params: Dict[str, Any]):
:param sql: jinja compatible template
:param params: params to be inserted into template
"""
import jinja2
from jinja2 import Template

try:
if "self" in params:
params = {k: v for k, v in params.items() if k != "self"}
Expand Down
9 changes: 6 additions & 3 deletions fugue/sql/workflow.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from builtins import isinstance
from typing import Any, Dict, Tuple

from fugue_sql_antlr import FugueSQLParser
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars

from fugue._utils.misc import import_fsql_dependency

from ..collections.yielded import Yielded
from ..constants import FUGUE_CONF_SQL_DIALECT, FUGUE_CONF_SQL_IGNORE_CASE
from ..dataframe.api import is_df
from ..dataframe.dataframe import DataFrame
from ..workflow.workflow import FugueWorkflow, WorkflowDataFrame, WorkflowDataFrames
from ._utils import LazyWorkflowDataFrame, fill_sql_template
from ._visitors import FugueSQLHooks, _Extensions


class FugueSQLWorkflow(FugueWorkflow):
Expand All @@ -37,6 +36,10 @@ def __call__(self, code: str, *args: Any, **kwargs: Any) -> None:
def _sql(
self, code: str, *args: Any, **kwargs: Any
) -> Dict[str, Tuple[WorkflowDataFrame, WorkflowDataFrames, LazyWorkflowDataFrame]]:
FugueSQLParser = import_fsql_dependency("fugue_sql_antlr").FugueSQLParser

from ._visitors import FugueSQLHooks, _Extensions

# TODO: move dict construction to triad
params: Dict[str, Any] = {}
for a in args:
Expand Down
7 changes: 5 additions & 2 deletions fugue_dask/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas
import pyarrow as pa
from dask.distributed import Client, get_client
from qpd_dask.engine import DaskUtils as DaskUtilsBase
from triad.utils.pandas_like import PandasLikeUtils
from triad.utils.pyarrow import to_pandas_dtype, to_single_pandas_dtype

import fugue.api as fa
Expand All @@ -23,7 +23,10 @@ def get_default_partitions() -> int:
return n if n > 0 else fa.get_current_parallelism() * 2


class DaskUtils(DaskUtilsBase):
class DaskUtils(PandasLikeUtils[pd.DataFrame, pd.Series]):
def concat_dfs(self, *dfs: pd.DataFrame) -> pd.DataFrame:
return pd.concat(list(dfs))

def get_or_create_client(self, client: Optional[Client] = None):
if client is not None:
return client
Expand Down
6 changes: 4 additions & 2 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import dask.dataframe as dd
import pandas as pd
from distributed import Client
from qpd_dask import run_sql_on_dask
from triad.collections import Schema
from triad.collections.dict import IndexedOrderedDict, ParamDict
from triad.collections.fs import FileSystem
Expand All @@ -14,6 +13,7 @@
from triad.utils.threading import RunOnce

from fugue import StructuredRawSQL
from fugue._utils.misc import import_fsql_dependency
from fugue.collections.partition import (
PartitionCursor,
PartitionSpec,
Expand Down Expand Up @@ -53,9 +53,11 @@ def is_distributed(self) -> bool:
return True

def select(self, dfs: DataFrames, statement: StructuredRawSQL) -> DataFrame:
qpd_dask = import_fsql_dependency("qpd_dask")

_dfs, _sql = self.encode(dfs, statement)
dask_dfs = {k: self.to_df(v).native for k, v in _dfs.items()} # type: ignore
df = run_sql_on_dask(_sql, dask_dfs, ignore_case=True)
df = qpd_dask.run_sql_on_dask(_sql, dask_dfs, ignore_case=True)
return DaskDataFrame(df)


Expand Down
13 changes: 13 additions & 0 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# pylint: disable-all
try:
import qpd_pandas # noqa: F401

HAS_QPD = True
except ImportError: # pragma: no cover
HAS_QPD = False

import datetime
import os
Expand Down Expand Up @@ -802,6 +808,7 @@ def test_join(self):
).assert_eq(d)
dag.run(self.engine)

@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_df_select(self):
with FugueWorkflow() as dag:
# wildcard
Expand Down Expand Up @@ -853,13 +860,15 @@ def test_df_select(self):
dag.select("select * from", a).assert_eq(b)
dag.run(self.engine, {"fugue.sql.compile.ignore_case": True})

@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_df_filter(self):
with FugueWorkflow() as dag:
a = dag.df([[1, 10], [2, 20], [3, 30]], "x:int,y:int")
b = dag.df([[2, 20]], "x:int,y:int")
a.filter((col("y") > 15) & (col("y") < 25)).assert_eq(b)
dag.run(self.engine)

@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_df_assign(self):
with FugueWorkflow() as dag:
a = dag.df([[1, 10], [2, 20], [3, 30]], "x:int,y:int")
Expand All @@ -873,6 +882,7 @@ def test_df_assign(self):
a.assign(lit("x").alias("y"), z=(col("y") + 1).cast(float)).assert_eq(b)
dag.run(self.engine)

@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_aggregate(self):
with FugueWorkflow() as dag:
a = dag.df([[1, 10], [1, 200], [3, 30]], "x:int,y:int")
Expand All @@ -886,6 +896,7 @@ def test_aggregate(self):
).assert_eq(c)
dag.run(self.engine)

@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_select(self):
class MockEngine(QPDPandasEngine):
def __init__(self, execution_engine, p: int = 0):
Expand Down Expand Up @@ -1630,6 +1641,7 @@ def t5(df: pd.DataFrame, c: Callable) -> List[List[Any]]:

assert 4 == cb3.n

@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_sql_api(self):
def tr(df: pd.DataFrame, n=1) -> pd.DataFrame:
return df + n
Expand Down Expand Up @@ -1676,6 +1688,7 @@ def tr(df: pd.DataFrame, n=1) -> pd.DataFrame:
assert fa.is_local(sdf4)

@pytest.mark.skipif(os.name == "nt", reason="Skip Windows")
@pytest.mark.skipif(not HAS_QPD, reason="qpd not working")
def test_any_column_name(self):

f_parquet = os.path.join(str(self.tmpdir), "a.parquet")
Expand Down
Loading

0 comments on commit e600dad

Please sign in to comment.