Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create new rest dlt factory #2663

Merged
merged 10 commits into from
Dec 21, 2024
38 changes: 38 additions & 0 deletions warehouse/oso_dagster/assets/defillama.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from dlt.sources.rest_api.typing import RESTAPIConfig

from ..factories.rest import create_rest_factory_asset

DEFI_LLAMA_PROTOCOLS = [
"aave-v1",
"aave-v2",
"aave-v3",
"optimism-bridge",
# ...
]


config: RESTAPIConfig = {
"client": {
"base_url": "https://api.llama.fi/",
},
"resources": list(
map(
lambda protocol: {
"name": f"{protocol.replace('-', '_')}",
"endpoint": {
"path": f"protocol/{protocol}",
"data_selector": "$",
},
},
DEFI_LLAMA_PROTOCOLS,
)
),
}

dlt_assets = create_rest_factory_asset(
config=config,
)

defillama_tvl_assets = dlt_assets(
key_prefix=["defillama", "tvl"],
)
26 changes: 26 additions & 0 deletions warehouse/oso_dagster/assets/growthepie.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from dlt.sources.rest_api.typing import RESTAPIConfig

from ..factories.rest import create_rest_factory_asset

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.growthepie.xyz/v1",
},
"resources": [
{
"name": "fundamentals_full",
"endpoint": {
"path": "fundamentals_full.json",
"data_selector": "$",
},
}
],
}

dlt_assets = create_rest_factory_asset(
config=config,
)

growthepie_assets = dlt_assets(
key_prefix="growthepie",
)
14 changes: 13 additions & 1 deletion warehouse/oso_dagster/factories/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,24 @@ def _dlt_asset(
context.log.debug("dlt pipeline setup with bigquery and jsonl")
dlt_run_options["loader_file_format"] = "jsonl"

dlt_prefix_keys = (
[key_prefix]
if isinstance(key_prefix, str)
else (key_prefix or [])
)
dlt_source_name = (
dlt_prefix_keys[-1] if dlt_prefix_keys else key_prefix_str
)
dlt_key_prefix = dlt_prefix_keys[:-1]

results = dlt.run(
context=context,
dlt_source=dlt_source,
dlt_pipeline=pipeline,
dagster_dlt_translator=PrefixedDltTranslator(
source_name=key_prefix_str, tags=dict(tags)
prefix=dlt_key_prefix,
source_name=dlt_source_name,
tags=dict(tags),
),
**dlt_run_options,
)
Expand Down
6 changes: 6 additions & 0 deletions warehouse/oso_dagster/factories/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ def load_assets_factories_from_modules(
dag.add(obj)
elif isinstance(obj, AssetFactoryResponse):
all = all + obj
elif isinstance(obj, list):
Jabolol marked this conversation as resolved.
Show resolved Hide resolved
for item in obj:
if isinstance(item, EarlyResourcesAssetFactory):
dag.add(item)
elif isinstance(item, AssetFactoryResponse):
all = all + item
return all
126 changes: 126 additions & 0 deletions warehouse/oso_dagster/factories/rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Callable, Optional, ParamSpec, Sequence, TypeVar, Union, cast

import dlt
from dagster import AssetExecutionContext
from dlt.extract.resource import DltResource
from dlt.sources.rest_api import rest_api_resources
from dlt.sources.rest_api.typing import EndpointResource, RESTAPIConfig

from . import dlt_factory

P = ParamSpec("P")
R = TypeVar("R")

Q = ParamSpec("Q")
T = TypeVar("T")


def _rest_source(_rest_source: Callable[Q, T], _asset: Callable[P, R]):
"""
The main factory for creating a REST API source asset. It is a wrapper
used to get full type information for both the REST API source and the
asset factory.
"""

def _factory(*_args: Q.args, **rest_kwargs: Q.kwargs) -> Callable[P, R]:
"""
Forwards the arguments to the asset factory and returns a new factory,
maintaining full type information for the caller.
"""

def _wrapper(*_args: P.args, **asset_kwargs: P.kwargs):
"""
Creates a new dlt asset using the `dlt_factory` decorator, forwarding
the arguments to the REST API source.

The caller should not provide a name for the asset, as it will be
automatically generated for each resource in the REST API configuration.
"""

name = asset_kwargs.get("name", None)
if name is not None:
raise ValueError(
"Names will be automatically generated for `rest_factory`"
)

key_prefix = cast(
Optional[Union[str, Sequence[str]]],
asset_kwargs.get("key_prefix", None),
)
if key_prefix is None:
raise ValueError("Key prefix is required for `rest_factory`")

if not isinstance(key_prefix, str):
key_prefix = "/".join(key_prefix)

config = cast(Optional[RESTAPIConfig], rest_kwargs.pop("config", None))
if config is None:
raise ValueError("Config is required for `rest_factory`")

config_resources = config.pop("resources", None) # type: ignore
if config_resources is None:
raise ValueError("Resources is required for `rest_factory`")

def create_asset_for_resource(resource_ref, config_ref):
"""
The internal function that creates a new asset for a given resource to use
the correct reference. If not, the asset will be created with the wrong
reference, the last resource in the configuration:

https://pylint.readthedocs.io/en/latest/user_guide/messages/warning/cell-var-from-loop.html
"""

resource_ref = cast(
Union[str, EndpointResource, DltResource], resource_ref
)

resource_name = None

if isinstance(resource_ref, str):
resource_name = resource_ref
elif isinstance(resource_ref, dict):
resource_name = resource_ref.get("name", None)
elif isinstance(resource_ref, DltResource):
resource_name = resource_ref.name

if not isinstance(resource_name, str):
raise ValueError("Failed to extract resource name from reference")

op_tags: dict = cast(dict, asset_kwargs.pop("op_tags", {}))
op_tags["dagster/concurrency_key"] = f"rest_factory_{key_prefix}"

@dlt_factory(name=resource_name, op_tags=op_tags, **asset_kwargs)
def _dlt_ref_asset(context: AssetExecutionContext):
"""
The dlt asset function that creates the REST API source asset.
"""

context.log.info(
f"Rest factory materializing asset: {key_prefix}/{resource_name}"
)

rest_api_config = cast(RESTAPIConfig, config_ref)

rest_api_config["resources"] = [resource_ref]

for resource in rest_api_resources(rest_api_config, **rest_kwargs):
yield dlt.resource(
resource,
name=resource_name,
max_table_nesting=0,
write_disposition="merge",
)

return _dlt_ref_asset

return [
create_asset_for_resource(resource_ref, {**config})
for resource_ref in config_resources
]

return cast(Callable[P, R], _wrapper)

return _factory


create_rest_factory_asset = _rest_source(rest_api_resources, dlt_factory)
Loading