Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLMesh + Metrics Calculation Service #2628

Merged
merged 9 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pyee = "^12.1.1"
aiotrino = "^0.2.3"
pytest-asyncio = "^0.24.0"
isort = "^5.13.2"
uvicorn = { extras = ["standard"], version = "^0.32.1" }


[tool.poetry.scripts]
Expand Down
25 changes: 2 additions & 23 deletions warehouse/metrics_mesh/models/metrics_factories.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from metrics_tools.factory import (
timeseries_metrics,
MetricQueryDef,
RollingConfig,
)
from metrics_tools.factory import MetricQueryDef, RollingConfig, timeseries_metrics

timeseries_metrics(
start="2015-01-01",
Expand Down Expand Up @@ -148,7 +144,7 @@
"commits_rolling": MetricQueryDef(
ref="commits.sql",
rolling=RollingConfig(
windows=[180],
windows=[10],
unit="day",
cron="@daily",
),
Expand Down Expand Up @@ -261,23 +257,6 @@
),
entity_types=["artifact", "project", "collection"],
),
# "libin": MetricQueryDef(
# ref="libin.sql",
# vars={
# "activity_event_types": [
# "COMMIT_CODE",
# "ISSUE_OPENED",
# "PULL_REQUEST_OPENED",
# "PULL_REQUEST_MERGED",
# ],
# },
# rolling=RollingConfig(
# windows=[30, 90, 180],
# unit="day",
# cron="@daily",
# ),
# entity_types=["artifact"],
# ),
"funding_received": MetricQueryDef(
ref="funding_received.sql",
rolling=RollingConfig(
Expand Down
48 changes: 34 additions & 14 deletions warehouse/metrics_tools/compute/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import queue
import typing as t
import uuid
from datetime import datetime

from aiotrino.dbapi import Connection
from pydantic import BaseModel
from pyee.asyncio import AsyncIOEventEmitter
from sqlglot import exp
from sqlmesh.core.dialect import parse_one

from .types import ExportReference, ExportType
from .types import ColumnsDefinition, ExportReference, ExportType

logger = logging.getLogger(__name__)

Expand All @@ -23,11 +24,14 @@ class ExportCacheCompletedQueueItem(BaseModel):


class ExportCacheQueueItem(BaseModel):
execution_time: datetime
table: str


class DBExportAdapter(abc.ABC):
async def export_table(self, table: str) -> ExportReference:
async def export_table(
self, table: str, execution_time: datetime
) -> ExportReference:
raise NotImplementedError()

async def clean_export_table(self, table: str):
Expand All @@ -38,10 +42,15 @@ class FakeExportAdapter(DBExportAdapter):
def __init__(self, log_override: t.Optional[logging.Logger] = None):
self.logger = log_override or logger

async def export_table(self, table: str) -> ExportReference:
async def export_table(
self, table: str, execution_time: datetime
) -> ExportReference:
self.logger.info(f"fake exporting table: {table}")
return ExportReference(
table=table, type=ExportType.GCS, payload={"gcs_path": "fake_path:{table}"}
table_name=table,
type=ExportType.GCS,
payload={"gcs_path": "fake_path:{table}"},
columns=ColumnsDefinition(columns=[]),
)

async def clean_export_table(self, table: str):
Expand All @@ -63,7 +72,9 @@ def __init__(
self.hive_schema = hive_schema
self.logger = log_override or logger

async def export_table(self, table: str) -> ExportReference:
async def export_table(
self, table: str, execution_time: datetime
) -> ExportReference:
columns: t.List[t.Tuple[str, str]] = []

col_result = await self.run_query(f"SHOW COLUMNS FROM {table}")
Expand All @@ -77,7 +88,9 @@ async def export_table(self, table: str) -> ExportReference:
self.logger.debug(f"retrieved columns for {table} export: {columns}")
export_table_name = f"export_{table_exp.this.this}_{uuid.uuid4().hex}"

gcs_path = f"gs://{self.gcs_bucket}/trino-export/{export_table_name}/"
# We make cleaning easier by using the execution time to allow listing
# of the export tables
gcs_path = f"gs://{self.gcs_bucket}/trino-export/{execution_time.strftime('%Y/%m/%d/%H')}/{export_table_name}/"

# We use a little bit of a hybrid templating+sqlglot magic to generate
# the create and insert queries. This saves us having to figure out the
Expand Down Expand Up @@ -135,7 +148,10 @@ async def export_table(self, table: str) -> ExportReference:
await self.run_query(insert_query.sql(dialect="trino"))

return ExportReference(
table=table, type=ExportType.GCS, payload={"gcs_path": gcs_path}
table_name=table,
type=ExportType.GCS,
payload={"gcs_path": gcs_path},
columns=ColumnsDefinition(columns=columns, dialect="trino"),
)

async def run_query(self, query: str):
Expand Down Expand Up @@ -237,9 +253,9 @@ async def stop(self):
async def export_queue_loop(self):
in_progress: t.Set[str] = set()

async def export_table(table: str):
async def export_table(table: str, execution_time: datetime):
try:
return await self._export_table_for_cache(table)
return await self._export_table_for_cache(table, execution_time)
except Exception as e:
self.logger.error(f"Error exporting table {table}: {e}")
in_progress.remove(table)
Expand All @@ -253,7 +269,7 @@ async def export_table(table: str):
# The table is already being exported. Skip this in the queue
continue
in_progress.add(item.table)
export_reference = await export_table(item.table)
export_reference = await export_table(item.table, item.execution_time)
self.event_emitter.emit(
"exported_table", table=item.table, export_reference=export_reference
)
Expand Down Expand Up @@ -281,17 +297,19 @@ async def get_export_table_reference(self, table: str):
return None
return copy.deepcopy(reference)

async def _export_table_for_cache(self, table: str):
async def _export_table_for_cache(self, table: str, execution_time: datetime):
"""Triggers an export of a table to a cache location in GCS. This does
this by using the Hive catalog in trino to create a new table with the
same schema as the original table, but with a different name. This new
table is then used as the cache location for the original table."""

export_reference = await self.export_adapter.export_table(table)
export_reference = await self.export_adapter.export_table(table, execution_time)
self.logger.info(f"exported table: {table} -> {export_reference}")
return export_reference

async def resolve_export_references(self, tables: t.List[str]):
async def resolve_export_references(
self, tables: t.List[str], execution_time: datetime
):
"""Resolves any required export table references or queues up a list of
tables to be exported to a cache location. Once ready, the map of tables
is resolved."""
Expand Down Expand Up @@ -331,5 +349,7 @@ async def handle_exported_table(
"exported_table", handle_exported_table
)
for table in tables_to_export:
self.export_queue.put_nowait(ExportCacheQueueItem(table=table))
self.export_queue.put_nowait(
ExportCacheQueueItem(table=table, execution_time=execution_time)
)
return await future
1 change: 1 addition & 0 deletions warehouse/metrics_tools/compute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def submit_job(
locals=locals,
dependent_tables_map=dependent_tables_map,
retries=retries,
execution_time=datetime.now(),
)
job_response = self.service_post_with_input(
QueryJobSubmitResponse, "/job/submit", request
Expand Down
4 changes: 2 additions & 2 deletions warehouse/metrics_tools/compute/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ def make_new_cluster_with_defaults():
from . import constants

return make_new_cluster(
f"{constants.cluster_worker_image_repo}:{constants.cluster_worker_image_tag}",
f"{constants.cluster_image_repo}:{constants.cluster_image_tag}",
constants.cluster_name,
constants.cluster_namespace,
threads=constants.cluster_worker_threads,
threads=constants.worker_threads,
scheduler_memory_limit=constants.scheduler_memory_limit,
scheduler_memory_request=constants.scheduler_memory_request,
worker_memory_limit=constants.worker_memory_limit,
Expand Down
13 changes: 6 additions & 7 deletions warehouse/metrics_tools/compute/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,23 @@

cluster_namespace = env.required_str("METRICS_CLUSTER_NAMESPACE")
cluster_name = env.required_str("METRICS_CLUSTER_NAME")
cluster_worker_threads = env.required_int("METRICS_CLUSTER_WORKER_THREADS", 16)
cluster_worker_image_repo = env.required_str(
"METRICS_CLUSTER_WORKER_IMAGE_REPO", "ghcr.io/opensource-observer/dagster-dask"
cluster_image_repo = env.required_str(
"METRICS_CLUSTER_IMAGE_REPO", "ghcr.io/opensource-observer/dagster-dask"
)
cluster_worker_image_tag = env.required_str("METRICS_CLUSTER_WORKER_IMAGE_TAG")
cluster_image_tag = env.required_str("METRICS_CLUSTER_IMAGE_TAG")
scheduler_memory_limit = env.required_str("METRICS_SCHEDULER_MEMORY_LIMIT", "90000Mi")
scheduler_memory_request = env.required_str(
"METRICS_SCHEDULER_MEMORY_REQUEST", "85000Mi"
)
worker_threads = env.required_int("METRICS_WORKER_THREADS", 16)
worker_memory_limit = env.required_str("METRICS_WORKER_MEMORY_LIMIT", "90000Mi")
worker_memory_request = env.required_str("METRICS_WORKER_MEMORY_REQUEST", "85000Mi")

gcs_bucket = env.required_str("METRICS_GCS_BUCKET")
gcs_key_id = env.required_str("METRICS_GCS_KEY_ID")
gcs_secret = env.required_str("METRICS_GCS_SECRET")

results_path_prefix = env.required_str(
"METRICS_GCS_RESULTS_PATH_PREFIX", "metrics-calc-service-results"
)
results_path_prefix = env.required_str("METRICS_GCS_RESULTS_PATH_PREFIX", "mcs-results")

worker_duckdb_path = env.required_str("METRICS_WORKER_DUCKDB_PATH")

Expand All @@ -36,6 +34,7 @@
hive_schema = env.required_str("METRICS_HIVE_SCHEMA", "export")

debug_all = env.ensure_bool("METRICS_DEBUG_ALL", False)
debug_with_duckdb = env.ensure_bool("METRICS_DEBUG_WITH_DUCKDB", False)
if not debug_all:
debug_cache = env.ensure_bool("METRICS_DEBUG_CACHE", False)
debug_cluster = env.ensure_bool("METRICS_DEBUG_CLUSTER", False)
Expand Down
43 changes: 43 additions & 0 deletions warehouse/metrics_tools/compute/log_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Default log configuration for the metrics calculation service. This can be
# used by uvicorn Thanks to:
# https://gist.github.com/liviaerxin/d320e33cbcddcc5df76dd92948e5be3b for a
# starting point.
version: 1
disable_existing_loggers: False
formatters:
default:
# "()": uvicorn.logging.DefaultFormatter
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
access:
# "()": uvicorn.logging.AccessFormatter
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
default:
formatter: default
class: logging.StreamHandler
stream: ext://sys.stderr
access:
formatter: access
class: logging.StreamHandler
stream: ext://sys.stdout
loggers:
uvicorn.error:
level: INFO
handlers:
- default
propagate: no
uvicorn.access:
level: INFO
handlers:
- access
propagate: no
metrics_tools:
level: DEBUG
handlers:
- default
propagate: no
root:
level: ERROR
handlers:
- default
propagate: no
7 changes: 5 additions & 2 deletions warehouse/metrics_tools/compute/manual_testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ..definition import PeerMetricDependencyRef
from .types import (
ClusterStartRequest,
ColumnsDefinition,
ExportedTableLoadRequest,
ExportReference,
ExportType,
Expand All @@ -32,11 +33,12 @@ def run_cache_load(url: str):
req = ExportedTableLoadRequest(
map={
"sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958": ExportReference(
table="export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958",
table_name="export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958",
type=ExportType.GCS,
payload={
"gcs_path": "gs://oso-dataset-transfer-bucket/trino-export/export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958"
},
columns=ColumnsDefinition(columns=[]),
),
}
)
Expand Down Expand Up @@ -66,11 +68,12 @@ def run_local_test(
client.run_cache_manual_load(
{
"sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958": ExportReference(
table="export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958",
table_name="export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958",
type=ExportType.GCS,
payload={
"gcs_path": "gs://oso-dataset-transfer-bucket/trino-export/export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958"
},
columns=ColumnsDefinition(columns=[]),
),
}
)
Expand Down
Loading
Loading