From c7e1f6370d0f937a7998b3b6e2547d988d14fdcf Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Thu, 19 Dec 2024 07:25:13 +0000 Subject: [PATCH] fix --- warehouse/metrics_tools/compute/cluster.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py index ef8d1d47..61e93177 100644 --- a/warehouse/metrics_tools/compute/cluster.py +++ b/warehouse/metrics_tools/compute/cluster.py @@ -56,12 +56,6 @@ async def start_duckdb_cluster_async( a thread. The "async" version of dask's KubeCluster doesn't work as expected. So for now we do this.""" - options: t.Dict[str, t.Any] = { - "namespace": namespace, - } - options.update(kwargs) - if cluster_spec: - options["custom_cluster_spec"] = cluster_spec worker_command = ["dask", "worker"] resources_to_join = [] @@ -71,6 +65,14 @@ async def start_duckdb_cluster_async( resources_str = f'{",".join(resources_to_join)}' worker_command.extend(["--resources", resources_str]) + options: t.Dict[str, t.Any] = { + "namespace": namespace, + "worker_command": worker_command, + } + options.update(kwargs) + if cluster_spec: + options["custom_cluster_spec"] = cluster_spec + # loop = asyncio.get_running_loop() cluster = await KubeCluster(asynchronous=True, **options) adapt_response = cluster.adapt(minimum=min_size, maximum=max_size)