Skip to content

Commit

Permalink
Merge pull request #460 from dlt-hub/rfix/adds-motherduck-destination
Browse files Browse the repository at this point in the history
adds motherduck destination
  • Loading branch information
rudolfix authored Jun 28, 2023
2 parents 7e902d4 + 59b3f54 commit e7fafaf
Show file tree
Hide file tree
Showing 29 changed files with 3,087 additions and 2,823 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ jobs:
# run: poetry install --no-interaction

- run: |
poetry run pytest tests/load tests/cli --ignore=tests/load/bigquery --ignore=tests/load/snowflake -k '(not bigquery and not snowflake)'
poetry run pytest tests/load tests/cli --ignore=tests/load/bigquery --ignore=tests/load/snowflake -k '(not bigquery and not snowflake and not motherduck)'
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load tests/cli --ignore=tests/load/bigquery --ignore=tests/load/snowflake -k "(not bigquery and not snowflake)"
poetry run pytest tests/load tests/cli --ignore=tests/load/bigquery --ignore=tests/load/snowflake -k "(not bigquery and not snowflake and not motherduck)"
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
Expand Down
4 changes: 1 addition & 3 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc

# import jsonlines
from dataclasses import dataclass
from typing import Any, Dict, Sequence, IO, Type, Optional, List, cast

Expand Down Expand Up @@ -177,7 +176,6 @@ def data_format(cls) -> TFileFormatSpec:
supports_compression=True,
requires_destination_capabilities=True,
)
return TFileFormatSpec("insert_values", "insert_values", False, False, requires_destination_capabilities=True)


@configspec
Expand Down Expand Up @@ -227,7 +225,7 @@ def write_data(self, rows: Sequence[Any]) -> None:
for key in self.complex_indices:
for row in rows:
if key in row:
row[key] = json.dumps(row[key]) if row[key] else row[key]
row[key] = json.dumps(row[key])

table = pyarrow.Table.from_pylist(rows, schema=self.schema)
# Write
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_job_id_from_file_path(file_path: str) -> str:
class BigQueryMergeJob(SqlMergeJob):

@classmethod
def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: str, key_clauses: Sequence[str]) -> List[str]:
def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: str, key_clauses: Sequence[str], for_delete: bool) -> List[str]:
# generate several clauses: BigQuery does not support OR nor unions
sql: List[str] = []
for clause in key_clauses:
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/bigquery/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BigQuerySqlClient(SqlClientBase[bigquery.Client], DBTransaction):
def __init__(self, dataset_name: str, credentials: GcpServiceAccountCredentialsWithoutDefaults) -> None:
self._client: bigquery.Client = None
self.credentials: GcpServiceAccountCredentialsWithoutDefaults = credentials
super().__init__(dataset_name)
super().__init__(credentials.project_id, dataset_name)

self._default_retry = bigquery.DEFAULT_RETRY.with_deadline(credentials.retry_deadline)
self._default_query = bigquery.QueryJobConfig(default_dataset=self.fully_qualified_dataset_name(escape=False))
Expand Down
38 changes: 22 additions & 16 deletions dlt/destinations/duckdb/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@


@configspec
class DuckDbCredentials(ConnectionStringCredentials):
drivername: Final[str] = "duckdb" # type: ignore
class DuckDbBaseCredentials(ConnectionStringCredentials):
password: Optional[TSecretValue] = None
username: Optional[str] = None
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None

read_only: bool = False # open database read/write

__config_gen_annotations__: ClassVar[List[str]] = []

def borrow_conn(self, read_only: bool) -> Any:
import duckdb

Expand All @@ -37,7 +33,7 @@ def borrow_conn(self, read_only: bool) -> Any:
# obtain a lock because duck releases the GIL and we have refcount concurrency
with self._conn_lock:
if not hasattr(self, "_conn"):
self._conn = duckdb.connect(database=self.database, read_only=read_only)
self._conn = duckdb.connect(database=self._conn_str(), read_only=read_only)
self._conn_owner = True
self._conn_borrows = 0

Expand Down Expand Up @@ -79,6 +75,26 @@ def parse_native_representation(self, native_value: Any) -> None:
else:
raise

def _conn_str(self) -> str:
return self.database

def _delete_conn(self) -> None:
# print("Closing conn because is owner")
self._conn.close()
delattr(self, "_conn")

def __del__(self) -> None:
if hasattr(self, "_conn") and self._conn_owner:
self._delete_conn()


@configspec
class DuckDbCredentials(DuckDbBaseCredentials):
drivername: Final[str] = "duckdb" # type: ignore
username: Optional[str] = None

__config_gen_annotations__: ClassVar[List[str]] = []

def on_resolved(self) -> None:
# do not set any paths for external database
if self.database == ":external:":
Expand Down Expand Up @@ -158,16 +174,6 @@ def _path_from_pipeline(self, default_path: str) -> Tuple[str, bool]:
return default_path, True


def _delete_conn(self) -> None:
# print("Closing conn because is owner")
self._conn.close()
delattr(self, "_conn")

def __del__(self) -> None:
if hasattr(self, "_conn") and self._conn_owner:
self._delete_conn()


@configspec(init=True)
class DuckDbClientConfiguration(DestinationClientDwhConfiguration):
destination_name: Final[str] = "duckdb" # type: ignore
Expand Down
15 changes: 3 additions & 12 deletions dlt/destinations/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error

from dlt.destinations.duckdb import capabilities
from dlt.destinations.duckdb.configuration import DuckDbCredentials
from dlt.destinations.duckdb.configuration import DuckDbBaseCredentials


class DuckDBDBApiCursorImpl(DBApiCursorImpl):
Expand All @@ -34,8 +34,8 @@ class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
dbapi: ClassVar[DBApi] = duckdb
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(self, dataset_name: str, credentials: DuckDbCredentials) -> None:
super().__init__(dataset_name)
def __init__(self, dataset_name: str, credentials: DuckDbBaseCredentials) -> None:
super().__init__(None, dataset_name)
self._conn: duckdb.DuckDBPyConnection = None
self.credentials = credentials

Expand Down Expand Up @@ -91,15 +91,6 @@ def rollback_transaction(self) -> None:
def native_connection(self) -> duckdb.DuckDBPyConnection:
return self._conn

def has_dataset(self) -> bool:
query = """
SELECT 1
FROM INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name = %s;
"""
rows = self.execute_sql(query, self.fully_qualified_dataset_name(escape=False))
return len(rows) > 0

def create_dataset(self) -> None:
self.execute_sql("CREATE SCHEMA %s" % self.fully_qualified_dataset_name())

Expand Down
17 changes: 10 additions & 7 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,18 @@ def _null_to_bool(v: str) -> bool:
return True
raise ValueError(v)

schema_table: TTableSchemaColumns = {}
db_params = self.sql_client.make_qualified_table_name(table_name, escape=False).split(".", 3)
query = """
SELECT column_name, data_type, is_nullable, numeric_precision, numeric_scale
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position;
"""
rows = self.sql_client.execute_sql(query, self.sql_client.fully_qualified_dataset_name(escape=False), table_name)
SELECT column_name, data_type, is_nullable, numeric_precision, numeric_scale
FROM INFORMATION_SCHEMA.COLUMNS
WHERE """
if len(db_params) == 3:
query += "table_catalog = %s AND "
query += "table_schema = %s AND table_name = %s ORDER BY ordinal_position;"
rows = self.sql_client.execute_sql(query, *db_params)

# if no rows we assume that table does not exist
schema_table: TTableSchemaColumns = {}
if len(rows) == 0:
# TODO: additionally check if table exists
return False, schema_table
Expand Down
45 changes: 45 additions & 0 deletions dlt/destinations/motherduck/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Type

from dlt.common.schema.schema import Schema
from dlt.common.configuration import with_config, known_sections
from dlt.common.configuration.accessors import config
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_duckdb_literal
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration

from dlt.destinations.motherduck.configuration import MotherDuckClientConfiguration


@with_config(spec=MotherDuckClientConfiguration, sections=(known_sections.DESTINATION, "motherduck",))
def _configure(config: MotherDuckClientConfiguration = config.value) -> MotherDuckClientConfiguration:
return config


def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "parquet"
caps.supported_loader_file_formats = ["parquet", "insert_values", "sql"]
caps.escape_identifier = escape_postgres_identifier
caps.escape_literal = escape_duckdb_literal
caps.max_identifier_length = 65536
caps.max_column_identifier_length = 65536
caps.naming_convention = "duck_case"
caps.max_query_length = 512 * 1024
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 1024 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = False
caps.alter_add_multi_column = False

return caps


def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase:
# import client when creating instance so capabilities and config specs can be accessed without dependencies installed
from dlt.destinations.motherduck.motherduck import MotherDuckClient

return MotherDuckClient(schema, _configure(initial_config)) # type: ignore


def spec() -> Type[DestinationClientConfiguration]:
return MotherDuckClientConfiguration
45 changes: 45 additions & 0 deletions dlt/destinations/motherduck/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Any, ClassVar, Final, List

from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.common.typing import TSecretValue
from dlt.common.configuration.exceptions import ConfigurationValueError

from dlt.destinations.duckdb.configuration import DuckDbBaseCredentials

MOTHERDUCK_DRIVERNAME = "md"


@configspec
class MotherDuckCredentials(DuckDbBaseCredentials):
drivername: Final[str] = "md" # type: ignore
username: str = "motherduck"

read_only: bool = False # open database read/write

__config_gen_annotations__: ClassVar[List[str]] = ["password", "database"]

def _conn_str(self) -> str:
return f"{MOTHERDUCK_DRIVERNAME}:{self.database}?token={self.password}"

def _token_to_password(self) -> None:
# could be motherduck connection
if self.query and "token" in self.query:
self.password = TSecretValue(self.query.pop("token"))

def parse_native_representation(self, native_value: Any) -> None:
super().parse_native_representation(native_value)
self._token_to_password()

def on_resolved(self) -> None:
self._token_to_password()
if self.drivername == MOTHERDUCK_DRIVERNAME and not self.password:
raise ConfigurationValueError("Motherduck schema 'md' was specified without corresponding token or password. The required format of connection string is: md:///<database_name>?token=<token>")


@configspec(init=True)
class MotherDuckClientConfiguration(DestinationClientDwhConfiguration):
destination_name: Final[str] = "motherduck" # type: ignore
credentials: MotherDuckCredentials

create_indexes: bool = False # should unique indexes be created, this slows loading down massively
24 changes: 24 additions & 0 deletions dlt/destinations/motherduck/motherduck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import ClassVar

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.schema import Schema


from dlt.destinations.duckdb.duck import DuckDbClient
from dlt.destinations.motherduck import capabilities
from dlt.destinations.motherduck.sql_client import MotherDuckSqlClient
from dlt.destinations.motherduck.configuration import MotherDuckClientConfiguration


class MotherDuckClient(DuckDbClient):

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(self, schema: Schema, config: MotherDuckClientConfiguration) -> None:
super().__init__(schema, config) # type: ignore
sql_client = MotherDuckSqlClient(
self.make_dataset_name(schema, config.dataset_name, config.default_schema_name),
config.credentials
)
self.config: MotherDuckClientConfiguration = config # type: ignore
self.sql_client: MotherDuckSqlClient = sql_client
27 changes: 27 additions & 0 deletions dlt/destinations/motherduck/sql_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import duckdb

from contextlib import contextmanager
from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence
from dlt.common.destination import DestinationCapabilitiesContext

from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation
from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame
from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error

from dlt.destinations.duckdb.sql_client import DuckDbSqlClient, DuckDBDBApiCursorImpl
from dlt.destinations.motherduck import capabilities
from dlt.destinations.motherduck.configuration import MotherDuckCredentials


class MotherDuckSqlClient(DuckDbSqlClient):

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(self, dataset_name: str, credentials: MotherDuckCredentials) -> None:
super().__init__(dataset_name, credentials)
self.database_name = credentials.database

def fully_qualified_dataset_name(self, escape: bool = True) -> str:
database_name = self.capabilities.escape_identifier(self.database_name) if escape else self.database_name
dataset_name = self.capabilities.escape_identifier(self.dataset_name) if escape else self.dataset_name
return f"{database_name}.{dataset_name}"
11 changes: 1 addition & 10 deletions dlt/destinations/postgres/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Psycopg2SqlClient(SqlClientBase["psycopg2.connection"], DBTransaction):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(self, dataset_name: str, credentials: PostgresCredentials) -> None:
super().__init__(dataset_name)
super().__init__(credentials.database, dataset_name)
self._conn: psycopg2.connection = None
self.credentials = credentials

Expand Down Expand Up @@ -68,15 +68,6 @@ def rollback_transaction(self) -> None:
def native_connection(self) -> "psycopg2.connection":
return self._conn

def has_dataset(self) -> bool:
query = """
SELECT 1
FROM INFORMATION_SCHEMA.SCHEMATA
WHERE schema_name = %s;
"""
rows = self.execute_sql(query, self.fully_qualified_dataset_name(escape=False))
return len(rows) > 0

def create_dataset(self) -> None:
self.execute_sql("CREATE SCHEMA %s" % self.fully_qualified_dataset_name())

Expand Down
Loading

0 comments on commit e7fafaf

Please sign in to comment.