Skip to content

Commit

Permalink
fix: yield correct dlt resource
Browse files Browse the repository at this point in the history
  • Loading branch information
Jabolol committed Dec 21, 2024
1 parent ef004bd commit 159eaa4
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions warehouse/oso_dagster/factories/rest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Optional, ParamSpec, TypeVar, Union, cast
from typing import Callable, Optional, ParamSpec, Sequence, TypeVar, Union, cast

import dlt
from dagster import AssetExecutionContext
Expand Down Expand Up @@ -43,10 +43,16 @@ def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs):
"Names will be automatically generated for `rest_factory`"
)

key_prefix = asset_kwargs.get("key_prefix", None)
key_prefix = cast(
Optional[Union[str, Sequence[str]]],
asset_kwargs.get("key_prefix", None),
)
if key_prefix is None:
raise ValueError("Key prefix is required for `rest_factory`")

if not isinstance(key_prefix, str):
key_prefix = "/".join(key_prefix)

config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None))
if config is None:
raise ValueError("Config is required for `rest_factory`")
Expand Down Expand Up @@ -80,12 +86,10 @@ def create_asset_for_resource(resource_ref, config_ref):
if not isinstance(resource_name, str):
raise ValueError("Failed to extract resource name from reference")

asset_tags: dict = cast(dict, asset_kwargs.get("tags", {}))
asset_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}"

asset_kwargs["tags"] = asset_tags
op_tags: dict = cast(dict, asset_kwargs.pop("op_tags", {}))
op_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}"

@dlt_factory(name=resource_name, **asset_kwargs)
@dlt_factory(name=resource_name, op_tags=op_tags, **asset_kwargs)
def _dlt_ref_asset(context: AssetExecutionContext):
"""
The dlt asset function that creates the REST API source asset.
Expand All @@ -99,15 +103,13 @@ def _dlt_ref_asset(context: AssetExecutionContext):

rest_api_config["resources"] = [resource_ref]

@dlt.source(max_table_nesting=0)
def _internal_source():
"""
The internal source function that creates the REST API source.
"""

yield from rest_api_resources(rest_api_config, **rest_kwargs)

return _internal_source()
for resource in rest_api_resources(rest_api_config, **rest_kwargs):
yield dlt.resource(
resource,
name=resource_name,
max_table_nesting=0,
write_disposition="merge",
)

return _dlt_ref_asset

Expand Down

0 comments on commit 159eaa4

Please sign in to comment.