Skip to content

Commit

Permalink
fixed for kubecluster resource config
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Dec 19, 2024
1 parent e33f61d commit a14388f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
15 changes: 10 additions & 5 deletions warehouse/metrics_tools/compute/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def start_duckdb_cluster(

async def start_duckdb_cluster_async(
namespace: str,
resources: t.Dict[str, str],
resources: t.Dict[str, int],
cluster_spec: t.Optional[dict] = None,
min_size: int = 6,
max_size: int = 6,
Expand All @@ -58,17 +58,22 @@ async def start_duckdb_cluster_async(

options: t.Dict[str, t.Any] = {
"namespace": namespace,
"resources": resources,
}
options.update(kwargs)
if cluster_spec:
options["custom_cluster_spec"] = cluster_spec
worker_command = ["dask", "worker"]
resources_to_join = []

for resource, value in resources.items():
resources_to_join.append(f"{resource}={value}")
if resources_to_join:
resources_str = f'{",".join(resources_to_join)}'
worker_command.extend(["--resources", resources_str])

# loop = asyncio.get_running_loop()
cluster = await KubeCluster(asynchronous=True, **options)
print(f"is cluster awaitable?: {inspect.isawaitable(cluster)}")
adapt_response = cluster.adapt(minimum=min_size, maximum=max_size)
print(f"is adapt_response awaitable?: {inspect.isawaitable(adapt_response)}")
if inspect.isawaitable(adapt_response):
await adapt_response
return cluster
Expand Down Expand Up @@ -165,7 +170,7 @@ class KubeClusterFactory(ClusterFactory):
def __init__(
self,
namespace: str,
resources: t.Dict[str, str],
resources: t.Dict[str, int],
cluster_spec: t.Optional[dict] = None,
log_override: t.Optional[logging.Logger] = None,
**kwargs: t.Any,
Expand Down
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/compute/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class ClusterConfig(BaseSettings):
scheduler_memory_request: str = "85000Mi"
scheduler_pool_type: str = "sqlmesh-scheduler"

worker_resources: t.Dict[str, str] = Field(default_factory=lambda: {"slots": "32"})
worker_resources: t.Dict[str, int] = Field(default_factory=lambda: {"slots": "32"})
worker_threads: int = 16
worker_memory_limit: str = "90000Mi"
worker_memory_request: str = "85000Mi"
Expand Down

0 comments on commit a14388f

Please sign in to comment.