Skip to content

Commit

Permalink
more clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Dec 11, 2024
1 parent 3bd5c3e commit 484228a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 50 deletions.
6 changes: 3 additions & 3 deletions warehouse/metrics_tools/compute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def calculate_metrics(
retries (t.Optional[int], optional): The number of retries. Defaults to None.
Returns:
str: The gcs result path from the metrics calculation service
ExportReference: The export reference for the resulting calculation
"""
# Trigger the cluster start
status = self.start_cluster(
Expand All @@ -97,7 +97,7 @@ def calculate_metrics(
retries,
)
job_id = job_response.job_id
result_path = job_response.result_path
export_reference = job_response.export_reference

# Wait for the job to be completed
status_response = self.get_job_status(job_id)
Expand All @@ -122,7 +122,7 @@ def calculate_metrics(
f"job[{job_id}] completed with status {status_response.status}"
)

return result_path
return export_reference

def start_cluster(self, min_size: int, max_size: int):
"""Start a compute cluster with the given min and max size"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Testing script
"""Manual testing scripts for the metrics calculation service.
Eventually we should replace this with a larger end-to-end test
"""

import logging
from datetime import datetime

Expand All @@ -13,55 +17,11 @@
ExportedTableLoadRequest,
ExportReference,
ExportType,
QueryJobSubmitRequest,
)

logger = logging.getLogger(__name__)


def run_get(
url: str,
start: str,
end: str,
batch_size: int = 1,
):
input = QueryJobSubmitRequest(
query_str="""
SELECT bucket_day, to_artifact_id, from_artifact_id, event_source, event_type, SUM(amount) as amount
FROM metrics.events_daily_to_artifact
where bucket_day >= strptime(@start_ds, '%Y-%m-%d') and bucket_day <= strptime(@end_ds, '%Y-%m-%d')
group by
bucket_day,
to_artifact_id,
from_artifact_id,
event_source,
event_type
""",
start=datetime.strptime(start, "%Y-%m-%d"),
end=datetime.strptime(end, "%Y-%m-%d"),
dialect="duckdb",
columns=[
("bucket_day", "TIMESTAMP"),
("to_artifact_id", "VARCHAR"),
("from_artifact_id", "VARCHAR"),
("event_source", "VARCHAR"),
("event_type", "VARCHAR"),
("amount", "NUMERIC"),
],
ref=PeerMetricDependencyRef(
name="", entity_type="artifact", window=30, unit="day"
),
locals={},
dependent_tables_map={
"metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958"
},
batch_size=batch_size,
)
# requests.get(f"{url}/sub", json=input.dict())
response = requests.post(f"{url}/job/submit", json=to_jsonable_python(input))
print(response.json())


def run_start(url: str, min=6, max=10):
req = ClusterStartRequest(min_size=min, max_size=max)
response = requests.post(f"{url}/cluster/start", json=to_jsonable_python(req))
Expand Down
10 changes: 9 additions & 1 deletion warehouse/metrics_tools/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ClusterStartRequest,
ClusterStatus,
ExportReference,
ExportType,
QueryJobProgress,
QueryJobState,
QueryJobStatus,
Expand Down Expand Up @@ -205,7 +206,14 @@ async def submit_job(self, input: QueryJobSubmitRequest):
async with self.job_state_lock:
self.job_tasks[job_id] = task

return QueryJobSubmitResponse(job_id=job_id, result_path=result_path)
return QueryJobSubmitResponse(
job_id=job_id,
export_reference=ExportReference(
table=job_id,
type=ExportType.GCS,
payload={"gcs_path": result_path},
),
)

async def _notify_job_pending(self, job_id: str, total: int):
await self._set_job_state(
Expand Down
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/compute/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def query_as(self, dialect: str) -> str:

class QueryJobSubmitResponse(BaseModel):
job_id: str
result_path: str
export_reference: ExportReference


class QueryJobStatusResponse(BaseModel):
Expand Down

0 comments on commit 484228a

Please sign in to comment.