Skip to content

Commit

Permalink
Expose full_refresh for dbt assets (#1651)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 authored Jun 17, 2024
1 parent ebbe786 commit ac7b0d3
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions warehouse/oso_dagster/assets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from typing import Any, Mapping, Dict, List
from dagster import AssetExecutionContext, AssetKey, asset, AssetsDefinition
from dagster import AssetExecutionContext, AssetKey, asset, AssetsDefinition, Config

from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator
from google.cloud.bigquery.schema import SchemaField
Expand Down Expand Up @@ -45,6 +45,10 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
return final_key


class DBTConfig(Config):
full_refresh: bool = False


def dbt_assets_from_manifests_map(
project_dir: str,
manifests: Dict[str, str],
Expand All @@ -63,14 +67,17 @@ def dbt_assets_from_manifests_map(
manifest=manifest_path,
dagster_dbt_translator=translator,
)
def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
def _generated_dbt_assets(context: AssetExecutionContext, config: DBTConfig):
print(f"using profiles dir {dbt_profiles_dir}")
dbt = DbtCliResource(
project_dir=os.fspath(project_dir),
target=target,
profiles_dir=dbt_profiles_dir,
)
yield from dbt.cli(["build"], context=context).stream()
build_args = ["build"]
if config.full_refresh:
build_args += ["--full-refresh"]
yield from dbt.cli(build_args, context=context).stream()

assets.append(_generated_dbt_assets)

Expand Down

0 comments on commit ac7b0d3

Please sign in to comment.