Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] dlt add examples for translator, sourceasset, partitions #22822

Merged
merged 11 commits into from
Jul 10, 2024
141 changes: 140 additions & 1 deletion docs/content/integrations/embedded-elt/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,148 @@ And that's it! You should now have two assets that load data to corresponding Sn
---
## Advanced usage
### Overriding the translator to customize dlt assets
The <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" /> object can be used to customize how dlt properties map to Dagster concepts.
For example, to change how the name of the asset is derived, you can override the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_asset_key" /> method, or if you would like to change the key of the upstream source asset, you can override the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
```python file=/integrations/embedded_elt/dlt_dagster_translator.py
from collections.abc import Iterable
import dlt
from dagster_embedded_elt.dlt import (
DagsterDltResource,
DagsterDltTranslator,
dlt_assets,
)
from dagster import AssetExecutionContext, AssetKey
@dlt.source
def example_dlt_source():
def example_resource(): ...
return example_resource
class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_asset_key(self, resource: DagsterDltResource) -> AssetKey:
"""Overrides asset key to be the dlt resource name."""
return AssetKey(f"{resource.name}")
def get_deps_asset_keys(self, resource: DagsterDltResource) -> Iterable[AssetKey]:
"""Overrides upstream asset key to be a single source asset."""
return [AssetKey("common_upstream_dlt_dependency")]
@dlt_assets(
name="example_dlt_assets",
dlt_source=example_dlt_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",
),
dlt_dagster_translator=CustomDagsterDltTranslator(),
)
def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
```
In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets.
### Assigning metadata to upstream source assets
A common question is how to define metadata on the source assets upstream of the dlt assets.
This can be accomplished by defining a <PyObject object="SourceAsset" /> with a key that matches the one defined in the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a <PyObject object="SourceAsset" /> with attributes like `group_name`.
```python file=/integrations/embedded_elt/dlt_source_assets.py
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster import AssetExecutionContext, SourceAsset
@dlt.source
def example_dlt_source():
def example_resource(): ...
return example_resource
@dlt_assets(
dlt_source=example_dlt_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",
),
)
def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
thinkific_source_assets = [
SourceAsset(key, group_name="thinkific")
for key in example_dlt_assets.dependency_keys
]
```
### Using partitions in your dlt assets
While still an experimental feature, it is possible to use partitions within your dlt assets. However, it should be noted that this may result in concurrency related issues as state is managed by dlt. For this reason, it is recommended to set concurrency limits for your partitioned dlt assets. See the [Limiting concurrency in data pipelines](/guides/limiting-concurrency-in-data-pipelines) guide for more details.
That said, here is an example of using static named partitions from a dlt source.
```python file=/integrations/embedded_elt/dlt_partitions.py
from typing import Optional
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster import AssetExecutionContext, StaticPartitionsDefinition
color_partitions = StaticPartitionsDefinition(["red", "green", "blue"])
@dlt.source
def example_dlt_source(color: Optional[str] = None):
def load_colors():
if color:
# partition-specific processing
...
else:
# non-partitioned processing
...
@dlt_assets(
dlt_source=example_dlt_source(),
name="example_dlt_assets",
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
),
partitions_def=color_partitions,
)
def compute(context: AssetExecutionContext, dlt: DagsterDltResource):
color = context.partition_key
yield from dlt.run(context=context, dlt_source=example_dlt_source(color=color))
```
## What's next?
Want to see more real-world examples of dlt in production? Check out how we use it internally at Dagster in [Dagster Open Platform](https://github.com/dagster-io/dagster-open-platform).
Want to see real-world examples of dlt in production? Check out how we use it internally at Dagster in the [Dagster Open Platform](https://github.com/dagster-io/dagster-open-platform) project.
---
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from collections.abc import Iterable

import dlt
from dagster_embedded_elt.dlt import (
DagsterDltResource,
DagsterDltTranslator,
dlt_assets,
)

from dagster import AssetExecutionContext, AssetKey


@dlt.source
def example_dlt_source():
def example_resource(): ...

return example_resource


class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_asset_key(self, resource: DagsterDltResource) -> AssetKey:
"""Overrides asset key to be the dlt resource name."""
return AssetKey(f"{resource.name}")

def get_deps_asset_keys(self, resource: DagsterDltResource) -> Iterable[AssetKey]:
"""Overrides upstream asset key to be a single source asset."""
return [AssetKey("common_upstream_dlt_dependency")]


@dlt_assets(
name="example_dlt_assets",
dlt_source=example_dlt_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",
),
dlt_dagster_translator=CustomDagsterDltTranslator(),
)
def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Optional

import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets

from dagster import AssetExecutionContext, StaticPartitionsDefinition

color_partitions = StaticPartitionsDefinition(["red", "green", "blue"])


@dlt.source
def example_dlt_source(color: Optional[str] = None):
def load_colors():
if color:
# partition-specific processing
...
else:
# non-partitioned processing
...


@dlt_assets(
dlt_source=example_dlt_source(),
name="example_dlt_assets",
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
),
partitions_def=color_partitions,
)
def compute(context: AssetExecutionContext, dlt: DagsterDltResource):
color = context.partition_key
yield from dlt.run(context=context, dlt_source=example_dlt_source(color=color))
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets

from dagster import AssetExecutionContext, SourceAsset


@dlt.source
def example_dlt_source():
def example_resource(): ...

return example_resource


@dlt_assets(
dlt_source=example_dlt_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",
),
)
def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)


thinkific_source_assets = [
SourceAsset(key, group_name="thinkific")
for key in example_dlt_assets.dependency_keys
]