From cd3889f49af76904a38a4dcd136269b3ca32896c Mon Sep 17 00:00:00 2001 From: Raymond Cheng Date: Thu, 22 Aug 2024 14:26:44 -0700 Subject: [PATCH] feat: add event table mart (#1976) * named `timeseries_events_by_artifact_v0`, this is just a subset of `int_events` * Stop copying int_events to Clickhouse, copy the mart instead * I also added an `order_by` for faster querying. * This required plumbing the order_by from the model metadata through to the Clickhouse Dagster asset * Also copies metric_name to display_name in metrics_v0 for now --- .../models/intermediate/events/int_events.sql | 2 +- .../events/int_events_to_project.sql | 16 ++++++++-------- .../events/int_events_with_artifact_id.sql | 7 ------- .../models/intermediate/metrics/int_metrics.sql | 6 +++--- .../events/timeseries_events_by_artifact_v0.sql | 16 ++++++++++++++++ .../oso_dagster/assets/clickhouse_dbt_marts.py | 1 + warehouse/oso_dagster/factories/bq2clickhouse.py | 5 ++++- 7 files changed, 33 insertions(+), 20 deletions(-) delete mode 100644 warehouse/dbt/models/intermediate/events/int_events_with_artifact_id.sql create mode 100644 warehouse/dbt/models/marts/events/timeseries_events_by_artifact_v0.sql diff --git a/warehouse/dbt/models/intermediate/events/int_events.sql b/warehouse/dbt/models/intermediate/events/int_events.sql index 538d249f3..aca3effe6 100644 --- a/warehouse/dbt/models/intermediate/events/int_events.sql +++ b/warehouse/dbt/models/intermediate/events/int_events.sql @@ -30,7 +30,7 @@ "granularity": "day", }, meta={ - 'sync_to_db': True + 'sync_to_db': False } ) }} diff --git a/warehouse/dbt/models/intermediate/events/int_events_to_project.sql b/warehouse/dbt/models/intermediate/events/int_events_to_project.sql index 49cdb0193..e52dc5c11 100644 --- a/warehouse/dbt/models/intermediate/events/int_events_to_project.sql +++ b/warehouse/dbt/models/intermediate/events/int_events_to_project.sql @@ -4,14 +4,14 @@ select int_artifacts_by_project.project_id, - int_events_with_artifact_id.from_artifact_id, - int_events_with_artifact_id.to_artifact_id, - int_events_with_artifact_id.time, - int_events_with_artifact_id.event_source, - int_events_with_artifact_id.event_type, - int_events_with_artifact_id.amount -from {{ ref('int_events_with_artifact_id') }} + int_events.from_artifact_id, + int_events.to_artifact_id, + int_events.time, + int_events.event_source, + int_events.event_type, + int_events.amount +from {{ ref('int_events') }} inner join {{ ref('int_artifacts_by_project') }} on - int_events_with_artifact_id.to_artifact_id + int_events.to_artifact_id = int_artifacts_by_project.artifact_id diff --git a/warehouse/dbt/models/intermediate/events/int_events_with_artifact_id.sql b/warehouse/dbt/models/intermediate/events/int_events_with_artifact_id.sql deleted file mode 100644 index 615757482..000000000 --- a/warehouse/dbt/models/intermediate/events/int_events_with_artifact_id.sql +++ /dev/null @@ -1,7 +0,0 @@ -{# - config( - materialized='ephemeral', - ) -#} -select * -from {{ ref('int_events') }} diff --git a/warehouse/dbt/models/intermediate/metrics/int_metrics.sql b/warehouse/dbt/models/intermediate/metrics/int_metrics.sql index d47944da7..59f4edc99 100644 --- a/warehouse/dbt/models/intermediate/metrics/int_metrics.sql +++ b/warehouse/dbt/models/intermediate/metrics/int_metrics.sql @@ -9,7 +9,7 @@ select distinct "OSO" as metric_source, "oso" as metric_namespace, metrics.metric as metric_name, - "TODO" as display_name, + metrics.metric as display_name, "TODO" as description, null as raw_definition, "TODO" as definition_ref, @@ -21,7 +21,7 @@ select distinct "OSO" as metric_source, "oso" as metric_namespace, metric as metric_name, - "TODO" as display_name, + metric as display_name, "TODO" as description, null as raw_definition, "TODO" as definition_ref, @@ -33,7 +33,7 @@ select distinct "OSO" as metric_source, "oso" as metric_namespace, CONCAT(metric, "_", LOWER(event_source)) as metric_name, - "TODO" as display_name, + CONCAT(metric, "_", LOWER(event_source)) as display_name, "TODO" as description, null as raw_definition, "TODO" as definition_ref, diff --git a/warehouse/dbt/models/marts/events/timeseries_events_by_artifact_v0.sql b/warehouse/dbt/models/marts/events/timeseries_events_by_artifact_v0.sql new file mode 100644 index 000000000..e65a2d670 --- /dev/null +++ b/warehouse/dbt/models/marts/events/timeseries_events_by_artifact_v0.sql @@ -0,0 +1,16 @@ +{{ + config(meta = { + 'sync_to_db': True, + 'order_by': [ 'event_source', 'event_type', 'to_artifact_id', 'time' ] + }) +}} + +select + time, + to_artifact_id, + from_artifact_id, + event_type, + event_source_id, + event_source, + amount +from {{ ref('int_events') }} diff --git a/warehouse/oso_dagster/assets/clickhouse_dbt_marts.py b/warehouse/oso_dagster/assets/clickhouse_dbt_marts.py index 6bef9e0c0..c5e21feb6 100644 --- a/warehouse/oso_dagster/assets/clickhouse_dbt_marts.py +++ b/warehouse/oso_dagster/assets/clickhouse_dbt_marts.py @@ -84,6 +84,7 @@ def clickhouse_assets_from_manifests_map( staging_bucket=staging_bucket, destination_table_name=table_name, index=n.get("meta").get("index"), + order_by=n.get("meta").get("order_by"), copy_mode=SourceMode.Overwrite, ), ) diff --git a/warehouse/oso_dagster/factories/bq2clickhouse.py b/warehouse/oso_dagster/factories/bq2clickhouse.py index 3afb0e789..c113c762b 100644 --- a/warehouse/oso_dagster/factories/bq2clickhouse.py +++ b/warehouse/oso_dagster/factories/bq2clickhouse.py @@ -38,6 +38,8 @@ class Bq2ClickhouseAssetConfig: destination_table_name: str # index_name => list of column names to index index: Optional[Dict[str, List[str]]] + # order_by => list of column names to order by + order_by: Optional[List[str]] # Incremental or overwrite copy_mode: SourceMode # Dagster remaining args @@ -167,6 +169,7 @@ def bq2clickhouse_asset( # Create the Clickhouse tables and import the data destination_table_name = asset_config.destination_table_name index = asset_config.index + order_by = asset_config.order_by source_url = gcs_to_http_url(gcs_glob) with clickhouse.get_client() as ch_client: # Create a temporary table that we will use to write @@ -179,7 +182,7 @@ def bq2clickhouse_asset( # Also ensure that the expected destination exists. Even if we # will delete this keeps the `OVERWRITE` mode logic simple create_table( - ch_client, destination_table_name, columns, index, if_not_exists=True + ch_client, destination_table_name, columns, index, order_by, if_not_exists=True ) context.log.info(f"Ensured destination table {destination_table_name}") create_table(ch_client, temp_dest, columns, index, if_not_exists=False)