Skip to content

Commit

Permalink
Merge pull request #291 from dlt-hub/rfix/creates-duckdb-pwd
Browse files Browse the repository at this point in the history
makes duckdb database to follow current working directory
  • Loading branch information
rudolfix authored Apr 23, 2023
2 parents 1643c4d + 443a957 commit ade25a0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
41 changes: 34 additions & 7 deletions dlt/destinations/duckdb/configuration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import threading
from pathvalidate import is_valid_filepath
from typing import Any, Final, Optional
from typing import Any, Final, Optional, Tuple

from dlt.common import logger
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.configuration.specs.exceptions import InvalidConnectionString
Expand Down Expand Up @@ -82,15 +83,18 @@ def on_resolved(self) -> None:
if self.database == ":external:":
return
# try the pipeline context
is_default_path = False
if self.database == ":pipeline:":
self.database = self._path_in_pipeline(DEFAULT_DUCK_DB_NAME)
# if pipeline context was not present or database was not set
if not self.database:
# create database locally
self.database = self._path_from_pipeline(DEFAULT_DUCK_DB_NAME)
self.database, is_default_path = self._path_from_pipeline(DEFAULT_DUCK_DB_NAME)
# always make database an abs path
self.database = os.path.abspath(self.database)
self._path_to_pipeline(self.database)
# do not save the default path into pipeline's local state
if not is_default_path:
self._path_to_pipeline(self.database)

def _path_in_pipeline(self, rel_path: str) -> str:
from dlt.common.configuration.container import Container
Expand All @@ -111,19 +115,42 @@ def _path_to_pipeline(self, abspath: str) -> None:
if context.is_active():
context.pipeline().set_local_state_val(LOCAL_STATE_KEY, abspath)

def _path_from_pipeline(self, default_path: str) -> str:
def _path_from_pipeline(self, default_path: str) -> Tuple[str, bool]:
"""
Returns path to DuckDB as stored in the active pipeline's local state and a boolean flag.
If the pipeline state is not available, returns the default DuckDB path that includes the pipeline name and sets the flag to True.
If the pipeline context is not available, returns the provided default_path and sets the flag to True.
Args:
default_path (str): The default DuckDB path to return if the pipeline context or state is not available.
Returns:
Tuple[str, bool]: The path to the DuckDB as stored in the active pipeline's local state or the default path if not available,
and a boolean flag set to True when the default path is returned.
"""
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext

context = Container()[PipelineContext]
if context.is_active():
try:
# use pipeline name as default
default_path = DUCK_DB_NAME % context.pipeline().pipeline_name
return context.pipeline().get_local_state_val(LOCAL_STATE_KEY) # type: ignore
pipeline = context.pipeline()
default_path = DUCK_DB_NAME % pipeline.pipeline_name
# get pipeline path from local state
pipeline_path = pipeline.get_local_state_val(LOCAL_STATE_KEY)
# make sure that path exists
if not os.path.exists(pipeline_path):
logger.warning(f"Duckdb attached to pipeline {pipeline.pipeline_name} in path {os.path.relpath(pipeline_path)} was deleted. Attaching to duckdb database '{default_path}' in current folder.")
else:
return pipeline_path, False
except KeyError:
# no local state: default_path will be used
pass
return default_path

return default_path, True


def _delete_conn(self) -> None:
# print("Closing conn because is owner")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dlt"
version = "0.2.5"
version = "0.2.6a0"
description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run."
authors = ["dltHub Inc. <[email protected]>"]
maintainers = [ "Marcin Rudolf <[email protected]>", "Adrian Brudaru <[email protected]>", "Ty Dunn <[email protected]>"]
Expand Down
49 changes: 45 additions & 4 deletions tests/load/duckdb/test_duckdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.configuration.resolve import resolve_configuration
from dlt.common.configuration.utils import get_resolved_traces

from dlt.destinations.duckdb.configuration import DuckDbClientConfiguration, DEFAULT_DUCK_DB_NAME
from dlt.destinations.duckdb.configuration import DUCK_DB_NAME, DuckDbClientConfiguration, DEFAULT_DUCK_DB_NAME

from tests.load.pipeline.utils import drop_pipeline
from tests.utils import patch_home_dir, autouse_test_storage, preserve_environ, TEST_STORAGE_ROOT
Expand All @@ -23,7 +23,7 @@ def delete_default_duckdb_credentials() -> None:
def test_duckdb_open_conn_default() -> None:
delete_quack_db()
try:
print(get_resolved_traces().clear())
get_resolved_traces().clear()
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset"))
print(str(c.credentials))
print(str(os.getcwd()))
Expand Down Expand Up @@ -53,8 +53,10 @@ def test_duckdb_database_path() -> None:
# still cwd
db_path = os.path.abspath(os.path.join(".", "quack_pipeline.duckdb"))
assert c.credentials.database.lower() == db_path.lower()
# but it is kept in the local state
assert p.get_local_state_val("duckdb_database").lower() == db_path.lower()
# we do not keep default duckdb path in the local state
with pytest.raises(KeyError):
p.get_local_state_val("duckdb_database")

# connect
try:
conn = c.credentials.borrow_conn(read_only=False)
Expand Down Expand Up @@ -142,6 +144,44 @@ def test_keeps_initial_db_path() -> None:
assert conn.credentials.database.lower() != os.path.abspath(db_path).lower()


def test_duckdb_database_delete() -> None:
db_path = "_storage/path_test_quack.duckdb"
p = dlt.pipeline(pipeline_name="quack_pipeline", credentials=db_path, destination="duckdb")
p.run([1, 2, 3], table_name="table", dataset_name="dataset")
# attach the pipeline
p = dlt.attach(pipeline_name="quack_pipeline")
assert p.first_run is False
# drop the database
os.remove(db_path)
p = dlt.attach(pipeline_name="quack_pipeline")
assert p.first_run is False
p.run([1, 2, 3], table_name="table", dataset_name="dataset")
# we reverted to a default path in cwd
with pytest.raises(KeyError):
p.get_local_state_val("duckdb_database")


def test_duck_database_path_delete() -> None:
# delete path
db_folder = "_storage/db_path"
os.makedirs(db_folder)
db_path = f"{db_folder}/path_test_quack.duckdb"
p = dlt.pipeline(pipeline_name="deep_quack_pipeline", credentials=db_path, destination="duckdb")
p.run([1, 2, 3], table_name="table", dataset_name="dataset")
# attach the pipeline
p = dlt.attach(pipeline_name="deep_quack_pipeline")
assert p.first_run is False
# drop the database
os.remove(db_path)
os.rmdir(db_folder)
p = dlt.attach(pipeline_name="deep_quack_pipeline")
assert p.first_run is False
p.run([1, 2, 3], table_name="table", dataset_name="dataset")
# we reverted to a default path in cwd
with pytest.raises(KeyError):
p.get_local_state_val("duckdb_database")


def test_case_sensitive_database_name() -> None:
# make case sensitive folder name
cs_quack = os.path.join(TEST_STORAGE_ROOT, "QuAcK")
Expand Down Expand Up @@ -170,6 +210,7 @@ def test_external_duckdb_database() -> None:
conn.close()



def delete_quack_db() -> None:
if os.path.isfile(DEFAULT_DUCK_DB_NAME):
os.remove(DEFAULT_DUCK_DB_NAME)

0 comments on commit ade25a0

Please sign in to comment.