Skip to content

Commit

Permalink
Fix dlt factory (#1962)
Browse files Browse the repository at this point in the history
* Fix dlt factory to handle duckdb

* remove empty handling - for now
  • Loading branch information
ravenac95 authored Aug 16, 2024
1 parent da39548 commit 38398ce
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion warehouse/oso_dagster/factories/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def _dlt_asset(
dataset_name=dataset_name,
)
if constants.enable_bigquery:
context.log.debug("dlt pipeline setup to use staging")
pipeline = dltlib.pipeline(
f"{key_prefix_str}_{name}",
destination=dlt_warehouse_destination,
Expand Down Expand Up @@ -165,14 +166,19 @@ def _dlt_asset(
if len(config.with_resources) > 0:
dlt_source.with_resources(*config.with_resources)

dlt_run_options: Dict[str, Any] = {}
if constants.enable_bigquery:
context.log.debug("dlt pipeline setup with bigquery and jsonl")
dlt_run_options["loader_file_format"] = "jsonl"

results = dlt.run(
context=context,
dlt_source=dlt_source,
dlt_pipeline=pipeline,
dagster_dlt_translator=PrefixedDltTranslator(
source_name=key_prefix_str, tags=dict(tags)
),
loader_file_format="jsonl",
**dlt_run_options,
)
for result in results:
yield cast(R, result)
Expand Down

0 comments on commit 38398ce

Please sign in to comment.