Skip to content

Commit

Permalink
allows setting table name via property on DltResource (#593)
Browse files Browse the repository at this point in the history
* allows setting table name in resource via property

* incremental load snippets cleanup and simplification

* improves exceptions on wrong datatypes in run

* bumps to version 0.3.12
  • Loading branch information
rudolfix authored Aug 27, 2023
1 parent 784c1cf commit fe4ceee
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 29 deletions.
2 changes: 1 addition & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration):
def normalize_dataset_name(self, schema: Schema) -> str:
"""Builds full db dataset (schema) name out of configured dataset name and schema name: {dataset_name}_{schema.name}. The resulting name is normalized.
If default schema name equals schema.name, the schema suffix is skipped.
If default schema name is None or equals schema.name, the schema suffix is skipped.
"""
if not schema.name:
raise ValueError("schema_name is None or empty")
Expand Down
14 changes: 14 additions & 0 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ def __init__(self, pipe_name: str, gen: Any, msg: str, kind: str) -> None:
super().__init__(pipe_name, f"extraction of resource {pipe_name} in {kind} {self.func_name} caused an exception: {msg}")


class PipeGenInvalid(PipeException):
def __init__(self, pipe_name: str, gen: Any) -> None:
msg = "A pipe generator element must be an Iterator (ie. list or generator function). Generator element is typically created from a `data` argument to pipeline.run or extract method."
msg += "dlt will evaluate functions that were passed as data argument. If you passed a function the returned data type is not iterable. "
type_name = str(type(gen))
msg += f" Generator type is {type_name}."
if "DltSource" in type_name:
msg += " Did you pass a @dlt.source decorated function without calling it?"
if "DltResource" in type_name:
msg += " Did you pass a function that returns dlt.resource without calling it?"

super().__init__(pipe_name, msg)


class ResourceNameMissing(DltResourceException):
def __init__(self) -> None:
super().__init__(None, """Resource name is missing. If you create a resource directly from data ie. from a list you must pass the name explicitly in `name` argument.
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _write_static_table(resource: DltResource, table_name: str) -> None:
_write_dynamic_table(resource, pipe_item.item)
else:
# write item belonging to table with static name
table_name = resource.table_name
table_name = resource.table_name # type: ignore
_write_static_table(resource, table_name)
_write_item(table_name, resource.name, pipe_item.item)

Expand Down
8 changes: 5 additions & 3 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from dlt.common.typing import AnyFun, AnyType, TDataItems
from dlt.common.utils import get_callable_name

from dlt.extract.exceptions import CreatePipeException, DltSourceException, ExtractorException, InvalidResourceDataTypeFunctionNotAGenerator, InvalidStepFunctionArguments, InvalidTransformerGeneratorFunction, ParametrizedResourceUnbound, PipeException, PipeItemProcessingError, PipeNotBoundToData, ResourceExtractionError
from dlt.extract.exceptions import CreatePipeException, DltSourceException, ExtractorException, InvalidResourceDataTypeFunctionNotAGenerator, InvalidStepFunctionArguments, InvalidTransformerGeneratorFunction, ParametrizedResourceUnbound, PipeException, PipeGenInvalid, PipeItemProcessingError, PipeNotBoundToData, ResourceExtractionError
from dlt.extract.typing import DataItemWithMeta, ItemTransform, SupportsPipe, TPipedDataItems

if TYPE_CHECKING:
Expand Down Expand Up @@ -454,7 +454,8 @@ def from_pipe(cls, pipe: Pipe, *, max_parallel_items: int = 20, workers: int = 5
pipe = pipe._clone()
# head must be iterator
pipe.evaluate_gen()
assert isinstance(pipe.gen, Iterator)
if not isinstance(pipe.gen, Iterator):
raise PipeGenInvalid(pipe.name, pipe.gen)
# create extractor
extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode)
# add as first source
Expand Down Expand Up @@ -495,7 +496,8 @@ def _fork_pipeline(pipe: Pipe) -> None:
else:
# head of independent pipe must be iterator
pipe.evaluate_gen()
assert isinstance(pipe.gen, Iterator)
if not isinstance(pipe.gen, Iterator):
raise PipeGenInvalid(pipe.name, pipe.gen)
# add every head as source only once
if not any(i.pipe == pipe for i in extract._sources):
extract._sources.append(SourcePipeItem(pipe.gen, 0, pipe, None))
Expand Down
12 changes: 8 additions & 4 deletions dlt/extract/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ def __init__(self, name: str, table_schema_template: TTableSchemaTemplate = None
self.set_template(table_schema_template)

@property
def table_name(self) -> str:
"""Get table name to which resource loads data. Raises in case of table names derived from data."""
def table_name(self) -> TTableHintTemplate[str]:
"""Get table name to which resource loads data. May return a callable."""
if self._table_name_hint_fun:
raise DataItemRequiredForDynamicTableHints(self._name)
return self._table_schema_template["name"] if self._table_schema_template else self._name # type: ignore
return self._table_name_hint_fun
return self._table_schema_template["name"] if self._table_schema_template else self._name

@table_name.setter
def table_name(self, value: TTableHintTemplate[str]) -> None:
self.apply_hints(table_name=value)

@property
def write_disposition(self) -> TWriteDisposition:
Expand Down
5 changes: 1 addition & 4 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,7 @@ def apply_hint_args(resource: DltResource) -> None:
columns_dict = {c["name"]:c for c in columns}
# apply hints only if any of the hints is present, table_name must be always present
if table_name or parent_table_name or write_disposition or columns or primary_key:
resource_table_name: str = None
with contextlib.suppress(DataItemRequiredForDynamicTableHints):
resource_table_name = resource.table_name
resource.apply_hints(table_name or resource_table_name or resource.name, parent_table_name, write_disposition, columns_dict, primary_key)
resource.apply_hints(table_name or resource.table_name or resource.name, parent_table_name, write_disposition, columns_dict, primary_key)

def choose_schema() -> Schema:
"""Except of explicitly passed schema, use a clone that will get discarded if extraction fails"""
Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/dlt-ecosystem/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The **password authentication** is not any different from other databases like P
You can also pass credentials as a database connection string. For example:
```toml
# keep it at the top of your toml file! before any section starts
destination.postgres.snowflake="snowflake://loader:<password>@kgiotue-wn98412/dlt_data?warehouse=COMPUTE_WH&role=DLT_LOADER_ROLE"
destination.snowflake.credentials="snowflake://loader:<password>@kgiotue-wn98412/dlt_data?warehouse=COMPUTE_WH&role=DLT_LOADER_ROLE"
```

In **key pair authentication** you replace password with a private key exported in PEM format. The key may be encrypted. In that case you must provide a passphrase.
Expand All @@ -95,7 +95,7 @@ private_key_passphrase="passphrase"
We allow to pass private key and passphrase in connection string. Please url encode the private key and passphrase.
```toml
# keep it at the top of your toml file! before any section starts
destination.postgres.snowflake="snowflake://loader:<password>@kgiotue-wn98412/dlt_data?private_key=<url encoded pem>&private_key_passphrase=<url encoded passphrase>"
destination.snowflake.credentials="snowflake://loader:<password>@kgiotue-wn98412/dlt_data?private_key=<url encoded pem>&private_key_passphrase=<url encoded passphrase>"
```

## Write disposition
Expand Down
37 changes: 26 additions & 11 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,32 @@ def repo_issues(
repository,
created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
):
# get issues from created from last "created_at" value
for page in _get_issues_page(access_token, repository, since=created_at.last_value):
# get issues since "created_at" stored in state on previous run (or initial_value on first run)
for page in _get_issues_page(access_token, repository, since=created_at.start_value):
yield page
# last_value is updated after every page
print(created_at.last_value)
```

Here we add `created_at` argument that will receive incremental state, initialized to
`1970-01-01T00:00:00Z`. It is configured to track `created_at` field in issues returned by
`_get_issues_page` and then yielded. It will store the newest `created_at` value in `dlt`
[state](state.md) and make it available in `created_at.last_value` on next pipeline
[state](state.md) and make it available in `created_at.start_value` on next pipeline
run. This value is used to request only issues newer (or equal) via GitHub API.

On the first run of this resource, all the issues (we use "1970-01-01T00:00:00Z" as initial to get
all of them) will be loaded and the `created_at.last_value` will get the `created_at` of most recent
issue. On the second run we'll pass this value to `_get_issues_page` to get only the newer issues.
In essence, `dlt.sources.incremental` instance above
* **created_at.initial_value** which is always equal to "1970-01-01T00:00:00Z" passed in constructor
* **created_at.start_value** a maximum `created_at` value from the previous run or the **initial_value** on first run
* **created_at.last_value** a "real time" `created_at` value updated with each yielded item or page. before first yield it equals **start_value**
* **created_at.end_value** (here not used) [marking end of backfill range](#using-dltsourcesincremental-for-backfill)

When paginating you probably need **start_value** which does not change during the execution of the resource, however
most paginators will return a **next page** link which you should use.

Behind the scenes, `dlt` will deduplicate the results ie. in case the last issue is returned again
(`created_at` filter is inclusive) and skip already loaded ones. In the example below we
incrementally load the GitHub events, where API does not let us filter for the newest events - it
always returns all of them. Nevertheless, `dlt` will load only the incremental part, skipping all the
always returns all of them. Nevertheless, `dlt` will load only the new items, filtering out all the
duplicates and past issues.

```python
Expand Down Expand Up @@ -215,7 +222,13 @@ The `start_out_of_range` boolean flag is set when the first such element is yiel
since we know that github returns results ordered from newest to oldest, we know that all subsequent
items will be filtered out anyway and there's no need to fetch more data.

`dlt.sources.incremental` allows to define custom `last_value` function. This lets you define
### max, min or custom `last_value_func`

`dlt.sources.incremental` allows to choose a function that orders (compares) values coming from the items to current `last_value`.
* The default function is built-in `max` which returns bigger value of the two
* Another built-in `min` returns smaller value.

You can pass your custom function as well. This lets you define
`last_value` on complex types i.e. dictionaries and store indexes of last values, not just simple
types. The `last_value` argument is a [JSON Path](https://github.com/json-path/JsonPath#operators)
and lets you select nested and complex data (including the whole data item when `$` is used).
Expand Down Expand Up @@ -244,6 +257,8 @@ def get_events(last_created_at = dlt.sources.incremental("$", last_value_func=by
yield json.load(f)
```

### Deduplication primary_key

`dlt.sources.incremental` let's you optionally set a `primary_key` that is used exclusively to
deduplicate and which does not become a table hint. The same setting lets you disable the
deduplication altogether when empty tuple is passed. Below we pass `primary_key` directly to
Expand Down Expand Up @@ -304,7 +319,7 @@ def repo_issues(
created_at = dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z", end_value="2022-07-01T00:00:00Z")
):
# get issues from created from last "created_at" value
for page in _get_issues_page(access_token, repository, since=created_at.last_value, until=created_at.end_value):
for page in _get_issues_page(access_token, repository, since=created_at.start_value, until=created_at.end_value):
yield page
```
Above we use `initial_value` and `end_value` arguments of the `incremental` to define the range of issues that we want to retrieve
Expand Down Expand Up @@ -345,7 +360,7 @@ def tickets(
),
):
for page in zendesk_client.get_pages(
"/api/v2/incremental/tickets", "tickets", start_time=updated_at.last_value
"/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value
):
yield page
```
Expand Down Expand Up @@ -464,7 +479,7 @@ def tickets(
),
):
for page in zendesk_client.get_pages(
"/api/v2/incremental/tickets", "tickets", start_time=updated_at.last_value
"/api/v2/incremental/tickets", "tickets", start_time=updated_at.start_value
):
yield page

Expand Down
6 changes: 6 additions & 0 deletions docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ tables.users.apply_hints(
pipeline.run(tables)
```

To just change a name of a table to which resource will load data, do the following:
```python
tables = sql_database()
tables.users.table_name = "other_users"
```

## Load resources

You can pass individual resources or list of resources to the `dlt.pipeline` object. The resources
Expand Down
6 changes: 6 additions & 0 deletions docs/website/docs/walkthroughs/adjust-a-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ players_games:
Run the pipeline script again and make sure that the change is visible in export schema. Then,
[launch the Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md) to see the changed data.
:::note
Do not rename the tables or columns in the yaml file. `dlt` infers those from the data so the schema will be recreated.
You can [adjust the schema](../general-usage/resource.md#adjust-schema) in Python before resource is loaded.
:::


### Load data as json instead of generating child table or columns from flattened dicts

In the export schema, you can see that white and black players properties got flattened into:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dlt"
version = "0.3.12"
version = "0.3.13"
description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run."
authors = ["dltHub Inc. <[email protected]>"]
maintainers = [ "Marcin Rudolf <[email protected]>", "Adrian Brudaru <[email protected]>", "Ty Dunn <[email protected]>"]
Expand Down
9 changes: 9 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,15 @@ def yield_twice(item):
assert list(dlt.resource(["A", "b", "C"], name="data") | tx_stage) == ['A', 'A', 'B', 'B', 'C', 'C']


def test_set_table_name() -> None:
r = dlt.resource(["A", "b", "C"], name="data")
assert r.table_name == "data"
r.table_name = "letters"
assert r.table_name == "letters"
r.table_name = lambda letter: letter
assert callable(r.table_name)


def test_select_resources() -> None:

@dlt.source
Expand Down
50 changes: 48 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import logging
import os
import random
Expand All @@ -13,9 +14,8 @@
from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, UnknownDestinationModule
from dlt.common.pipeline import PipelineContext
from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.utils import uniq_id
from dlt.extract.exceptions import SourceExhausted
from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted
from dlt.extract.extract import ExtractorStorage
from dlt.extract.source import DltResource, DltSource
from dlt.load.exceptions import LoadClientJobFailed
Expand Down Expand Up @@ -193,6 +193,7 @@ def test_create_pipeline_all_destinations(destination_config: DestinationTestCon
p = dlt.pipeline(pipeline_name=destination_config.destination + "_pipeline", destination=destination_config.destination, staging=destination_config.staging)
# are capabilities injected
caps = p._container[DestinationCapabilitiesContext]
print(caps.naming_convention)
# are right naming conventions created
assert p._default_naming.max_length == min(caps.max_column_identifier_length, caps.max_identifier_length)
p.extract([1, "2", 3], table_name="data")
Expand Down Expand Up @@ -964,3 +965,48 @@ def test_emojis_resource_names() -> None:
assert_load_info(info)
table = info.load_packages[0].schema_update["_wide_peacock"]
assert table["resource"] == "🦚WidePeacock"


def test_invalid_data_edge_cases() -> None:
# pass not evaluated source function
@dlt.source
def my_source():
return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5)

pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy")
with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(my_source)
assert isinstance(pip_ex.value.__context__, PipeGenInvalid)
assert "dlt.source" in str(pip_ex.value)

def res_return():
return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5)

with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(res_return)
assert isinstance(pip_ex.value.__context__, PipeGenInvalid)
assert "dlt.resource" in str(pip_ex.value)

with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run({"a": "b"}, table_name="data")
assert isinstance(pip_ex.value.__context__, InvalidResourceDataTypeBasic)

# check same cases but that yield
@dlt.source
def my_source_yield():
yield dlt.resource(itertools.count(start=1), name="infinity").add_limit(5)

pipeline = dlt.pipeline(pipeline_name="invalid", destination="dummy")
with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(my_source_yield)
assert isinstance(pip_ex.value.__context__, PipeGenInvalid)
assert "dlt.source" in str(pip_ex.value)

def res_return_yield():
return dlt.resource(itertools.count(start=1), name="infinity").add_limit(5)

with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(res_return_yield)
assert isinstance(pip_ex.value.__context__, PipeGenInvalid)
assert "dlt.resource" in str(pip_ex.value)

0 comments on commit fe4ceee

Please sign in to comment.