Skip to content

Commit

Permalink
Use slots for large models (#2673)
Browse files Browse the repository at this point in the history
* Use slots for large models

* Use the slots

* Fix tests

* fixed for kubecluster resource config

* fix

* disable change in devs test

* disable tests!
  • Loading branch information
ravenac95 authored Dec 19, 2024
1 parent 8b6c012 commit bba3097
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 65 deletions.
90 changes: 47 additions & 43 deletions warehouse/metrics_mesh/models/metrics_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
windows=[30, 90, 180],
unit="day",
cron="@daily", # This determines how often this is calculated
model_batch_size=90,
slots=32,
),
entity_types=["artifact", "project", "collection"],
is_intermediate=True,
Expand All @@ -103,23 +105,23 @@
cron="@daily",
),
),
"contributor_classifications": MetricQueryDef(
ref="contributor_activity_classification.sql",
vars={
"full_time_ratio": 10 / 30,
"activity_event_types": [
"COMMIT_CODE",
"ISSUE_OPENED",
"PULL_REQUEST_OPENED",
"PULL_REQUEST_MERGED",
],
},
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@daily",
),
),
# "contributor_classifications": MetricQueryDef(
# ref="contributor_activity_classification.sql",
# vars={
# "full_time_ratio": 10 / 30,
# "activity_event_types": [
# "COMMIT_CODE",
# "ISSUE_OPENED",
# "PULL_REQUEST_OPENED",
# "PULL_REQUEST_MERGED",
# ],
# },
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@daily",
# ),
# ),
# Currently this query performs really poorly. We need to do some debugging on it
# "user_retention_classifications": MetricQueryDef(
# ref="user_retention_classification.sql",
Expand All @@ -133,14 +135,14 @@
# ),
# entity_types=["artifact", "project", "collection"],
# ),
"change_in_developer_activity": MetricQueryDef(
ref="change_in_developers.sql",
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@daily",
),
),
# "change_in_developer_activity": MetricQueryDef(
# ref="change_in_developers.sql",
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@daily",
# ),
# ),
"commits_rolling": MetricQueryDef(
ref="commits.sql",
rolling=RollingConfig(
Expand Down Expand Up @@ -228,6 +230,7 @@
windows=[30, 90, 180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -236,27 +239,28 @@
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@monthly",
),
entity_types=["artifact", "project", "collection"],
),
"contributors_lifecycle": MetricQueryDef(
ref="lifecycle.sql",
vars={
"activity_event_types": [
"COMMIT_CODE",
"ISSUE_OPENED",
"PULL_REQUEST_OPENED",
"PULL_REQUEST_MERGED",
],
},
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@monthly",
cron="@daily",
slots=32,
),
entity_types=["artifact", "project", "collection"],
),
# "contributors_lifecycle": MetricQueryDef(
# ref="lifecycle.sql",
# vars={
# "activity_event_types": [
# "COMMIT_CODE",
# "ISSUE_OPENED",
# "PULL_REQUEST_OPENED",
# "PULL_REQUEST_MERGED",
# ],
# },
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@monthly",
# ),
# entity_types=["artifact", "project", "collection"],
# ),
"funding_received": MetricQueryDef(
ref="funding_received.sql",
rolling=RollingConfig(
Expand Down
1 change: 1 addition & 0 deletions warehouse/metrics_tools/compute/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def initialize_app(app: FastAPI):
cluster_spec = make_new_cluster_with_defaults(config)
cluster_factory = KubeClusterFactory(
config.cluster_namespace,
config.worker_resources,
cluster_spec=cluster_spec,
shutdown_on_close=not config.debug_cluster_no_shutdown,
)
Expand Down
10 changes: 9 additions & 1 deletion warehouse/metrics_tools/compute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def calculate_metrics(
cluster_min_size: int = 6,
cluster_max_size: int = 6,
job_retries: int = 3,
slots: int = 1,
execution_time: t.Optional[datetime] = None,
):
"""Calculate metrics for a given period and write the results to a gcs
folder. This method is a high level method that triggers all of the
Expand All @@ -151,6 +153,7 @@ def calculate_metrics(
locals (t.Dict[str, t.Any]): The local variables to use
dependent_tables_map (t.Dict[str, str]): The dependent tables map
job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3.
execution_time (t.Optional[datetime]): The execution time for the job
Returns:
ExportReference: The export reference for the resulting calculation
Expand All @@ -172,6 +175,8 @@ def calculate_metrics(
locals,
dependent_tables_map,
job_retries,
slots=slots,
execution_time=execution_time,
)
job_id = job_response.job_id
export_reference = job_response.export_reference
Expand Down Expand Up @@ -240,6 +245,8 @@ def submit_job(
locals: t.Dict[str, t.Any],
dependent_tables_map: t.Dict[str, str],
job_retries: t.Optional[int] = None,
slots: int = 2,
execution_time: t.Optional[datetime] = None,
):
"""Submit a job to the metrics calculation service
Expand Down Expand Up @@ -268,8 +275,9 @@ def submit_job(
ref=ref,
locals=locals,
dependent_tables_map=dependent_tables_map,
slots=slots,
retries=job_retries,
execution_time=datetime.now(),
execution_time=execution_time or datetime.now(),
)
job_response = self.service_post_with_input(
JobSubmitResponse, "/job/submit", request
Expand Down
34 changes: 25 additions & 9 deletions warehouse/metrics_tools/compute/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def start_duckdb_cluster(

async def start_duckdb_cluster_async(
namespace: str,
resources: t.Dict[str, int],
cluster_spec: t.Optional[dict] = None,
min_size: int = 6,
max_size: int = 6,
Expand All @@ -55,24 +56,30 @@ async def start_duckdb_cluster_async(
a thread. The "async" version of dask's KubeCluster doesn't work as
expected. So for now we do this."""

options: t.Dict[str, t.Any] = {"namespace": namespace}
worker_command = ["dask", "worker"]
resources_to_join = []

for resource, value in resources.items():
resources_to_join.append(f"{resource}={value}")
if resources_to_join:
resources_str = f'{",".join(resources_to_join)}'
worker_command.extend(["--resources", resources_str])

options: t.Dict[str, t.Any] = {
"namespace": namespace,
"worker_command": worker_command,
}
options.update(kwargs)
if cluster_spec:
options["custom_cluster_spec"] = cluster_spec

# loop = asyncio.get_running_loop()
cluster = await KubeCluster(asynchronous=True, **options)
print(f"is cluster awaitable?: {inspect.isawaitable(cluster)}")
adapt_response = cluster.adapt(minimum=min_size, maximum=max_size)
print(f"is adapt_response awaitable?: {inspect.isawaitable(adapt_response)}")
if inspect.isawaitable(adapt_response):
await adapt_response
return cluster

# return await asyncio.to_thread(
# start_duckdb_cluster, namespace, cluster_spec, min_size, max_size
# )


class ClusterProxy(abc.ABC):
async def client(self) -> Client:
Expand Down Expand Up @@ -155,26 +162,35 @@ def workers(self):
class LocalClusterFactory(ClusterFactory):
async def create_cluster(self, min_size: int, max_size: int) -> ClusterProxy:
return LocalClusterProxy(
await LocalCluster(n_workers=max_size, asynchronous=True)
await LocalCluster(
n_workers=max_size, resources={"slots": 10}, asynchronous=True
)
)


class KubeClusterFactory(ClusterFactory):
def __init__(
self,
namespace: str,
resources: t.Dict[str, int],
cluster_spec: t.Optional[dict] = None,
log_override: t.Optional[logging.Logger] = None,
**kwargs: t.Any,
):
self._namespace = namespace
self.logger = log_override or logger
self._cluster_spec = cluster_spec
self._resources = resources
self.kwargs = kwargs

async def create_cluster(self, min_size: int, max_size: int):
cluster = await start_duckdb_cluster_async(
self._namespace, self._cluster_spec, min_size, max_size, **self.kwargs
self._namespace,
self._resources,
self._cluster_spec,
min_size,
max_size,
**self.kwargs,
)
return KubeClusterProxy(cluster)

Expand Down
1 change: 1 addition & 0 deletions warehouse/metrics_tools/compute/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def async_test_setup_cluster(config: AppConfig):

cluster_factory = KubeClusterFactory(
config.cluster_namespace,
config.worker_resources,
cluster_spec=cluster_spec,
log_override=logger,
)
Expand Down
3 changes: 3 additions & 0 deletions warehouse/metrics_tools/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ async def _batch_query_to_scheduler(
task_id,
result_path,
batch,
input.slots,
exported_dependent_tables_map,
retries=3,
)
Expand All @@ -251,6 +252,7 @@ async def _submit_query_task_to_scheduler(
task_id: str,
result_path: str,
batch: t.List[str],
slots: int,
exported_dependent_tables_map: t.Dict[str, ExportReference],
retries: int,
):
Expand All @@ -266,6 +268,7 @@ async def _submit_query_task_to_scheduler(
exported_dependent_tables_map,
retries=retries,
key=task_id,
resources={"slots": slots},
)

try:
Expand Down
2 changes: 2 additions & 0 deletions warehouse/metrics_tools/compute/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class JobSubmitRequest(BaseModel):
locals: t.Dict[str, t.Any]
dependent_tables_map: t.Dict[str, str]
retries: t.Optional[int] = None
slots: int = 2
execution_time: datetime

def query_as(self, dialect: str) -> str:
Expand Down Expand Up @@ -420,6 +421,7 @@ class ClusterConfig(BaseSettings):
scheduler_memory_request: str = "85000Mi"
scheduler_pool_type: str = "sqlmesh-scheduler"

worker_resources: t.Dict[str, int] = Field(default_factory=lambda: {"slots": "32"})
worker_threads: int = 16
worker_memory_limit: str = "90000Mi"
worker_memory_request: str = "85000Mi"
Expand Down
31 changes: 23 additions & 8 deletions warehouse/metrics_tools/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ class RollingConfig(t.TypedDict):
unit: str
cron: RollingCronOptions

# How many days do we process at once. This is useful to set for very large
# datasets but will default to a year if not set.
model_batch_size: t.NotRequired[int]

# The number of required slots for a given model. This is also very useful
# for large datasets
slots: t.NotRequired[int]


class TimeseriesBucket(Enum):
HOUR = "hour"
Expand Down Expand Up @@ -78,6 +86,8 @@ class PeerMetricDependencyRef(t.TypedDict):
unit: t.NotRequired[t.Optional[str]]
time_aggregation: t.NotRequired[t.Optional[str]]
cron: t.NotRequired[RollingCronOptions]
batch_size: t.NotRequired[int]
slots: t.NotRequired[int]


class MetricModelRef(t.TypedDict):
Expand Down Expand Up @@ -345,15 +355,20 @@ def generate_dependency_refs_for_name(self, name: str):
for entity in self._source.entity_types or DEFAULT_ENTITY_TYPES:
if self._source.rolling:
for window in self._source.rolling["windows"]:
refs.append(
PeerMetricDependencyRef(
name=name,
entity_type=entity,
window=window,
unit=self._source.rolling.get("unit"),
cron=self._source.rolling.get("cron"),
)
ref = PeerMetricDependencyRef(
name=name,
entity_type=entity,
window=window,
unit=self._source.rolling.get("unit"),
cron=self._source.rolling.get("cron"),
)
model_batch_size = self._source.rolling.get("model_batch_size")
slots = self._source.rolling.get("slots")
if model_batch_size:
ref["batch_size"] = model_batch_size
if slots:
ref["slots"] = slots
refs.append(ref)
for time_aggregation in self._source.time_aggregations or []:
refs.append(
PeerMetricDependencyRef(
Expand Down
Loading

0 comments on commit bba3097

Please sign in to comment.