From e1a8786dd9bc596347c7e2585d1a04789051aff2 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Tue, 17 Dec 2024 22:17:41 -0800 Subject: [PATCH] Use the cron to determine materialization interval (#2661) --- .../metrics_mesh/models/metrics_factories.py | 4 +-- .../compute/manual_testing_utils.py | 6 +++- warehouse/metrics_tools/compute/test_app.py | 6 +++- .../metrics_tools/compute/test_service.py | 1 + warehouse/metrics_tools/definition.py | 11 +++--- warehouse/metrics_tools/runner.py | 35 +++++++++++++++---- 6 files changed, 46 insertions(+), 17 deletions(-) diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 2af63c518..8b25f538a 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -236,7 +236,7 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", ), entity_types=["artifact", "project", "collection"], ), @@ -253,7 +253,7 @@ rolling=RollingConfig( windows=[30, 90, 180], unit="day", - cron="@daily", + cron="@monthly", ), entity_types=["artifact", "project", "collection"], ), diff --git a/warehouse/metrics_tools/compute/manual_testing_utils.py b/warehouse/metrics_tools/compute/manual_testing_utils.py index 3724d8f78..cf6fe1087 100644 --- a/warehouse/metrics_tools/compute/manual_testing_utils.py +++ b/warehouse/metrics_tools/compute/manual_testing_utils.py @@ -102,7 +102,11 @@ def run_local_test( ("amount", "NUMERIC"), ], ref=PeerMetricDependencyRef( - name="", entity_type="artifact", window=30, unit="day" + name="", + entity_type="artifact", + window=30, + unit="day", + cron="@daily", ), locals={}, dependent_tables_map={ diff --git a/warehouse/metrics_tools/compute/test_app.py b/warehouse/metrics_tools/compute/test_app.py index 00983b9a0..dd084296f 100644 --- a/warehouse/metrics_tools/compute/test_app.py +++ b/warehouse/metrics_tools/compute/test_app.py @@ -90,7 +90,11 @@ def test_app_with_all_debugging(app_client_with_all_debugging): ("amount", "NUMERIC"), ], ref=PeerMetricDependencyRef( - name="", entity_type="artifact", window=30, unit="day" + name="", + entity_type="artifact", + window=30, + unit="day", + cron="@daily", ), locals={}, dependent_tables_map={ diff --git a/warehouse/metrics_tools/compute/test_service.py b/warehouse/metrics_tools/compute/test_service.py index f87dba30f..0731b678c 100644 --- a/warehouse/metrics_tools/compute/test_service.py +++ b/warehouse/metrics_tools/compute/test_service.py @@ -53,6 +53,7 @@ async def test_metrics_calculation_service(): entity_type="artifact", window=30, unit="day", + cron="@daily", ), execution_time=datetime.now(), locals={}, diff --git a/warehouse/metrics_tools/definition.py b/warehouse/metrics_tools/definition.py index 5c07830ee..4ab84b42d 100644 --- a/warehouse/metrics_tools/definition.py +++ b/warehouse/metrics_tools/definition.py @@ -15,16 +15,13 @@ type ExtraVarBaseType = str | int | float type ExtraVarType = ExtraVarBaseType | t.List[ExtraVarBaseType] +RollingCronOptions = t.Literal["@daily", "@weekly", "@monthly", "@yearly"] + class RollingConfig(t.TypedDict): windows: t.List[int] unit: str - cron: str - - -@dataclass -class RollingWindow: - trailing_days: int + cron: RollingCronOptions class TimeseriesBucket(Enum): @@ -80,6 +77,7 @@ class PeerMetricDependencyRef(t.TypedDict): window: t.NotRequired[t.Optional[int]] unit: t.NotRequired[t.Optional[str]] time_aggregation: t.NotRequired[t.Optional[str]] + cron: t.NotRequired[RollingCronOptions] class MetricModelRef(t.TypedDict): @@ -353,6 +351,7 @@ def generate_dependency_refs_for_name(self, name: str): entity_type=entity, window=window, unit=self._source.rolling.get("unit"), + cron=self._source.rolling.get("cron"), ) ) for time_aggregation in self._source.time_aggregations or []: diff --git a/warehouse/metrics_tools/runner.py b/warehouse/metrics_tools/runner.py index 264dcabd6..288ead328 100644 --- a/warehouse/metrics_tools/runner.py +++ b/warehouse/metrics_tools/runner.py @@ -9,7 +9,7 @@ import arrow import duckdb import pandas as pd -from metrics_tools.definition import PeerMetricDependencyRef +from metrics_tools.definition import PeerMetricDependencyRef, RollingCronOptions from metrics_tools.intermediate import run_macro_evaluator from metrics_tools.macros import metrics_end, metrics_sample_date, metrics_start from metrics_tools.models import create_unregistered_macro_registry @@ -81,6 +81,16 @@ def __init__(self, dialect: str): self.dialect = dialect +ROLLING_CRON_TO_ARROW_UNIT: t.Dict[ + RollingCronOptions, t.Literal["day", "month", "year", "week"] +] = { + "@daily": "day", + "@weekly": "week", + "@monthly": "month", + "@yearly": "year", +} + + class MetricsRunner: @classmethod def create_duckdb_execution_context( @@ -206,18 +216,29 @@ def render_query(self, start: datetime, end: datetime) -> str: def render_rolling_queries(self, start: datetime, end: datetime) -> t.Iterator[str]: # Given a rolling input render all the rolling queries logger.debug(f"render_rolling_queries called with start={start} and end={end}") - for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)): - rendered_query = self.render_query(day.datetime, day.datetime) + for day in self.iter_query_days(start, end): + rendered_query = self.render_query(day, day) yield rendered_query + def iter_query_days(self, start: datetime, end: datetime): + cron = self._ref.get("cron") + assert cron is not None + arrow_interval = ROLLING_CRON_TO_ARROW_UNIT[cron] + if arrow_interval == "week": + # We want to start weeks on sunday so we need to adjust the start time to the next sunday + start_arrow = arrow.get(start) + day_of_week = start_arrow.weekday() + if day_of_week != 6: + start = start_arrow.shift(days=6 - day_of_week).datetime + for day in arrow.Arrow.range(arrow_interval, arrow.get(start), arrow.get(end)): + yield day.datetime + async def render_rolling_queries_async(self, start: datetime, end: datetime): logger.debug( f"render_rolling_queries_async called with start={start} and end={end}" ) - for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)): - rendered_query = await asyncio.to_thread( - self.render_query, day.datetime, day.datetime - ) + for day in self.iter_query_days(start, end): + rendered_query = await asyncio.to_thread(self.render_query, day, day) yield rendered_query def commit(self, start: datetime, end: datetime, destination: str):