From 159eaa46df48f373c08d50087b638c9a5d72a121 Mon Sep 17 00:00:00 2001 From: Javier Date: Sat, 21 Dec 2024 21:42:02 +0000 Subject: [PATCH] fix: yield correct dlt `resource` --- warehouse/oso_dagster/factories/rest.py | 34 +++++++++++++------------ 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/warehouse/oso_dagster/factories/rest.py b/warehouse/oso_dagster/factories/rest.py index dbf14ecf..c575fa42 100644 --- a/warehouse/oso_dagster/factories/rest.py +++ b/warehouse/oso_dagster/factories/rest.py @@ -1,4 +1,4 @@ -from typing import Callable, Optional, ParamSpec, TypeVar, Union, cast +from typing import Callable, Optional, ParamSpec, Sequence, TypeVar, Union, cast import dlt from dagster import AssetExecutionContext @@ -43,10 +43,16 @@ def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs): "Names will be automatically generated for `rest_factory`" ) - key_prefix = asset_kwargs.get("key_prefix", None) + key_prefix = cast( + Optional[Union[str, Sequence[str]]], + asset_kwargs.get("key_prefix", None), + ) if key_prefix is None: raise ValueError("Key prefix is required for `rest_factory`") + if not isinstance(key_prefix, str): + key_prefix = "/".join(key_prefix) + config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None)) if config is None: raise ValueError("Config is required for `rest_factory`") @@ -80,12 +86,10 @@ def create_asset_for_resource(resource_ref, config_ref): if not isinstance(resource_name, str): raise ValueError("Failed to extract resource name from reference") - asset_tags: dict = cast(dict, asset_kwargs.get("tags", {})) - asset_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}" - - asset_kwargs["tags"] = asset_tags + op_tags: dict = cast(dict, asset_kwargs.pop("op_tags", {})) + op_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}" - @dlt_factory(name=resource_name, **asset_kwargs) + @dlt_factory(name=resource_name, op_tags=op_tags, **asset_kwargs) def _dlt_ref_asset(context: AssetExecutionContext): """ The dlt asset function that creates the REST API source asset. @@ -99,15 +103,13 @@ def _dlt_ref_asset(context: AssetExecutionContext): rest_api_config["resources"] = [resource_ref] - @dlt.source(max_table_nesting=0) - def _internal_source(): - """ - The internal source function that creates the REST API source. - """ - - yield from rest_api_resources(rest_api_config, **rest_kwargs) - - return _internal_source() + for resource in rest_api_resources(rest_api_config, **rest_kwargs): + yield dlt.resource( + resource, + name=resource_name, + max_table_nesting=0, + write_disposition="merge", + ) return _dlt_ref_asset