Skip to content

Commit

Permalink
fix kubecluster slots (#2676)
Browse files Browse the repository at this point in the history
* fixes

* Slots not getting applied due to forward-only

* re-enable all

* fix slots

* Fix reimport

* comments

* use monthly

* materialize

* materialize more

* attempt to reduce stages

* fix
  • Loading branch information
ravenac95 authored Dec 20, 2024
1 parent 778fb15 commit a8e6d45
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 80 deletions.
8 changes: 8 additions & 0 deletions warehouse/metrics_mesh/models/metric_names_from_artifact.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
MODEL (
name metrics.metric_names_from_artifact,
kind FULL
);

SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_to_artifact
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
MODEL (
name metrics.metric_names_from_collection,
kind FULL
);

SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_to_collection
8 changes: 8 additions & 0 deletions warehouse/metrics_mesh/models/metric_names_from_project.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
MODEL (
name metrics.metric_names_from_project,
kind FULL
);

SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_to_project
103 changes: 59 additions & 44 deletions warehouse/metrics_mesh/models/metrics_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
windows=[30, 90, 180],
unit="day",
cron="@daily", # This determines how often this is calculated
slots=32,
),
entity_types=["artifact", "project", "collection"],
is_intermediate=True,
Expand Down Expand Up @@ -102,26 +103,28 @@
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@daily",
cron="@monthly",
slots=32,
),
),
"contributor_classifications": MetricQueryDef(
ref="contributor_activity_classification.sql",
vars={
"full_time_ratio": 10 / 30,
"activity_event_types": [
"COMMIT_CODE",
"ISSUE_OPENED",
"PULL_REQUEST_OPENED",
"PULL_REQUEST_MERGED",
],
},
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@monthly",
slots=32,
),
),
# "contributor_classifications": MetricQueryDef(
# ref="contributor_activity_classification.sql",
# vars={
# "full_time_ratio": 10 / 30,
# "activity_event_types": [
# "COMMIT_CODE",
# "ISSUE_OPENED",
# "PULL_REQUEST_OPENED",
# "PULL_REQUEST_MERGED",
# ],
# },
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@daily",
# ),
# ),
# Currently this query performs really poorly. We need to do some debugging on it
# "user_retention_classifications": MetricQueryDef(
# ref="user_retention_classification.sql",
Expand All @@ -135,20 +138,22 @@
# ),
# entity_types=["artifact", "project", "collection"],
# ),
# "change_in_developer_activity": MetricQueryDef(
# ref="change_in_developers.sql",
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@daily",
# ),
# ),
"change_in_developer_activity": MetricQueryDef(
ref="change_in_developers.sql",
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@monthly",
slots=32,
),
),
"commits_rolling": MetricQueryDef(
ref="commits.sql",
rolling=RollingConfig(
windows=[10],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -158,6 +163,7 @@
windows=[180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -167,6 +173,7 @@
windows=[180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -176,6 +183,7 @@
windows=[180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -185,6 +193,7 @@
windows=[180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -194,6 +203,7 @@
windows=[90, 180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -203,6 +213,7 @@
windows=[90, 180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -222,6 +233,7 @@
windows=[30, 90, 180],
unit="day",
cron="@daily",
slots=32,
),
),
"gas_fees": MetricQueryDef(
Expand All @@ -230,7 +242,7 @@
windows=[30, 90, 180],
unit="day",
cron="@daily",
slots=8,
slots=16,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -244,29 +256,31 @@
),
entity_types=["artifact", "project", "collection"],
),
# "contributors_lifecycle": MetricQueryDef(
# ref="lifecycle.sql",
# vars={
# "activity_event_types": [
# "COMMIT_CODE",
# "ISSUE_OPENED",
# "PULL_REQUEST_OPENED",
# "PULL_REQUEST_MERGED",
# ],
# },
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@monthly",
# ),
# entity_types=["artifact", "project", "collection"],
# ),
"contributors_lifecycle": MetricQueryDef(
ref="lifecycle.sql",
vars={
"activity_event_types": [
"COMMIT_CODE",
"ISSUE_OPENED",
"PULL_REQUEST_OPENED",
"PULL_REQUEST_MERGED",
],
},
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@monthly",
slots=32,
),
entity_types=["artifact", "project", "collection"],
),
"funding_received": MetricQueryDef(
ref="funding_received.sql",
rolling=RollingConfig(
windows=[180],
unit="day",
cron="@daily",
slots=8,
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -276,6 +290,7 @@
windows=[180],
unit="day",
cron="@daily",
slots=16,
),
entity_types=["artifact", "project", "collection"],
),
Expand Down
15 changes: 6 additions & 9 deletions warehouse/metrics_mesh/models/metrics_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ MODEL (
);

WITH unioned_metric_names AS (
SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_to_artifact
SELECT *
FROM metrics.metric_names_from_artifact
UNION ALL
SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_to_project
SELECT *
FROM metrics.metric_names_from_project
UNION ALL
SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_to_collection
SELECT *
FROM metrics.metric_names_from_collection
), all_timeseries_metric_names AS (
SELECT DISTINCT
metric
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MODEL (
name metrics.timeseries_metrics_by_artifact_v0,
kind VIEW
kind FULL
);

WITH all_timeseries_metrics_by_artifact AS (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MODEL (
name metrics.timeseries_metrics_by_collection_v0,
kind VIEW
kind FULL
);

WITH all_timeseries_metrics_by_collection AS (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MODEL (
name metrics.timeseries_metrics_by_project_v0,
kind VIEW
kind FULL
);

WITH all_timeseries_metrics_by_project AS (
Expand Down
28 changes: 16 additions & 12 deletions warehouse/metrics_tools/compute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(

def calculate_metrics(
self,
*,
query_str: str,
start: datetime,
end: datetime,
Expand All @@ -126,11 +127,11 @@ def calculate_metrics(
ref: PeerMetricDependencyRef,
locals: t.Dict[str, t.Any],
dependent_tables_map: t.Dict[str, str],
slots: int,
progress_handler: t.Optional[t.Callable[[JobStatusResponse], None]] = None,
cluster_min_size: int = 6,
cluster_max_size: int = 6,
job_retries: int = 3,
slots: int = 1,
execution_time: t.Optional[datetime] = None,
):
"""Calculate metrics for a given period and write the results to a gcs
Expand All @@ -153,6 +154,7 @@ def calculate_metrics(
locals (t.Dict[str, t.Any]): The local variables to use
dependent_tables_map (t.Dict[str, str]): The dependent tables map
job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3.
slots (int): The number of slots to use for the job
execution_time (t.Optional[datetime]): The execution time for the job
Returns:
Expand All @@ -165,17 +167,17 @@ def calculate_metrics(
self.logger.info(f"cluster status: {status}")

job_response = self.submit_job(
query_str,
start,
end,
dialect,
batch_size,
columns,
ref,
locals,
dependent_tables_map,
job_retries,
query_str=query_str,
start=start,
end=end,
dialect=dialect,
batch_size=batch_size,
columns=columns,
ref=ref,
locals=locals,
dependent_tables_map=dependent_tables_map,
slots=slots,
job_retries=job_retries,
execution_time=execution_time,
)
job_id = job_response.job_id
Expand Down Expand Up @@ -235,6 +237,7 @@ def wait_for_job(

def submit_job(
self,
*,
query_str: str,
start: datetime,
end: datetime,
Expand All @@ -244,8 +247,8 @@ def submit_job(
ref: PeerMetricDependencyRef,
locals: t.Dict[str, t.Any],
dependent_tables_map: t.Dict[str, str],
slots: int,
job_retries: t.Optional[int] = None,
slots: int = 2,
execution_time: t.Optional[datetime] = None,
):
"""Submit a job to the metrics calculation service
Expand All @@ -260,6 +263,7 @@ def submit_job(
ref (PeerMetricDependencyRef): The dependency reference
locals (t.Dict[str, t.Any]): The local variables to use
dependent_tables_map (t.Dict[str, str]): The dependent tables map
slots (int): The number of slots to use for the job
job_retries (int): The number of retries for a given job in the worker queue. Defaults to 3.
Returns:
Expand Down
Loading

0 comments on commit a8e6d45

Please sign in to comment.