Skip to content

Commit

Permalink
Send an alert when goldsky data is missing (#1884)
Browse files Browse the repository at this point in the history
* alerts when goldsky data is missing

* Add errors

* catch "NoNewData" error elsewhere
  • Loading branch information
ravenac95 authored Jul 30, 2024
1 parent 1eb93b0 commit bf6b86c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
1 change: 1 addition & 0 deletions warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def load_definitions():
"dlt_staging_destination": dlt_staging_destination,
"dlt_warehouse_destination": dlt_warehouse_destination,
"project_id": project_id,
"alert_manager": alert_manager,
}
for target in constants.main_dbt_manifests:
resources[f"{target}_dbt"] = DbtCliResource(
Expand Down
42 changes: 26 additions & 16 deletions warehouse/oso_dagster/factories/goldsky/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
TableRecord,
MetadataValue,
TableColumn,
TableSchema
TableSchema,
ResourceParam
)

from dagster_gcp import BigQueryResource, GCSResource
Expand All @@ -48,7 +49,8 @@
from .. import AssetFactoryResponse
from .config import GoldskyConfig, GoldskyConfigInterface, SchemaDict
from ..common import AssetDeps, AssetList
from ...utils import batch_delete_blobs, add_tags
from ...utils import batch_delete_blobs, add_tags, AlertManager
from .errors import NoNewData

GenericExecutionContext = AssetExecutionContext | OpExecutionContext

Expand Down Expand Up @@ -489,8 +491,7 @@ async def materialize(
workers = await self.load_worker_tables(context, checkpoint_range)

if len(workers) == 0:
context.log.warn("Nothing to materialize. This is likely an error on the goldsky connection to gcs.")
return
raise NoNewData("Nothing to materialize. This might not be expected but intentionally an error is thrown.")

# Dedupe and partition the current worker table into a deduped and partitioned table
await self.dedupe_worker_tables(context, workers)
Expand Down Expand Up @@ -614,7 +615,7 @@ async def load_worker_tables(
continue
workers.append(worker)
if len(workers) == 0:
context.log.debug('Queue empty and no workers found')
raise NoNewData('Nothing to materialize. Queue empty and no in progress workers found')

return workers

Expand Down Expand Up @@ -996,9 +997,14 @@ def generated_asset(
bigquery: BigQueryResource,
gcs: GCSResource,
cbt: CBTResource,
alert_manager: ResourceParam[AlertManager]
) -> None:
context.log.info(f"Run ID: {context.run_id} AssetKey: {context.asset_key}")
materialize_asset(context, bigquery, gcs, cbt)
try:
materialize_asset(context, bigquery, gcs, cbt)
except NoNewData:
alert_manager.alert(f"Goldsky Asset {context.asset_key.to_user_string()} has no new data.")


related_ops_prefix = "_".join(generated_asset.key.path)

Expand All @@ -1025,6 +1031,7 @@ def goldsky_backfill_op(
gcs: GCSResource,
cbt: CBTResource,
config: dict,
alert_manager: ResourceParam[AlertManager],
) -> None:
start_checkpoint = None
end_checkpoint = None
Expand All @@ -1038,16 +1045,19 @@ def goldsky_backfill_op(
end_checkpoint=end_checkpoint,
)
context.log.info("Starting a backfill")
materialize_asset(
context,
bigquery,
gcs,
cbt,
checkpoint_range=GoldskyCheckpointRange(
start=op_input.start_checkpoint, end=op_input.end_checkpoint
),
pointer_table_suffix=op_input.backfill_label,
)
try:
materialize_asset(
context,
bigquery,
gcs,
cbt,
checkpoint_range=GoldskyCheckpointRange(
start=op_input.start_checkpoint, end=op_input.end_checkpoint
),
pointer_table_suffix=op_input.backfill_label,
)
except NoNewData:
alert_manager.alert(f"Goldsky Asset {context.asset_key.to_user_string()} has no new data.")

@op(name=f"{related_ops_prefix}_files_stats_op", tags=add_tags(tags,{
"opensource.observer/op-type": "debug"
Expand Down
2 changes: 2 additions & 0 deletions warehouse/oso_dagster/factories/goldsky/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class NoNewData(Exception):
pass

0 comments on commit bf6b86c

Please sign in to comment.