Skip to content

Commit

Permalink
Use the cron to determine materialization interval (#2661)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 authored Dec 18, 2024
1 parent c90133c commit e1a8786
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 17 deletions.
4 changes: 2 additions & 2 deletions warehouse/metrics_mesh/models/metrics_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@daily",
cron="@monthly",
),
entity_types=["artifact", "project", "collection"],
),
Expand All @@ -253,7 +253,7 @@
rolling=RollingConfig(
windows=[30, 90, 180],
unit="day",
cron="@daily",
cron="@monthly",
),
entity_types=["artifact", "project", "collection"],
),
Expand Down
6 changes: 5 additions & 1 deletion warehouse/metrics_tools/compute/manual_testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ def run_local_test(
("amount", "NUMERIC"),
],
ref=PeerMetricDependencyRef(
name="", entity_type="artifact", window=30, unit="day"
name="",
entity_type="artifact",
window=30,
unit="day",
cron="@daily",
),
locals={},
dependent_tables_map={
Expand Down
6 changes: 5 additions & 1 deletion warehouse/metrics_tools/compute/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def test_app_with_all_debugging(app_client_with_all_debugging):
("amount", "NUMERIC"),
],
ref=PeerMetricDependencyRef(
name="", entity_type="artifact", window=30, unit="day"
name="",
entity_type="artifact",
window=30,
unit="day",
cron="@daily",
),
locals={},
dependent_tables_map={
Expand Down
1 change: 1 addition & 0 deletions warehouse/metrics_tools/compute/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async def test_metrics_calculation_service():
entity_type="artifact",
window=30,
unit="day",
cron="@daily",
),
execution_time=datetime.now(),
locals={},
Expand Down
11 changes: 5 additions & 6 deletions warehouse/metrics_tools/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
type ExtraVarBaseType = str | int | float
type ExtraVarType = ExtraVarBaseType | t.List[ExtraVarBaseType]

RollingCronOptions = t.Literal["@daily", "@weekly", "@monthly", "@yearly"]


class RollingConfig(t.TypedDict):
windows: t.List[int]
unit: str
cron: str


@dataclass
class RollingWindow:
trailing_days: int
cron: RollingCronOptions


class TimeseriesBucket(Enum):
Expand Down Expand Up @@ -80,6 +77,7 @@ class PeerMetricDependencyRef(t.TypedDict):
window: t.NotRequired[t.Optional[int]]
unit: t.NotRequired[t.Optional[str]]
time_aggregation: t.NotRequired[t.Optional[str]]
cron: t.NotRequired[RollingCronOptions]


class MetricModelRef(t.TypedDict):
Expand Down Expand Up @@ -353,6 +351,7 @@ def generate_dependency_refs_for_name(self, name: str):
entity_type=entity,
window=window,
unit=self._source.rolling.get("unit"),
cron=self._source.rolling.get("cron"),
)
)
for time_aggregation in self._source.time_aggregations or []:
Expand Down
35 changes: 28 additions & 7 deletions warehouse/metrics_tools/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import arrow
import duckdb
import pandas as pd
from metrics_tools.definition import PeerMetricDependencyRef
from metrics_tools.definition import PeerMetricDependencyRef, RollingCronOptions
from metrics_tools.intermediate import run_macro_evaluator
from metrics_tools.macros import metrics_end, metrics_sample_date, metrics_start
from metrics_tools.models import create_unregistered_macro_registry
Expand Down Expand Up @@ -81,6 +81,16 @@ def __init__(self, dialect: str):
self.dialect = dialect


ROLLING_CRON_TO_ARROW_UNIT: t.Dict[
RollingCronOptions, t.Literal["day", "month", "year", "week"]
] = {
"@daily": "day",
"@weekly": "week",
"@monthly": "month",
"@yearly": "year",
}


class MetricsRunner:
@classmethod
def create_duckdb_execution_context(
Expand Down Expand Up @@ -206,18 +216,29 @@ def render_query(self, start: datetime, end: datetime) -> str:
def render_rolling_queries(self, start: datetime, end: datetime) -> t.Iterator[str]:
# Given a rolling input render all the rolling queries
logger.debug(f"render_rolling_queries called with start={start} and end={end}")
for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)):
rendered_query = self.render_query(day.datetime, day.datetime)
for day in self.iter_query_days(start, end):
rendered_query = self.render_query(day, day)
yield rendered_query

def iter_query_days(self, start: datetime, end: datetime):
cron = self._ref.get("cron")
assert cron is not None
arrow_interval = ROLLING_CRON_TO_ARROW_UNIT[cron]
if arrow_interval == "week":
# We want to start weeks on sunday so we need to adjust the start time to the next sunday
start_arrow = arrow.get(start)
day_of_week = start_arrow.weekday()
if day_of_week != 6:
start = start_arrow.shift(days=6 - day_of_week).datetime
for day in arrow.Arrow.range(arrow_interval, arrow.get(start), arrow.get(end)):
yield day.datetime

async def render_rolling_queries_async(self, start: datetime, end: datetime):
logger.debug(
f"render_rolling_queries_async called with start={start} and end={end}"
)
for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)):
rendered_query = await asyncio.to_thread(
self.render_query, day.datetime, day.datetime
)
for day in self.iter_query_days(start, end):
rendered_query = await asyncio.to_thread(self.render_query, day, day)
yield rendered_query

def commit(self, start: datetime, end: datetime, destination: str):
Expand Down

0 comments on commit e1a8786

Please sign in to comment.