Skip to content

Commit

Permalink
Materialize all assets daily (#1819)
Browse files Browse the repository at this point in the history
* wip

* Setup all assets to materialize daily

* Enable automatic retries

* Remove unnecessary file

* rename

* remove change

* Fix tests

* ci fix

* More fixes
  • Loading branch information
ravenac95 authored Jul 19, 2024
1 parent a931184 commit 14f9286
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 132 deletions.
5 changes: 4 additions & 1 deletion ops/k8s-apps/production/dagster/custom-helm-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ spec:
host: admin-dagster.opensource.observer
readOnlyDagsterWebserver:
host: dagster.opensource.observer

dagsterDaemon:
runRetries:
enabled: true
maxRetries: 3
28 changes: 15 additions & 13 deletions warehouse/oso_dagster/assets/ossd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import cast, Optional
from typing import cast, Optional, Dict

from dagster import (
multi_asset,
Expand All @@ -13,14 +13,19 @@
from ossdirectory.fetch import OSSDirectory
import polars as pl
import arrow
import dlt

from oso_dagster.dlt_sources.github_repos import (
oss_directory_github_repositories_resource,
)
from oso_dagster.factories import dlt_factory
from oso_dagster.utils import secret_ref_arg

common_tags: Dict[str, str] = {
"opensource.observer/environment": "production",
"opensource.observer/group": "ossd",
"opensource.observer/type": "source",
}


class OSSDirectoryConfig(Config):
# By default, the oss-directory asset doesn't write anything if there aren't
Expand Down Expand Up @@ -58,9 +63,10 @@ def oss_directory_to_dataframe(output: str, data: Optional[OSSDirectory] = None)

@multi_asset(
outs={
"projects": AssetOut(is_required=False, key_prefix="ossd"),
"collections": AssetOut(is_required=False, key_prefix="ossd"),
"projects": AssetOut(is_required=False, key_prefix="ossd", tags=common_tags),
"collections": AssetOut(is_required=False, key_prefix="ossd", tags=common_tags),
},
compute_kind="dataframe",
can_subset=True,
)
def projects_and_collections(
Expand Down Expand Up @@ -119,18 +125,14 @@ def projects_and_collections(
)


@dlt.source
def oss_directory_github_repositories_from_df(gh_token: str, projects_df: pl.DataFrame):
return oss_directory_github_repositories_resource(
projects_df,
gh_token,
)


project_key = projects_and_collections.keys_by_output_name["projects"]


@dlt_factory(key_prefix="ossd", ins={"projects_df": AssetIn(project_key)})
@dlt_factory(
key_prefix="ossd",
ins={"projects_df": AssetIn(project_key)},
tags=common_tags,
)
def repositories(
projects_df: pl.DataFrame,
gh_token: str = secret_ref_arg(group_name="ossd", key="github_token"),
Expand Down
9 changes: 8 additions & 1 deletion warehouse/oso_dagster/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
load_dbt_manifests,
BQTargetConfigTemplate,
)
from dotenv import load_dotenv

load_dotenv()

main_dbt_project_dir = Path(__file__).joinpath("..", "..", "..").resolve()

Expand All @@ -33,7 +36,9 @@ def get_project_id():
except Exception:
raise Exception("GOOGLE_PROJECT_ID must be set if you're not in GCP")

staging_bucket = ensure(os.getenv("DAGSTER_STAGING_BUCKET_URL"), "Missing DAGSTER_STAGING_BUCKET_URL")
staging_bucket = ensure(
os.getenv("DAGSTER_STAGING_BUCKET_URL"), "Missing DAGSTER_STAGING_BUCKET_URL"
)
profile_name = os.getenv("DAGSTER_DBT_PROFILE_NAME", "opensource_observer")
gcp_secrets_prefix = os.getenv("DAGSTER_GCP_SECRETS_PREFIX", "")
use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "true").lower() in [
Expand Down Expand Up @@ -71,3 +76,5 @@ def get_project_id():
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")

env = os.getenv("DAGSTER_ENV", "dev")
228 changes: 119 additions & 109 deletions warehouse/oso_dagster/factories/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,116 +50,126 @@ def pydantic_to_dlt_nullable_columns(b: Type[BaseModel]):
return table_schema_columns


def dlt_factory[
def _dlt_factory[
R: Union[AssetMaterialization, MaterializeResult],
C: DltAssetConfig,
](
*,
config_type: Type[C] = DltAssetConfig,
dataset_name: str = "",
name: str = "",
key_prefix: Optional[AssetKeyPrefixParam] = None,
deps: Optional[AssetDeps] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
tags: Optional[Mapping[str, str]] = None,
):
"""Generates a dlt based asset from a given dlt source. This also
automatically configures the pipeline for this source to have the main
datawarehouse as the destination.
The builtin dagster_embedded_elt doesn't properly handle things like
dependencies so this factory mixes some of that library and some bespoke OSO
related dagster configuration.
"""
tags = tags or {}

key_prefix_str = ""
if key_prefix:
if isinstance(key_prefix, str):
key_prefix_str = key_prefix
else:
key_prefix_str = "_".join(key_prefix)
dataset_name = dataset_name or key_prefix_str

def _decorator(f: Callable[..., Iterator[DltResource]]):
asset_name = name or f.__name__

@early_resources_asset_factory(caller_depth=2)
def _factory(
dlt_staging_destination: Destination,
dlt_warehouse_destination: Destination,
secrets: SecretResolver,
):
resolved_secrets = resolve_secrets_for_func(secrets, f)
source = dltlib.source(f)

asset_ins = dict(ins or {})

# Exlude ins and resolved secrets
extra_resources = (
set(f.__annotations__.keys())
- set(resolved_secrets.keys())
- set(asset_ins.keys())
)

@asset(
name=asset_name,
key_prefix=key_prefix,
required_resource_keys=extra_resources.union({"dlt"}),
deps=deps,
ins=asset_ins,
tags=tags,
)
def _dlt_asset(
context: AssetExecutionContext, config: config_type, **extra_source_args
) -> Iterable[R]:
pipeline = dltlib.pipeline(
f"{key_prefix_str}_{name}",
destination=dlt_warehouse_destination,
staging=dlt_staging_destination,
dataset_name=dataset_name,
C: DltAssetConfig, **P,
](c: Callable[P, Any], config_type: Type[C] = DltAssetConfig):
def dlt_factory(
config_type: Type[C] = config_type,
dataset_name: str = "",
name: str = "",
key_prefix: Optional[AssetKeyPrefixParam] = None,
deps: Optional[AssetDeps] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
tags: Optional[Mapping[str, str]] = None,
*args: P.args,
**kwargs: P.kwargs,
):
"""Generates a dlt based asset from a given dlt source. This also
automatically configures the pipeline for this source to have the main
datawarehouse as the destination.
The builtin dagster_embedded_elt doesn't properly handle things like
dependencies so this factory mixes some of that library and some bespoke OSO
related dagster configuration.
"""
tags = tags or {}

key_prefix_str = ""
if key_prefix:
if isinstance(key_prefix, str):
key_prefix_str = key_prefix
else:
key_prefix_str = "_".join(key_prefix)
dataset_name = dataset_name or key_prefix_str

def _decorator(f: Callable[..., Iterator[DltResource]]):
asset_name = name or f.__name__

@early_resources_asset_factory(caller_depth=2)
def _factory(
dlt_staging_destination: Destination,
dlt_warehouse_destination: Destination,
secrets: SecretResolver,
):
resolved_secrets = resolve_secrets_for_func(secrets, f)
source = dltlib.source(f)

asset_ins = dict(ins or {})

# Exlude ins and resolved secrets
extra_resources = (
set(f.__annotations__.keys())
- set(resolved_secrets.keys())
- set(asset_ins.keys())
)

dlt = cast(DagsterDltResource, getattr(context.resources, "dlt"))

source_args: Dict[str, Any] = extra_source_args
source_args.update(resolved_secrets)

if "context" in source.__annotations__:
source_args["context"] = context
if "dlt" in source.__annotations__:
source_args["dlt"] = dlt
if "config" in source.__annotations__:
source_args["config"] = config

for resource in extra_resources:
source_args[resource] = getattr(context.resources, resource)

context.log.debug(
f"creating the dlt source and passing the following args: {source_args.keys()}"
@asset(
name=asset_name,
key_prefix=key_prefix,
required_resource_keys=extra_resources.union({"dlt"}),
deps=deps,
ins=asset_ins,
tags=tags,
**kwargs,
)
context.log.debug(f"new source dataset_name: {dataset_name}")
dlt_source = source(**source_args)

if config.limit:
dlt_source = dlt_source.add_limit(config.limit)
if len(config.with_resources) > 0:
dlt_source.with_resources(*config.with_resources)

results = dlt.run(
context=context,
dlt_source=dlt_source,
dlt_pipeline=pipeline,
dagster_dlt_translator=PrefixedDltTranslator(
source_name=key_prefix_str, tags=dict(tags)
),
loader_file_format="jsonl",
)
for result in results:
yield cast(R, result)

return AssetFactoryResponse([_dlt_asset])

return _factory

return _decorator
def _dlt_asset(
context: AssetExecutionContext,
config: config_type,
**extra_source_args,
) -> Iterable[R]:
pipeline = dltlib.pipeline(
f"{key_prefix_str}_{name}",
destination=dlt_warehouse_destination,
staging=dlt_staging_destination,
dataset_name=dataset_name,
)

dlt = cast(DagsterDltResource, getattr(context.resources, "dlt"))

source_args: Dict[str, Any] = extra_source_args
source_args.update(resolved_secrets)

if "context" in source.__annotations__:
source_args["context"] = context
if "dlt" in source.__annotations__:
source_args["dlt"] = dlt
if "config" in source.__annotations__:
source_args["config"] = config

for resource in extra_resources:
source_args[resource] = getattr(context.resources, resource)

context.log.debug(
f"creating the dlt source and passing the following args: {source_args.keys()}"
)
context.log.debug(f"new source dataset_name: {dataset_name}")
dlt_source = source(**source_args)

if config.limit:
dlt_source = dlt_source.add_limit(config.limit)
if len(config.with_resources) > 0:
dlt_source.with_resources(*config.with_resources)

results = dlt.run(
context=context,
dlt_source=dlt_source,
dlt_pipeline=pipeline,
dagster_dlt_translator=PrefixedDltTranslator(
source_name=key_prefix_str, tags=dict(tags)
),
loader_file_format="jsonl",
)
for result in results:
yield cast(R, result)

return AssetFactoryResponse([_dlt_asset])

return _factory

return _decorator

return dlt_factory


dlt_factory = _dlt_factory(asset, DltAssetConfig)
1 change: 1 addition & 0 deletions warehouse/oso_dagster/factories/goldsky/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ def materialize_asset(
if key_prefix:
group_name = key_prefix if isinstance(key_prefix, str) else "__".join(list(key_prefix))
tags["opensource.observer/group"] = group_name
tags["dagster/concurrency_key"] = group_name

@asset(name=asset_config.name, key_prefix=asset_config.key_prefix, deps=deps, compute_kind="goldsky", tags=add_tags(tags, {
"opensource.observer/type": "source",
Expand Down
16 changes: 10 additions & 6 deletions warehouse/oso_dagster/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
To add a daily schedule that materializes your dbt assets, uncomment the following lines.
"""

from dagster import define_asset_job, ScheduleDefinition, AssetSelection

materialize_all_assets = define_asset_job(
"materialize_all_assets_job", AssetSelection.all()
)

schedules = [
# build_schedule_from_dbt_selection(
# [opensource_observer_dbt_assets],
# job_name="materialize_dbt_models",
# cron_schedule="0 0 * * *",
# dbt_select="fqn:*",
# ),
ScheduleDefinition(
job=materialize_all_assets,
cron_schedule="@daily",
)
]
1 change: 1 addition & 0 deletions warehouse/oso_dagster/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .types import *
from .alerts import *
from .http import *
from .scheduling import *
16 changes: 16 additions & 0 deletions warehouse/oso_dagster/utils/scheduling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dagster import FreshnessPolicy
from .types import params_from


@params_from(FreshnessPolicy)
def production_freshness_policy(*args, **kwargs) -> FreshnessPolicy | None:
"""Creates a freshness policy that is automatically disabled in a
non-production mode."""

# Lazily import this because importing constants in the utils forces the
# need to certain environment variables.
from .. import constants

if constants.env == "production":
return FreshnessPolicy(*args, **kwargs)
return None
Loading

0 comments on commit 14f9286

Please sign in to comment.