From 1a9849d76b9899e4a148bfba30ab56de2294eda9 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 12:41:19 -0800 Subject: [PATCH 01/13] MCS Bugs (#2667) * Found these MCS bugs wrote regression tests to cover them * additional refactoring * remove unused comments --- warehouse/metrics_tools/compute/service.py | 24 +-- .../metrics_tools/compute/test_service.py | 91 +++++++++- warehouse/metrics_tools/compute/test_types.py | 164 ++++++++++++++++++ warehouse/metrics_tools/compute/types.py | 28 +++ warehouse/metrics_tools/test_runner.py | 28 +++ 5 files changed, 312 insertions(+), 23 deletions(-) create mode 100644 warehouse/metrics_tools/compute/test_types.py create mode 100644 warehouse/metrics_tools/test_runner.py diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index 378c0fd3..89a747a1 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -322,9 +322,7 @@ async def _notify_job_pending(self, job_id: str, input: JobSubmitRequest): async def _notify_job_running(self, job_id: str): await self._update_job_state( job_id, - QueryJobUpdate( - time=datetime.now(), - scope=QueryJobUpdateScope.JOB, + QueryJobUpdate.create_job_update( payload=QueryJobStateUpdate( status=QueryJobStatus.RUNNING, has_remaining_tasks=True, @@ -335,9 +333,7 @@ async def _notify_job_running(self, job_id: str): async def _notify_job_task_completed(self, job_id: str, task_id: str): await self._update_job_state( job_id, - QueryJobUpdate( - time=datetime.now(), - scope=QueryJobUpdateScope.TASK, + QueryJobUpdate.create_task_update( payload=QueryJobTaskUpdate( task_id=task_id, status=QueryJobTaskStatus.SUCCEEDED, @@ -350,9 +346,7 @@ async def _notify_job_task_failed( ): await self._update_job_state( job_id, - QueryJobUpdate( - time=datetime.now(), - scope=QueryJobUpdateScope.TASK, + QueryJobUpdate.create_task_update( payload=QueryJobTaskUpdate( task_id=task_id, status=QueryJobTaskStatus.FAILED, @@ -364,9 +358,7 @@ async def _notify_job_task_failed( async def _notify_job_task_cancelled(self, job_id: str, task_id: str): await self._update_job_state( job_id, - QueryJobUpdate( - time=datetime.now(), - scope=QueryJobUpdateScope.TASK, + QueryJobUpdate.create_task_update( payload=QueryJobTaskUpdate( task_id=task_id, status=QueryJobTaskStatus.CANCELLED, @@ -377,9 +369,7 @@ async def _notify_job_task_cancelled(self, job_id: str, task_id: str): async def _notify_job_completed(self, job_id: str): await self._update_job_state( job_id, - QueryJobUpdate( - time=datetime.now(), - scope=QueryJobUpdateScope.JOB, + QueryJobUpdate.create_job_update( payload=QueryJobStateUpdate( status=QueryJobStatus.COMPLETED, has_remaining_tasks=False, @@ -395,9 +385,7 @@ async def _notify_job_failed( ): await self._update_job_state( job_id, - QueryJobUpdate( - time=datetime.now(), - scope=QueryJobUpdateScope.JOB, + QueryJobUpdate.create_job_update( payload=QueryJobStateUpdate( status=QueryJobStatus.FAILED, has_remaining_tasks=has_remaining_tasks, diff --git a/warehouse/metrics_tools/compute/test_service.py b/warehouse/metrics_tools/compute/test_service.py index 0731b678..b71b388a 100644 --- a/warehouse/metrics_tools/compute/test_service.py +++ b/warehouse/metrics_tools/compute/test_service.py @@ -1,4 +1,5 @@ import asyncio +import typing as t from datetime import datetime import pytest @@ -11,6 +12,7 @@ ColumnsDefinition, ExportReference, ExportType, + JobStatusResponse, JobSubmitRequest, QueryJobStatus, ) @@ -62,12 +64,91 @@ async def test_metrics_calculation_service(): ) async def wait_for_job_to_complete(): - status = await service.get_job_status(response.job_id) - while status.status in [QueryJobStatus.PENDING, QueryJobStatus.RUNNING]: - status = await service.get_job_status(response.job_id) - await asyncio.sleep(1) + updates: t.List[JobStatusResponse] = [] + future = asyncio.Future() + + async def collect_updates(update: JobStatusResponse): + updates.append(update) + if update.status not in [QueryJobStatus.PENDING, QueryJobStatus.RUNNING]: + future.set_result(updates) + + close = service.listen_for_job_updates(response.job_id, collect_updates) + return (close, future) + + close, updates_future = await asyncio.create_task(wait_for_job_to_complete()) + updates = await updates_future + close() + + assert len(updates) == 5 + + status = await service.get_job_status(response.job_id) + assert status.status == QueryJobStatus.COMPLETED + + await service.close() + + +@pytest.mark.asyncio +async def test_metrics_calculation_service_using_monthly_cron(): + service = MetricsCalculationService.setup( + "someid", + "bucket", + "result_path_prefix", + ClusterManager.with_dummy_metrics_plugin(LocalClusterFactory()), + await CacheExportManager.setup(FakeExportAdapter()), + DummyImportAdapter(), + ) + await service.start_cluster(ClusterStartRequest(min_size=1, max_size=1)) + await service.add_existing_exported_table_references( + { + "source.table123": ExportReference( + table_name="export_table123", + type=ExportType.GCS, + columns=ColumnsDefinition( + columns=[("col1", "INT"), ("col2", "TEXT")], dialect="duckdb" + ), + payload={"gcs_path": "gs://bucket/result_path_prefix/export_table123"}, + ), + } + ) + response = await service.submit_job( + JobSubmitRequest( + query_str="SELECT * FROM ref.table123", + start=datetime(2021, 1, 1), + end=datetime(2021, 4, 1), + dialect="duckdb", + batch_size=1, + columns=[("col1", "int"), ("col2", "string")], + ref=PeerMetricDependencyRef( + name="test", + entity_type="artifact", + window=30, + unit="day", + cron="@monthly", + ), + execution_time=datetime.now(), + locals={}, + dependent_tables_map={"source.table123": "source.table123"}, + ) + ) + + async def wait_for_job_to_complete(): + updates: t.List[JobStatusResponse] = [] + future = asyncio.Future() + + async def collect_updates(update: JobStatusResponse): + updates.append(update) + if update.status not in [QueryJobStatus.PENDING, QueryJobStatus.RUNNING]: + future.set_result(updates) + + close = service.listen_for_job_updates(response.job_id, collect_updates) + return (close, future) + + close, updates_future = await asyncio.create_task(wait_for_job_to_complete()) + updates = await updates_future + close() + + assert len(updates) == 6 - await asyncio.wait_for(asyncio.create_task(wait_for_job_to_complete()), timeout=60) status = await service.get_job_status(response.job_id) assert status.status == QueryJobStatus.COMPLETED diff --git a/warehouse/metrics_tools/compute/test_types.py b/warehouse/metrics_tools/compute/test_types.py new file mode 100644 index 00000000..5e81a726 --- /dev/null +++ b/warehouse/metrics_tools/compute/test_types.py @@ -0,0 +1,164 @@ +import pytest + +from .types import ( + QueryJobState, + QueryJobStateUpdate, + QueryJobStatus, + QueryJobTaskStatus, + QueryJobTaskUpdate, + QueryJobUpdate, +) + + +@pytest.mark.parametrize( + "description,updates,expected_status,expected_has_remaining_tasks,expected_exceptions_count", + [ + ( + "should fail if job failed", + [ + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.FAILED, + has_remaining_tasks=False, + exception="failed", + ) + ) + ], + QueryJobStatus.FAILED, + False, + 1, + ), + ( + "should still be running if no failure", + [ + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.RUNNING, + has_remaining_tasks=True, + ), + ), + QueryJobUpdate.create_task_update( + QueryJobTaskUpdate( + status=QueryJobTaskStatus.SUCCEEDED, + task_id="task_id", + ) + ), + ], + QueryJobStatus.RUNNING, + True, + 0, + ), + ( + "should fail if task failed and still has remaining tasks", + [ + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.RUNNING, + has_remaining_tasks=True, + ), + ), + QueryJobUpdate.create_task_update( + QueryJobTaskUpdate( + status=QueryJobTaskStatus.FAILED, + task_id="task_id", + exception="failed", + ) + ), + ], + QueryJobStatus.FAILED, + True, + 1, + ), + ( + "should fail if task failed and job failed but no remaining tasks", + [ + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.RUNNING, + has_remaining_tasks=True, + ), + ), + QueryJobUpdate.create_task_update( + QueryJobTaskUpdate( + status=QueryJobTaskStatus.FAILED, + task_id="task_id", + exception="failed", + ) + ), + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.FAILED, + has_remaining_tasks=False, + exception="failed", + ) + ), + ], + QueryJobStatus.FAILED, + False, + 2, + ), + ( + "should fail if task failed and job supposedly completed but no remaining tasks", + [ + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.RUNNING, + has_remaining_tasks=True, + ), + ), + QueryJobUpdate.create_task_update( + QueryJobTaskUpdate( + status=QueryJobTaskStatus.FAILED, + task_id="task_id", + exception="failed", + ) + ), + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.COMPLETED, + has_remaining_tasks=False, + ) + ), + ], + QueryJobStatus.FAILED, + False, + 1, + ), + ( + "should fail if a task is cancelled", + [ + QueryJobUpdate.create_job_update( + QueryJobStateUpdate( + status=QueryJobStatus.RUNNING, + has_remaining_tasks=True, + ), + ), + QueryJobUpdate.create_task_update( + QueryJobTaskUpdate( + status=QueryJobTaskStatus.CANCELLED, + task_id="task_id", + ) + ), + ], + QueryJobStatus.FAILED, + True, + 0, + ), + ], +) +def test_query_job_state( + description, + updates, + expected_status, + expected_has_remaining_tasks, + expected_exceptions_count, +): + state = QueryJobState.start("job_id", 4) + for update in updates: + state.update(update) + assert state.status == expected_status, description + assert state.has_remaining_tasks == expected_has_remaining_tasks, description + + response = state.as_response() + assert response.status == expected_status, description + assert len(response.exceptions) == expected_exceptions_count, description diff --git a/warehouse/metrics_tools/compute/types.py b/warehouse/metrics_tools/compute/types.py index 13d6c21b..b0b2baf4 100644 --- a/warehouse/metrics_tools/compute/types.py +++ b/warehouse/metrics_tools/compute/types.py @@ -149,6 +149,14 @@ class QueryJobUpdate(BaseModel): scope: QueryJobUpdateScope payload: QueryJobUpdateTypes = Field(discriminator="type") + @classmethod + def create_job_update(cls, payload: QueryJobStateUpdate) -> "QueryJobUpdate": + return cls(time=datetime.now(), scope=QueryJobUpdateScope.JOB, payload=payload) + + @classmethod + def create_task_update(cls, payload: QueryJobTaskUpdate) -> "QueryJobUpdate": + return cls(time=datetime.now(), scope=QueryJobUpdateScope.TASK, payload=payload) + class ClusterStatus(BaseModel): status: str @@ -219,6 +227,25 @@ class QueryJobState(BaseModel): status: QueryJobStatus = QueryJobStatus.PENDING updates: t.List[QueryJobUpdate] + @classmethod + def start(cls, job_id: str, tasks_count: int) -> "QueryJobState": + now = datetime.now() + return cls( + job_id=job_id, + created_at=now, + tasks_count=tasks_count, + updates=[ + QueryJobUpdate( + time=now, + scope=QueryJobUpdateScope.JOB, + payload=QueryJobStateUpdate( + status=QueryJobStatus.PENDING, + has_remaining_tasks=True, + ), + ) + ], + ) + def latest_update(self) -> QueryJobUpdate: return self.updates[-1] @@ -233,6 +260,7 @@ def update(self, update: QueryJobUpdate): self.has_remaining_tasks = False elif payload.status == QueryJobStatus.FAILED: self.has_remaining_tasks = payload.has_remaining_tasks + self.status = payload.status elif payload.status == QueryJobStatus.RUNNING: self.status = payload.status else: diff --git a/warehouse/metrics_tools/test_runner.py b/warehouse/metrics_tools/test_runner.py new file mode 100644 index 00000000..0d9957c1 --- /dev/null +++ b/warehouse/metrics_tools/test_runner.py @@ -0,0 +1,28 @@ +from datetime import datetime + +import duckdb +from metrics_tools.definition import PeerMetricDependencyRef +from metrics_tools.runner import MetricsRunner + + +def test_runner_rendering(): + runner = MetricsRunner.create_duckdb_execution_context( + conn=duckdb.connect(), + query=""" + select time from foo + where time between @metrics_start('DATE') + and @metrics_end('DATE') + """, + ref=PeerMetricDependencyRef( + name="test", + entity_type="artifact", + window=30, + unit="day", + cron="@monthly", + ), + locals={}, + ) + start = datetime.strptime("2024-01-01", "%Y-%m-%d") + end = datetime.strptime("2024-12-31", "%Y-%m-%d") + rendered = list(runner.render_rolling_queries(start, end)) + assert len(rendered) == 12 From 32b0720061d412e7a2c582c5c5ea7b062531987a Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Wed, 18 Dec 2024 22:10:41 +0000 Subject: [PATCH 02/13] Increase size of workers --- .../metrics-calculation-service/custom-helm-values.yaml | 4 ++-- ops/tf-modules/warehouse-cluster/main.tf | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml b/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml index c40ea324..5b910007 100644 --- a/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml +++ b/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml @@ -32,8 +32,8 @@ spec: worker: threads: "16" memory: - limit: "96Gi" - request: "90Gi" + limit: "206Gi" + request: "200Gi" poolType: "mcs-worker" duckdb_path: "/scratch/mcs-local.db" trino: diff --git a/ops/tf-modules/warehouse-cluster/main.tf b/ops/tf-modules/warehouse-cluster/main.tf index 2589bc87..81931b64 100644 --- a/ops/tf-modules/warehouse-cluster/main.tf +++ b/ops/tf-modules/warehouse-cluster/main.tf @@ -158,10 +158,10 @@ locals { # MCS Workers { name = "${var.cluster_name}-mcs-worker-node-pool" - machine_type = "n1-highmem-16" + machine_type = "n1-highmem-32" node_locations = join(",", var.cluster_zones) min_count = 0 - max_count = 20 + max_count = 50 local_ssd_count = 0 local_ssd_ephemeral_storage_count = 2 spot = true From a5d16c6a26140061ee9597129c305a32f5372b3c Mon Sep 17 00:00:00 2001 From: ccerv1 Date: Wed, 18 Dec 2024 17:16:53 -0500 Subject: [PATCH 03/13] docs/gitcoin-social-graph --- .../docs/tutorials/gitcoin-social-networks.md | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 apps/docs/docs/tutorials/gitcoin-social-networks.md diff --git a/apps/docs/docs/tutorials/gitcoin-social-networks.md b/apps/docs/docs/tutorials/gitcoin-social-networks.md new file mode 100644 index 00000000..fd93915d --- /dev/null +++ b/apps/docs/docs/tutorials/gitcoin-social-networks.md @@ -0,0 +1,42 @@ +--- +title: Funding in a Social Network +sidebar_position: 5 +--- + +Analyze Gitcoin grants funding in a social network. New to OSO? Check out our [Getting Started guide](../get-started/index.md) to set up your BigQuery or API access. + +This tutorial combines Farcaster and Gitcoin data to to identify popular projects within a social network. + +## BigQuery + +If you haven't already, then the first step is to subscribe to OSO public datasets in BigQuery. You can do this by clicking the "Subscribe" button on our [Datasets page](../integrate/datasets/#oso-production-data-pipeline). For this tutorial, you'll need to subscribe to the Gitcoin and Karma3/OpenRank datasets. (You can also use the Farcaster dataset in place of OpenRank.) + +The following queries should work if you copy-paste them into your [BigQuery console](https://console.cloud.google.com/bigquery). + +### Identify popular projects within your social network + +```sql +select distinct + donations.donor_address, + users.user_source_id as fid, + users.user_name as username, + donations.project_name, + amount_in_usd, + timestamp +from `gitcoin.all_donations` as donations +join `oso_production.artifacts_by_user_v1` as users + on lower(donations.donor_address) = users.artifact_name +where + user_source = 'FARCASTER' + and users.user_source_id in ( + with max_date as ( + select max(date) as last_date + from `karma3.localtrust` + ) + select cast(j as string) as fid + from `karma3.localtrust` + where i = 5650 + order by v desc + limit 150 + ) +``` From 98036f5251524b663aa2191e5dde68e55d341c68 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 14:09:32 -0800 Subject: [PATCH 04/13] More MCS Bugs (#2668) * Ensure the import isn't actually made into trino with failed data * Additional fixes for error handling --- warehouse/metrics_tools/compute/service.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index 89a747a1..d47bdcf1 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -141,8 +141,7 @@ async def _handle_query_job_submit_request( exported_dependent_tables_map = await self.resolve_dependent_tables(input) except Exception as e: self.logger.error(f"job[{job_id}] failed to export dependencies: {e}") - await self._notify_job_failed(job_id, False, e) - return + raise e self.logger.debug(f"job[{job_id}] dependencies exported") tasks = await self._batch_query_to_scheduler( @@ -163,8 +162,15 @@ async def _handle_query_job_submit_request( f"job[{job_id}] task failed with uncaught exception: {e}" ) exceptions.append(e) + # Report failure early for any listening clients. The server + # will collect all errors for any internal reporting needed await self._notify_job_failed(job_id, True, e) + # If there are any exceptions then we report those as failed and short + # circuit this method + if len(exceptions) > 0: + raise JobTasksFailed(job_id, len(exceptions), exceptions) + # Import the final result into the database self.logger.info("job[{job_id}]: importing final result into the database") await self.import_adapter.import_reference(calculation_export, final_export) @@ -172,9 +178,6 @@ async def _handle_query_job_submit_request( self.logger.debug(f"job[{job_id}]: notifying job completed") await self._notify_job_completed(job_id) - if len(exceptions) > 0: - raise JobTasksFailed(job_id, len(exceptions), exceptions) - async def _batch_query_to_scheduler( self, job_id: str, From e70c95af5ceb7607923d152ef2dc052874449574 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Wed, 18 Dec 2024 22:29:02 +0000 Subject: [PATCH 05/13] Adjust worker size again --- .../metrics-calculation-service/custom-helm-values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml b/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml index 5b910007..c86010c4 100644 --- a/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml +++ b/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml @@ -32,8 +32,8 @@ spec: worker: threads: "16" memory: - limit: "206Gi" - request: "200Gi" + limit: "192Gi" + request: "180Gi" poolType: "mcs-worker" duckdb_path: "/scratch/mcs-local.db" trino: From f106b101707095ab58f9d09ca0621bb9300eb7b6 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Wed, 18 Dec 2024 23:14:53 +0000 Subject: [PATCH 06/13] MCS: Scale up again --- .../metrics-calculation-service/custom-helm-values.yaml | 4 ++-- ops/tf-modules/warehouse-cluster/main.tf | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml b/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml index c86010c4..10d97238 100644 --- a/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml +++ b/ops/k8s-apps/production/metrics-calculation-service/custom-helm-values.yaml @@ -32,8 +32,8 @@ spec: worker: threads: "16" memory: - limit: "192Gi" - request: "180Gi" + limit: "390Gi" + request: "375Gi" poolType: "mcs-worker" duckdb_path: "/scratch/mcs-local.db" trino: diff --git a/ops/tf-modules/warehouse-cluster/main.tf b/ops/tf-modules/warehouse-cluster/main.tf index 81931b64..7073f655 100644 --- a/ops/tf-modules/warehouse-cluster/main.tf +++ b/ops/tf-modules/warehouse-cluster/main.tf @@ -158,7 +158,7 @@ locals { # MCS Workers { name = "${var.cluster_name}-mcs-worker-node-pool" - machine_type = "n1-highmem-32" + machine_type = "n1-highmem-64" node_locations = join(",", var.cluster_zones) min_count = 0 max_count = 50 From 3cd8faa6f5ebdc0ac1a573c7accda9b1aaff8c9d Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 15:34:47 -0800 Subject: [PATCH 07/13] MCS: More error handling improvements (#2669) --- warehouse/metrics_tools/compute/service.py | 43 +++++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index d47bdcf1..f3871911 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -38,22 +38,46 @@ logger.setLevel(logging.DEBUG) -class JobFailed(Exception): +class JobError(Exception): pass -class JobTasksFailed(JobFailed): +class JobFailed(JobError): exceptions: t.List[Exception] + cancellations: t.List[str] failures: int - def __init__(self, job_id: str, failures: int, exceptions: t.List[Exception]): + def __init__( + self, + job_id: str, + failures: int, + exceptions: t.List[Exception], + cancellations: t.List[str], + ): self.failures = failures self.exceptions = exceptions + self.cancellations = cancellations super().__init__( - f"job[{job_id}] failed with {failures} failures and {len(exceptions)} exceptions" + f"job[{job_id}] failed with {failures} failures and {len(exceptions)} exceptions and {len(cancellations)} cancellations" ) +class JobTaskCancelled(JobError): + task_id: str + + def __init__(self, task_id: str): + self.task_id = task_id + super().__init__(f"task {task_id} was cancelled") + + +class JobTaskFailed(JobError): + exception: Exception + + def __init__(self, exception: Exception): + self.exception = exception + super().__init__(f"task failed with exception: {exception}") + + class MetricsCalculationService: id: str gcs_bucket: str @@ -153,10 +177,15 @@ async def _handle_query_job_submit_request( self.logger.warning("job[{job_id}] batch count mismatch") exceptions = [] + cancellations = [] for next_task in asyncio.as_completed(tasks): try: await next_task + except JobTaskCancelled as e: + cancellations.append(e.task_id) + except JobTaskFailed as e: + exceptions.append(e.exception) except Exception as e: self.logger.error( f"job[{job_id}] task failed with uncaught exception: {e}" @@ -168,8 +197,8 @@ async def _handle_query_job_submit_request( # If there are any exceptions then we report those as failed and short # circuit this method - if len(exceptions) > 0: - raise JobTasksFailed(job_id, len(exceptions), exceptions) + if len(exceptions) > 0 or len(cancellations) > 0: + raise JobFailed(job_id, len(exceptions), exceptions, cancellations) # Import the final result into the database self.logger.info("job[{job_id}]: importing final result into the database") @@ -246,9 +275,11 @@ async def _submit_query_task_to_scheduler( except CancelledError as e: self.logger.error(f"job[{job_id}] task cancelled {e.args}") await self._notify_job_task_cancelled(job_id, task_id) + raise JobTaskCancelled(task_id) except Exception as e: self.logger.error(f"job[{job_id}] task failed with exception: {e}") await self._notify_job_task_failed(job_id, task_id, e) + raise JobTaskFailed(e) return task_id async def close(self): From e58c3b43b65258a7bef53b97ce4a01a2b42ad9f6 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 18:58:57 -0800 Subject: [PATCH 08/13] MCS: Increase disk (#2671) MCS: Increase size of disk --- ops/tf-modules/warehouse-cluster/main.tf | 2 +- warehouse/metrics_tools/compute/service.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ops/tf-modules/warehouse-cluster/main.tf b/ops/tf-modules/warehouse-cluster/main.tf index 7073f655..01b367ed 100644 --- a/ops/tf-modules/warehouse-cluster/main.tf +++ b/ops/tf-modules/warehouse-cluster/main.tf @@ -163,7 +163,7 @@ locals { min_count = 0 max_count = 50 local_ssd_count = 0 - local_ssd_ephemeral_storage_count = 2 + local_ssd_ephemeral_storage_count = 3 spot = true disk_size_gb = 100 disk_type = "pd-standard" diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index f3871911..e3579c62 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -174,7 +174,7 @@ async def _handle_query_job_submit_request( total = len(tasks) if total != input.batch_count(): - self.logger.warning("job[{job_id}] batch count mismatch") + self.logger.warning(f"job[{job_id}] batch count mismatch") exceptions = [] cancellations = [] @@ -201,7 +201,7 @@ async def _handle_query_job_submit_request( raise JobFailed(job_id, len(exceptions), exceptions, cancellations) # Import the final result into the database - self.logger.info("job[{job_id}]: importing final result into the database") + self.logger.info(f"job[{job_id}]: importing final result into the database") await self.import_adapter.import_reference(calculation_export, final_export) self.logger.debug(f"job[{job_id}]: notifying job completed") From 8b6c0129a92874d54517f6ea126674b24cba6149 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 19:00:34 -0800 Subject: [PATCH 09/13] Use polars and do an async write to gcs (#2672) --- warehouse/metrics_tools/compute/worker.py | 31 +++++++++++++---------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index a9818e30..763328fe 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -1,5 +1,4 @@ # The worker initialization -import io import logging import os import time @@ -9,7 +8,8 @@ from threading import Lock import duckdb -import pandas as pd +import gcsfs +import polars as pl from dask.distributed import Worker, WorkerPlugin, get_worker from google.cloud import storage from metrics_tools.compute.types import ExportReference, ExportType @@ -72,6 +72,7 @@ def __init__( self._gcs_secret = gcs_secret self._duckdb_path = duckdb_path self._conn = None + self._fs = None self._cache_status: t.Dict[str, bool] = {} self._catalog = None self._mode = "duckdb" @@ -93,6 +94,7 @@ def setup(self, worker: Worker): ); """ self._conn.sql(sql) + self._fs = gcsfs.GCSFileSystem() def teardown(self, worker: Worker): if self._conn: @@ -170,6 +172,11 @@ def upload_to_gcs_bucket(self, blob_path: str, file: t.IO): blob = bucket.blob(blob_path) blob.upload_from_file(file) + @property + def fs(self): + assert self._fs is not None, "GCSFS not initialized" + return self._fs + def handle_query( self, job_id: str, @@ -182,30 +189,28 @@ def handle_query( This executes the query with duckdb and writes the results to a gcs path. """ + for ref, actual in dependencies.items(): self.logger.info( f"job[{job_id}][{task_id}] Loading cache for {ref}:{actual}" ) self.get_for_cache(ref, actual) conn = self.connection - results: t.List[pd.DataFrame] = [] + results: t.List[pl.DataFrame] = [] for query in queries: self.logger.info(f"job[{job_id}][{task_id}]: Executing query {query}") - result = conn.execute(query).df() + result = conn.execute(query).pl() results.append(result) # Concatenate the results self.logger.info(f"job[{job_id}][{task_id}]: Concatenating results") - results_df = pd.concat(results) + results_df = pl.concat(results) # Export the results to a parquet file in memory - self.logger.info(f"job[{job_id}][{task_id}]: Writing to in memory parquet") - inmem_file = io.BytesIO() - results_df.to_parquet(inmem_file) - inmem_file.seek(0) - - # Upload the parquet to gcs - self.logger.info(f"job[{job_id}][{task_id}]: Uploading to gcs {result_path}") - self.upload_to_gcs_bucket(result_path, inmem_file) + self.logger.info( + f"job[{job_id}][{task_id}]: Uploading to gcs {result_path} with polars" + ) + with self.fs.open(f"{self._gcs_bucket}/{result_path}", "wb") as f: + results_df.write_parquet(f) # type: ignore return task_id From bba3097efca86ccf02b90a4ee80daf7c1230330e Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 18 Dec 2024 23:58:21 -0800 Subject: [PATCH 10/13] Use slots for large models (#2673) * Use slots for large models * Use the slots * Fix tests * fixed for kubecluster resource config * fix * disable change in devs test * disable tests! --- .../test_change_in_developers_over_window.yml | 0 .../metrics_mesh/models/metrics_factories.py | 90 ++++++++++--------- warehouse/metrics_tools/compute/app.py | 1 + warehouse/metrics_tools/compute/client.py | 10 ++- warehouse/metrics_tools/compute/cluster.py | 34 +++++-- warehouse/metrics_tools/compute/debug.py | 1 + warehouse/metrics_tools/compute/service.py | 3 + warehouse/metrics_tools/compute/types.py | 2 + warehouse/metrics_tools/definition.py | 31 +++++-- warehouse/metrics_tools/factory/factory.py | 17 +++- 10 files changed, 124 insertions(+), 65 deletions(-) rename warehouse/metrics_mesh/{tests => disabled-tests}/test_change_in_developers_over_window.yml (100%) diff --git a/warehouse/metrics_mesh/tests/test_change_in_developers_over_window.yml b/warehouse/metrics_mesh/disabled-tests/test_change_in_developers_over_window.yml similarity index 100% rename from warehouse/metrics_mesh/tests/test_change_in_developers_over_window.yml rename to warehouse/metrics_mesh/disabled-tests/test_change_in_developers_over_window.yml diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 8b25f538..37bb9aa7 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -88,6 +88,8 @@ windows=[30, 90, 180], unit="day", cron="@daily", # This determines how often this is calculated + model_batch_size=90, + slots=32, ), entity_types=["artifact", "project", "collection"], is_intermediate=True, @@ -103,23 +105,23 @@ cron="@daily", ), ), - "contributor_classifications": MetricQueryDef( - ref="contributor_activity_classification.sql", - vars={ - "full_time_ratio": 10 / 30, - "activity_event_types": [ - "COMMIT_CODE", - "ISSUE_OPENED", - "PULL_REQUEST_OPENED", - "PULL_REQUEST_MERGED", - ], - }, - rolling=RollingConfig( - windows=[30, 90, 180], - unit="day", - cron="@daily", - ), - ), + # "contributor_classifications": MetricQueryDef( + # ref="contributor_activity_classification.sql", + # vars={ + # "full_time_ratio": 10 / 30, + # "activity_event_types": [ + # "COMMIT_CODE", + # "ISSUE_OPENED", + # "PULL_REQUEST_OPENED", + # "PULL_REQUEST_MERGED", + # ], + # }, + # rolling=RollingConfig( + # windows=[30, 90, 180], + # unit="day", + # cron="@daily", + # ), + # ), # Currently this query performs really poorly. We need to do some debugging on it # "user_retention_classifications": MetricQueryDef( # ref="user_retention_classification.sql", @@ -133,14 +135,14 @@ # ), # entity_types=["artifact", "project", "collection"], # ), - "change_in_developer_activity": MetricQueryDef( - ref="change_in_developers.sql", - rolling=RollingConfig( - windows=[30, 90, 180], - unit="day", - cron="@daily", - ), - ), + # "change_in_developer_activity": MetricQueryDef( + # ref="change_in_developers.sql", + # rolling=RollingConfig( + # windows=[30, 90, 180], + # unit="day", + # cron="@daily", + # ), + # ), "commits_rolling": MetricQueryDef( ref="commits.sql", rolling=RollingConfig( @@ -228,6 +230,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -236,27 +239,28 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@monthly", - ), - entity_types=["artifact", "project", "collection"], - ), - "contributors_lifecycle": MetricQueryDef( - ref="lifecycle.sql", - vars={ - "activity_event_types": [ - "COMMIT_CODE", - "ISSUE_OPENED", - "PULL_REQUEST_OPENED", - "PULL_REQUEST_MERGED", - ], - }, - rolling=RollingConfig( - windows=[30, 90, 180], - unit="day", - cron="@monthly", + cron="@daily", + slots=32, ), entity_types=["artifact", "project", "collection"], ), + # "contributors_lifecycle": MetricQueryDef( + # ref="lifecycle.sql", + # vars={ + # "activity_event_types": [ + # "COMMIT_CODE", + # "ISSUE_OPENED", + # "PULL_REQUEST_OPENED", + # "PULL_REQUEST_MERGED", + # ], + # }, + # rolling=RollingConfig( + # windows=[30, 90, 180], + # unit="day", + # cron="@monthly", + # ), + # entity_types=["artifact", "project", "collection"], + # ), "funding_received": MetricQueryDef( ref="funding_received.sql", rolling=RollingConfig( diff --git a/warehouse/metrics_tools/compute/app.py b/warehouse/metrics_tools/compute/app.py index bd472b37..70ab2042 100644 --- a/warehouse/metrics_tools/compute/app.py +++ b/warehouse/metrics_tools/compute/app.py @@ -91,6 +91,7 @@ async def initialize_app(app: FastAPI): cluster_spec = make_new_cluster_with_defaults(config) cluster_factory = KubeClusterFactory( config.cluster_namespace, + config.worker_resources, cluster_spec=cluster_spec, shutdown_on_close=not config.debug_cluster_no_shutdown, ) diff --git a/warehouse/metrics_tools/compute/client.py b/warehouse/metrics_tools/compute/client.py index 1da49f1e..3e3646c5 100644 --- a/warehouse/metrics_tools/compute/client.py +++ b/warehouse/metrics_tools/compute/client.py @@ -130,6 +130,8 @@ def calculate_metrics( cluster_min_size: int = 6, cluster_max_size: int = 6, job_retries: int = 3, + slots: int = 1, + execution_time: t.Optional[datetime] = None, ): """Calculate metrics for a given period and write the results to a gcs folder. This method is a high level method that triggers all of the @@ -151,6 +153,7 @@ def calculate_metrics( locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. + execution_time (t.Optional[datetime]): The execution time for the job Returns: ExportReference: The export reference for the resulting calculation @@ -172,6 +175,8 @@ def calculate_metrics( locals, dependent_tables_map, job_retries, + slots=slots, + execution_time=execution_time, ) job_id = job_response.job_id export_reference = job_response.export_reference @@ -240,6 +245,8 @@ def submit_job( locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], job_retries: t.Optional[int] = None, + slots: int = 2, + execution_time: t.Optional[datetime] = None, ): """Submit a job to the metrics calculation service @@ -268,8 +275,9 @@ def submit_job( ref=ref, locals=locals, dependent_tables_map=dependent_tables_map, + slots=slots, retries=job_retries, - execution_time=datetime.now(), + execution_time=execution_time or datetime.now(), ) job_response = self.service_post_with_input( JobSubmitResponse, "/job/submit", request diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py index b8a502c6..61e93177 100644 --- a/warehouse/metrics_tools/compute/cluster.py +++ b/warehouse/metrics_tools/compute/cluster.py @@ -46,6 +46,7 @@ def start_duckdb_cluster( async def start_duckdb_cluster_async( namespace: str, + resources: t.Dict[str, int], cluster_spec: t.Optional[dict] = None, min_size: int = 6, max_size: int = 6, @@ -55,24 +56,30 @@ async def start_duckdb_cluster_async( a thread. The "async" version of dask's KubeCluster doesn't work as expected. So for now we do this.""" - options: t.Dict[str, t.Any] = {"namespace": namespace} + worker_command = ["dask", "worker"] + resources_to_join = [] + + for resource, value in resources.items(): + resources_to_join.append(f"{resource}={value}") + if resources_to_join: + resources_str = f'{",".join(resources_to_join)}' + worker_command.extend(["--resources", resources_str]) + + options: t.Dict[str, t.Any] = { + "namespace": namespace, + "worker_command": worker_command, + } options.update(kwargs) if cluster_spec: options["custom_cluster_spec"] = cluster_spec # loop = asyncio.get_running_loop() cluster = await KubeCluster(asynchronous=True, **options) - print(f"is cluster awaitable?: {inspect.isawaitable(cluster)}") adapt_response = cluster.adapt(minimum=min_size, maximum=max_size) - print(f"is adapt_response awaitable?: {inspect.isawaitable(adapt_response)}") if inspect.isawaitable(adapt_response): await adapt_response return cluster - # return await asyncio.to_thread( - # start_duckdb_cluster, namespace, cluster_spec, min_size, max_size - # ) - class ClusterProxy(abc.ABC): async def client(self) -> Client: @@ -155,7 +162,9 @@ def workers(self): class LocalClusterFactory(ClusterFactory): async def create_cluster(self, min_size: int, max_size: int) -> ClusterProxy: return LocalClusterProxy( - await LocalCluster(n_workers=max_size, asynchronous=True) + await LocalCluster( + n_workers=max_size, resources={"slots": 10}, asynchronous=True + ) ) @@ -163,6 +172,7 @@ class KubeClusterFactory(ClusterFactory): def __init__( self, namespace: str, + resources: t.Dict[str, int], cluster_spec: t.Optional[dict] = None, log_override: t.Optional[logging.Logger] = None, **kwargs: t.Any, @@ -170,11 +180,17 @@ def __init__( self._namespace = namespace self.logger = log_override or logger self._cluster_spec = cluster_spec + self._resources = resources self.kwargs = kwargs async def create_cluster(self, min_size: int, max_size: int): cluster = await start_duckdb_cluster_async( - self._namespace, self._cluster_spec, min_size, max_size, **self.kwargs + self._namespace, + self._resources, + self._cluster_spec, + min_size, + max_size, + **self.kwargs, ) return KubeClusterProxy(cluster) diff --git a/warehouse/metrics_tools/compute/debug.py b/warehouse/metrics_tools/compute/debug.py index b344b045..6bc4c6fd 100644 --- a/warehouse/metrics_tools/compute/debug.py +++ b/warehouse/metrics_tools/compute/debug.py @@ -17,6 +17,7 @@ def async_test_setup_cluster(config: AppConfig): cluster_factory = KubeClusterFactory( config.cluster_namespace, + config.worker_resources, cluster_spec=cluster_spec, log_override=logger, ) diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index e3579c62..47e5ff39 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -235,6 +235,7 @@ async def _batch_query_to_scheduler( task_id, result_path, batch, + input.slots, exported_dependent_tables_map, retries=3, ) @@ -251,6 +252,7 @@ async def _submit_query_task_to_scheduler( task_id: str, result_path: str, batch: t.List[str], + slots: int, exported_dependent_tables_map: t.Dict[str, ExportReference], retries: int, ): @@ -266,6 +268,7 @@ async def _submit_query_task_to_scheduler( exported_dependent_tables_map, retries=retries, key=task_id, + resources={"slots": slots}, ) try: diff --git a/warehouse/metrics_tools/compute/types.py b/warehouse/metrics_tools/compute/types.py index b0b2baf4..2e5a44ee 100644 --- a/warehouse/metrics_tools/compute/types.py +++ b/warehouse/metrics_tools/compute/types.py @@ -182,6 +182,7 @@ class JobSubmitRequest(BaseModel): locals: t.Dict[str, t.Any] dependent_tables_map: t.Dict[str, str] retries: t.Optional[int] = None + slots: int = 2 execution_time: datetime def query_as(self, dialect: str) -> str: @@ -420,6 +421,7 @@ class ClusterConfig(BaseSettings): scheduler_memory_request: str = "85000Mi" scheduler_pool_type: str = "sqlmesh-scheduler" + worker_resources: t.Dict[str, int] = Field(default_factory=lambda: {"slots": "32"}) worker_threads: int = 16 worker_memory_limit: str = "90000Mi" worker_memory_request: str = "85000Mi" diff --git a/warehouse/metrics_tools/definition.py b/warehouse/metrics_tools/definition.py index 4ab84b42..e5f0bb89 100644 --- a/warehouse/metrics_tools/definition.py +++ b/warehouse/metrics_tools/definition.py @@ -23,6 +23,14 @@ class RollingConfig(t.TypedDict): unit: str cron: RollingCronOptions + # How many days do we process at once. This is useful to set for very large + # datasets but will default to a year if not set. + model_batch_size: t.NotRequired[int] + + # The number of required slots for a given model. This is also very useful + # for large datasets + slots: t.NotRequired[int] + class TimeseriesBucket(Enum): HOUR = "hour" @@ -78,6 +86,8 @@ class PeerMetricDependencyRef(t.TypedDict): unit: t.NotRequired[t.Optional[str]] time_aggregation: t.NotRequired[t.Optional[str]] cron: t.NotRequired[RollingCronOptions] + batch_size: t.NotRequired[int] + slots: t.NotRequired[int] class MetricModelRef(t.TypedDict): @@ -345,15 +355,20 @@ def generate_dependency_refs_for_name(self, name: str): for entity in self._source.entity_types or DEFAULT_ENTITY_TYPES: if self._source.rolling: for window in self._source.rolling["windows"]: - refs.append( - PeerMetricDependencyRef( - name=name, - entity_type=entity, - window=window, - unit=self._source.rolling.get("unit"), - cron=self._source.rolling.get("cron"), - ) + ref = PeerMetricDependencyRef( + name=name, + entity_type=entity, + window=window, + unit=self._source.rolling.get("unit"), + cron=self._source.rolling.get("cron"), ) + model_batch_size = self._source.rolling.get("model_batch_size") + slots = self._source.rolling.get("slots") + if model_batch_size: + ref["batch_size"] = model_batch_size + if slots: + ref["slots"] = slots + refs.append(ref) for time_aggregation in self._source.time_aggregations or []: refs.append( PeerMetricDependencyRef( diff --git a/warehouse/metrics_tools/factory/factory.py b/warehouse/metrics_tools/factory/factory.py index a9fc1af8..b379944f 100644 --- a/warehouse/metrics_tools/factory/factory.py +++ b/warehouse/metrics_tools/factory/factory.py @@ -415,7 +415,12 @@ def generate_rolling_python_model_for_rendered_query( columns = METRICS_COLUMNS_BY_ENTITY[ref["entity_type"]] - kind_common = {"batch_size": 365, "batch_concurrency": 1} + kind_common = { + "batch_size": ref.get("batch_size", 365), + "batch_concurrency": 1, + "lookback": 10, + "forward_only": True, + } partitioned_by = ("day(metrics_sample_date)",) window = ref.get("window") assert window is not None @@ -468,12 +473,15 @@ def generate_time_aggregation_model_for_rendered_query( time_aggregation = ref.get("time_aggregation") assert time_aggregation is not None - kind_common = {"batch_concurrency": 1} - kind_options = {"lookback": 7, **kind_common} + kind_common = { + "batch_concurrency": 1, + "forward_only": True, + } + kind_options = {"lookback": 10, **kind_common} partitioned_by = ("day(metrics_sample_date)",) if time_aggregation == "weekly": - kind_options = {"lookback": 7, **kind_common} + kind_options = {"lookback": 10, **kind_common} if time_aggregation == "monthly": kind_options = {"lookback": 1, **kind_common} partitioned_by = ("month(metrics_sample_date)",) @@ -680,6 +688,7 @@ def generated_rolling_query( batch_size=env.ensure_int("SQLMESH_MCS_BATCH_SIZE", 10), columns=columns, ref=ref, + slots=ref.get("slots", 1), locals=sqlmesh_vars, dependent_tables_map=create_dependent_tables_map( context, rendered_query_str From dc907a072ddaa1371188c1a3e89a454d0aa86e80 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Thu, 19 Dec 2024 23:05:36 +0000 Subject: [PATCH 11/13] test long timeouts for trino + hive --- ops/k8s-apps/base/trino/trino.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ops/k8s-apps/base/trino/trino.yaml b/ops/k8s-apps/base/trino/trino.yaml index efba1e1a..3f46db0e 100644 --- a/ops/k8s-apps/base/trino/trino.yaml +++ b/ops/k8s-apps/base/trino/trino.yaml @@ -112,7 +112,7 @@ spec: hive.metastore-cache-ttl=0s hive.metastore-refresh-interval=5s hive.metastore.thrift.client.connect-timeout=10s - hive.metastore.thrift.client.read-timeout=10s + hive.metastore.thrift.client.read-timeout=30s iceberg.use-file-size-from-metadata=false fs.native-gcs.enabled=true fs.cache.enabled=true From 778fb155287f2e4939e52ec548bbbe973e494e3a Mon Sep 17 00:00:00 2001 From: Carl Cervone <42869436+ccerv1@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:25:13 -0500 Subject: [PATCH 12/13] feat(dbt): open collective deposits to github projects (#2660) * feat(dbt): open collective deposits to github projects * wip: expanded deposits model * feat(dbt): add int model to identify open collective project names * fix(dbt): refact deposit events model * feat(dbt): pass open collective deposits into oss_funding mart model * fix(dbt): include unlabeled projects from oss-funding csv --- .../int_open_collective_projects.sql | 59 +++++++++++++++ .../funding/int_open_collective_deposits.sql | 24 ++++++ .../int_oss_funding_grants_to_project.sql | 74 +++++++++++++++---- 3 files changed, 144 insertions(+), 13 deletions(-) create mode 100644 warehouse/dbt/models/intermediate/directory/int_open_collective_projects.sql create mode 100644 warehouse/dbt/models/intermediate/funding/int_open_collective_deposits.sql diff --git a/warehouse/dbt/models/intermediate/directory/int_open_collective_projects.sql b/warehouse/dbt/models/intermediate/directory/int_open_collective_projects.sql new file mode 100644 index 00000000..5718a7e9 --- /dev/null +++ b/warehouse/dbt/models/intermediate/directory/int_open_collective_projects.sql @@ -0,0 +1,59 @@ +with url_registry as ( + select + LOWER(JSON_EXTRACT_SCALAR(to_account, '$.slug')) as project_slug, + LOWER(JSON_EXTRACT_SCALAR(link, '$.url')) as github_url, + REGEXP_EXTRACT(LOWER(JSON_EXTRACT_SCALAR(link, '$.url')), r'github\.com/([a-z0-9-]+)') + as artifact_namespace, + REGEXP_EXTRACT( + LOWER(JSON_EXTRACT_SCALAR(link, '$.url')), r'github\.com/[a-z0-9-]+/([a-z0-9-._]+)' + ) as artifact_name + from + {{ ref('stg_open_collective__deposits') }}, + UNNEST(JSON_EXTRACT_ARRAY(to_account, '$.socialLinks')) as link + where + JSON_EXTRACT_SCALAR(link, '$.url') like '%github.com%' +), + +oso_projects as ( + select + project_id, + artifact_namespace, + artifact_name + from {{ ref('repositories_v0') }} + where artifact_source = 'GITHUB' +), + +namespace_counts as ( + select + artifact_namespace, + COUNT(distinct project_id) as project_count, + MIN(project_id) as project_id + from oso_projects + group by artifact_namespace +), + +matched_projects as ( + select + ur.*, + case + when op.project_id is not null then op.project_id + when nc.project_count = 1 then nc.project_id + end as project_id + from url_registry as ur + left join oso_projects as op + on + ur.artifact_namespace = op.artifact_namespace + and ur.artifact_name = op.artifact_name + left join namespace_counts as nc + on ur.artifact_namespace = nc.artifact_namespace +) + +select distinct + project_id as project_id, + {{ oso_id("'OPEN_COLLECTIVE'", 'project_slug') }} as artifact_id, + project_slug as artifact_source_id, + 'OPEN_COLLECTIVE' as artifact_source, + '' as artifact_namespace, + project_slug as artifact_name, + 'https://opencollective.com/' || project_slug as artifact_url +from matched_projects diff --git a/warehouse/dbt/models/intermediate/funding/int_open_collective_deposits.sql b/warehouse/dbt/models/intermediate/funding/int_open_collective_deposits.sql new file mode 100644 index 00000000..43882486 --- /dev/null +++ b/warehouse/dbt/models/intermediate/funding/int_open_collective_deposits.sql @@ -0,0 +1,24 @@ +with open_collective_deposits as ( + select + id as event_source_id, + created_at as `time`, + JSON_EXTRACT_SCALAR(to_account, '$.name') as project_name, + LOWER(JSON_EXTRACT_SCALAR(to_account, '$.slug')) as project_slug, + UPPER(JSON_EXTRACT_SCALAR(to_account, '$.type')) as project_type, + UPPER(JSON_EXTRACT_SCALAR(amount, '$.currency')) as currency, + CAST(JSON_EXTRACT_SCALAR(amount, '$.value') as NUMERIC) as amount + from {{ ref('stg_open_collective__deposits') }} +) + +select + open_collective_deposits.event_source_id, + open_collective_deposits.`time`, + projects.project_id, + open_collective_deposits.project_name, + open_collective_deposits.project_slug, + open_collective_deposits.project_type, + open_collective_deposits.currency, + open_collective_deposits.amount +from open_collective_deposits +left join {{ ref('int_open_collective_projects') }} as projects + on open_collective_deposits.project_slug = projects.artifact_source_id diff --git a/warehouse/dbt/models/intermediate/funding/int_oss_funding_grants_to_project.sql b/warehouse/dbt/models/intermediate/funding/int_oss_funding_grants_to_project.sql index ac2bc517..2c71e251 100644 --- a/warehouse/dbt/models/intermediate/funding/int_oss_funding_grants_to_project.sql +++ b/warehouse/dbt/models/intermediate/funding/int_oss_funding_grants_to_project.sql @@ -17,8 +17,7 @@ with oss_funding_data as ( parse_json(metadata_json) as metadata_json from {{ source('static_data_sources', 'oss_funding_v1') }} where - to_project_name is not null - and from_funder_name is not null + from_funder_name is not null and amount is not null ), @@ -52,6 +51,55 @@ gitcoin_data as ( from {{ ref('int_gitcoin_funding_events') }} ), +open_collective_data as ( + select -- noqa: ST06 + ocd.`time`, + 'GRANT_RECEIVED_USD' as event_type, + ocd.event_source_id, + 'OPEN_COLLECTIVE' as event_source, + projects.project_name as to_project_name, + 'opencollective' as from_project_name, + ocd.amount, + 'contributions' as grant_pool_name, + to_json(struct( + ocd.project_name as open_collective_project_name, + ocd.project_slug as open_collective_project_slug, + ocd.project_type as open_collective_project_type, + ocd.currency as open_collective_currency, + ocd.amount as open_collective_amount + )) as metadata_json + from {{ ref('int_open_collective_deposits') }} as ocd + left join {{ ref('projects_v1') }} as projects + on ocd.project_id = projects.project_id + where ocd.currency = 'USD' +), + +oso_indexed_data as ( + select + gitcoin_data.time, + gitcoin_data.event_type, + gitcoin_data.event_source_id, + gitcoin_data.event_source, + gitcoin_data.to_project_name, + gitcoin_data.from_project_name, + gitcoin_data.amount, + gitcoin_data.grant_pool_name, + gitcoin_data.metadata_json + from gitcoin_data + union all + select + open_collective_data.time, + open_collective_data.event_type, + open_collective_data.event_source_id, + open_collective_data.event_source, + open_collective_data.to_project_name, + open_collective_data.from_project_name, + open_collective_data.amount, + open_collective_data.grant_pool_name, + open_collective_data.metadata_json + from open_collective_data +), + grants as ( select oss_funding_data.time, @@ -68,16 +116,16 @@ grants as ( union all select - gitcoin_data.time, - gitcoin_data.event_type, - gitcoin_data.event_source_id, - gitcoin_data.event_source, - gitcoin_data.to_project_name, - gitcoin_data.from_project_name, - gitcoin_data.amount, - gitcoin_data.grant_pool_name, - gitcoin_data.metadata_json - from gitcoin_data + oso_indexed_data.time, + oso_indexed_data.event_type, + oso_indexed_data.event_source_id, + oso_indexed_data.event_source, + oso_indexed_data.to_project_name, + oso_indexed_data.from_project_name, + oso_indexed_data.amount, + oso_indexed_data.grant_pool_name, + oso_indexed_data.metadata_json + from oso_indexed_data ) select @@ -87,7 +135,7 @@ select grants.event_source, grants.to_project_name, to_projects.project_id as to_project_id, - 'WALLET' as to_type, + 'FISCAL_HOST' as to_type, grants.from_project_name, from_projects.project_id as from_project_id, 'WALLET' as from_type, From a8e6d458bf60baafbc661dacfd7489d1eabfafbf Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Thu, 19 Dec 2024 19:17:10 -0800 Subject: [PATCH 13/13] fix kubecluster slots (#2676) * fixes * Slots not getting applied due to forward-only * re-enable all * fix slots * Fix reimport * comments * use monthly * materialize * materialize more * attempt to reduce stages * fix --- .../models/metric_names_from_artifact.sql | 8 ++ .../models/metric_names_from_collection.sql | 8 ++ .../models/metric_names_from_project.sql | 8 ++ .../metrics_mesh/models/metrics_factories.py | 103 ++++++++++-------- warehouse/metrics_mesh/models/metrics_v0.sql | 15 +-- .../timeseries_metrics_by_artifact_v0.sql | 2 +- .../timeseries_metrics_by_collection_v0.sql | 2 +- .../timeseries_metrics_by_project_v0.sql | 2 +- warehouse/metrics_tools/compute/client.py | 28 +++-- warehouse/metrics_tools/compute/cluster.py | 19 +++- .../compute/manual_testing_utils.py | 1 + warehouse/metrics_tools/compute/result.py | 6 +- warehouse/metrics_tools/compute/test_app.py | 1 + warehouse/metrics_tools/compute/worker.py | 9 +- warehouse/metrics_tools/factory/factory.py | 2 +- 15 files changed, 134 insertions(+), 80 deletions(-) create mode 100644 warehouse/metrics_mesh/models/metric_names_from_artifact.sql create mode 100644 warehouse/metrics_mesh/models/metric_names_from_collection.sql create mode 100644 warehouse/metrics_mesh/models/metric_names_from_project.sql diff --git a/warehouse/metrics_mesh/models/metric_names_from_artifact.sql b/warehouse/metrics_mesh/models/metric_names_from_artifact.sql new file mode 100644 index 00000000..55efa30f --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_artifact.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_artifact, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_artifact \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metric_names_from_collection.sql b/warehouse/metrics_mesh/models/metric_names_from_collection.sql new file mode 100644 index 00000000..fe477492 --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_collection.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_collection, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_collection \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metric_names_from_project.sql b/warehouse/metrics_mesh/models/metric_names_from_project.sql new file mode 100644 index 00000000..c2e30d94 --- /dev/null +++ b/warehouse/metrics_mesh/models/metric_names_from_project.sql @@ -0,0 +1,8 @@ +MODEL ( + name metrics.metric_names_from_project, + kind FULL +); + +SELECT DISTINCT + metric +FROM metrics.timeseries_metrics_to_project \ No newline at end of file diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 37bb9aa7..6067ae5f 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -70,6 +70,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", # This determines how often this is calculated + slots=32, ), entity_types=["artifact", "project", "collection"], is_intermediate=True, @@ -102,26 +103,28 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", + slots=32, + ), + ), + "contributor_classifications": MetricQueryDef( + ref="contributor_activity_classification.sql", + vars={ + "full_time_ratio": 10 / 30, + "activity_event_types": [ + "COMMIT_CODE", + "ISSUE_OPENED", + "PULL_REQUEST_OPENED", + "PULL_REQUEST_MERGED", + ], + }, + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, ), ), - # "contributor_classifications": MetricQueryDef( - # ref="contributor_activity_classification.sql", - # vars={ - # "full_time_ratio": 10 / 30, - # "activity_event_types": [ - # "COMMIT_CODE", - # "ISSUE_OPENED", - # "PULL_REQUEST_OPENED", - # "PULL_REQUEST_MERGED", - # ], - # }, - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@daily", - # ), - # ), # Currently this query performs really poorly. We need to do some debugging on it # "user_retention_classifications": MetricQueryDef( # ref="user_retention_classification.sql", @@ -135,20 +138,22 @@ # ), # entity_types=["artifact", "project", "collection"], # ), - # "change_in_developer_activity": MetricQueryDef( - # ref="change_in_developers.sql", - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@daily", - # ), - # ), + "change_in_developer_activity": MetricQueryDef( + ref="change_in_developers.sql", + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, + ), + ), "commits_rolling": MetricQueryDef( ref="commits.sql", rolling=RollingConfig( windows=[10], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -158,6 +163,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -167,6 +173,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -176,6 +183,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -185,6 +193,7 @@ windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -194,6 +203,7 @@ windows=[90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -203,6 +213,7 @@ windows=[90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -222,6 +233,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", + slots=32, ), ), "gas_fees": MetricQueryDef( @@ -230,7 +242,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", - slots=8, + slots=16, ), entity_types=["artifact", "project", "collection"], ), @@ -244,29 +256,31 @@ ), entity_types=["artifact", "project", "collection"], ), - # "contributors_lifecycle": MetricQueryDef( - # ref="lifecycle.sql", - # vars={ - # "activity_event_types": [ - # "COMMIT_CODE", - # "ISSUE_OPENED", - # "PULL_REQUEST_OPENED", - # "PULL_REQUEST_MERGED", - # ], - # }, - # rolling=RollingConfig( - # windows=[30, 90, 180], - # unit="day", - # cron="@monthly", - # ), - # entity_types=["artifact", "project", "collection"], - # ), + "contributors_lifecycle": MetricQueryDef( + ref="lifecycle.sql", + vars={ + "activity_event_types": [ + "COMMIT_CODE", + "ISSUE_OPENED", + "PULL_REQUEST_OPENED", + "PULL_REQUEST_MERGED", + ], + }, + rolling=RollingConfig( + windows=[30, 90, 180], + unit="day", + cron="@monthly", + slots=32, + ), + entity_types=["artifact", "project", "collection"], + ), "funding_received": MetricQueryDef( ref="funding_received.sql", rolling=RollingConfig( windows=[180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -276,6 +290,7 @@ windows=[180], unit="day", cron="@daily", + slots=16, ), entity_types=["artifact", "project", "collection"], ), diff --git a/warehouse/metrics_mesh/models/metrics_v0.sql b/warehouse/metrics_mesh/models/metrics_v0.sql index e864fe86..8c9d020c 100644 --- a/warehouse/metrics_mesh/models/metrics_v0.sql +++ b/warehouse/metrics_mesh/models/metrics_v0.sql @@ -4,17 +4,14 @@ MODEL ( ); WITH unioned_metric_names AS ( - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_artifact + SELECT * + FROM metrics.metric_names_from_artifact UNION ALL - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_project + SELECT * + FROM metrics.metric_names_from_project UNION ALL - SELECT DISTINCT - metric - FROM metrics.timeseries_metrics_to_collection + SELECT * + FROM metrics.metric_names_from_collection ), all_timeseries_metric_names AS ( SELECT DISTINCT metric diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql index 1b9a89f4..28579835 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_artifact_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_artifact_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_artifact AS ( diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql index cac96af9..a4c5ae0e 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_collection_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_collection_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_collection AS ( diff --git a/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql b/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql index 4a0b3d65..b38645d8 100644 --- a/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql +++ b/warehouse/metrics_mesh/models/timeseries_metrics_by_project_v0.sql @@ -1,6 +1,6 @@ MODEL ( name metrics.timeseries_metrics_by_project_v0, - kind VIEW + kind FULL ); WITH all_timeseries_metrics_by_project AS ( diff --git a/warehouse/metrics_tools/compute/client.py b/warehouse/metrics_tools/compute/client.py index 3e3646c5..292c0da2 100644 --- a/warehouse/metrics_tools/compute/client.py +++ b/warehouse/metrics_tools/compute/client.py @@ -117,6 +117,7 @@ def __init__( def calculate_metrics( self, + *, query_str: str, start: datetime, end: datetime, @@ -126,11 +127,11 @@ def calculate_metrics( ref: PeerMetricDependencyRef, locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], + slots: int, progress_handler: t.Optional[t.Callable[[JobStatusResponse], None]] = None, cluster_min_size: int = 6, cluster_max_size: int = 6, job_retries: int = 3, - slots: int = 1, execution_time: t.Optional[datetime] = None, ): """Calculate metrics for a given period and write the results to a gcs @@ -153,6 +154,7 @@ def calculate_metrics( locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. + slots (int): The number of slots to use for the job execution_time (t.Optional[datetime]): The execution time for the job Returns: @@ -165,17 +167,17 @@ def calculate_metrics( self.logger.info(f"cluster status: {status}") job_response = self.submit_job( - query_str, - start, - end, - dialect, - batch_size, - columns, - ref, - locals, - dependent_tables_map, - job_retries, + query_str=query_str, + start=start, + end=end, + dialect=dialect, + batch_size=batch_size, + columns=columns, + ref=ref, + locals=locals, + dependent_tables_map=dependent_tables_map, slots=slots, + job_retries=job_retries, execution_time=execution_time, ) job_id = job_response.job_id @@ -235,6 +237,7 @@ def wait_for_job( def submit_job( self, + *, query_str: str, start: datetime, end: datetime, @@ -244,8 +247,8 @@ def submit_job( ref: PeerMetricDependencyRef, locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], + slots: int, job_retries: t.Optional[int] = None, - slots: int = 2, execution_time: t.Optional[datetime] = None, ): """Submit a job to the metrics calculation service @@ -260,6 +263,7 @@ def submit_job( ref (PeerMetricDependencyRef): The dependency reference locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map + slots (int): The number of slots to use for the job job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. Returns: diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py index 61e93177..4a72d6f7 100644 --- a/warehouse/metrics_tools/compute/cluster.py +++ b/warehouse/metrics_tools/compute/cluster.py @@ -65,10 +65,7 @@ async def start_duckdb_cluster_async( resources_str = f'{",".join(resources_to_join)}' worker_command.extend(["--resources", resources_str]) - options: t.Dict[str, t.Any] = { - "namespace": namespace, - "worker_command": worker_command, - } + options: t.Dict[str, t.Any] = {"namespace": namespace} options.update(kwargs) if cluster_spec: options["custom_cluster_spec"] = cluster_spec @@ -379,7 +376,18 @@ def make_new_cluster( worker_memory_request: str, worker_memory_limit: str, worker_pool_type: str, + worker_resources: t.Dict[str, int], + worker_command: t.Optional[t.List[str]] = None, ): + worker_command = worker_command or ["dask", "worker"] + if worker_resources: + resources_to_join = [] + for resource, value in worker_resources.items(): + resources_to_join.append(f"{resource}={value}") + if resources_to_join: + resources_str = f'{",".join(resources_to_join)}' + worker_command.extend(["--resources", resources_str]) + spec = make_cluster_spec( name=f"{cluster_id}", resources={ @@ -387,6 +395,8 @@ def make_new_cluster( "limits": {"memory": scheduler_memory_limit}, }, image=image, + # The type for this says string but it accepts a list + worker_command=worker_command, # type: ignore ) spec["spec"]["scheduler"]["spec"]["tolerations"] = [ { @@ -464,4 +474,5 @@ def make_new_cluster_with_defaults(config: ClusterConfig): worker_memory_limit=config.worker_memory_limit, worker_memory_request=config.worker_memory_request, worker_pool_type=config.worker_pool_type, + worker_resources=config.worker_resources, ) diff --git a/warehouse/metrics_tools/compute/manual_testing_utils.py b/warehouse/metrics_tools/compute/manual_testing_utils.py index cf6fe108..6475472f 100644 --- a/warehouse/metrics_tools/compute/manual_testing_utils.py +++ b/warehouse/metrics_tools/compute/manual_testing_utils.py @@ -112,6 +112,7 @@ def run_local_test( dependent_tables_map={ "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" }, + slots=2, batch_size=batch_size, cluster_max_size=cluster_size, cluster_min_size=cluster_size, diff --git a/warehouse/metrics_tools/compute/result.py b/warehouse/metrics_tools/compute/result.py index 67f5e37a..b5fb8ac6 100644 --- a/warehouse/metrics_tools/compute/result.py +++ b/warehouse/metrics_tools/compute/result.py @@ -200,10 +200,8 @@ def process_columns_for_import( # timestamp. We can cast it downstream. column_identifier = exp.to_identifier(column_name) processed_column_type = column_type - if column_type.this == exp.DataType.Type.DATE: - processed_column_type = exp.DataType( - this=exp.DataType.Type.TIMESTAMP, nested=False - ) + # Assuming that we use polars or pyarrow at the time of parquet write. + # It shouldn't be necessary to cast types. return ( column_identifier, diff --git a/warehouse/metrics_tools/compute/test_app.py b/warehouse/metrics_tools/compute/test_app.py index dd084296..d6781294 100644 --- a/warehouse/metrics_tools/compute/test_app.py +++ b/warehouse/metrics_tools/compute/test_app.py @@ -100,6 +100,7 @@ def test_app_with_all_debugging(app_client_with_all_debugging): dependent_tables_map={ "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" }, + slots=2, batch_size=batch_size, cluster_max_size=cluster_size, cluster_min_size=cluster_size, diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index 763328fe..c675b013 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -187,7 +187,9 @@ def handle_query( ) -> t.Any: """Execute a duckdb load on a worker. - This executes the query with duckdb and writes the results to a gcs path. + This executes the query with duckdb and writes the results to a gcs + path. We need to use polars or pyarrow here because the pandas parquet + writer doesn't write the correct datatypes for trino. """ for ref, actual in dependencies.items(): @@ -203,14 +205,15 @@ def handle_query( results.append(result) # Concatenate the results self.logger.info(f"job[{job_id}][{task_id}]: Concatenating results") - results_df = pl.concat(results) + combined_results = pl.concat(results) # Export the results to a parquet file in memory self.logger.info( f"job[{job_id}][{task_id}]: Uploading to gcs {result_path} with polars" ) with self.fs.open(f"{self._gcs_bucket}/{result_path}", "wb") as f: - results_df.write_parquet(f) # type: ignore + combined_results.write_parquet(f) # type: ignore + self.logger.info(f"job[{job_id}][{task_id}]: Upload completed") return task_id diff --git a/warehouse/metrics_tools/factory/factory.py b/warehouse/metrics_tools/factory/factory.py index b379944f..faed0571 100644 --- a/warehouse/metrics_tools/factory/factory.py +++ b/warehouse/metrics_tools/factory/factory.py @@ -688,7 +688,7 @@ def generated_rolling_query( batch_size=env.ensure_int("SQLMESH_MCS_BATCH_SIZE", 10), columns=columns, ref=ref, - slots=ref.get("slots", 1), + slots=ref.get("slots", env.ensure_int("SQLMESH_DEFAULT_MCS_SLOTS", 2)), locals=sqlmesh_vars, dependent_tables_map=create_dependent_tables_map( context, rendered_query_str