diff --git a/warehouse/metrics_mesh/tests/test_change_in_developers_over_window.yml b/warehouse/metrics_mesh/disabled-tests/test_change_in_developers_over_window.yml similarity index 100% rename from warehouse/metrics_mesh/tests/test_change_in_developers_over_window.yml rename to warehouse/metrics_mesh/disabled-tests/test_change_in_developers_over_window.yml diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 8b25f538a..37bb9aa7b 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -88,6 +88,8 @@ windows=[30, 90, 180], unit="day", cron="@daily", # This determines how often this is calculated + model_batch_size=90, + slots=32, ), entity_types=["artifact", "project", "collection"], is_intermediate=True, @@ -103,23 +105,23 @@ cron="@daily", ), ), - "contributor_classifications": MetricQueryDef( - ref="contributor_activity_classification.sql", - vars={ - "full_time_ratio": 10 / 30, - "activity_event_types": [ - "COMMIT_CODE", - "ISSUE_OPENED", - "PULL_REQUEST_OPENED", - "PULL_REQUEST_MERGED", - ], - }, - rolling=RollingConfig( - windows=[30, 90, 180], - unit="day", - cron="@daily", - ), - ), + # "contributor_classifications": MetricQueryDef( + # ref="contributor_activity_classification.sql", + # vars={ + # "full_time_ratio": 10 / 30, + # "activity_event_types": [ + # "COMMIT_CODE", + # "ISSUE_OPENED", + # "PULL_REQUEST_OPENED", + # "PULL_REQUEST_MERGED", + # ], + # }, + # rolling=RollingConfig( + # windows=[30, 90, 180], + # unit="day", + # cron="@daily", + # ), + # ), # Currently this query performs really poorly. We need to do some debugging on it # "user_retention_classifications": MetricQueryDef( # ref="user_retention_classification.sql", @@ -133,14 +135,14 @@ # ), # entity_types=["artifact", "project", "collection"], # ), - "change_in_developer_activity": MetricQueryDef( - ref="change_in_developers.sql", - rolling=RollingConfig( - windows=[30, 90, 180], - unit="day", - cron="@daily", - ), - ), + # "change_in_developer_activity": MetricQueryDef( + # ref="change_in_developers.sql", + # rolling=RollingConfig( + # windows=[30, 90, 180], + # unit="day", + # cron="@daily", + # ), + # ), "commits_rolling": MetricQueryDef( ref="commits.sql", rolling=RollingConfig( @@ -228,6 +230,7 @@ windows=[30, 90, 180], unit="day", cron="@daily", + slots=8, ), entity_types=["artifact", "project", "collection"], ), @@ -236,27 +239,28 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@monthly", - ), - entity_types=["artifact", "project", "collection"], - ), - "contributors_lifecycle": MetricQueryDef( - ref="lifecycle.sql", - vars={ - "activity_event_types": [ - "COMMIT_CODE", - "ISSUE_OPENED", - "PULL_REQUEST_OPENED", - "PULL_REQUEST_MERGED", - ], - }, - rolling=RollingConfig( - windows=[30, 90, 180], - unit="day", - cron="@monthly", + cron="@daily", + slots=32, ), entity_types=["artifact", "project", "collection"], ), + # "contributors_lifecycle": MetricQueryDef( + # ref="lifecycle.sql", + # vars={ + # "activity_event_types": [ + # "COMMIT_CODE", + # "ISSUE_OPENED", + # "PULL_REQUEST_OPENED", + # "PULL_REQUEST_MERGED", + # ], + # }, + # rolling=RollingConfig( + # windows=[30, 90, 180], + # unit="day", + # cron="@monthly", + # ), + # entity_types=["artifact", "project", "collection"], + # ), "funding_received": MetricQueryDef( ref="funding_received.sql", rolling=RollingConfig( diff --git a/warehouse/metrics_tools/compute/app.py b/warehouse/metrics_tools/compute/app.py index bd472b370..70ab20420 100644 --- a/warehouse/metrics_tools/compute/app.py +++ b/warehouse/metrics_tools/compute/app.py @@ -91,6 +91,7 @@ async def initialize_app(app: FastAPI): cluster_spec = make_new_cluster_with_defaults(config) cluster_factory = KubeClusterFactory( config.cluster_namespace, + config.worker_resources, cluster_spec=cluster_spec, shutdown_on_close=not config.debug_cluster_no_shutdown, ) diff --git a/warehouse/metrics_tools/compute/client.py b/warehouse/metrics_tools/compute/client.py index 1da49f1e2..3e3646c50 100644 --- a/warehouse/metrics_tools/compute/client.py +++ b/warehouse/metrics_tools/compute/client.py @@ -130,6 +130,8 @@ def calculate_metrics( cluster_min_size: int = 6, cluster_max_size: int = 6, job_retries: int = 3, + slots: int = 1, + execution_time: t.Optional[datetime] = None, ): """Calculate metrics for a given period and write the results to a gcs folder. This method is a high level method that triggers all of the @@ -151,6 +153,7 @@ def calculate_metrics( locals (t.Dict[str, t.Any]): The local variables to use dependent_tables_map (t.Dict[str, str]): The dependent tables map job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3. + execution_time (t.Optional[datetime]): The execution time for the job Returns: ExportReference: The export reference for the resulting calculation @@ -172,6 +175,8 @@ def calculate_metrics( locals, dependent_tables_map, job_retries, + slots=slots, + execution_time=execution_time, ) job_id = job_response.job_id export_reference = job_response.export_reference @@ -240,6 +245,8 @@ def submit_job( locals: t.Dict[str, t.Any], dependent_tables_map: t.Dict[str, str], job_retries: t.Optional[int] = None, + slots: int = 2, + execution_time: t.Optional[datetime] = None, ): """Submit a job to the metrics calculation service @@ -268,8 +275,9 @@ def submit_job( ref=ref, locals=locals, dependent_tables_map=dependent_tables_map, + slots=slots, retries=job_retries, - execution_time=datetime.now(), + execution_time=execution_time or datetime.now(), ) job_response = self.service_post_with_input( JobSubmitResponse, "/job/submit", request diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py index b8a502c6e..61e93177d 100644 --- a/warehouse/metrics_tools/compute/cluster.py +++ b/warehouse/metrics_tools/compute/cluster.py @@ -46,6 +46,7 @@ def start_duckdb_cluster( async def start_duckdb_cluster_async( namespace: str, + resources: t.Dict[str, int], cluster_spec: t.Optional[dict] = None, min_size: int = 6, max_size: int = 6, @@ -55,24 +56,30 @@ async def start_duckdb_cluster_async( a thread. The "async" version of dask's KubeCluster doesn't work as expected. So for now we do this.""" - options: t.Dict[str, t.Any] = {"namespace": namespace} + worker_command = ["dask", "worker"] + resources_to_join = [] + + for resource, value in resources.items(): + resources_to_join.append(f"{resource}={value}") + if resources_to_join: + resources_str = f'{",".join(resources_to_join)}' + worker_command.extend(["--resources", resources_str]) + + options: t.Dict[str, t.Any] = { + "namespace": namespace, + "worker_command": worker_command, + } options.update(kwargs) if cluster_spec: options["custom_cluster_spec"] = cluster_spec # loop = asyncio.get_running_loop() cluster = await KubeCluster(asynchronous=True, **options) - print(f"is cluster awaitable?: {inspect.isawaitable(cluster)}") adapt_response = cluster.adapt(minimum=min_size, maximum=max_size) - print(f"is adapt_response awaitable?: {inspect.isawaitable(adapt_response)}") if inspect.isawaitable(adapt_response): await adapt_response return cluster - # return await asyncio.to_thread( - # start_duckdb_cluster, namespace, cluster_spec, min_size, max_size - # ) - class ClusterProxy(abc.ABC): async def client(self) -> Client: @@ -155,7 +162,9 @@ def workers(self): class LocalClusterFactory(ClusterFactory): async def create_cluster(self, min_size: int, max_size: int) -> ClusterProxy: return LocalClusterProxy( - await LocalCluster(n_workers=max_size, asynchronous=True) + await LocalCluster( + n_workers=max_size, resources={"slots": 10}, asynchronous=True + ) ) @@ -163,6 +172,7 @@ class KubeClusterFactory(ClusterFactory): def __init__( self, namespace: str, + resources: t.Dict[str, int], cluster_spec: t.Optional[dict] = None, log_override: t.Optional[logging.Logger] = None, **kwargs: t.Any, @@ -170,11 +180,17 @@ def __init__( self._namespace = namespace self.logger = log_override or logger self._cluster_spec = cluster_spec + self._resources = resources self.kwargs = kwargs async def create_cluster(self, min_size: int, max_size: int): cluster = await start_duckdb_cluster_async( - self._namespace, self._cluster_spec, min_size, max_size, **self.kwargs + self._namespace, + self._resources, + self._cluster_spec, + min_size, + max_size, + **self.kwargs, ) return KubeClusterProxy(cluster) diff --git a/warehouse/metrics_tools/compute/debug.py b/warehouse/metrics_tools/compute/debug.py index b344b0451..6bc4c6fd9 100644 --- a/warehouse/metrics_tools/compute/debug.py +++ b/warehouse/metrics_tools/compute/debug.py @@ -17,6 +17,7 @@ def async_test_setup_cluster(config: AppConfig): cluster_factory = KubeClusterFactory( config.cluster_namespace, + config.worker_resources, cluster_spec=cluster_spec, log_override=logger, ) diff --git a/warehouse/metrics_tools/compute/service.py b/warehouse/metrics_tools/compute/service.py index e3579c629..47e5ff390 100644 --- a/warehouse/metrics_tools/compute/service.py +++ b/warehouse/metrics_tools/compute/service.py @@ -235,6 +235,7 @@ async def _batch_query_to_scheduler( task_id, result_path, batch, + input.slots, exported_dependent_tables_map, retries=3, ) @@ -251,6 +252,7 @@ async def _submit_query_task_to_scheduler( task_id: str, result_path: str, batch: t.List[str], + slots: int, exported_dependent_tables_map: t.Dict[str, ExportReference], retries: int, ): @@ -266,6 +268,7 @@ async def _submit_query_task_to_scheduler( exported_dependent_tables_map, retries=retries, key=task_id, + resources={"slots": slots}, ) try: diff --git a/warehouse/metrics_tools/compute/types.py b/warehouse/metrics_tools/compute/types.py index b0b2baf47..2e5a44eed 100644 --- a/warehouse/metrics_tools/compute/types.py +++ b/warehouse/metrics_tools/compute/types.py @@ -182,6 +182,7 @@ class JobSubmitRequest(BaseModel): locals: t.Dict[str, t.Any] dependent_tables_map: t.Dict[str, str] retries: t.Optional[int] = None + slots: int = 2 execution_time: datetime def query_as(self, dialect: str) -> str: @@ -420,6 +421,7 @@ class ClusterConfig(BaseSettings): scheduler_memory_request: str = "85000Mi" scheduler_pool_type: str = "sqlmesh-scheduler" + worker_resources: t.Dict[str, int] = Field(default_factory=lambda: {"slots": "32"}) worker_threads: int = 16 worker_memory_limit: str = "90000Mi" worker_memory_request: str = "85000Mi" diff --git a/warehouse/metrics_tools/definition.py b/warehouse/metrics_tools/definition.py index 4ab84b42d..e5f0bb89e 100644 --- a/warehouse/metrics_tools/definition.py +++ b/warehouse/metrics_tools/definition.py @@ -23,6 +23,14 @@ class RollingConfig(t.TypedDict): unit: str cron: RollingCronOptions + # How many days do we process at once. This is useful to set for very large + # datasets but will default to a year if not set. + model_batch_size: t.NotRequired[int] + + # The number of required slots for a given model. This is also very useful + # for large datasets + slots: t.NotRequired[int] + class TimeseriesBucket(Enum): HOUR = "hour" @@ -78,6 +86,8 @@ class PeerMetricDependencyRef(t.TypedDict): unit: t.NotRequired[t.Optional[str]] time_aggregation: t.NotRequired[t.Optional[str]] cron: t.NotRequired[RollingCronOptions] + batch_size: t.NotRequired[int] + slots: t.NotRequired[int] class MetricModelRef(t.TypedDict): @@ -345,15 +355,20 @@ def generate_dependency_refs_for_name(self, name: str): for entity in self._source.entity_types or DEFAULT_ENTITY_TYPES: if self._source.rolling: for window in self._source.rolling["windows"]: - refs.append( - PeerMetricDependencyRef( - name=name, - entity_type=entity, - window=window, - unit=self._source.rolling.get("unit"), - cron=self._source.rolling.get("cron"), - ) + ref = PeerMetricDependencyRef( + name=name, + entity_type=entity, + window=window, + unit=self._source.rolling.get("unit"), + cron=self._source.rolling.get("cron"), ) + model_batch_size = self._source.rolling.get("model_batch_size") + slots = self._source.rolling.get("slots") + if model_batch_size: + ref["batch_size"] = model_batch_size + if slots: + ref["slots"] = slots + refs.append(ref) for time_aggregation in self._source.time_aggregations or []: refs.append( PeerMetricDependencyRef( diff --git a/warehouse/metrics_tools/factory/factory.py b/warehouse/metrics_tools/factory/factory.py index a9fc1af83..b379944f2 100644 --- a/warehouse/metrics_tools/factory/factory.py +++ b/warehouse/metrics_tools/factory/factory.py @@ -415,7 +415,12 @@ def generate_rolling_python_model_for_rendered_query( columns = METRICS_COLUMNS_BY_ENTITY[ref["entity_type"]] - kind_common = {"batch_size": 365, "batch_concurrency": 1} + kind_common = { + "batch_size": ref.get("batch_size", 365), + "batch_concurrency": 1, + "lookback": 10, + "forward_only": True, + } partitioned_by = ("day(metrics_sample_date)",) window = ref.get("window") assert window is not None @@ -468,12 +473,15 @@ def generate_time_aggregation_model_for_rendered_query( time_aggregation = ref.get("time_aggregation") assert time_aggregation is not None - kind_common = {"batch_concurrency": 1} - kind_options = {"lookback": 7, **kind_common} + kind_common = { + "batch_concurrency": 1, + "forward_only": True, + } + kind_options = {"lookback": 10, **kind_common} partitioned_by = ("day(metrics_sample_date)",) if time_aggregation == "weekly": - kind_options = {"lookback": 7, **kind_common} + kind_options = {"lookback": 10, **kind_common} if time_aggregation == "monthly": kind_options = {"lookback": 1, **kind_common} partitioned_by = ("month(metrics_sample_date)",) @@ -680,6 +688,7 @@ def generated_rolling_query( batch_size=env.ensure_int("SQLMESH_MCS_BATCH_SIZE", 10), columns=columns, ref=ref, + slots=ref.get("slots", 1), locals=sqlmesh_vars, dependent_tables_map=create_dependent_tables_map( context, rendered_query_str