From 7843c8c10526e8a9c8c0f0236b76fab69fa50e2a Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 18 Dec 2024 16:05:28 +0000 Subject: [PATCH 1/8] add: defillama `tvl` asset --- warehouse/oso_dagster/assets/defillama.py | 375 ++++++++++++++++++++++ 1 file changed, 375 insertions(+) create mode 100644 warehouse/oso_dagster/assets/defillama.py diff --git a/warehouse/oso_dagster/assets/defillama.py b/warehouse/oso_dagster/assets/defillama.py new file mode 100644 index 000000000..1b73bb367 --- /dev/null +++ b/warehouse/oso_dagster/assets/defillama.py @@ -0,0 +1,375 @@ +from dlt.sources.rest_api.typing import RESTAPIConfig + +from ..factories.rest import create_rest_factory_asset + +# Chains supported by DeFi Llama (https://defillama.com/chains) +DEFI_LLAMA_CHAINS = [ + "ethereum", + "solana", + "tron", + "bitcoin", + "bsc", + "base", + "arbitrum", + "hyperliquid", + "sui", + "avalanche", + "aptos", + "polygon", + "core", + "optimism", + "pulsechain", + "cronos", + "blast", + "bitlayer", + "cardano", + "mantle", + "linea", + "thorchain", + "taiko", + "dydx", + "bsquared", + "gnosis", + "zircuit", + "near", + "starknet", + "ton", + "rootstock", + "scroll", + "sei", + "ailayer", + "kaia", + "bob", + "eos", + "kava", + "hedera", + "merlin", + "algorand", + "ronin", + "mode", + "multiversx", + "osmosis", + "celo", + "fantom", + "mixin", + "stacks", + "zksync era", + "filecoin", + "iota evm", + "wemix3.0", + "xrpl", + "bouncebit", + "hydration", + "fuel ignition", + "fraxtal", + "neutron", + "tezos", + "metis", + "icp", + "telos", + "injective", + "stellar", + "manta", + "flow", + "cronos zkevm", + "radix", + "conflux", + "bifrost network", + "flare", + "kujira", + "apechain", + "aurora", + "mayachain", + "astar", + "xdc", + "moonbeam", + "k2", + "map protocol", + "iotex", + "reya network", + "rollux", + "zklink nova", + "opbnb", + "icon", + "chainflip", + "secret", + "defichain", + "waves", + "bitcoincash", + "bifrost", + "neo", + "immutable zkevm", + "chiliz", + "zetachain", + "alephium", + "fsc", + "polygon zkevm", + "eos evm", + "ontology", + "canto", + "doge", + "ergo", + "x layer", + "terra2", + "onus", + "eclipse", + "venom", + "polynomial", + "dogechain", + "proton", + "morph", + "litecoin", + "dexalot", + "moonriver", + "wanchain", + "beam", + "dymension", + "oraichain", + "nuls", + "godwokenv1", + "vite", + "dfs network", + "dfk", + "carbon", + "vision", + "shibarium", + "oasys", + "acala", + "renec", + "harmony", + "interlay", + "horizen eon", + "lisk", + "gravity", + "ux", + "functionx", + "hydra", + "bevm", + "smartbch", + "aelf", + "oasis sapphire", + "re.al", + "thundercore", + "meter", + "archway", + "zilliqa", + "airdao", + "boba", + "oktchain", + "defiverse", + "theta", + "world chain", + "songbird", + "wax", + "kadena", + "velas", + "etherlink", + "kcc", + "xai", + "persistence one", + "elastos", + "astar zkevm", + "cosmoshub", + "xpla", + "arbitrum nova", + "terra classic", + "milkomeda c1", + "fuse", + "everscale", + "haqq", + "idex", + "genesys", + "rangers", + "degen", + "equilibrium", + "bitrock", + "oasis emerald", + "neon", + "elysium", + "rss3", + "vechain", + "taraxa", + "heco", + "ink", + "satoshivm", + "skale europa", + "bahamut", + "sanko", + "shimmerevm", + "lachain network", + "obyte", + "energi", + "csc", + "evmos", + "ultron", + "starcoin", + "saakuru", + "naka", + "ethereumclassic", + "viction", + "bittorrent", + "nos", + "chihuahua", + "bostrom", + "karura", + "juno", + "kroma", + "boba_bnb", + "hela", + "enuls", + "migaloo", + "unit0", + "crab", + "mint", + "tombchain", + "nolus", + "ancient8", + "lightlink", + "massa", + "kintsugi", + "dash", + "energyweb", + "defichain evm", + "zora", + "nibiru", + "step", + "shape", + "inevm", + "kardia", + "ethereumpow", + "sora", + "loop", + "zkfair", + "meer", + "matchain", + "libre", + "alv", + "godwoken", + "stargaze", + "endurance", + "q protocol", + "corn", + "rari", + "newton", + "nahmii", + "electroneum", + "shiden", + "aura network", + "bitkub chain", + "polkadex", + "comdex", + "sxnetwork", + "findora", + "neo x mainnet", + "crescent", + "grove", + "areon network", + "jbc", + "cyber", + "concordium", + "ethf", + "syscoin", + "callisto", + "rei", + "planq", + "ham", + "multivac", + "cube", + "mvc", + "hpb", + "shido", + "lachain", + "manta atlantic", + "vinuchain", + "darwinia", + "gochain", + "sifchain", + "ontologyevm", + "bitgert", + "reichain", + "tenet", + "zeniq", + "bitnet", + "aeternity", + "milkomeda a1", + "joltify", + "palm", + "asset chain", + "redbelly", + "pego", + "sonic", + "exsat", + "goerli", + "dexit", + "omax", + "celestia", + "bandchain", + "sommelier", + "stride", + "polkadot", + "kusama", + "fusion", + "boba_avax", + "stafi", + "oxfun", + "empire", + "hoo", + "echelon", + "quicksilver", + "clv", + "pokt", + "dsc", + "zksync lite", + "nova network", + "cmp", + "genshiro", + "lamden", + "polis", + "zyx", + "ubiq", + "heiko", + "parallel", + "coti", + "kekchain", + "muuchain", + "tlchain", + "bitindi", + "lung", + "bone", + "lukso", +] + + +config: RESTAPIConfig = { + "client": { + "base_url": "https://api.llama.fi/", + }, + "resource_defaults": { + "endpoint": { + "params": { + "excludeTotalDataChart": False, + "excludeTotalDataChartBreakdown": True, + "fullChart": True, + }, + }, + }, + "resources": list( + map( + lambda chain: { + "name": chain.replace(" ", "_"), + "endpoint": { + "path": f"overview/dexs/{chain.replace(' ', '%20')}", + }, + }, + DEFI_LLAMA_CHAINS, + ) + ), +} + +dlt_assets = create_rest_factory_asset( + config=config, +) + +tvl_assets = dlt_assets( + key_prefix="tvl", +) From 03609d9b5ca2cf1a4e3e487176887fcca16c486b Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 18 Dec 2024 16:06:14 +0000 Subject: [PATCH 2/8] fix: scan top level lists for asset factories --- warehouse/oso_dagster/factories/loader.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/warehouse/oso_dagster/factories/loader.py b/warehouse/oso_dagster/factories/loader.py index 155f7fa9e..8f11a2eeb 100644 --- a/warehouse/oso_dagster/factories/loader.py +++ b/warehouse/oso_dagster/factories/loader.py @@ -80,4 +80,10 @@ def load_assets_factories_from_modules( dag.add(obj) elif isinstance(obj, AssetFactoryResponse): all = all + obj + elif isinstance(obj, list): + for item in obj: + if isinstance(item, EarlyResourcesAssetFactory): + dag.add(item) + elif isinstance(item, AssetFactoryResponse): + all = all + item return all From 253d3943315da17846ef01e7dbc433c13fd894d6 Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 18 Dec 2024 16:07:47 +0000 Subject: [PATCH 3/8] feat: create new `rest` dlt factory --- warehouse/oso_dagster/factories/rest.py | 124 ++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 warehouse/oso_dagster/factories/rest.py diff --git a/warehouse/oso_dagster/factories/rest.py b/warehouse/oso_dagster/factories/rest.py new file mode 100644 index 000000000..dbf14ecfa --- /dev/null +++ b/warehouse/oso_dagster/factories/rest.py @@ -0,0 +1,124 @@ +from typing import Callable, Optional, ParamSpec, TypeVar, Union, cast + +import dlt +from dagster import AssetExecutionContext +from dlt.extract.resource import DltResource +from dlt.sources.rest_api import rest_api_resources +from dlt.sources.rest_api.typing import EndpointResource, RESTAPIConfig + +from . import dlt_factory + +P = ParamSpec("P") +R = TypeVar("R") + +Q = ParamSpec("Q") +T = TypeVar("T") + + +def _rest_source(_rest_source: Callable[Q, T], _asset: Callable[P, R]): + """ + The main factory for creating a REST API source asset. It is a wrapper + used to get full type information for both the REST API source and the + asset factory. + """ + + def _factory(*_args: Q.args, **rest_kwargs: Q.kwargs) -> Callable[P, R]: + """ + Forwards the arguments to the asset factory and returns a new factory, + maintaining full type information for the caller. + """ + + def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs): + """ + Creates a new dlt asset using the `dlt_factory` decorator, forwarding + the arguments to the REST API source. + + The caller should not provide a name for the asset, as it will be + automatically generated for each resource in the REST API configuration. + """ + + name = asset_kwargs.get("name", None) + if name is not None: + raise ValueError( + "Names will be automatically generated for `rest_factory`" + ) + + key_prefix = asset_kwargs.get("key_prefix", None) + if key_prefix is None: + raise ValueError("Key prefix is required for `rest_factory`") + + config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None)) + if config is None: + raise ValueError("Config is required for `rest_factory`") + + config_resources = config.pop("resources", None) # type: ignore + if config_resources is None: + raise ValueError("Resources is required for `rest_factory`") + + def create_asset_for_resource(resource_ref, config_ref): + """ + The internal function that creates a new asset for a given resource to use + the correct reference. If not, the asset will be created with the wrong + reference, the last resource in the configuration: + + https://pylint.readthedocs.io/en/latest/user_guide/messages/warning/cell-var-from-loop.html + """ + + resource_ref = cast( + Union[str, EndpointResource, DltResource], resource_ref + ) + + resource_name = None + + if isinstance(resource_ref, str): + resource_name = resource_ref + elif isinstance(resource_ref, dict): + resource_name = resource_ref.get("name", None) + elif isinstance(resource_ref, DltResource): + resource_name = resource_ref.name + + if not isinstance(resource_name, str): + raise ValueError("Failed to extract resource name from reference") + + asset_tags: dict = cast(dict, asset_kwargs.get("tags", {})) + asset_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}" + + asset_kwargs["tags"] = asset_tags + + @dlt_factory(name=resource_name, **asset_kwargs) + def _dlt_ref_asset(context: AssetExecutionContext): + """ + The dlt asset function that creates the REST API source asset. + """ + + context.log.info( + f"Rest factory materializing asset: {key_prefix}/{resource_name}" + ) + + rest_api_config = cast(RESTAPIConfig, config_ref) + + rest_api_config["resources"] = [resource_ref] + + @dlt.source(max_table_nesting=0) + def _internal_source(): + """ + The internal source function that creates the REST API source. + """ + + yield from rest_api_resources(rest_api_config, **rest_kwargs) + + return _internal_source() + + return _dlt_ref_asset + + return [ + create_asset_for_resource(resource_ref, {**config}) + for resource_ref in config_resources + ] + + return cast(Callable[P, R], _wrapper) + + return _factory + + +create_rest_factory_asset = _rest_source(rest_api_resources, dlt_factory) From 4162f530d9cfb65236da79512221addad0c190df Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 18 Dec 2024 17:58:33 +0000 Subject: [PATCH 4/8] fix: update name to `defillama` --- warehouse/oso_dagster/assets/defillama.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warehouse/oso_dagster/assets/defillama.py b/warehouse/oso_dagster/assets/defillama.py index 1b73bb367..e5c9616ef 100644 --- a/warehouse/oso_dagster/assets/defillama.py +++ b/warehouse/oso_dagster/assets/defillama.py @@ -370,6 +370,6 @@ config=config, ) -tvl_assets = dlt_assets( - key_prefix="tvl", +defillama_assets = dlt_assets( + key_prefix="defillama", ) From c2899b8ba844d78958b57d5dea065b31d86ebe8c Mon Sep 17 00:00:00 2001 From: Javier Date: Sat, 21 Dec 2024 21:28:02 +0000 Subject: [PATCH 5/8] fix: use `protocols/{protocol}` endpoint --- warehouse/oso_dagster/assets/defillama.py | 363 +--------------------- 1 file changed, 13 insertions(+), 350 deletions(-) diff --git a/warehouse/oso_dagster/assets/defillama.py b/warehouse/oso_dagster/assets/defillama.py index e5c9616ef..c9eedd308 100644 --- a/warehouse/oso_dagster/assets/defillama.py +++ b/warehouse/oso_dagster/assets/defillama.py @@ -2,341 +2,12 @@ from ..factories.rest import create_rest_factory_asset -# Chains supported by DeFi Llama (https://defillama.com/chains) -DEFI_LLAMA_CHAINS = [ - "ethereum", - "solana", - "tron", - "bitcoin", - "bsc", - "base", - "arbitrum", - "hyperliquid", - "sui", - "avalanche", - "aptos", - "polygon", - "core", - "optimism", - "pulsechain", - "cronos", - "blast", - "bitlayer", - "cardano", - "mantle", - "linea", - "thorchain", - "taiko", - "dydx", - "bsquared", - "gnosis", - "zircuit", - "near", - "starknet", - "ton", - "rootstock", - "scroll", - "sei", - "ailayer", - "kaia", - "bob", - "eos", - "kava", - "hedera", - "merlin", - "algorand", - "ronin", - "mode", - "multiversx", - "osmosis", - "celo", - "fantom", - "mixin", - "stacks", - "zksync era", - "filecoin", - "iota evm", - "wemix3.0", - "xrpl", - "bouncebit", - "hydration", - "fuel ignition", - "fraxtal", - "neutron", - "tezos", - "metis", - "icp", - "telos", - "injective", - "stellar", - "manta", - "flow", - "cronos zkevm", - "radix", - "conflux", - "bifrost network", - "flare", - "kujira", - "apechain", - "aurora", - "mayachain", - "astar", - "xdc", - "moonbeam", - "k2", - "map protocol", - "iotex", - "reya network", - "rollux", - "zklink nova", - "opbnb", - "icon", - "chainflip", - "secret", - "defichain", - "waves", - "bitcoincash", - "bifrost", - "neo", - "immutable zkevm", - "chiliz", - "zetachain", - "alephium", - "fsc", - "polygon zkevm", - "eos evm", - "ontology", - "canto", - "doge", - "ergo", - "x layer", - "terra2", - "onus", - "eclipse", - "venom", - "polynomial", - "dogechain", - "proton", - "morph", - "litecoin", - "dexalot", - "moonriver", - "wanchain", - "beam", - "dymension", - "oraichain", - "nuls", - "godwokenv1", - "vite", - "dfs network", - "dfk", - "carbon", - "vision", - "shibarium", - "oasys", - "acala", - "renec", - "harmony", - "interlay", - "horizen eon", - "lisk", - "gravity", - "ux", - "functionx", - "hydra", - "bevm", - "smartbch", - "aelf", - "oasis sapphire", - "re.al", - "thundercore", - "meter", - "archway", - "zilliqa", - "airdao", - "boba", - "oktchain", - "defiverse", - "theta", - "world chain", - "songbird", - "wax", - "kadena", - "velas", - "etherlink", - "kcc", - "xai", - "persistence one", - "elastos", - "astar zkevm", - "cosmoshub", - "xpla", - "arbitrum nova", - "terra classic", - "milkomeda c1", - "fuse", - "everscale", - "haqq", - "idex", - "genesys", - "rangers", - "degen", - "equilibrium", - "bitrock", - "oasis emerald", - "neon", - "elysium", - "rss3", - "vechain", - "taraxa", - "heco", - "ink", - "satoshivm", - "skale europa", - "bahamut", - "sanko", - "shimmerevm", - "lachain network", - "obyte", - "energi", - "csc", - "evmos", - "ultron", - "starcoin", - "saakuru", - "naka", - "ethereumclassic", - "viction", - "bittorrent", - "nos", - "chihuahua", - "bostrom", - "karura", - "juno", - "kroma", - "boba_bnb", - "hela", - "enuls", - "migaloo", - "unit0", - "crab", - "mint", - "tombchain", - "nolus", - "ancient8", - "lightlink", - "massa", - "kintsugi", - "dash", - "energyweb", - "defichain evm", - "zora", - "nibiru", - "step", - "shape", - "inevm", - "kardia", - "ethereumpow", - "sora", - "loop", - "zkfair", - "meer", - "matchain", - "libre", - "alv", - "godwoken", - "stargaze", - "endurance", - "q protocol", - "corn", - "rari", - "newton", - "nahmii", - "electroneum", - "shiden", - "aura network", - "bitkub chain", - "polkadex", - "comdex", - "sxnetwork", - "findora", - "neo x mainnet", - "crescent", - "grove", - "areon network", - "jbc", - "cyber", - "concordium", - "ethf", - "syscoin", - "callisto", - "rei", - "planq", - "ham", - "multivac", - "cube", - "mvc", - "hpb", - "shido", - "lachain", - "manta atlantic", - "vinuchain", - "darwinia", - "gochain", - "sifchain", - "ontologyevm", - "bitgert", - "reichain", - "tenet", - "zeniq", - "bitnet", - "aeternity", - "milkomeda a1", - "joltify", - "palm", - "asset chain", - "redbelly", - "pego", - "sonic", - "exsat", - "goerli", - "dexit", - "omax", - "celestia", - "bandchain", - "sommelier", - "stride", - "polkadot", - "kusama", - "fusion", - "boba_avax", - "stafi", - "oxfun", - "empire", - "hoo", - "echelon", - "quicksilver", - "clv", - "pokt", - "dsc", - "zksync lite", - "nova network", - "cmp", - "genshiro", - "lamden", - "polis", - "zyx", - "ubiq", - "heiko", - "parallel", - "coti", - "kekchain", - "muuchain", - "tlchain", - "bitindi", - "lung", - "bone", - "lukso", +DEFI_LLAMA_PROTOCOLS = [ + "aave-v1", + "aave-v2", + "aave-v3", + "optimism-bridge", + # ... ] @@ -344,24 +15,16 @@ "client": { "base_url": "https://api.llama.fi/", }, - "resource_defaults": { - "endpoint": { - "params": { - "excludeTotalDataChart": False, - "excludeTotalDataChartBreakdown": True, - "fullChart": True, - }, - }, - }, "resources": list( map( - lambda chain: { - "name": chain.replace(" ", "_"), + lambda protocol: { + "name": f"{protocol.replace('-', '_')}", "endpoint": { - "path": f"overview/dexs/{chain.replace(' ', '%20')}", + "path": f"protocol/{protocol}", + "data_selector": "$", }, }, - DEFI_LLAMA_CHAINS, + DEFI_LLAMA_PROTOCOLS, ) ), } @@ -370,6 +33,6 @@ config=config, ) -defillama_assets = dlt_assets( - key_prefix="defillama", +defillama_tvl_assets = dlt_assets( + key_prefix=["defillama", "tvl"], ) From 0de53fd11119ca9d8874b9ad6f9269f3512c040a Mon Sep 17 00:00:00 2001 From: Javier Date: Sat, 21 Dec 2024 21:39:15 +0000 Subject: [PATCH 6/8] add: `growthepie` fundamentals tvl source --- warehouse/oso_dagster/assets/growthepie.py | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 warehouse/oso_dagster/assets/growthepie.py diff --git a/warehouse/oso_dagster/assets/growthepie.py b/warehouse/oso_dagster/assets/growthepie.py new file mode 100644 index 000000000..c24e934cc --- /dev/null +++ b/warehouse/oso_dagster/assets/growthepie.py @@ -0,0 +1,26 @@ +from dlt.sources.rest_api.typing import RESTAPIConfig + +from ..factories.rest import create_rest_factory_asset + +config: RESTAPIConfig = { + "client": { + "base_url": "https://api.growthepie.xyz/v1", + }, + "resources": [ + { + "name": "fundamentals_full", + "endpoint": { + "path": "fundamentals_full.json", + "data_selector": "$", + }, + } + ], +} + +dlt_assets = create_rest_factory_asset( + config=config, +) + +growthepie_assets = dlt_assets( + key_prefix="growthepie", +) From ef004bd5c93f81ec7c1511b20e4144bc88ce978d Mon Sep 17 00:00:00 2001 From: Javier Date: Sat, 21 Dec 2024 21:41:05 +0000 Subject: [PATCH 7/8] fix: support `prefixed` dlt assets --- warehouse/oso_dagster/factories/dlt.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/warehouse/oso_dagster/factories/dlt.py b/warehouse/oso_dagster/factories/dlt.py index 2f71e5baf..8b377b954 100644 --- a/warehouse/oso_dagster/factories/dlt.py +++ b/warehouse/oso_dagster/factories/dlt.py @@ -191,12 +191,24 @@ def _dlt_asset( context.log.debug("dlt pipeline setup with bigquery and jsonl") dlt_run_options["loader_file_format"] = "jsonl" + dlt_prefix_keys = ( + [key_prefix] + if isinstance(key_prefix, str) + else (key_prefix or []) + ) + dlt_source_name = ( + dlt_prefix_keys[-1] if dlt_prefix_keys else key_prefix_str + ) + dlt_key_prefix = dlt_prefix_keys[:-1] + results = dlt.run( context=context, dlt_source=dlt_source, dlt_pipeline=pipeline, dagster_dlt_translator=PrefixedDltTranslator( - source_name=key_prefix_str, tags=dict(tags) + prefix=dlt_key_prefix, + source_name=dlt_source_name, + tags=dict(tags), ), **dlt_run_options, ) From 159eaa46df48f373c08d50087b638c9a5d72a121 Mon Sep 17 00:00:00 2001 From: Javier Date: Sat, 21 Dec 2024 21:42:02 +0000 Subject: [PATCH 8/8] fix: yield correct dlt `resource` --- warehouse/oso_dagster/factories/rest.py | 34 +++++++++++++------------ 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/warehouse/oso_dagster/factories/rest.py b/warehouse/oso_dagster/factories/rest.py index dbf14ecfa..c575fa42c 100644 --- a/warehouse/oso_dagster/factories/rest.py +++ b/warehouse/oso_dagster/factories/rest.py @@ -1,4 +1,4 @@ -from typing import Callable, Optional, ParamSpec, TypeVar, Union, cast +from typing import Callable, Optional, ParamSpec, Sequence, TypeVar, Union, cast import dlt from dagster import AssetExecutionContext @@ -43,10 +43,16 @@ def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs): "Names will be automatically generated for `rest_factory`" ) - key_prefix = asset_kwargs.get("key_prefix", None) + key_prefix = cast( + Optional[Union[str, Sequence[str]]], + asset_kwargs.get("key_prefix", None), + ) if key_prefix is None: raise ValueError("Key prefix is required for `rest_factory`") + if not isinstance(key_prefix, str): + key_prefix = "/".join(key_prefix) + config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None)) if config is None: raise ValueError("Config is required for `rest_factory`") @@ -80,12 +86,10 @@ def create_asset_for_resource(resource_ref, config_ref): if not isinstance(resource_name, str): raise ValueError("Failed to extract resource name from reference") - asset_tags: dict = cast(dict, asset_kwargs.get("tags", {})) - asset_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}" - - asset_kwargs["tags"] = asset_tags + op_tags: dict = cast(dict, asset_kwargs.pop("op_tags", {})) + op_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}" - @dlt_factory(name=resource_name, **asset_kwargs) + @dlt_factory(name=resource_name, op_tags=op_tags, **asset_kwargs) def _dlt_ref_asset(context: AssetExecutionContext): """ The dlt asset function that creates the REST API source asset. @@ -99,15 +103,13 @@ def _dlt_ref_asset(context: AssetExecutionContext): rest_api_config["resources"] = [resource_ref] - @dlt.source(max_table_nesting=0) - def _internal_source(): - """ - The internal source function that creates the REST API source. - """ - - yield from rest_api_resources(rest_api_config, **rest_kwargs) - - return _internal_source() + for resource in rest_api_resources(rest_api_config, **rest_kwargs): + yield dlt.resource( + resource, + name=resource_name, + max_table_nesting=0, + write_disposition="merge", + ) return _dlt_ref_asset