Skip to content

Commit

Permalink
Migrated metrics_v0 and timeseries_metrics_by_artifact_v0 to sqlmesh (#…
Browse files Browse the repository at this point in the history
…2089)

* Move metrics_v0 to sqlmesh

* Works on clickhouse
  • Loading branch information
ravenac95 authored Sep 6, 2024
1 parent 9ef634c commit ab802e9
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 145 deletions.
4 changes: 4 additions & 0 deletions warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
connection=DuckDBConnectionConfig(
database=os.environ.get("SQLMESH_DUCKDB_LOCAL_PATH")
),
variables={
"oso_source": "sources",
},
),
"clickhouse": GatewayConfig(
connection=ClickhouseConnectionConfig(
Expand All @@ -37,6 +40,7 @@
password=os.environ.get("SQLMESH_POSTGRES_PASSWORD", "placeholder"),
db=os.environ.get("SQLMESH_POSTGRES_DB", ""),
),
variables={"oso_source": "default"},
),
},
default_gateway="local",
Expand Down
13 changes: 7 additions & 6 deletions warehouse/metrics_mesh/lib/factories/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ def daily_timeseries_rolling_window_model(
},
dialect="clickhouse",
columns={
"bucket_day": exp.DataType.build("DATE"),
"event_source": "String",
"to_artifact_id": "String",
"from_artifact_id": "String",
"metric": "String",
"amount": "Int64",
"bucket_day": exp.DataType.build("DATE", dialect="clickhouse"),
"event_source": exp.DataType.build("String", dialect="clickhouse"),
"to_artifact_id": exp.DataType.build("String", dialect="clickhouse"),
"from_artifact_id": exp.DataType.build("String", dialect="clickhouse"),
"metric": exp.DataType.build("String", dialect="clickhouse"),
"amount": exp.DataType.build("Float64", dialect="clickhouse"),
},
grain=["metric", "to_artifact_id", "from_artifact_id", "bucket_day"],
**(raw_options.get("model_options", {})),
)
def generated_model(evaluator: MacroEvaluator):
Expand Down
11 changes: 11 additions & 0 deletions warehouse/metrics_mesh/macros/oso_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator
from sqlglot import expressions as exp


@macro()
def oso_id(_evaluator: MacroEvaluator, *args: exp.Expression):
return exp.SHA2(
this=exp.Concat(expressions=args, safe=True, coalesce=False),
length=exp.Literal(this=256, is_string=False),
)
218 changes: 109 additions & 109 deletions warehouse/metrics_mesh/macros/source.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,109 @@
"""
A source macro that can be used for rewriting a source reference at runtime.
This mimics the sources behavior in dbt except that the source and destination
of the rewrite is infinitely flexible.
"""

from typing import Optional, Dict, List
import os
import glob
import yaml
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator
from sqlglot import to_table
from pydantic import BaseModel, model_validator

CURR_DIR = os.path.abspath(os.path.dirname(__file__))
SOURCE_YAML_DIR = os.path.abspath(os.path.join(CURR_DIR, "../sources"))
SOURCE_YAML_GLOB = os.path.join(SOURCE_YAML_DIR, "*.yml")


class TableReference(BaseModel):
name: str
catalog: Optional[str] = None
table_name: Optional[str] = None
schema_name: str

@model_validator(mode="after")
def ensure_table_name(self):
if self.table_name is None:
self.table_name = self.name
return self


class SourcesFile(BaseModel):
gateway: str
sources: Dict[str, List[TableReference]]


EnvSourceMap = Dict[str, Dict[str, Dict[str, TableReference]]]


def read_yaml_files(glob_pattern) -> List[SourcesFile]:
# Something about the multithread/processing of sqlmesh probably interferes
# with something in pydantic. This is a hack for now to get this working,
# but we should likely just use a typed dict or a simple dataclass to do
# this validation.
from typing import Optional, Dict, List

class TableReference(BaseModel):
name: str
catalog: Optional[str] = None
table_name: Optional[str] = None
schema_name: str

@model_validator(mode="after")
def ensure_table_name(self):
if self.table_name is None:
self.table_name = self.name
return self

class SourcesFile(BaseModel):
gateway: str
sources: Dict[str, List[TableReference]]

SourcesFile.model_rebuild()
sources_files: List[SourcesFile] = []
# Find all files matching the glob pattern
for file_name in glob.glob(glob_pattern):
if os.path.isfile(file_name):
with open(file_name, "r") as file:
try:
data = yaml.safe_load(file)
sources_files.append(SourcesFile.model_validate(data))
except yaml.YAMLError as exc:
print(f"Error parsing {file_name}: {exc}")
return sources_files


def generate_source_map(parsed_sources_files: List[SourcesFile]) -> EnvSourceMap:
env_source_map: EnvSourceMap = {}
for sources_file in parsed_sources_files:
if sources_file.gateway not in env_source_map:
env_source_map[sources_file.gateway] = {}
source_map = env_source_map[sources_file.gateway]
for key, table_refs in sources_file.sources.items():
if key not in source_map:
source_map[key] = {}
for table_ref in table_refs:
if table_ref.name in source_map[key]:
print("WARNING: table annotated multiple times")
source_map[key][table_ref.name] = table_ref
return env_source_map


@macro()
def source(evaluator: MacroEvaluator, ref: str, table: str):
"""Allows us to change the location of a source when the gateway changes."""
source_map = generate_source_map(read_yaml_files(SOURCE_YAML_GLOB))

gateway = evaluator.gateway
if not gateway:
return ""
table_ref = source_map[gateway][ref][table]
if not table_ref.catalog:
return to_table(f'"{table_ref.schema_name}"."{table_ref.table_name}"')
return to_table(
f'"{table_ref.catalog}"."{table_ref.schema_name}"."{table_ref.table_name}"'
)
# """
# A source macro that can be used for rewriting a source reference at runtime.

# This mimics the sources behavior in dbt except that the source and destination
# of the rewrite is infinitely flexible.
# """

# from typing import Optional, Dict, List
# import os
# import glob
# import yaml
# from sqlmesh import macro
# from sqlmesh.core.macros import MacroEvaluator
# from sqlglot import to_table
# from pydantic import BaseModel, model_validator

# CURR_DIR = os.path.abspath(os.path.dirname(__file__))
# SOURCE_YAML_DIR = os.path.abspath(os.path.join(CURR_DIR, "../sources"))
# SOURCE_YAML_GLOB = os.path.join(SOURCE_YAML_DIR, "*.yml")


# class TableReference(BaseModel):
# name: str
# catalog: Optional[str] = None
# table_name: Optional[str] = None
# schema_name: str

# @model_validator(mode="after")
# def ensure_table_name(self):
# if self.table_name is None:
# self.table_name = self.name
# return self


# class SourcesFile(BaseModel):
# gateway: str
# sources: Dict[str, List[TableReference]]


# EnvSourceMap = Dict[str, Dict[str, Dict[str, TableReference]]]


# def read_yaml_files(glob_pattern) -> List[SourcesFile]:
# # Something about the multithread/processing of sqlmesh probably interferes
# # with something in pydantic. This is a hack for now to get this working,
# # but we should likely just use a typed dict or a simple dataclass to do
# # this validation.
# from typing import Optional, Dict, List

# class TableReference(BaseModel):
# name: str
# catalog: Optional[str] = None
# table_name: Optional[str] = None
# schema_name: str

# @model_validator(mode="after")
# def ensure_table_name(self):
# if self.table_name is None:
# self.table_name = self.name
# return self

# class SourcesFile(BaseModel):
# gateway: str
# sources: Dict[str, List[TableReference]]

# SourcesFile.model_rebuild()
# sources_files: List[SourcesFile] = []
# # Find all files matching the glob pattern
# for file_name in glob.glob(glob_pattern):
# if os.path.isfile(file_name):
# with open(file_name, "r") as file:
# try:
# data = yaml.safe_load(file)
# sources_files.append(SourcesFile.model_validate(data))
# except yaml.YAMLError as exc:
# print(f"Error parsing {file_name}: {exc}")
# return sources_files


# def generate_source_map(parsed_sources_files: List[SourcesFile]) -> EnvSourceMap:
# env_source_map: EnvSourceMap = {}
# for sources_file in parsed_sources_files:
# if sources_file.gateway not in env_source_map:
# env_source_map[sources_file.gateway] = {}
# source_map = env_source_map[sources_file.gateway]
# for key, table_refs in sources_file.sources.items():
# if key not in source_map:
# source_map[key] = {}
# for table_ref in table_refs:
# if table_ref.name in source_map[key]:
# print("WARNING: table annotated multiple times")
# source_map[key][table_ref.name] = table_ref
# return env_source_map


# @macro()
# def source(evaluator: MacroEvaluator, ref: str, table: str):
# """Allows us to change the location of a source when the gateway changes."""
# source_map = generate_source_map(read_yaml_files(SOURCE_YAML_GLOB))

# gateway = evaluator.gateway
# if not gateway:
# return ""
# table_ref = source_map[gateway][ref][table]
# if not table_ref.catalog:
# return to_table(f'"{table_ref.schema_name}"."{table_ref.table_name}"')
# return to_table(
# f'"{table_ref.catalog}"."{table_ref.schema_name}"."{table_ref.table_name}"'
# )
Empty file.
26 changes: 13 additions & 13 deletions warehouse/metrics_mesh/models/events_daily_to_artifact.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,33 @@ MODEL (
to_artifact_id
),
columns (
bucket_day Date,
bucket_day DATE,
event_source String,
event_type String,
from_artifact_id String,
to_artifact_id String,
amount Int64
),
amount Float64
)
);
with events as (
select distinct from_artifact_id,
WITH events AS (
SELECT DISTINCT from_artifact_id,
to_artifact_id,
event_source,
event_type,
time,
amount
from @source("oso", "timeseries_events_by_artifact_v0")
where CAST(time AS DATE) between STR_TO_DATE(@start_ds, '%Y-%m-%d') and STR_TO_DATE(@end_ds, '%Y-%m-%d')
from @oso_source.timeseries_events_by_artifact_v0
where CAST(time AS DATE) between STR_TO_DATE(@start_ds, '%Y-%m-%d')::Date and STR_TO_DATE(@end_ds, '%Y-%m-%d')::Date
)
select from_artifact_id,
SELECT from_artifact_id,
to_artifact_id,
event_source,
event_type,
DATE_TRUNC('day', CAST(time AS DATE)) as bucket_day,
SUM(amount) as amount
from events
group by from_artifact_id,
DATE_TRUNC('DAY', time::DATE) AS bucket_day,
SUM(amount) AS amount
FROM events
GROUP BY from_artifact_id,
to_artifact_id,
event_source,
event_type,
DATE_TRUNC('day', CAST(time AS DATE))
DATE_TRUNC('DAY', time::DATE)
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
MetricQuery,
)


daily_timeseries_rolling_window_model(
model_name="metrics.timeseries_code_metrics_by_artifact_over_30_days",
model_name="metrics.timeseries_metrics_by_artifact_over_30_days",
metric_queries={
"developer_active_days": MetricQuery(
ref="active_days.sql",
Expand Down Expand Up @@ -36,4 +37,8 @@
),
},
trailing_days=30,
model_options=dict(
start="2015-01-01",
cron="@daily",
),
)
33 changes: 33 additions & 0 deletions warehouse/metrics_mesh/models/metrics_v0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
MODEL (
name metrics.metrics_v0,
kind FULL,
dialect "clickhouse"
);
WITH all_timeseries_metric_names AS (
SELECT DISTINCT
metric
FROM metrics.timeseries_metrics_by_artifact_over_30_days
),
metrics_v0_no_casting AS (
SELECT
@oso_id('OSO', 'oso', metric) AS metric_id,
'OSO' AS metric_source,
'oso' AS metric_namespace,
metric AS metric_name,
metric AS display_name,
'TODO' AS description,
NULL AS raw_definition,
'TODO' AS definition_ref,
'UNKNOWN' AS aggregation_function
FROM all_timeseries_metric_names
)
select
metric_id::String,
metric_source::String,
metric_name::String,
display_name::String,
description::Nullable(String),
raw_definition::Nullable(String),
definition_ref::Nullable(String),
aggregation_function::Nullable(String)
FROM metrics_v0_no_casting
Loading

0 comments on commit ab802e9

Please sign in to comment.