From fe4ceeed00f8370ba8b2ffb8c70e53b26eb95892 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Sun, 27 Aug 2023 21:21:08 +0200 Subject: [PATCH] allows setting table name via property on DltResource (#593) * allows setting table name in resource via property * incremental load snippets cleanup and simplification * improves exceptions on wrong datatypes in run * bumps to version 0.3.12 --- dlt/common/destination/reference.py | 2 +- dlt/extract/exceptions.py | 14 ++++++ dlt/extract/extract.py | 2 +- dlt/extract/pipe.py | 8 +-- dlt/extract/schema.py | 12 +++-- dlt/pipeline/pipeline.py | 5 +- .../dlt-ecosystem/destinations/snowflake.md | 4 +- .../docs/general-usage/incremental-loading.md | 37 ++++++++++---- docs/website/docs/general-usage/resource.md | 6 +++ .../docs/walkthroughs/adjust-a-schema.md | 6 +++ pyproject.toml | 2 +- tests/extract/test_sources.py | 9 ++++ tests/pipeline/test_pipeline.py | 50 ++++++++++++++++++- 13 files changed, 128 insertions(+), 29 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 2408cf5882..83d86e0cf5 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -54,7 +54,7 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration): def normalize_dataset_name(self, schema: Schema) -> str: """Builds full db dataset (schema) name out of configured dataset name and schema name: {dataset_name}_{schema.name}. The resulting name is normalized. - If default schema name equals schema.name, the schema suffix is skipped. + If default schema name is None or equals schema.name, the schema suffix is skipped. """ if not schema.name: raise ValueError("schema_name is None or empty") diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index 15750fa9c7..79329f2107 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -62,6 +62,20 @@ def __init__(self, pipe_name: str, gen: Any, msg: str, kind: str) -> None: super().__init__(pipe_name, f"extraction of resource {pipe_name} in {kind} {self.func_name} caused an exception: {msg}") +class PipeGenInvalid(PipeException): + def __init__(self, pipe_name: str, gen: Any) -> None: + msg = "A pipe generator element must be an Iterator (ie. list or generator function). Generator element is typically created from a `data` argument to pipeline.run or extract method." + msg += "dlt will evaluate functions that were passed as data argument. If you passed a function the returned data type is not iterable. " + type_name = str(type(gen)) + msg += f" Generator type is {type_name}." + if "DltSource" in type_name: + msg += " Did you pass a @dlt.source decorated function without calling it?" + if "DltResource" in type_name: + msg += " Did you pass a function that returns dlt.resource without calling it?" + + super().__init__(pipe_name, msg) + + class ResourceNameMissing(DltResourceException): def __init__(self) -> None: super().__init__(None, """Resource name is missing. If you create a resource directly from data ie. from a list you must pass the name explicitly in `name` argument. diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index d060c080d1..0c2f871115 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -142,7 +142,7 @@ def _write_static_table(resource: DltResource, table_name: str) -> None: _write_dynamic_table(resource, pipe_item.item) else: # write item belonging to table with static name - table_name = resource.table_name + table_name = resource.table_name # type: ignore _write_static_table(resource, table_name) _write_item(table_name, resource.name, pipe_item.item) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 62bca76e17..e8a165e6aa 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -18,7 +18,7 @@ from dlt.common.typing import AnyFun, AnyType, TDataItems from dlt.common.utils import get_callable_name -from dlt.extract.exceptions import CreatePipeException, DltSourceException, ExtractorException, InvalidResourceDataTypeFunctionNotAGenerator, InvalidStepFunctionArguments, InvalidTransformerGeneratorFunction, ParametrizedResourceUnbound, PipeException, PipeItemProcessingError, PipeNotBoundToData, ResourceExtractionError +from dlt.extract.exceptions import CreatePipeException, DltSourceException, ExtractorException, InvalidResourceDataTypeFunctionNotAGenerator, InvalidStepFunctionArguments, InvalidTransformerGeneratorFunction, ParametrizedResourceUnbound, PipeException, PipeGenInvalid, PipeItemProcessingError, PipeNotBoundToData, ResourceExtractionError from dlt.extract.typing import DataItemWithMeta, ItemTransform, SupportsPipe, TPipedDataItems if TYPE_CHECKING: @@ -454,7 +454,8 @@ def from_pipe(cls, pipe: Pipe, *, max_parallel_items: int = 20, workers: int = 5 pipe = pipe._clone() # head must be iterator pipe.evaluate_gen() - assert isinstance(pipe.gen, Iterator) + if not isinstance(pipe.gen, Iterator): + raise PipeGenInvalid(pipe.name, pipe.gen) # create extractor extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode) # add as first source @@ -495,7 +496,8 @@ def _fork_pipeline(pipe: Pipe) -> None: else: # head of independent pipe must be iterator pipe.evaluate_gen() - assert isinstance(pipe.gen, Iterator) + if not isinstance(pipe.gen, Iterator): + raise PipeGenInvalid(pipe.name, pipe.gen) # add every head as source only once if not any(i.pipe == pipe for i in extract._sources): extract._sources.append(SourcePipeItem(pipe.gen, 0, pipe, None)) diff --git a/dlt/extract/schema.py b/dlt/extract/schema.py index 41846b1a7d..1bbfd61832 100644 --- a/dlt/extract/schema.py +++ b/dlt/extract/schema.py @@ -34,11 +34,15 @@ def __init__(self, name: str, table_schema_template: TTableSchemaTemplate = None self.set_template(table_schema_template) @property - def table_name(self) -> str: - """Get table name to which resource loads data. Raises in case of table names derived from data.""" + def table_name(self) -> TTableHintTemplate[str]: + """Get table name to which resource loads data. May return a callable.""" if self._table_name_hint_fun: - raise DataItemRequiredForDynamicTableHints(self._name) - return self._table_schema_template["name"] if self._table_schema_template else self._name # type: ignore + return self._table_name_hint_fun + return self._table_schema_template["name"] if self._table_schema_template else self._name + + @table_name.setter + def table_name(self, value: TTableHintTemplate[str]) -> None: + self.apply_hints(table_name=value) @property def write_disposition(self) -> TWriteDisposition: diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b72feb4888..96a9488ddb 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -779,10 +779,7 @@ def apply_hint_args(resource: DltResource) -> None: columns_dict = {c["name"]:c for c in columns} # apply hints only if any of the hints is present, table_name must be always present if table_name or parent_table_name or write_disposition or columns or primary_key: - resource_table_name: str = None - with contextlib.suppress(DataItemRequiredForDynamicTableHints): - resource_table_name = resource.table_name - resource.apply_hints(table_name or resource_table_name or resource.name, parent_table_name, write_disposition, columns_dict, primary_key) + resource.apply_hints(table_name or resource.table_name or resource.name, parent_table_name, write_disposition, columns_dict, primary_key) def choose_schema() -> Schema: """Except of explicitly passed schema, use a clone that will get discarded if extraction fails""" diff --git a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md index b6cf17b576..595852ea56 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md +++ b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md @@ -75,7 +75,7 @@ The **password authentication** is not any different from other databases like P You can also pass credentials as a database connection string. For example: ```toml # keep it at the top of your toml file! before any section starts -destination.postgres.snowflake="snowflake://loader:@kgiotue-wn98412/dlt_data?warehouse=COMPUTE_WH&role=DLT_LOADER_ROLE" +destination.snowflake.credentials="snowflake://loader:@kgiotue-wn98412/dlt_data?warehouse=COMPUTE_WH&role=DLT_LOADER_ROLE" ``` In **key pair authentication** you replace password with a private key exported in PEM format. The key may be encrypted. In that case you must provide a passphrase. @@ -95,7 +95,7 @@ private_key_passphrase="passphrase" We allow to pass private key and passphrase in connection string. Please url encode the private key and passphrase. ```toml # keep it at the top of your toml file! before any section starts -destination.postgres.snowflake="snowflake://loader:@kgiotue-wn98412/dlt_data?private_key=&private_key_passphrase=" +destination.snowflake.credentials="snowflake://loader:@kgiotue-wn98412/dlt_data?private_key=&private_key_passphrase=" ``` ## Write disposition diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 2730b291a9..96c138a69a 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -168,25 +168,32 @@ def repo_issues( repository, created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z") ): - # get issues from created from last "created_at" value - for page in _get_issues_page(access_token, repository, since=created_at.last_value): + # get issues since "created_at" stored in state on previous run (or initial_value on first run) + for page in _get_issues_page(access_token, repository, since=created_at.start_value): yield page + # last_value is updated after every page + print(created_at.last_value) ``` Here we add `created_at` argument that will receive incremental state, initialized to `1970-01-01T00:00:00Z`. It is configured to track `created_at` field in issues returned by `_get_issues_page` and then yielded. It will store the newest `created_at` value in `dlt` -[state](state.md) and make it available in `created_at.last_value` on next pipeline +[state](state.md) and make it available in `created_at.start_value` on next pipeline run. This value is used to request only issues newer (or equal) via GitHub API. -On the first run of this resource, all the issues (we use "1970-01-01T00:00:00Z" as initial to get -all of them) will be loaded and the `created_at.last_value` will get the `created_at` of most recent -issue. On the second run we'll pass this value to `_get_issues_page` to get only the newer issues. +In essence, `dlt.sources.incremental` instance above +* **created_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor +* **created_at.start_value** a maximum `created_at` value from the previous run or the **initial_value** on first run +* **created_at.last_value** a "real time" `created_at` value updated with each yielded item or page. before first yield it equals **start_value** +* **created_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill) + +When paginating you probably need **start_value** which does not change during the execution of the resource, however +most paginators will return a **next page** link which you should use. Behind the scenes, `dlt` will deduplicate the results ie. in case the last issue is returned again (`created_at` filter is inclusive) and skip already loaded ones. In the example below we incrementally load the GitHub events, where API does not let us filter for the newest events - it -always returns all of them. Nevertheless, `dlt` will load only the incremental part, skipping all the +always returns all of them. Nevertheless, `dlt` will load only the new items, filtering out all the duplicates and past issues. ```python @@ -215,7 +222,13 @@ The `start_out_of_range` boolean flag is set when the first such element is yiel since we know that github returns results ordered from newest to oldest, we know that all subsequent items will be filtered out anyway and there's no need to fetch more data. -`dlt.sources.incremental` allows to define custom `last_value` function. This lets you define +### max, min or custom `last_value_func` + +`dlt.sources.incremental` allows to choose a function that orders (compares) values coming from the items to current `last_value`. +* The default function is built-in `max` which returns bigger value of the two +* Another built-in `min` returns smaller value. + +You can pass your custom function as well. This lets you define `last_value` on complex types i.e. dictionaries and store indexes of last values, not just simple types. The `last_value` argument is a [JSON Path](https://github.com/json-path/JsonPath#operators) and lets you select nested and complex data (including the whole data item when `$` is used). @@ -244,6 +257,8 @@ def get_events(last_created_at = dlt.sources.incremental("$", last_value_func=by yield json.load(f) ``` +### Deduplication primary_key + `dlt.sources.incremental` let's you optionally set a `primary_key` that is used exclusively to deduplicate and which does not become a table hint. The same setting lets you disable the deduplication altogether when empty tuple is passed. Below we pass `primary_key` directly to @@ -304,7 +319,7 @@ def repo_issues( created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z", end_value="2022-07-01T00:00:00Z") ): # get issues from created from last "created_at" value - for page in _get_issues_page(access_token, repository, since=created_at.last_value, until=created_at.end_value): + for page in _get_issues_page(access_token, repository, since=created_at.start_value, until=created_at.end_value): yield page ``` Above we use `initial_value` and `end_value` arguments of the `incremental` to define the range of issues that we want to retrieve @@ -345,7 +360,7 @@ def tickets( ), ): for page in zendesk_client.get_pages( - "/api/v2/incremental/tickets", "tickets", start_time=updated_at.last_value + "/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value ): yield page ``` @@ -464,7 +479,7 @@ def tickets( ), ): for page in zendesk_client.get_pages( - "/api/v2/incremental/tickets", "tickets", start_time=updated_at.last_value + "/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value ): yield page diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 656811d5ea..462b93b220 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -242,6 +242,12 @@ tables.users.apply_hints( pipeline.run(tables) ``` +To just change a name of a table to which resource will load data, do the following: +```python +tables = sql_database() +tables.users.table_name = "other_users" +``` + ## Load resources You can pass individual resources or list of resources to the `dlt.pipeline` object. The resources diff --git a/docs/website/docs/walkthroughs/adjust-a-schema.md b/docs/website/docs/walkthroughs/adjust-a-schema.md index dd860c72d9..d62dc215d9 100644 --- a/docs/website/docs/walkthroughs/adjust-a-schema.md +++ b/docs/website/docs/walkthroughs/adjust-a-schema.md @@ -100,6 +100,12 @@ players_games: Run the pipeline script again and make sure that the change is visible in export schema. Then, [launch the Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md) to see the changed data. +:::note +Do not rename the tables or columns in the yaml file. `dlt` infers those from the data so the schema will be recreated. +You can [adjust the schema](../general-usage/resource.md#adjust-schema) in Python before resource is loaded. +::: + + ### Load data as json instead of generating child table or columns from flattened dicts In the export schema, you can see that white and black players properties got flattened into: diff --git a/pyproject.toml b/pyproject.toml index d3e81fe3d1..eaf70d59f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dlt" -version = "0.3.12" +version = "0.3.13" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 36e7737415..a20d87d3e1 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -349,6 +349,15 @@ def yield_twice(item): assert list(dlt.resource(["A", "b", "C"], name="data") | tx_stage) == ['A', 'A', 'B', 'B', 'C', 'C'] +def test_set_table_name() -> None: + r = dlt.resource(["A", "b", "C"], name="data") + assert r.table_name == "data" + r.table_name = "letters" + assert r.table_name == "letters" + r.table_name = lambda letter: letter + assert callable(r.table_name) + + def test_select_resources() -> None: @dlt.source diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index afaffafa2f..1b10ef50e9 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,3 +1,4 @@ +import itertools import logging import os import random @@ -13,9 +14,8 @@ from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, UnknownDestinationModule from dlt.common.pipeline import PipelineContext from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector -from dlt.common.schema.exceptions import InvalidDatasetName from dlt.common.utils import uniq_id -from dlt.extract.exceptions import SourceExhausted +from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted from dlt.extract.extract import ExtractorStorage from dlt.extract.source import DltResource, DltSource from dlt.load.exceptions import LoadClientJobFailed @@ -193,6 +193,7 @@ def test_create_pipeline_all_destinations(destination_config: DestinationTestCon p = dlt.pipeline(pipeline_name=destination_config.destination + "_pipeline", destination=destination_config.destination, staging=destination_config.staging) # are capabilities injected caps = p._container[DestinationCapabilitiesContext] + print(caps.naming_convention) # are right naming conventions created assert p._default_naming.max_length == min(caps.max_column_identifier_length, caps.max_identifier_length) p.extract([1, "2", 3], table_name="data") @@ -964,3 +965,48 @@ def test_emojis_resource_names() -> None: assert_load_info(info) table = info.load_packages[0].schema_update["_wide_peacock"] assert table["resource"] == "🦚WidePeacock" + + +def test_invalid_data_edge_cases() -> None: + # pass not evaluated source function + @dlt.source + def my_source(): + return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) + + pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy") + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(my_source) + assert isinstance(pip_ex.value.__context__, PipeGenInvalid) + assert "dlt.source" in str(pip_ex.value) + + def res_return(): + return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) + + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(res_return) + assert isinstance(pip_ex.value.__context__, PipeGenInvalid) + assert "dlt.resource" in str(pip_ex.value) + + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run({"a": "b"}, table_name="data") + assert isinstance(pip_ex.value.__context__, InvalidResourceDataTypeBasic) + + # check same cases but that yield + @dlt.source + def my_source_yield(): + yield dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) + + pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy") + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(my_source_yield) + assert isinstance(pip_ex.value.__context__, PipeGenInvalid) + assert "dlt.source" in str(pip_ex.value) + + def res_return_yield(): + return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5) + + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(res_return_yield) + assert isinstance(pip_ex.value.__context__, PipeGenInvalid) + assert "dlt.resource" in str(pip_ex.value) +