diff --git a/warehouse/oso_dagster/assets/defillama.py b/warehouse/oso_dagster/assets/defillama.py new file mode 100644 index 000000000..c9eedd308 --- /dev/null +++ b/warehouse/oso_dagster/assets/defillama.py @@ -0,0 +1,38 @@ +from dlt.sources.rest_api.typing import RESTAPIConfig + +from ..factories.rest import create_rest_factory_asset + +DEFI_LLAMA_PROTOCOLS = [ + "aave-v1", + "aave-v2", + "aave-v3", + "optimism-bridge", + # ... +] + + +config: RESTAPIConfig = { + "client": { + "base_url": "https://api.llama.fi/", + }, + "resources": list( + map( + lambda protocol: { + "name": f"{protocol.replace('-', '_')}", + "endpoint": { + "path": f"protocol/{protocol}", + "data_selector": "$", + }, + }, + DEFI_LLAMA_PROTOCOLS, + ) + ), +} + +dlt_assets = create_rest_factory_asset( + config=config, +) + +defillama_tvl_assets = dlt_assets( + key_prefix=["defillama", "tvl"], +) diff --git a/warehouse/oso_dagster/assets/growthepie.py b/warehouse/oso_dagster/assets/growthepie.py new file mode 100644 index 000000000..c24e934cc --- /dev/null +++ b/warehouse/oso_dagster/assets/growthepie.py @@ -0,0 +1,26 @@ +from dlt.sources.rest_api.typing import RESTAPIConfig + +from ..factories.rest import create_rest_factory_asset + +config: RESTAPIConfig = { + "client": { + "base_url": "https://api.growthepie.xyz/v1", + }, + "resources": [ + { + "name": "fundamentals_full", + "endpoint": { + "path": "fundamentals_full.json", + "data_selector": "$", + }, + } + ], +} + +dlt_assets = create_rest_factory_asset( + config=config, +) + +growthepie_assets = dlt_assets( + key_prefix="growthepie", +) diff --git a/warehouse/oso_dagster/factories/dlt.py b/warehouse/oso_dagster/factories/dlt.py index 2f71e5baf..8b377b954 100644 --- a/warehouse/oso_dagster/factories/dlt.py +++ b/warehouse/oso_dagster/factories/dlt.py @@ -191,12 +191,24 @@ def _dlt_asset( context.log.debug("dlt pipeline setup with bigquery and jsonl") dlt_run_options["loader_file_format"] = "jsonl" + dlt_prefix_keys = ( + [key_prefix] + if isinstance(key_prefix, str) + else (key_prefix or []) + ) + dlt_source_name = ( + dlt_prefix_keys[-1] if dlt_prefix_keys else key_prefix_str + ) + dlt_key_prefix = dlt_prefix_keys[:-1] + results = dlt.run( context=context, dlt_source=dlt_source, dlt_pipeline=pipeline, dagster_dlt_translator=PrefixedDltTranslator( - source_name=key_prefix_str, tags=dict(tags) + prefix=dlt_key_prefix, + source_name=dlt_source_name, + tags=dict(tags), ), **dlt_run_options, ) diff --git a/warehouse/oso_dagster/factories/loader.py b/warehouse/oso_dagster/factories/loader.py index 155f7fa9e..8f11a2eeb 100644 --- a/warehouse/oso_dagster/factories/loader.py +++ b/warehouse/oso_dagster/factories/loader.py @@ -80,4 +80,10 @@ def load_assets_factories_from_modules( dag.add(obj) elif isinstance(obj, AssetFactoryResponse): all = all + obj + elif isinstance(obj, list): + for item in obj: + if isinstance(item, EarlyResourcesAssetFactory): + dag.add(item) + elif isinstance(item, AssetFactoryResponse): + all = all + item return all diff --git a/warehouse/oso_dagster/factories/rest.py b/warehouse/oso_dagster/factories/rest.py new file mode 100644 index 000000000..c575fa42c --- /dev/null +++ b/warehouse/oso_dagster/factories/rest.py @@ -0,0 +1,126 @@ +from typing import Callable, Optional, ParamSpec, Sequence, TypeVar, Union, cast + +import dlt +from dagster import AssetExecutionContext +from dlt.extract.resource import DltResource +from dlt.sources.rest_api import rest_api_resources +from dlt.sources.rest_api.typing import EndpointResource, RESTAPIConfig + +from . import dlt_factory + +P = ParamSpec("P") +R = TypeVar("R") + +Q = ParamSpec("Q") +T = TypeVar("T") + + +def _rest_source(_rest_source: Callable[Q, T], _asset: Callable[P, R]): + """ + The main factory for creating a REST API source asset. It is a wrapper + used to get full type information for both the REST API source and the + asset factory. + """ + + def _factory(*_args: Q.args, **rest_kwargs: Q.kwargs) -> Callable[P, R]: + """ + Forwards the arguments to the asset factory and returns a new factory, + maintaining full type information for the caller. + """ + + def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs): + """ + Creates a new dlt asset using the `dlt_factory` decorator, forwarding + the arguments to the REST API source. + + The caller should not provide a name for the asset, as it will be + automatically generated for each resource in the REST API configuration. + """ + + name = asset_kwargs.get("name", None) + if name is not None: + raise ValueError( + "Names will be automatically generated for `rest_factory`" + ) + + key_prefix = cast( + Optional[Union[str, Sequence[str]]], + asset_kwargs.get("key_prefix", None), + ) + if key_prefix is None: + raise ValueError("Key prefix is required for `rest_factory`") + + if not isinstance(key_prefix, str): + key_prefix = "/".join(key_prefix) + + config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None)) + if config is None: + raise ValueError("Config is required for `rest_factory`") + + config_resources = config.pop("resources", None) # type: ignore + if config_resources is None: + raise ValueError("Resources is required for `rest_factory`") + + def create_asset_for_resource(resource_ref, config_ref): + """ + The internal function that creates a new asset for a given resource to use + the correct reference. If not, the asset will be created with the wrong + reference, the last resource in the configuration: + + https://pylint.readthedocs.io/en/latest/user_guide/messages/warning/cell-var-from-loop.html + """ + + resource_ref = cast( + Union[str, EndpointResource, DltResource], resource_ref + ) + + resource_name = None + + if isinstance(resource_ref, str): + resource_name = resource_ref + elif isinstance(resource_ref, dict): + resource_name = resource_ref.get("name", None) + elif isinstance(resource_ref, DltResource): + resource_name = resource_ref.name + + if not isinstance(resource_name, str): + raise ValueError("Failed to extract resource name from reference") + + op_tags: dict = cast(dict, asset_kwargs.pop("op_tags", {})) + op_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}" + + @dlt_factory(name=resource_name, op_tags=op_tags, **asset_kwargs) + def _dlt_ref_asset(context: AssetExecutionContext): + """ + The dlt asset function that creates the REST API source asset. + """ + + context.log.info( + f"Rest factory materializing asset: {key_prefix}/{resource_name}" + ) + + rest_api_config = cast(RESTAPIConfig, config_ref) + + rest_api_config["resources"] = [resource_ref] + + for resource in rest_api_resources(rest_api_config, **rest_kwargs): + yield dlt.resource( + resource, + name=resource_name, + max_table_nesting=0, + write_disposition="merge", + ) + + return _dlt_ref_asset + + return [ + create_asset_for_resource(resource_ref, {**config}) + for resource_ref in config_resources + ] + + return cast(Callable[P, R], _wrapper) + + return _factory + + +create_rest_factory_asset = _rest_source(rest_api_resources, dlt_factory)