Skip to content

Commit

Permalink
feat: create new rest dlt factory (#2663)
Browse files Browse the repository at this point in the history
* add: defillama `tvl` asset

* fix: scan top level lists for asset factories

* feat: create new `rest` dlt factory

* fix: update name to `defillama`

* fix: use `protocols/{protocol}` endpoint

* add: `growthepie` fundamentals tvl source

* fix: support `prefixed` dlt assets

* fix: yield correct dlt `resource`
  • Loading branch information
Jabolol authored Dec 21, 2024
1 parent a8e6d45 commit dc09f83
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 1 deletion.
38 changes: 38 additions & 0 deletions warehouse/oso_dagster/assets/defillama.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from dlt.sources.rest_api.typing import RESTAPIConfig

from ..factories.rest import create_rest_factory_asset

DEFI_LLAMA_PROTOCOLS = [
"aave-v1",
"aave-v2",
"aave-v3",
"optimism-bridge",
# ...
]


config: RESTAPIConfig = {
"client": {
"base_url": "https://api.llama.fi/",
},
"resources": list(
map(
lambda protocol: {
"name": f"{protocol.replace('-', '_')}",
"endpoint": {
"path": f"protocol/{protocol}",
"data_selector": "$",
},
},
DEFI_LLAMA_PROTOCOLS,
)
),
}

dlt_assets = create_rest_factory_asset(
config=config,
)

defillama_tvl_assets = dlt_assets(
key_prefix=["defillama", "tvl"],
)
26 changes: 26 additions & 0 deletions warehouse/oso_dagster/assets/growthepie.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from dlt.sources.rest_api.typing import RESTAPIConfig

from ..factories.rest import create_rest_factory_asset

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.growthepie.xyz/v1",
},
"resources": [
{
"name": "fundamentals_full",
"endpoint": {
"path": "fundamentals_full.json",
"data_selector": "$",
},
}
],
}

dlt_assets = create_rest_factory_asset(
config=config,
)

growthepie_assets = dlt_assets(
key_prefix="growthepie",
)
14 changes: 13 additions & 1 deletion warehouse/oso_dagster/factories/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,24 @@ def _dlt_asset(
context.log.debug("dlt pipeline setup with bigquery and jsonl")
dlt_run_options["loader_file_format"] = "jsonl"

dlt_prefix_keys = (
[key_prefix]
if isinstance(key_prefix, str)
else (key_prefix or [])
)
dlt_source_name = (
dlt_prefix_keys[-1] if dlt_prefix_keys else key_prefix_str
)
dlt_key_prefix = dlt_prefix_keys[:-1]

results = dlt.run(
context=context,
dlt_source=dlt_source,
dlt_pipeline=pipeline,
dagster_dlt_translator=PrefixedDltTranslator(
source_name=key_prefix_str, tags=dict(tags)
prefix=dlt_key_prefix,
source_name=dlt_source_name,
tags=dict(tags),
),
**dlt_run_options,
)
Expand Down
6 changes: 6 additions & 0 deletions warehouse/oso_dagster/factories/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ def load_assets_factories_from_modules(
dag.add(obj)
elif isinstance(obj, AssetFactoryResponse):
all = all + obj
elif isinstance(obj, list):
for item in obj:
if isinstance(item, EarlyResourcesAssetFactory):
dag.add(item)
elif isinstance(item, AssetFactoryResponse):
all = all + item
return all
126 changes: 126 additions & 0 deletions warehouse/oso_dagster/factories/rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Callable, Optional, ParamSpec, Sequence, TypeVar, Union, cast

import dlt
from dagster import AssetExecutionContext
from dlt.extract.resource import DltResource
from dlt.sources.rest_api import rest_api_resources
from dlt.sources.rest_api.typing import EndpointResource, RESTAPIConfig

from . import dlt_factory

P = ParamSpec("P")
R = TypeVar("R")

Q = ParamSpec("Q")
T = TypeVar("T")


def _rest_source(_rest_source: Callable[Q, T], _asset: Callable[P, R]):
"""
The main factory for creating a REST API source asset. It is a wrapper
used to get full type information for both the REST API source and the
asset factory.
"""

def _factory(*_args: Q.args, **rest_kwargs: Q.kwargs) -> Callable[P, R]:
"""
Forwards the arguments to the asset factory and returns a new factory,
maintaining full type information for the caller.
"""

def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs):
"""
Creates a new dlt asset using the `dlt_factory` decorator, forwarding
the arguments to the REST API source.
The caller should not provide a name for the asset, as it will be
automatically generated for each resource in the REST API configuration.
"""

name = asset_kwargs.get("name", None)
if name is not None:
raise ValueError(
"Names will be automatically generated for `rest_factory`"
)

key_prefix = cast(
Optional[Union[str, Sequence[str]]],
asset_kwargs.get("key_prefix", None),
)
if key_prefix is None:
raise ValueError("Key prefix is required for `rest_factory`")

if not isinstance(key_prefix, str):
key_prefix = "/".join(key_prefix)

config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None))
if config is None:
raise ValueError("Config is required for `rest_factory`")

config_resources = config.pop("resources", None) # type: ignore
if config_resources is None:
raise ValueError("Resources is required for `rest_factory`")

def create_asset_for_resource(resource_ref, config_ref):
"""
The internal function that creates a new asset for a given resource to use
the correct reference. If not, the asset will be created with the wrong
reference, the last resource in the configuration:
https://pylint.readthedocs.io/en/latest/user_guide/messages/warning/cell-var-from-loop.html
"""

resource_ref = cast(
Union[str, EndpointResource, DltResource], resource_ref
)

resource_name = None

if isinstance(resource_ref, str):
resource_name = resource_ref
elif isinstance(resource_ref, dict):
resource_name = resource_ref.get("name", None)
elif isinstance(resource_ref, DltResource):
resource_name = resource_ref.name

if not isinstance(resource_name, str):
raise ValueError("Failed to extract resource name from reference")

op_tags: dict = cast(dict, asset_kwargs.pop("op_tags", {}))
op_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}"

@dlt_factory(name=resource_name, op_tags=op_tags, **asset_kwargs)
def _dlt_ref_asset(context: AssetExecutionContext):
"""
The dlt asset function that creates the REST API source asset.
"""

context.log.info(
f"Rest factory materializing asset: {key_prefix}/{resource_name}"
)

rest_api_config = cast(RESTAPIConfig, config_ref)

rest_api_config["resources"] = [resource_ref]

for resource in rest_api_resources(rest_api_config, **rest_kwargs):
yield dlt.resource(
resource,
name=resource_name,
max_table_nesting=0,
write_disposition="merge",
)

return _dlt_ref_asset

return [
create_asset_for_resource(resource_ref, {**config})
for resource_ref in config_resources
]

return cast(Callable[P, R], _wrapper)

return _factory


create_rest_factory_asset = _rest_source(rest_api_resources, dlt_factory)

0 comments on commit dc09f83

Please sign in to comment.