Skip to content

Commit

Permalink
Fix duckdb compatibility issues (#472)
Browse files Browse the repository at this point in the history
* Fix duckdb compatibility issues

* update

* update
  • Loading branch information
goodwanghan authored May 24, 2023
1 parent 5598175 commit 1c3bef7
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 68 deletions.
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.8.4

- [471](https://github.com/fugue-project/fugue/issues/471) Fix compatibility issues for duckdb 0.8.0+
- [466](https://github.com/fugue-project/fugue/issues/466) Fix Ray 2.4.0 compatibility issue
- [464](https://github.com/fugue-project/fugue/issues/464) Support for spark/databricks connect
- [459](https://github.com/fugue-project/fugue/issues/459) DEPRECATION: Avro support
- [455](https://github.com/fugue-project/fugue/issues/455) Make Fugue pandas 2 compatible

## 0.8.3
Expand Down
9 changes: 7 additions & 2 deletions fugue_duckdb/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
"TIME": pa.time32("ms"),
}

_PA_TYPES_TO_DUCK: Dict[pa.DataType, str] = {v: k for k, v in _DUCK_TYPES_TO_PA.items()}
_PA_TYPES_TO_DUCK: Dict[pa.DataType, str] = {
v: k
for k, v in list(_DUCK_TYPES_TO_PA.items())
+ [("VARCHAR", pa.large_string()), ("BLOB", pa.large_binary())]
}


def encode_column_name(name: str) -> str:
Expand Down Expand Up @@ -94,8 +98,9 @@ def to_duck_type(tp: pa.DataType) -> str:
raise ValueError(f"can't convert {tp} to DuckDB data type")


def to_pa_type(duck_type: str) -> pa.DataType:
def to_pa_type(duck_type_raw: Any) -> pa.DataType:
try:
duck_type = str(duck_type_raw) # for duckdb >= 0.8.0
if duck_type.endswith("[]"):
return pa.list_(to_pa_type(duck_type[:-2]))
p = duck_type.find("(")
Expand Down
2 changes: 1 addition & 1 deletion fugue_duckdb/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def to_df(self, df: Any, schema: Any = None) -> DuckDataFrame:
res = DuckDataFrame(self.connection.from_df(ddf.as_pandas()))
else:
res = DuckDataFrame(
duckdb.arrow(ddf.as_arrow(), connection=self.connection)
duckdb.from_arrow(ddf.as_arrow(), connection=self.connection)
)
if ddf.has_metadata: # pragma: no cover
res.reset_metadata(ddf.metadata)
Expand Down
27 changes: 17 additions & 10 deletions fugue_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import pyarrow as pa
from duckdb import DuckDBPyRelation
from triad import Schema
from triad.utils.pyarrow import LARGE_TYPES_REPLACEMENT, replace_types_in_table

from fugue import ArrayDataFrame, ArrowDataFrame, DataFrame, LocalBoundedDataFrame
from fugue.exceptions import FugueDataFrameOperationError, FugueDatasetEmptyError
from fugue.plugins import (
as_arrow,
as_fugue_dataset,
as_local_bounded,
get_column_names,
get_num_partitions,
get_schema,
is_df,
)

Expand All @@ -26,15 +29,7 @@ class DuckDataFrame(LocalBoundedDataFrame):

def __init__(self, rel: DuckDBPyRelation):
self._rel = rel
super().__init__(schema=self._get_schema)

def _get_schema(self) -> Schema:
return Schema(
[
pa.field(x, to_pa_type(y))
for x, y in zip(self._rel.columns, self._rel.types)
]
)
super().__init__(schema=lambda: _duck_get_schema(self._rel))

@property
def alias(self) -> str:
Expand Down Expand Up @@ -98,7 +93,7 @@ def alter_columns(self, columns: Any) -> DataFrame:
return DuckDataFrame(self._rel.project(", ".join(fields)))

def as_arrow(self, type_safe: bool = False) -> pa.Table:
return self._rel.arrow()
return _duck_as_arrow(self._rel)

def as_pandas(self) -> pd.DataFrame:
if any(pa.types.is_nested(f.type) for f in self.schema.fields):
Expand Down Expand Up @@ -169,6 +164,18 @@ def _duck_as_local(df: DuckDBPyRelation) -> DuckDBPyRelation:
return df


@as_arrow.candidate(lambda df: isinstance(df, DuckDBPyRelation))
def _duck_as_arrow(df: DuckDBPyRelation) -> pa.Table:
_df = df.arrow()
_df = replace_types_in_table(_df, LARGE_TYPES_REPLACEMENT, recursive=True)
return _df


@get_schema.candidate(lambda df: isinstance(df, DuckDBPyRelation))
def _duck_get_schema(df: DuckDBPyRelation) -> Schema:
return Schema([pa.field(x, to_pa_type(y)) for x, y in zip(df.columns, df.types)])


@get_column_names.candidate(lambda df: isinstance(df, DuckDBPyRelation))
def _get_duckdb_columns(df: DuckDBPyRelation) -> List[Any]:
return list(df.columns)
Expand Down
14 changes: 8 additions & 6 deletions fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
encode_schema_names,
encode_value_to_expr,
)
from .dataframe import DuckDataFrame
from .dataframe import DuckDataFrame, _duck_as_arrow

_FUGUE_DUCKDB_PRAGMA_CONFIG_PREFIX = "fugue.duckdb.pragma."
_FUGUE_DUCKDB_EXTENSIONS = "fugue.duckdb.extensions"
Expand Down Expand Up @@ -108,8 +108,8 @@ def _other_select(self, dfs: DataFrames, statement: str) -> DataFrame:
conn = duckdb.connect()
try:
for k, v in dfs.items():
duckdb.arrow(v.as_arrow(), connection=conn).create_view(k)
return ArrowDataFrame(conn.execute(statement).arrow())
duckdb.from_arrow(v.as_arrow(), connection=conn).create_view(k)
return ArrowDataFrame(_duck_as_arrow(conn.execute(statement)))
finally:
conn.close()

Expand Down Expand Up @@ -229,7 +229,7 @@ def persist(
# TODO: we should create DuckDB table, but it has bugs, so can't use by 0.3.1
if isinstance(df, DuckDataFrame):
# materialize
res: DataFrame = ArrowDataFrame(df.native.arrow())
res: DataFrame = ArrowDataFrame(df.as_arrow())
else:
res = self.to_df(df)
res.reset_metadata(df.metadata)
Expand Down Expand Up @@ -540,12 +540,14 @@ def _gen_duck() -> DuckDataFrame:
if isinstance(df, DuckDataFrame):
return df
rdf = DuckDataFrame(
duckdb.arrow(df.as_arrow(), connection=engine.connection)
duckdb.from_arrow(df.as_arrow(), connection=engine.connection)
)
rdf.reset_metadata(df.metadata if df.has_metadata else None)
return rdf
tdf = ArrowDataFrame(df, schema)
return DuckDataFrame(duckdb.arrow(tdf.native, connection=engine.connection))
return DuckDataFrame(
duckdb.from_arrow(tdf.native, connection=engine.connection)
)

res = _gen_duck()
if create_view:
Expand Down
39 changes: 0 additions & 39 deletions fugue_polars/_utils.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,8 @@
import polars as pl
import pyarrow as pa
from triad import Schema
from triad.utils.pyarrow import get_alter_func

from fugue.dataframe.arrow_dataframe import _build_empty_arrow


def pl_as_arrow(df: pl.DataFrame) -> pa.Table:
adf = df.to_arrow()
schema = convert_schema(adf.schema)
func = get_alter_func(adf.schema, schema, safe=False)
return func(adf)


def to_schema(df: pl.DataFrame) -> Schema:
return Schema(convert_schema(pl.DataFrame(schema=df.schema).to_arrow().schema))


def build_empty_pl(schema: Schema) -> pl.DataFrame:
return pl.from_arrow(_build_empty_arrow(schema))


def convert_schema(schema: pa.Schema) -> pa.Schema:
fields = [convert_field(f) for f in schema]
return pa.schema(fields)


def convert_field(field: pa.Field) -> pa.Field:
tp = convert_type(field.type)
if tp == field.type:
return field
return pa.field(field.name, tp)


def convert_type(tp: pa.DataType) -> pa.DataType:
if pa.types.is_struct(tp):
return pa.struct([convert_field(f) for f in tp])
if pa.types.is_list(tp) or pa.types.is_large_list(tp):
return pa.list_(convert_type(tp.value_type))
if pa.types.is_map(tp): # pragma: no cover
return pa.map_(convert_type(tp.key_type), convert_type(tp.value_type))
if pa.types.is_large_string(tp):
return pa.string()
if pa.types.is_large_binary(tp):
return pa.binary()
return tp
27 changes: 21 additions & 6 deletions fugue_polars/polars_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
from triad.collections.schema import Schema
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.pyarrow import (
LARGE_TYPES_REPLACEMENT,
replace_types_in_schema,
replace_types_in_table,
)

from fugue import ArrowDataFrame
from fugue.api import (
as_arrow,
drop_columns,
get_column_names,
get_schema,
Expand All @@ -28,7 +34,7 @@
)
from fugue.exceptions import FugueDataFrameOperationError

from ._utils import build_empty_pl, pl_as_arrow, to_schema
from ._utils import build_empty_pl


class PolarsDataFrame(LocalBoundedDataFrame):
Expand All @@ -55,7 +61,7 @@ def __init__(
InvalidOperationError("can't reset schema for pl.DataFrame"),
)
self._native = df
super().__init__(to_schema(df))
super().__init__(_get_pl_schema(df))

@property
def native(self) -> pl.DataFrame:
Expand Down Expand Up @@ -107,7 +113,7 @@ def alter_columns(self, columns: Any) -> DataFrame:
return PolarsDataFrame(pl.from_arrow(adf.native))

def as_arrow(self, type_safe: bool = False) -> pa.Table:
return pl_as_arrow(self.native)
return _pl_as_arrow(self.native)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
Expand All @@ -121,15 +127,15 @@ def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
if not self.empty:
yield from ArrowDataFrame(pl_as_arrow(self.native)).as_array_iterable(
yield from ArrowDataFrame(_pl_as_arrow(self.native)).as_array_iterable(
columns=columns
)

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
if not self.empty:
yield from ArrowDataFrame(pl_as_arrow(self.native)).as_dict_iterable(
yield from ArrowDataFrame(_pl_as_arrow(self.native)).as_dict_iterable(
columns=columns
)

Expand All @@ -144,6 +150,13 @@ def _pl_as_local_bounded(df: pl.DataFrame) -> pl.DataFrame:
return df


@as_arrow.candidate(lambda df: isinstance(df, pl.DataFrame))
def _pl_as_arrow(df: pl.DataFrame) -> pa.Table:
adf = df.to_arrow()
adf = replace_types_in_table(adf, LARGE_TYPES_REPLACEMENT)
return adf


@is_df.candidate(lambda df: isinstance(df, pl.DataFrame))
def _pl_is_df(df: pl.DataFrame) -> bool:
return True
Expand Down Expand Up @@ -181,7 +194,9 @@ def _get_pl_columns(df: pl.DataFrame) -> List[Any]:

@get_schema.candidate(lambda df: isinstance(df, pl.DataFrame))
def _get_pl_schema(df: pl.DataFrame) -> Schema:
return to_schema(df)
adf = df.to_arrow()
schema = replace_types_in_schema(adf.schema, LARGE_TYPES_REPLACEMENT)
return Schema(schema)


@rename.candidate(lambda df, *args, **kwargs: isinstance(df, pl.DataFrame))
Expand Down
2 changes: 2 additions & 0 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def get_dfs() -> Iterable[LocalDataFrame]:
)
if not cursor_set:
cursor.set(lambda: pdf.peek_array(), 0, 0)
cursor_set = True
yield pdf

input_df = IterablePandasDataFrame(get_dfs(), input_schema)
Expand Down Expand Up @@ -280,6 +281,7 @@ def get_dfs() -> Iterable[LocalDataFrame]:
pdf = ArrowDataFrame(func(adf))
if not cursor_set:
cursor.set(lambda: pdf.peek_array(), 0, 0)
cursor_set = True
yield pdf

input_df = IterableArrowDataFrame(get_dfs(), input_schema)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_version() -> str:
keywords="distributed spark dask sql dsl domain specific language",
url="http://github.com/fugue-project/fugue",
install_requires=[
"triad>=0.8.6",
"triad>=0.8.8",
"adagio>=0.2.4",
"qpd>=0.4.1",
"fugue-sql-antlr>=0.1.6",
Expand Down
4 changes: 2 additions & 2 deletions tests/fugue_duckdb/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def setUpClass(cls):

def df(self, data: Any = None, schema: Any = None) -> DuckDataFrame:
df = ArrowDataFrame(data, schema)
return DuckDataFrame(duckdb.arrow(df.native, self._con))
return DuckDataFrame(duckdb.from_arrow(df.native, self._con))

def test_as_array_special_values(self):
for func in [
Expand Down Expand Up @@ -74,7 +74,7 @@ def setUpClass(cls):

def df(self, data: Any = None, schema: Any = None) -> DuckDataFrame:
df = ArrowDataFrame(data, schema)
return DuckDataFrame(duckdb.arrow(df.native, self._con)).native
return DuckDataFrame(duckdb.from_arrow(df.native, self._con)).native

def to_native_df(self, pdf: pd.DataFrame) -> Any:
return duckdb.from_df(pdf)
Expand Down
2 changes: 1 addition & 1 deletion tests/fugue_duckdb/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_type_conversion():
con = duckdb.connect()

def assert_(tp):
dt = duckdb.arrow(pa.Table.from_pydict(dict(a=pa.nulls(2, tp))), con).types[0]
dt = duckdb.from_arrow(pa.Table.from_pydict(dict(a=pa.nulls(2, tp))), con).types[0]
assert to_pa_type(dt) == tp
dt = to_duck_type(tp)
assert to_pa_type(dt) == tp
Expand Down

0 comments on commit 1c3bef7

Please sign in to comment.