From f0ae36e61cfd0cc1d7fb4b9ea1aa37fb115dd8ce Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Thu, 19 Dec 2024 18:23:12 -0800 Subject: [PATCH] fix kubecluster slots (#2676) * fixes * Slots not getting applied due to forward-only * re-enable all * fix slots * Fix reimport * comments * use monthly * materialize * materialize more * attempt to reduce stages * fix --- .../models/metric_names_from_artifact.sql | 8 ++ .../models/metric_names_from_collection.sql | 8 ++ .../models/metric_names_from_project.sql | 8 ++ .../metrics_mesh/models/metrics_factories.py | 103 ++++++++++-------- warehouse/metrics_mesh/models/metrics_v0.sql | 15 +-- .../timeseries_metrics_by_artifact_v0.sql | 2 +- .../timeseries_metrics_by_collection_v0.sql | 2 +- .../timeseries_metrics_by_project_v0.sql | 2 +- warehouse/metrics_tools/compute/client.py | 28 +++-- warehouse/metrics_tools/compute/cluster.py | 19 +++- .../compute/manual_testing_utils.py | 1 + warehouse/metrics_tools/compute/result.py | 6 +- warehouse/metrics_tools/compute/test_app.py | 1 + warehouse/metrics_tools/compute/worker.py | 9 +- warehouse/metrics_tools/factory/factory.py | 2 +- 15 files changed, 134 insertions(+), 80 deletions(-) create mode 100644 warehouse/metrics_mesh/models/metric_names_from_artifact.sql create mode 100644 warehouse/metrics_mesh/models/metric_names_from_collection.sql create mode 100644 warehouse/metrics_mesh/models/metric_names_from_project.sql diff --git a/warehouse/metrics_mesh/models/metric_names_from_artifact.sql b/warehouse/metrics_mesh/models/metric_names_from_artifact.sql new file mode 100644 index 00000000..55efa30f --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_artifact.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_artifact, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_artifact \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metric_names_from_collection.sql b/warehouse/metrics_mesh/models/metric_names_from_collection.sql new file mode 100644 index 00000000..fe477492 --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_collection.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_collection, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_collection \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metric_names_from_project.sql b/warehouse/metrics_mesh/models/metric_names_from_project.sql new file mode 100644 index 00000000..c2e30d94 --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_project.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_project, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_project \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 37bb9aa7..6067ae5f 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -70,6 +70,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", # This determines how often this is calculated + slots=32, ), entity_types=["artifact", "project", "collection"], is_intermediate=True, @@ -102,26 +103,28 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", + slots=32, + ), + ), + "contributor_classifications": MetricQueryDef( + ref="contributor_activity_classification.sql", + vars={ + "full_time_ratio": 10 / 30, + "activity_event_types": [ + "COMMIT_CODE", + "ISSUE_OPENED", + "PULL_REQUEST_OPENED", + "PULL_REQUEST_MERGED", + ], + }, + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, ), ), - # "contributor_classifications": MetricQueryDef( - # ref="contributor_activity_classification.sql", - # vars={ - # "full_time_ratio": 10 / 30, - # "activity_event_types": [ - # "COMMIT_CODE", - # "ISSUE_OPENED", - # "PULL_REQUEST_OPENED", - # "PULL_REQUEST_MERGED", - # ], - # }, - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@daily", - # ), - # ), # Currently this query performs really poorly. We need to do some debugging on it # "user_retention_classifications": MetricQueryDef( # ref="user_retention_classification.sql", @@ -135,20 +138,22 @@ # ), # entity_types=["artifact", "project", "collection"], # ), - # "change_in_developer_activity": MetricQueryDef( - # ref="change_in_developers.sql", - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@daily", - # ), - # ), + "change_in_developer_activity": MetricQueryDef( + ref="change_in_developers.sql", + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, + ), + ), "commits_rolling": MetricQueryDef( ref="commits.sql", rolling=RollingConfig( windows=[10], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -158,6 +163,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -167,6 +173,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -176,6 +183,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -185,6 +193,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -194,6 +203,7 @@ windows=[90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -203,6 +213,7 @@ windows=[90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -222,6 +233,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", + slots=32, ), ), "gas_fees": MetricQueryDef( @@ -230,7 +242,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", - slots=8, + slots=16, ), entity_types=["artifact", "project", "collection"], ), @@ -244,29 +256,31 @@ ), entity_types=["artifact", "project", "collection"], ), - # "contributors_lifecycle": MetricQueryDef( - # ref="lifecycle.sql", - # vars={ - # "activity_event_types": [ - # "COMMIT_CODE", - # "ISSUE_OPENED", - # "PULL_REQUEST_OPENED", - # "PULL_REQUEST_MERGED", - # ], - # }, - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@monthly", - # ), - # entity_types=["artifact", "project", "collection"], - # ), + "contributors_lifecycle": MetricQueryDef( + ref="lifecycle.sql", + vars={ + "activity_event_types": [ + "COMMIT_CODE", + "ISSUE_OPENED", + "PULL_REQUEST_OPENED", + "PULL_REQUEST_MERGED", + ], + }, + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, + ), + entity_types=["artifact", "project", "collection"], + ), "funding_received": MetricQueryDef( ref="funding_received.sql", rolling=RollingConfig( windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -276,6 +290,7 @@ windows=[180], unit="day", cron="@daily", + slots=16, ), entity_types=["artifact", "project", "collection"], ), diff --git a/warehouse/metrics_mesh/models/metrics_v0.sql b/warehouse/metrics_mesh/models/metrics_v0.sql index e864fe86..8c9d020c 100644 --- a/warehouse/metrics_mesh/models/metrics_v0.sql +++ b/warehouse/metrics_mesh/models/metrics_v0.sql @@ -4,17 +4,14 @@ MODEL ( ); WITH unioned_metric_names AS ( - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_artifact + SELECT * + FROM metrics.metric_names_from_artifact UNION ALL - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_project + SELECT * + FROM metrics.metric_names_from_project UNION ALL - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_collection + SELECT * + FROM metrics.metric_names_from_collection ), all_timeseries_metric_names AS ( SELECT DISTINCT metric diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql index 1b9a89f4..28579835 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_artifact_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_artifact AS ( diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql index cac96af9..a4c5ae0e 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_collection_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_collection AS ( diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql index 4a0b3d65..b38645d8 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_project_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_project AS ( diff --git a/warehouse/metrics_tools/compute/client.py b/warehouse/metrics_tools/compute/client.py index 3e3646c5..292c0da2 100644 --- a/warehouse/metrics_tools/compute/client.py +++ b/warehouse/metrics_tools/compute/client.py @@ -117,6 +117,7 @@ def __init__( def calculate_metrics( self, + *, query_str: str, start: datetime, end: datetime, @@ -126,11 +127,11 @@ def calculate_metrics( ref: PeerMetricDependencyRef, locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], + slots: int, progress_handler: t.Optional[t.Callable[[JobStatusResponse], None]] = None, cluster_min_size: int = 6, cluster_max_size: int = 6, job_retries: int = 3, - slots: int = 1, execution_time: t.Optional[datetime] = None, ): """Calculate metrics for a given period and write the results to a gcs @@ -153,6 +154,7 @@ def calculate_metrics( locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. + slots (int): The number of slots to use for the job execution_time (t.Optional[datetime]): The execution time for the job Returns: @@ -165,17 +167,17 @@ def calculate_metrics( self.logger.info(f"cluster status: {status}") job_response = self.submit_job( - query_str, - start, - end, - dialect, - batch_size, - columns, - ref, - locals, - dependent_tables_map, - job_retries, + query_str=query_str, + start=start, + end=end, + dialect=dialect, + batch_size=batch_size, + columns=columns, + ref=ref, + locals=locals, + dependent_tables_map=dependent_tables_map, slots=slots, + job_retries=job_retries, execution_time=execution_time, ) job_id = job_response.job_id @@ -235,6 +237,7 @@ def wait_for_job( def submit_job( self, + *, query_str: str, start: datetime, end: datetime, @@ -244,8 +247,8 @@ def submit_job( ref: PeerMetricDependencyRef, locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], + slots: int, job_retries: t.Optional[int] = None, - slots: int = 2, execution_time: t.Optional[datetime] = None, ): """Submit a job to the metrics calculation service @@ -260,6 +263,7 @@ def submit_job( ref (PeerMetricDependencyRef): The dependency reference locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map + slots (int): The number of slots to use for the job job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. Returns: diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py index 61e93177..4a72d6f7 100644 --- a/warehouse/metrics_tools/compute/cluster.py +++ b/warehouse/metrics_tools/compute/cluster.py @@ -65,10 +65,7 @@ async def start_duckdb_cluster_async( resources_str = f'{",".join(resources_to_join)}' worker_command.extend(["--resources", resources_str]) - options: t.Dict[str, t.Any] = { - "namespace": namespace, - "worker_command": worker_command, - } + options: t.Dict[str, t.Any] = {"namespace": namespace} options.update(kwargs) if cluster_spec: options["custom_cluster_spec"] = cluster_spec @@ -379,7 +376,18 @@ def make_new_cluster( worker_memory_request: str, worker_memory_limit: str, worker_pool_type: str, + worker_resources: t.Dict[str, int], + worker_command: t.Optional[t.List[str]] = None, ): + worker_command = worker_command or ["dask", "worker"] + if worker_resources: + resources_to_join = [] + for resource, value in worker_resources.items(): + resources_to_join.append(f"{resource}={value}") + if resources_to_join: + resources_str = f'{",".join(resources_to_join)}' + worker_command.extend(["--resources", resources_str]) + spec = make_cluster_spec( name=f"{cluster_id}", resources={ @@ -387,6 +395,8 @@ def make_new_cluster( "limits": {"memory": scheduler_memory_limit}, }, image=image, + # The type for this says string but it accepts a list + worker_command=worker_command, # type: ignore ) spec["spec"]["scheduler"]["spec"]["tolerations"] = [ { @@ -464,4 +474,5 @@ def make_new_cluster_with_defaults(config: ClusterConfig): worker_memory_limit=config.worker_memory_limit, worker_memory_request=config.worker_memory_request, worker_pool_type=config.worker_pool_type, + worker_resources=config.worker_resources, ) diff --git a/warehouse/metrics_tools/compute/manual_testing_utils.py b/warehouse/metrics_tools/compute/manual_testing_utils.py index cf6fe108..6475472f 100644 --- a/warehouse/metrics_tools/compute/manual_testing_utils.py +++ b/warehouse/metrics_tools/compute/manual_testing_utils.py @@ -112,6 +112,7 @@ def run_local_test( dependent_tables_map={ "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" }, + slots=2, batch_size=batch_size, cluster_max_size=cluster_size, cluster_min_size=cluster_size, diff --git a/warehouse/metrics_tools/compute/result.py b/warehouse/metrics_tools/compute/result.py index 67f5e37a..b5fb8ac6 100644 --- a/warehouse/metrics_tools/compute/result.py +++ b/warehouse/metrics_tools/compute/result.py @@ -200,10 +200,8 @@ def process_columns_for_import( # timestamp. We can cast it downstream. column_identifier = exp.to_identifier(column_name) processed_column_type = column_type - if column_type.this == exp.DataType.Type.DATE: - processed_column_type = exp.DataType( - this=exp.DataType.Type.TIMESTAMP, nested=False - ) + # Assuming that we use polars or pyarrow at the time of parquet write. + # It shouldn't be necessary to cast types. return ( column_identifier, diff --git a/warehouse/metrics_tools/compute/test_app.py b/warehouse/metrics_tools/compute/test_app.py index dd084296..d6781294 100644 --- a/warehouse/metrics_tools/compute/test_app.py +++ b/warehouse/metrics_tools/compute/test_app.py @@ -100,6 +100,7 @@ def test_app_with_all_debugging(app_client_with_all_debugging): dependent_tables_map={ "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" }, + slots=2, batch_size=batch_size, cluster_max_size=cluster_size, cluster_min_size=cluster_size, diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index 763328fe..c675b013 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -187,7 +187,9 @@ def handle_query( ) -> t.Any: """Execute a duckdb load on a worker. - This executes the query with duckdb and writes the results to a gcs path. + This executes the query with duckdb and writes the results to a gcs + path. We need to use polars or pyarrow here because the pandas parquet + writer doesn't write the correct datatypes for trino. """ for ref, actual in dependencies.items(): @@ -203,14 +205,15 @@ def handle_query( results.append(result) # Concatenate the results self.logger.info(f"job[{job_id}][{task_id}]: Concatenating results") - results_df = pl.concat(results) + combined_results = pl.concat(results) # Export the results to a parquet file in memory self.logger.info( f"job[{job_id}][{task_id}]: Uploading to gcs {result_path} with polars" ) with self.fs.open(f"{self._gcs_bucket}/{result_path}", "wb") as f: - results_df.write_parquet(f) # type: ignore + combined_results.write_parquet(f) # type: ignore + self.logger.info(f"job[{job_id}][{task_id}]: Upload completed") return task_id diff --git a/warehouse/metrics_tools/factory/factory.py b/warehouse/metrics_tools/factory/factory.py index b379944f..faed0571 100644 --- a/warehouse/metrics_tools/factory/factory.py +++ b/warehouse/metrics_tools/factory/factory.py @@ -688,7 +688,7 @@ def generated_rolling_query( batch_size=env.ensure_int("SQLMESH_MCS_BATCH_SIZE", 10), columns=columns, ref=ref, - slots=ref.get("slots", 1), + slots=ref.get("slots", env.ensure_int("SQLMESH_DEFAULT_MCS_SLOTS", 2)), locals=sqlmesh_vars, dependent_tables_map=create_dependent_tables_map( context, rendered_query_str