Skip to content

Commit

Permalink
Merge pull request #1875 from dlt-hub/devel
Browse files Browse the repository at this point in the history
master merge for 1.1.0 release
  • Loading branch information
rudolfix authored Sep 26, 2024
2 parents 3bb7d25 + 6b7e59d commit d2b6d05
Show file tree
Hide file tree
Showing 151 changed files with 3,564 additions and 2,645 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ jobs:
with:
python-version: "3.10.x"

- name: Setup node 20
uses: actions/setup-node@v4
with:
node-version: 20

- name: Install Poetry
uses: snok/install-poetry@v1
with:
Expand All @@ -81,6 +86,9 @@ jobs:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}

- name: run docs preprocessor
run: make preprocess-docs

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres -E lancedb --with docs,sentry-sdk --without airflow
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk,airflow

lint:
./tools/check-package.sh
Expand Down Expand Up @@ -107,4 +107,6 @@ test-build-images: build-library
docker build -f deploy/dlt/Dockerfile.airflow --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
# docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .


preprocess-docs:
# run docs preprocessing to run a few checks and ensure examples can be parsed
cd docs/website && npm i && npm run preprocess-docs
12 changes: 9 additions & 3 deletions dlt/common/configuration/specs/connection_string_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@
@configspec
class ConnectionStringCredentials(CredentialsConfiguration):
drivername: str = dataclasses.field(default=None, init=False, repr=False, compare=False)
database: str = None
database: Optional[str] = None
password: Optional[TSecretValue] = None
username: str = None
username: Optional[str] = None
host: Optional[str] = None
port: Optional[int] = None
query: Optional[Dict[str, Any]] = None

__config_gen_annotations__: ClassVar[List[str]] = ["port", "password", "host"]
__config_gen_annotations__: ClassVar[List[str]] = [
"database",
"port",
"username",
"password",
"host",
]

def __init__(self, connection_string: Union[str, Dict[str, Any]] = None) -> None:
"""Initializes the credentials from SQLAlchemy like connection string or from dict holding connection string elements"""
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
TLoaderReplaceStrategy,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

Expand Down Expand Up @@ -169,7 +170,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None
merge_strategies_selector: MergeStrategySelector = None
# TODO: also add `supported_replace_strategies` capability
supported_replace_strategies: Sequence[TLoaderReplaceStrategy] = None

max_parallel_load_jobs: Optional[int] = None
"""The destination can set the maximum amount of parallel load jobs being executed"""
Expand Down
13 changes: 7 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.utils import verify_schema_capabilities, verify_supported_data_types
from dlt.common.exceptions import TerminalValueError
from dlt.common.exceptions import TerminalException
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import C_DLT_LOAD_ID, _TTableSchemaBase, TWriteDisposition
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table
from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
Expand All @@ -41,14 +44,12 @@
UnknownDestinationModule,
DestinationSchemaTampered,
DestinationTransientException,
DestinationTerminalException,
)
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc

TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")
Expand Down Expand Up @@ -353,7 +354,7 @@ def __init__(self, file_path: str) -> None:
# ensure file name
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._exception: Exception = None
self._exception: BaseException = None

# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
Expand Down Expand Up @@ -392,7 +393,7 @@ def run_managed(
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
except (TerminalException, AssertionError) as e:
self._state = "failed"
self._exception = e
logger.exception(f"Terminal exception in job {self.job_id()} in file {self._file_path}")
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class NormalizerInfo(TypedDict, total=True):

TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"]
TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]


WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class FilesystemConfiguration(BaseConfiguration):
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None
max_state_files: int = 100
"""Maximum number of pipeline state files to keep; 0 or negative value disables cleanup."""

@property
def protocol(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.timestamp_precision = 3
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.merge_strategies_selector = athena_merge_strategies_selector
return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_clone_table = True
caps.schema_supports_numeric_precision = False # no precision information in BigQuery
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = True

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
15 changes: 13 additions & 2 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from urllib.parse import urlparse, urlunparse

from dlt import config
from dlt.common.configuration.specs.azure_credentials import (
AzureServicePrincipalCredentialsWithoutDefaults,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
HasFollowupJobs,
Expand Down Expand Up @@ -95,15 +98,23 @@ def run(self) -> None:
))
"""
elif bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(staging_credentials, AzureCredentialsWithoutDefaults)
assert isinstance(
staging_credentials, AzureCredentialsWithoutDefaults
), "AzureCredentialsWithoutDefaults required to pass explicit credential"
# Explicit azure credentials are needed to load from bucket without a named stage
credentials_clause = f"""WITH(CREDENTIAL(AZURE_SAS_TOKEN='{staging_credentials.azure_storage_sas_token}'))"""
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)

if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(staging_credentials, AzureCredentialsWithoutDefaults)
assert isinstance(
staging_credentials,
(
AzureCredentialsWithoutDefaults,
AzureServicePrincipalCredentialsWithoutDefaults,
),
)
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)
Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/dremio/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
5 changes: 0 additions & 5 deletions dlt/destinations/impl/duckdb/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@

@configspec(init=False)
class DuckDbBaseCredentials(ConnectionStringCredentials):
password: Optional[TSecretValue] = None
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None

read_only: bool = False # open database read/write

def borrow_conn(self, read_only: bool) -> Any:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/duckdb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
"reference",
]
caps.has_case_sensitive_identifiers = True
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
46 changes: 41 additions & 5 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import base64

from types import TracebackType
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast, Any
from fsspec import AbstractFileSystem
from contextlib import contextmanager

Expand Down Expand Up @@ -164,9 +164,12 @@ def _storage_options(self) -> Dict[str, str]:
return _deltalake_storage_options(self._job_client.config)

def _delta_table(self) -> Optional["DeltaTable"]: # type: ignore[name-defined] # noqa: F821
from dlt.common.libs.deltalake import try_get_deltatable
from dlt.common.libs.deltalake import DeltaTable

return try_get_deltatable(self.make_remote_url(), storage_options=self._storage_options)
if DeltaTable.is_deltatable(self.make_remote_url(), storage_options=self._storage_options):
return DeltaTable(self.make_remote_url(), storage_options=self._storage_options)
else:
return None

@property
def _partition_columns(self) -> List[str]:
Expand Down Expand Up @@ -476,7 +479,9 @@ def _to_path_safe_string(self, s: str) -> str:
"""for base64 strings"""
return base64.b64decode(s).hex() if s else None

def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]:
def _list_dlt_table_files(
self, table_name: str, pipeline_name: str = None
) -> Iterator[Tuple[str, List[str]]]:
dirname = self.get_table_dir(table_name)
if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
Expand All @@ -485,7 +490,9 @@ def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str
fileparts = filename.split(FILENAME_SEPARATOR)
if len(fileparts) != 3:
continue
yield filepath, fileparts
# Filters only if pipeline_name provided
if pipeline_name is None or fileparts[0] == pipeline_name:
yield filepath, fileparts

def _store_load(self, load_id: str) -> None:
# write entry to load "table"
Expand Down Expand Up @@ -520,6 +527,31 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _cleanup_pipeline_states(self, pipeline_name: str) -> None:
state_table_files = list(
self._list_dlt_table_files(self.schema.state_table_name, pipeline_name)
)

if len(state_table_files) > self.config.max_state_files:
# filter and collect a list of state files
state_file_info: List[Dict[str, Any]] = [
{
"load_id": float(fileparts[1]), # convert load_id to float for comparison
"filepath": filepath,
}
for filepath, fileparts in state_table_files
]

# sort state file info by load_id in descending order
state_file_info.sort(key=lambda x: x["load_id"], reverse=True)

# keeping only the most recent MAX_STATE_HISTORY files
files_to_delete = state_file_info[self.config.max_state_files :]

# delete the old files
for file_info in files_to_delete:
self._delete_file(file_info["filepath"])

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
if self.config.as_staging_destination:
Expand All @@ -539,6 +571,10 @@ def _store_current_state(self, load_id: str) -> None:
# write
self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc))

# perform state cleanup only if max_state_files is set to a positive value
if self.config.max_state_files >= 1:
self._cleanup_pipeline_states(pipeline_name)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# search newest state
selected_path = None
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/lancedb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:

caps.decimal_precision = (38, 18)
caps.timestamp_precision = 6
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/motherduck/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.max_parallel_load_jobs = 8
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/mssql/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
@configspec(init=False)
class MsSqlCredentials(ConnectionStringCredentials):
drivername: Final[str] = dataclasses.field(default="mssql", init=False, repr=False, compare=False) # type: ignore
database: str = None
username: str = None
password: TSecretValue = None
host: str = None
port: int = 1433
Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/mssql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_rows_per_insert = 1000
caps.timestamp_precision = 7
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/postgres/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
@configspec(init=False)
class PostgresCredentials(ConnectionStringCredentials):
drivername: Final[str] = dataclasses.field(default="postgresql", init=False, repr=False, compare=False) # type: ignore
database: str = None
username: str = None
password: TSecretValue = None
host: str = None
port: int = 5432
Expand Down
Loading

0 comments on commit d2b6d05

Please sign in to comment.