Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(destinations): implement destination_info property #1534

Open
wants to merge 9 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable


class Destination(ABC, Generic[TDestinationConfig, TDestinationClient]):
"""A destination factory that can be partially pre-configured
"""
A destination factory that can be partially pre-configured
with credentials and other config params.
"""

Expand Down Expand Up @@ -581,6 +582,22 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
"""Returns raw capabilities, before being adjusted with naming convention and config"""
...

@property
def destination_info(self) -> Dict[str, Any]:
"""Return the destination info as a dict.

Returns:
Dict[str, Any]: Destination info.
"""
conf = self.configuration(self.spec(), accept_partial=True)
return {
"destination_name": self.destination_name,
"destination_type": self.destination_type,
"environment": conf.get("environment"),
"fingerprint": conf.fingerprint(),
"repr": self.destination_description,
}

@property
def destination_name(self) -> str:
"""The destination name will either be explicitly set while creating the destination or will be taken from the type"""
Expand Down
14 changes: 14 additions & 0 deletions dlt/destinations/impl/destination/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)
from dlt.common.typing import TDataItems
from dlt.common.schema import TTableSchema
from dlt.common.utils import digest128

TDestinationCallable = Callable[[Union[TDataItems, str], TTableSchema], None]
TDestinationCallableParams = ParamSpec("TDestinationCallableParams")
Expand All @@ -27,6 +28,19 @@ class CustomDestinationClientConfiguration(DestinationClientConfiguration):
skip_dlt_columns_and_tables: bool = True
max_table_nesting: Optional[int] = 0

def fingerprint(self) -> str:
"""Returns fingerprint of the custom destination.

Returns:
str: The custom destination fingerprint.
"""
name = (
self.destination_callable
if isinstance(self.destination_callable, str)
else self.destination_callable.__module__ + "." + self.destination_callable.__name__
)
return digest128(name)

def ensure_callable(self) -> None:
"""Makes sure that valid callable was provided"""
# TODO: this surely can be done with `on_resolved`
Expand Down
19 changes: 17 additions & 2 deletions tests/destinations/test_custom_destination.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Tuple, Dict, Union, cast
from typing import List, Tuple, Dict

import dlt
import pytest
Expand All @@ -18,6 +18,7 @@
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.configuration.inject import get_fun_spec
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.utils import digest128

from dlt.destinations.impl.destination.factory import _DESTINATIONS
from dlt.destinations.impl.destination.configuration import CustomDestinationClientConfiguration
Expand Down Expand Up @@ -58,10 +59,24 @@ def items_resource() -> TDataItems:

p = dlt.pipeline("sink_test", destination=test_sink, dev_mode=True)
p.run([items_resource()])

return calls


def test_custom_destination_info():
@dlt.destination
def test_sink(items: TDataItems, table: TTableSchema) -> None:
pass

p = dlt.pipeline("sink_test", destination=test_sink, dev_mode=True) # type: ignore
assert p.destination.destination_info == {
"destination_name": "test_sink",
"destination_type": "dlt.destinations.destination",
"environment": None,
"fingerprint": digest128(test_sink.__module__ + "." + "test_sink"),
"repr": "test_sink(dlt.destinations.destination)",
}


@pytest.mark.parametrize("loader_file_format", SUPPORTED_LOADER_FORMATS)
def test_all_datatypes(loader_file_format: TLoaderFileFormat) -> None:
data_types = deepcopy(TABLE_ROW_ALL_DATA_TYPES)
Expand Down
Loading