Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debugging Metrics Data Quality [Unfinished] #2590

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ production/

# cube.dev
core.1
.cubecloud
.cubecloud
warehouse/metrics_tools/builder/lib
1,288 changes: 1,278 additions & 10 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ sqlmesh = { extras = ["trino"], version = "^0.129.0" }
dagster-duckdb = "^0.24.0"
dagster-duckdb-polars = "^0.24.0"
google-cloud-bigquery-storage = "^2.25.0"
dagster-sqlmesh = "0.2.0.dev3"
dagster-sqlmesh = {path = "../dagster-sqlmesh", develop = true}
google-auth = "^2.34.0"
pillow = "^10.4.0"
dagster-k8s = "^0.24.6"
pyiceberg = { extras = ["hive"], version = "^0.7.1" }
connectorx = "^0.4.0"
bokeh = "^3.6.1"
pyvis = "^0.3.2"
networkx = "^3.4.2"
jupyter = "^1.1.1"
matplotlib = "^3.9.3"


[tool.poetry.scripts]
Expand Down
10 changes: 4 additions & 6 deletions warehouse/metrics_mesh/models/metrics_factories.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from metrics_tools.factory import (
timeseries_metrics,
MetricQueryDef,
RollingConfig,
)
import os

from metrics_tools.factory import MetricQueryDef, RollingConfig, timeseries_metrics

timeseries_metrics(
start="2015-01-01",
start=os.environ.get("SQLMESH_TIMESERIES_METRICS_START", "2015-01-01"),
catalog="metrics",
model_prefix="timeseries",
timeseries_sources=[
Expand Down
1 change: 1 addition & 0 deletions warehouse/metrics_tools/builder/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""A tool to help build metrics"""
576 changes: 576 additions & 0 deletions warehouse/metrics_tools/builder/builder.ipynb

Large diffs are not rendered by default.

100 changes: 100 additions & 0 deletions warehouse/metrics_tools/builder/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Wraps the sqlmesh controller from dagster sqlmesh to provide a richer
experience specifically for creating metrics models"""

import logging
import os
import typing as t
import sqlglot as sql
from sqlglot import exp
import matplotlib.pyplot as plt
import matplotlib.dates as mdates


from dagster_sqlmesh.controller.base import SQLMeshController, SQLMeshInstance

logger = logging.getLogger(__name__)


class MetricsController:
@classmethod
def setup(
cls,
path: str,
environment: str = "test",
gateway: str = "local",
enable_logging: bool = False,
log_override: t.Optional[logging.Logger] = None,
):
if enable_logging:
import sys

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
controller = SQLMeshController.setup(
path=path,
gateway=gateway,
log_override=log_override,
)
os.environ["SQLMESH_TIMESERIES_METRICS_START"] = "2024-07-01"
return cls(environment, controller)

def __init__(self, environment: str, sqlmesh_controller: SQLMeshController):
self._environment = environment
self._sqlmesh_controller = sqlmesh_controller

def load(self):
# Debug console
with self._sqlmesh_controller.instance(self._environment) as mesh:
for event in mesh.plan_and_run(
plan_options={"skip_tests": True},
):
logger.debug(f"event received: {event.__class__.__name__}")
logger.debug(event)

def models_dag(self):
with self._sqlmesh_controller.instance(self._environment) as mesh:
return mesh.models_dag()

def plot(self, query: str, group_by: str):
with self._sqlmesh_controller.instance(self._environment) as mesh:
df = self._fetchdf(mesh, query)
for entity_id, group in df.groupby(group_by):
print(group.dtypes)
print(entity_id)
plt.plot(
group["metrics_sample_date"],
group["amount"],
label=f"Entity {entity_id}",
)

# Add labels, title, and legend
plt.xlabel("Time")
dt_fmt = mdates.DateFormatter("%Y-%m-%d")
plt.locator_params(nbins=5)
plt.gca().xaxis.set_major_formatter(dt_fmt)
plt.ylabel("Amount")
plt.title("Metrics Over Time by Project ID")
plt.grid(True)
plt.show()
return df

def fetchdf(self, query: str):
with self._sqlmesh_controller.instance(self._environment) as mesh:
return self._fetchdf(mesh, query)

def _fetchdf(self, mesh: SQLMeshInstance, query: str):
parsed = sql.parse_one(query)
for table in parsed.find_all(exp.Table):
table_name = table.this
db_name = table.db
try:
rewritten_table = mesh.context.table(f"{db_name}.{table_name}")
except KeyError:
continue
print(f"rewriting: {rewritten_table}")
rewritten = exp.to_table(rewritten_table)
if table.alias:
rewritten = rewritten.as_(table.alias, table=True)
table.replace(rewritten)

print(parsed.sql(dialect="duckdb"))
return mesh.context.fetchdf(parsed.sql(dialect="duckdb"))
464 changes: 464 additions & 0 deletions warehouse/metrics_tools/builder/dag.html

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions warehouse/metrics_tools/builder/dag_widget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import networkx as nx
from pyvis.network import Network
from ipywidgets import Output, VBox, HTML, interact
from IPython.display import display
import typing as t


def create_dag_widget(dag_dict: t.Dict[str, t.Set[str]]) -> None:
"""
Create an interactive DAG widget using IPyWidgets and Pyvis.

Parameters:
dag_dict (Dict[str, Set[str]]): A dictionary representing the DAG.
Keys are node names, and values are sets of parent node names.
"""
# Create a directed graph
G = nx.DiGraph()
for node, parents in dag_dict.items():
for parent in parents:
G.add_edge(parent, node)
if not parents: # Add nodes with no parents
G.add_node(node)

# Create Pyvis Network
net = Network(notebook=True, directed=True, cdn_resources="in_line")

net.from_nx(G)

# Add click event handlers
node_html: t.Dict[str, str] = {}
for node in G.nodes():
node_html[node] = f"Clicked on node: {node}"
net.add_node(node, title=f"Click to select {node}")

# Render to HTML file
net.show("dag.html")

# Create widgets for interaction
output = Output()
html_view = HTML(value=net.html)
html_view.layout.height = "500px"

# Display output on click
@output.capture(clear_output=True)
def on_node_click(node: str) -> None:
if node in node_html:
print(node_html[node])

# Interactive handling of clicks
def handle_node_click(node_name: str) -> None:
on_node_click(node_name)

interact(handle_node_click, node_name=list(G.nodes()))

# Combine the widgets
display(VBox([html_view, output]))


def create_dag_map_widget(dag_dict: t.Dict[str, t.Set[str]]) -> None:
"""
Create an interactive DAG widget displayed like a map using Pyvis.

Parameters:
dag_dict (Dict[str, Set[str]]): A dictionary representing the DAG.
Keys are node names, and values are sets of parent node names.
"""
# Create a directed graph
G = nx.DiGraph()
for node, parents in dag_dict.items():
for parent in parents:
G.add_edge(parent, node)
if not parents: # Add nodes with no parents
G.add_node(node)

# Create Pyvis Network
net = Network(height="600px", width="100%", directed=True, cdn_resources="in_line")
net.from_nx(G)

# Use physics for map-like layout
net.set_options(
"""
var options = {
"physics": {
"enabled": true,
"barnesHut": {
"gravitationalConstant": -20000,
"springLength": 150,
"springConstant": 0.05,
"damping": 0.1
}
},
"nodes": {
"shape": "box",
"font": {
"size": 16,
"align": "center"
},
"color": {
"background": "#D2E5FF",
"border": "#2B7CE9",
"highlight": {
"background": "#FFAA55",
"border": "#FF5722"
}
},
"borderWidth": 2
},
"edges": {
"arrows": {
"to": {
"enabled": true
}
},
"color": {
"color": "#848484",
"highlight": "#FFAA55"
},
"smooth": true
}
}
"""
)

# Add custom labels and tooltips for nodes
for node in G.nodes():
net.add_node(node, label=node, title=f"Node: {node}")

# Generate and display the HTML for the graph
net_html = net.generate_html()
display(HTML(net_html))
85 changes: 85 additions & 0 deletions warehouse/metrics_tools/compute/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
import typing as t
import uuid

from sqlglot import exp
from sqlmesh.core.dialect import parse_one
from trino.dbapi import Connection

logger = logging.getLogger(__name__)


class TrinoCacheExportManager:
def __init__(self, db: Connection, gcs_bucket: str):
self.exported_map: t.Dict[str, str] = {}
self.gcs_bucket = gcs_bucket
self.db = db

def run_query(self, query: str):
cursor = self.db.cursor()
logger.info(f"EXECUTING: {query}")
return cursor.execute(query)

def export_table_for_cache(self, table: str):
# Using the actual name
# Export with trino
if table in self.exported_map:
logger.debug(f"CACHE HIT FOR {table}")
return self.exported_map[table]

columns: t.List[t.Tuple[str, str]] = []

col_result = self.run_query(f"SHOW COLUMNS FROM {table}").fetchall()
for row in col_result:
column_name = row[0]
column_type = row[1]
columns.append((column_name, column_type))

table_exp = exp.to_table(table)
logger.info(f"RETREIVED COLUMNS: {columns}")
export_table_name = f"export_{table_exp.this.this}_{uuid.uuid4().hex}"

base_create_query = f"""
CREATE table "source"."export"."{export_table_name}" (
placeholder VARCHAR,
) WITH (
format = 'PARQUET',
external_location = 'gs://{self.gcs_bucket}/trino-export/{export_table_name}/'
)
"""
create_query = parse_one(base_create_query)
create_query.this.set(
"expressions",
[
exp.ColumnDef(
this=exp.to_identifier(column_name),
kind=parse_one(column_type, into=exp.DataType),
)
for column_name, column_type in columns
],
)

self.run_query(create_query.sql(dialect="trino"))

base_insert_query = f"""
INSERT INTO "source"."export"."{export_table_name}" (placeholder)
SELECT placeholder
FROM {table_exp}
"""

column_identifiers = [
exp.to_identifier(column_name) for column_name, _ in columns
]

insert_query = parse_one(base_insert_query)
insert_query.this.set(
"expressions",
column_identifiers,
)
select = t.cast(exp.Select, insert_query.expression)
select.set("expressions", column_identifiers)

self.run_query(insert_query.sql(dialect="trino"))

self.exported_map[table] = export_table_name
return self.exported_map[table]
Loading