diff --git a/dlt/destinations/duckdb/configuration.py b/dlt/destinations/duckdb/configuration.py index a8b249ce49..bf66b0998c 100644 --- a/dlt/destinations/duckdb/configuration.py +++ b/dlt/destinations/duckdb/configuration.py @@ -1,8 +1,9 @@ import os import threading from pathvalidate import is_valid_filepath -from typing import Any, Final, Optional +from typing import Any, Final, Optional, Tuple +from dlt.common import logger from dlt.common.configuration import configspec from dlt.common.configuration.specs import ConnectionStringCredentials from dlt.common.configuration.specs.exceptions import InvalidConnectionString @@ -82,15 +83,18 @@ def on_resolved(self) -> None: if self.database == ":external:": return # try the pipeline context + is_default_path = False if self.database == ":pipeline:": self.database = self._path_in_pipeline(DEFAULT_DUCK_DB_NAME) # if pipeline context was not present or database was not set if not self.database: # create database locally - self.database = self._path_from_pipeline(DEFAULT_DUCK_DB_NAME) + self.database, is_default_path = self._path_from_pipeline(DEFAULT_DUCK_DB_NAME) # always make database an abs path self.database = os.path.abspath(self.database) - self._path_to_pipeline(self.database) + # do not save the default path into pipeline's local state + if not is_default_path: + self._path_to_pipeline(self.database) def _path_in_pipeline(self, rel_path: str) -> str: from dlt.common.configuration.container import Container @@ -111,7 +115,20 @@ def _path_to_pipeline(self, abspath: str) -> None: if context.is_active(): context.pipeline().set_local_state_val(LOCAL_STATE_KEY, abspath) - def _path_from_pipeline(self, default_path: str) -> str: + def _path_from_pipeline(self, default_path: str) -> Tuple[str, bool]: + """ + Returns path to DuckDB as stored in the active pipeline's local state and a boolean flag. + + If the pipeline state is not available, returns the default DuckDB path that includes the pipeline name and sets the flag to True. + If the pipeline context is not available, returns the provided default_path and sets the flag to True. + + Args: + default_path (str): The default DuckDB path to return if the pipeline context or state is not available. + + Returns: + Tuple[str, bool]: The path to the DuckDB as stored in the active pipeline's local state or the default path if not available, + and a boolean flag set to True when the default path is returned. + """ from dlt.common.configuration.container import Container from dlt.common.pipeline import PipelineContext @@ -119,11 +136,21 @@ def _path_from_pipeline(self, default_path: str) -> str: if context.is_active(): try: # use pipeline name as default - default_path = DUCK_DB_NAME % context.pipeline().pipeline_name - return context.pipeline().get_local_state_val(LOCAL_STATE_KEY) # type: ignore + pipeline = context.pipeline() + default_path = DUCK_DB_NAME % pipeline.pipeline_name + # get pipeline path from local state + pipeline_path = pipeline.get_local_state_val(LOCAL_STATE_KEY) + # make sure that path exists + if not os.path.exists(pipeline_path): + logger.warning(f"Duckdb attached to pipeline {pipeline.pipeline_name} in path {os.path.relpath(pipeline_path)} was deleted. Attaching to duckdb database '{default_path}' in current folder.") + else: + return pipeline_path, False except KeyError: + # no local state: default_path will be used pass - return default_path + + return default_path, True + def _delete_conn(self) -> None: # print("Closing conn because is owner") diff --git a/pyproject.toml b/pyproject.toml index b306482e6b..d9681f6e9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dlt" -version = "0.2.5" +version = "0.2.6a0" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] diff --git a/tests/load/duckdb/test_duckdb_client.py b/tests/load/duckdb/test_duckdb_client.py index 4acfd95e6d..401c5b6b22 100644 --- a/tests/load/duckdb/test_duckdb_client.py +++ b/tests/load/duckdb/test_duckdb_client.py @@ -5,7 +5,7 @@ from dlt.common.configuration.resolve import resolve_configuration from dlt.common.configuration.utils import get_resolved_traces -from dlt.destinations.duckdb.configuration import DuckDbClientConfiguration, DEFAULT_DUCK_DB_NAME +from dlt.destinations.duckdb.configuration import DUCK_DB_NAME, DuckDbClientConfiguration, DEFAULT_DUCK_DB_NAME from tests.load.pipeline.utils import drop_pipeline from tests.utils import patch_home_dir, autouse_test_storage, preserve_environ, TEST_STORAGE_ROOT @@ -23,7 +23,7 @@ def delete_default_duckdb_credentials() -> None: def test_duckdb_open_conn_default() -> None: delete_quack_db() try: - print(get_resolved_traces().clear()) + get_resolved_traces().clear() c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset")) print(str(c.credentials)) print(str(os.getcwd())) @@ -53,8 +53,10 @@ def test_duckdb_database_path() -> None: # still cwd db_path = os.path.abspath(os.path.join(".", "quack_pipeline.duckdb")) assert c.credentials.database.lower() == db_path.lower() - # but it is kept in the local state - assert p.get_local_state_val("duckdb_database").lower() == db_path.lower() + # we do not keep default duckdb path in the local state + with pytest.raises(KeyError): + p.get_local_state_val("duckdb_database") + # connect try: conn = c.credentials.borrow_conn(read_only=False) @@ -142,6 +144,44 @@ def test_keeps_initial_db_path() -> None: assert conn.credentials.database.lower() != os.path.abspath(db_path).lower() +def test_duckdb_database_delete() -> None: + db_path = "_storage/path_test_quack.duckdb" + p = dlt.pipeline(pipeline_name="quack_pipeline", credentials=db_path, destination="duckdb") + p.run([1, 2, 3], table_name="table", dataset_name="dataset") + # attach the pipeline + p = dlt.attach(pipeline_name="quack_pipeline") + assert p.first_run is False + # drop the database + os.remove(db_path) + p = dlt.attach(pipeline_name="quack_pipeline") + assert p.first_run is False + p.run([1, 2, 3], table_name="table", dataset_name="dataset") + # we reverted to a default path in cwd + with pytest.raises(KeyError): + p.get_local_state_val("duckdb_database") + + +def test_duck_database_path_delete() -> None: + # delete path + db_folder = "_storage/db_path" + os.makedirs(db_folder) + db_path = f"{db_folder}/path_test_quack.duckdb" + p = dlt.pipeline(pipeline_name="deep_quack_pipeline", credentials=db_path, destination="duckdb") + p.run([1, 2, 3], table_name="table", dataset_name="dataset") + # attach the pipeline + p = dlt.attach(pipeline_name="deep_quack_pipeline") + assert p.first_run is False + # drop the database + os.remove(db_path) + os.rmdir(db_folder) + p = dlt.attach(pipeline_name="deep_quack_pipeline") + assert p.first_run is False + p.run([1, 2, 3], table_name="table", dataset_name="dataset") + # we reverted to a default path in cwd + with pytest.raises(KeyError): + p.get_local_state_val("duckdb_database") + + def test_case_sensitive_database_name() -> None: # make case sensitive folder name cs_quack = os.path.join(TEST_STORAGE_ROOT, "QuAcK") @@ -170,6 +210,7 @@ def test_external_duckdb_database() -> None: conn.close() + def delete_quack_db() -> None: if os.path.isfile(DEFAULT_DUCK_DB_NAME): os.remove(DEFAULT_DUCK_DB_NAME)