From 4eefbc5f8d76d992bc37c7ebc0307514620e509a Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Thu, 19 Dec 2024 19:46:56 +0000 Subject: [PATCH 01/11] fixes --- warehouse/metrics_tools/compute/client.py | 2 ++ warehouse/metrics_tools/compute/cluster.py | 19 +++++++++++++++---- warehouse/metrics_tools/compute/worker.py | 8 ++++---- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/warehouse/metrics_tools/compute/client.py b/warehouse/metrics_tools/compute/client.py index 3e3646c5..cb9dc68c 100644 --- a/warehouse/metrics_tools/compute/client.py +++ b/warehouse/metrics_tools/compute/client.py @@ -153,6 +153,7 @@ def calculate_metrics( locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. + slots (int): The number of slots to use for the job execution_time (t.Optional[datetime]): The execution time for the job Returns: @@ -260,6 +261,7 @@ def submit_job( ref (PeerMetricDependencyRef): The dependency reference locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map + slots (int): The number of slots to use for the job job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. Returns: diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py index 61e93177..4a72d6f7 100644 --- a/warehouse/metrics_tools/compute/cluster.py +++ b/warehouse/metrics_tools/compute/cluster.py @@ -65,10 +65,7 @@ async def start_duckdb_cluster_async( resources_str = f'{",".join(resources_to_join)}' worker_command.extend(["--resources", resources_str]) - options: t.Dict[str, t.Any] = { - "namespace": namespace, - "worker_command": worker_command, - } + options: t.Dict[str, t.Any] = {"namespace": namespace} options.update(kwargs) if cluster_spec: options["custom_cluster_spec"] = cluster_spec @@ -379,7 +376,18 @@ def make_new_cluster( worker_memory_request: str, worker_memory_limit: str, worker_pool_type: str, + worker_resources: t.Dict[str, int], + worker_command: t.Optional[t.List[str]] = None, ): + worker_command = worker_command or ["dask", "worker"] + if worker_resources: + resources_to_join = [] + for resource, value in worker_resources.items(): + resources_to_join.append(f"{resource}={value}") + if resources_to_join: + resources_str = f'{",".join(resources_to_join)}' + worker_command.extend(["--resources", resources_str]) + spec = make_cluster_spec( name=f"{cluster_id}", resources={ @@ -387,6 +395,8 @@ def make_new_cluster( "limits": {"memory": scheduler_memory_limit}, }, image=image, + # The type for this says string but it accepts a list + worker_command=worker_command, # type: ignore ) spec["spec"]["scheduler"]["spec"]["tolerations"] = [ { @@ -464,4 +474,5 @@ def make_new_cluster_with_defaults(config: ClusterConfig): worker_memory_limit=config.worker_memory_limit, worker_memory_request=config.worker_memory_request, worker_pool_type=config.worker_pool_type, + worker_resources=config.worker_resources, ) diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index 763328fe..820535bc 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -9,7 +9,7 @@ import duckdb import gcsfs -import polars as pl +import pandas as pd from dask.distributed import Worker, WorkerPlugin, get_worker from google.cloud import storage from metrics_tools.compute.types import ExportReference, ExportType @@ -196,14 +196,14 @@ def handle_query( ) self.get_for_cache(ref, actual) conn = self.connection - results: t.List[pl.DataFrame] = [] + results: t.List[pd.DataFrame] = [] for query in queries: self.logger.info(f"job[{job_id}][{task_id}]: Executing query {query}") - result = conn.execute(query).pl() + result = conn.execute(query).df() results.append(result) # Concatenate the results self.logger.info(f"job[{job_id}][{task_id}]: Concatenating results") - results_df = pl.concat(results) + results_df = pd.concat(results) # Export the results to a parquet file in memory self.logger.info( From 7759b82cf13cdb49236b4e1ac2848420da355999 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Thu, 19 Dec 2024 20:15:58 +0000 Subject: [PATCH 02/11] Slots not getting applied due to forward-only --- warehouse/metrics_tools/compute/client.py | 26 +++++++++++---------- warehouse/metrics_tools/compute/test_app.py | 1 + warehouse/metrics_tools/compute/worker.py | 2 +- warehouse/metrics_tools/factory/factory.py | 2 +- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/warehouse/metrics_tools/compute/client.py b/warehouse/metrics_tools/compute/client.py index cb9dc68c..292c0da2 100644 --- a/warehouse/metrics_tools/compute/client.py +++ b/warehouse/metrics_tools/compute/client.py @@ -117,6 +117,7 @@ def __init__( def calculate_metrics( self, + *, query_str: str, start: datetime, end: datetime, @@ -126,11 +127,11 @@ def calculate_metrics( ref: PeerMetricDependencyRef, locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], + slots: int, progress_handler: t.Optional[t.Callable[[JobStatusResponse], None]] = None, cluster_min_size: int = 6, cluster_max_size: int = 6, job_retries: int = 3, - slots: int = 1, execution_time: t.Optional[datetime] = None, ): """Calculate metrics for a given period and write the results to a gcs @@ -166,17 +167,17 @@ def calculate_metrics( self.logger.info(f"cluster status: {status}") job_response = self.submit_job( - query_str, - start, - end, - dialect, - batch_size, - columns, - ref, - locals, - dependent_tables_map, - job_retries, + query_str=query_str, + start=start, + end=end, + dialect=dialect, + batch_size=batch_size, + columns=columns, + ref=ref, + locals=locals, + dependent_tables_map=dependent_tables_map, slots=slots, + job_retries=job_retries, execution_time=execution_time, ) job_id = job_response.job_id @@ -236,6 +237,7 @@ def wait_for_job( def submit_job( self, + *, query_str: str, start: datetime, end: datetime, @@ -245,8 +247,8 @@ def submit_job( ref: PeerMetricDependencyRef, locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], + slots: int, job_retries: t.Optional[int] = None, - slots: int = 2, execution_time: t.Optional[datetime] = None, ): """Submit a job to the metrics calculation service diff --git a/warehouse/metrics_tools/compute/test_app.py b/warehouse/metrics_tools/compute/test_app.py index dd084296..d6781294 100644 --- a/warehouse/metrics_tools/compute/test_app.py +++ b/warehouse/metrics_tools/compute/test_app.py @@ -100,6 +100,7 @@ def test_app_with_all_debugging(app_client_with_all_debugging): dependent_tables_map={ "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" }, + slots=2, batch_size=batch_size, cluster_max_size=cluster_size, cluster_min_size=cluster_size, diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index 820535bc..29f5ddb3 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -210,7 +210,7 @@ def handle_query( f"job[{job_id}][{task_id}]: Uploading to gcs {result_path} with polars" ) with self.fs.open(f"{self._gcs_bucket}/{result_path}", "wb") as f: - results_df.write_parquet(f) # type: ignore + results_df.to_parquet(f) # type: ignore return task_id diff --git a/warehouse/metrics_tools/factory/factory.py b/warehouse/metrics_tools/factory/factory.py index b379944f..faed0571 100644 --- a/warehouse/metrics_tools/factory/factory.py +++ b/warehouse/metrics_tools/factory/factory.py @@ -688,7 +688,7 @@ def generated_rolling_query( batch_size=env.ensure_int("SQLMESH_MCS_BATCH_SIZE", 10), columns=columns, ref=ref, - slots=ref.get("slots", 1), + slots=ref.get("slots", env.ensure_int("SQLMESH_DEFAULT_MCS_SLOTS", 2)), locals=sqlmesh_vars, dependent_tables_map=create_dependent_tables_map( context, rendered_query_str From 9f02fa002f853ce96050e33a440b7fdc098dcdf7 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Thu, 19 Dec 2024 21:48:53 +0000 Subject: [PATCH 03/11] re-enable all --- .../metrics_mesh/models/metrics_factories.py | 98 +++++++++++-------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 37bb9aa7..82b0bb86 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -70,6 +70,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", # This determines how often this is calculated + slots=32, ), entity_types=["artifact", "project", "collection"], is_intermediate=True, @@ -103,25 +104,27 @@ windows=[30, 90, 180], unit="day", cron="@daily", + slots=32, + ), + ), + "contributor_classifications": MetricQueryDef( + ref="contributor_activity_classification.sql", + vars={ + "full_time_ratio": 10 / 30, + "activity_event_types": [ + "COMMIT_CODE", + "ISSUE_OPENED", + "PULL_REQUEST_OPENED", + "PULL_REQUEST_MERGED", + ], + }, + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@daily", + slots=32, ), ), - # "contributor_classifications": MetricQueryDef( - # ref="contributor_activity_classification.sql", - # vars={ - # "full_time_ratio": 10 / 30, - # "activity_event_types": [ - # "COMMIT_CODE", - # "ISSUE_OPENED", - # "PULL_REQUEST_OPENED", - # "PULL_REQUEST_MERGED", - # ], - # }, - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@daily", - # ), - # ), # Currently this query performs really poorly. We need to do some debugging on it # "user_retention_classifications": MetricQueryDef( # ref="user_retention_classification.sql", @@ -135,20 +138,22 @@ # ), # entity_types=["artifact", "project", "collection"], # ), - # "change_in_developer_activity": MetricQueryDef( - # ref="change_in_developers.sql", - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@daily", - # ), - # ), + "change_in_developer_activity": MetricQueryDef( + ref="change_in_developers.sql", + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@daily", + slots=32, + ), + ), "commits_rolling": MetricQueryDef( ref="commits.sql", rolling=RollingConfig( windows=[10], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -158,6 +163,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -167,6 +173,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -176,6 +183,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -185,6 +193,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -194,6 +203,7 @@ windows=[90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -203,6 +213,7 @@ windows=[90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -244,29 +255,31 @@ ), entity_types=["artifact", "project", "collection"], ), - # "contributors_lifecycle": MetricQueryDef( - # ref="lifecycle.sql", - # vars={ - # "activity_event_types": [ - # "COMMIT_CODE", - # "ISSUE_OPENED", - # "PULL_REQUEST_OPENED", - # "PULL_REQUEST_MERGED", - # ], - # }, - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@monthly", - # ), - # entity_types=["artifact", "project", "collection"], - # ), + "contributors_lifecycle": MetricQueryDef( + ref="lifecycle.sql", + vars={ + "activity_event_types": [ + "COMMIT_CODE", + "ISSUE_OPENED", + "PULL_REQUEST_OPENED", + "PULL_REQUEST_MERGED", + ], + }, + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, + ), + entity_types=["artifact", "project", "collection"], + ), "funding_received": MetricQueryDef( ref="funding_received.sql", rolling=RollingConfig( windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -276,6 +289,7 @@ windows=[180], unit="day", cron="@daily", + slots=16, ), entity_types=["artifact", "project", "collection"], ), From 932ae52e047a02e5eca9cba29454ed6ca8821a90 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Thu, 19 Dec 2024 21:50:08 +0000 Subject: [PATCH 04/11] fix slots --- warehouse/metrics_mesh/models/metrics_factories.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 82b0bb86..eacbf7a3 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -233,6 +233,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", + slots=32, ), ), "gas_fees": MetricQueryDef( @@ -241,7 +242,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", - slots=8, + slots=16, ), entity_types=["artifact", "project", "collection"], ), From 508b27eecf1a0a6dd17fa18a509b63c6b587b703 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 00:51:44 +0000 Subject: [PATCH 05/11] Fix reimport --- warehouse/metrics_tools/compute/result.py | 9 +++++---- warehouse/metrics_tools/compute/worker.py | 15 +++++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/warehouse/metrics_tools/compute/result.py b/warehouse/metrics_tools/compute/result.py index 67f5e37a..d9474d8a 100644 --- a/warehouse/metrics_tools/compute/result.py +++ b/warehouse/metrics_tools/compute/result.py @@ -200,10 +200,11 @@ def process_columns_for_import( # timestamp. We can cast it downstream. column_identifier = exp.to_identifier(column_name) processed_column_type = column_type - if column_type.this == exp.DataType.Type.DATE: - processed_column_type = exp.DataType( - this=exp.DataType.Type.TIMESTAMP, nested=False - ) + # This might not be necessary any more + # if column_type.this == exp.DataType.Type.DATE: + # processed_column_type = exp.DataType( + # this=exp.DataType.Type.TIMESTAMP, nested=False + # ) return ( column_identifier, diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index 29f5ddb3..c675b013 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -9,7 +9,7 @@ import duckdb import gcsfs -import pandas as pd +import polars as pl from dask.distributed import Worker, WorkerPlugin, get_worker from google.cloud import storage from metrics_tools.compute.types import ExportReference, ExportType @@ -187,7 +187,9 @@ def handle_query( ) -> t.Any: """Execute a duckdb load on a worker. - This executes the query with duckdb and writes the results to a gcs path. + This executes the query with duckdb and writes the results to a gcs + path. We need to use polars or pyarrow here because the pandas parquet + writer doesn't write the correct datatypes for trino. """ for ref, actual in dependencies.items(): @@ -196,21 +198,22 @@ def handle_query( ) self.get_for_cache(ref, actual) conn = self.connection - results: t.List[pd.DataFrame] = [] + results: t.List[pl.DataFrame] = [] for query in queries: self.logger.info(f"job[{job_id}][{task_id}]: Executing query {query}") - result = conn.execute(query).df() + result = conn.execute(query).pl() results.append(result) # Concatenate the results self.logger.info(f"job[{job_id}][{task_id}]: Concatenating results") - results_df = pd.concat(results) + combined_results = pl.concat(results) # Export the results to a parquet file in memory self.logger.info( f"job[{job_id}][{task_id}]: Uploading to gcs {result_path} with polars" ) with self.fs.open(f"{self._gcs_bucket}/{result_path}", "wb") as f: - results_df.to_parquet(f) # type: ignore + combined_results.write_parquet(f) # type: ignore + self.logger.info(f"job[{job_id}][{task_id}]: Upload completed") return task_id From 3c915e53df8008d8ce4c5fae7481a6b61e947f62 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 00:55:25 +0000 Subject: [PATCH 06/11] comments --- warehouse/metrics_tools/compute/result.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/warehouse/metrics_tools/compute/result.py b/warehouse/metrics_tools/compute/result.py index d9474d8a..b5fb8ac6 100644 --- a/warehouse/metrics_tools/compute/result.py +++ b/warehouse/metrics_tools/compute/result.py @@ -200,11 +200,8 @@ def process_columns_for_import( # timestamp. We can cast it downstream. column_identifier = exp.to_identifier(column_name) processed_column_type = column_type - # This might not be necessary any more - # if column_type.this == exp.DataType.Type.DATE: - # processed_column_type = exp.DataType( - # this=exp.DataType.Type.TIMESTAMP, nested=False - # ) + # Assuming that we use polars or pyarrow at the time of parquet write. + # It shouldn't be necessary to cast types. return ( column_identifier, From 2ce8167ed3bfdc97cd64d0e148b9659b6c6a95d3 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 00:58:43 +0000 Subject: [PATCH 07/11] use monthly --- warehouse/metrics_mesh/models/metrics_factories.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index eacbf7a3..6067ae5f 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -103,7 +103,7 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", slots=32, ), ), @@ -121,7 +121,7 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", slots=32, ), ), @@ -143,7 +143,7 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", slots=32, ), ), From 60124d0fc43932101489d5ccd9c5e3baf20fa6cd Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 01:38:11 +0000 Subject: [PATCH 08/11] materialize --- .../metrics_mesh/models/timeseries_metrics_by_project_v0.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql index 4a0b3d65..b38645d8 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_project_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_project AS ( From 75fa91253f42eb5091ec72c844297f0599a7a62f Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 01:39:36 +0000 Subject: [PATCH 09/11] materialize more --- .../metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql | 2 +- .../metrics_mesh/models/timeseries_metrics_by_collection_v0.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql index 1b9a89f4..28579835 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_artifact_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_artifact AS ( diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql index cac96af9..a4c5ae0e 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_collection_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_collection AS ( From a7489f154ba0d988536a3ece575f977feebeedc0 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 01:49:16 +0000 Subject: [PATCH 10/11] attempt to reduce stages --- .../models/metric_names_from_artifact.sql | 8 ++++++++ .../models/metric_names_from_collection.sql | 8 ++++++++ .../models/metric_names_from_project.sql | 8 ++++++++ warehouse/metrics_mesh/models/metrics_v0.sql | 15 ++++++--------- 4 files changed, 30 insertions(+), 9 deletions(-) create mode 100644 warehouse/metrics_mesh/models/metric_names_from_artifact.sql create mode 100644 warehouse/metrics_mesh/models/metric_names_from_collection.sql create mode 100644 warehouse/metrics_mesh/models/metric_names_from_project.sql diff --git a/warehouse/metrics_mesh/models/metric_names_from_artifact.sql b/warehouse/metrics_mesh/models/metric_names_from_artifact.sql new file mode 100644 index 00000000..55efa30f --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_artifact.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_artifact, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_artifact \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metric_names_from_collection.sql b/warehouse/metrics_mesh/models/metric_names_from_collection.sql new file mode 100644 index 00000000..fe477492 --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_collection.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_collection, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_collection \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metric_names_from_project.sql b/warehouse/metrics_mesh/models/metric_names_from_project.sql new file mode 100644 index 00000000..c2e30d94 --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_project.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_project, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_project \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metrics_v0.sql b/warehouse/metrics_mesh/models/metrics_v0.sql index e864fe86..8c9d020c 100644 --- a/warehouse/metrics_mesh/models/metrics_v0.sql +++ b/warehouse/metrics_mesh/models/metrics_v0.sql @@ -4,17 +4,14 @@ MODEL ( ); WITH unioned_metric_names AS ( - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_artifact + SELECT * + FROM metrics.metric_names_from_artifact UNION ALL - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_project + SELECT * + FROM metrics.metric_names_from_project UNION ALL - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_collection + SELECT * + FROM metrics.metric_names_from_collection ), all_timeseries_metric_names AS ( SELECT DISTINCT metric From db1109080d9963b75120f407a0a3337c77ee55c3 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 20 Dec 2024 02:16:31 +0000 Subject: [PATCH 11/11] fix --- warehouse/metrics_tools/compute/manual_testing_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/warehouse/metrics_tools/compute/manual_testing_utils.py b/warehouse/metrics_tools/compute/manual_testing_utils.py index cf6fe108..6475472f 100644 --- a/warehouse/metrics_tools/compute/manual_testing_utils.py +++ b/warehouse/metrics_tools/compute/manual_testing_utils.py @@ -112,6 +112,7 @@ def run_local_test( dependent_tables_map={ "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" }, + slots=2, batch_size=batch_size, cluster_max_size=cluster_size, cluster_min_size=cluster_size,