Skip to content

Commit

Permalink
remove unnecessary
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Dec 10, 2024
1 parent 5cb078b commit 3bd5c3e
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 111 deletions.
90 changes: 0 additions & 90 deletions ops/k8s-apps/base/metrics-calc-service/metrics-calc-service.yaml

This file was deleted.

21 changes: 0 additions & 21 deletions warehouse/metrics_tools/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dask.distributed import CancelledError, Future
from metrics_tools.compute.worker import execute_duckdb_load
from metrics_tools.runner import FakeEngineAdapter, MetricsRunner
from pydantic import BaseModel

from .cache import CacheExportManager
from .cluster import ClusterManager
Expand All @@ -32,20 +31,6 @@
logger.setLevel(logging.DEBUG)


class QueryJobComputeSubmitArgsQueueItem(BaseModel):
job_id: str
task_id: str
result_path: str
batch: str
dependencies: t.Dict[str, str]


class QueryJobSubmitRequestQueueItem(BaseModel):
job_id: str
result_path: str
input: QueryJobSubmitRequest


class MetricsCalculationService:
id: str
gcs_bucket: str
Expand All @@ -55,8 +40,6 @@ class MetricsCalculationService:
job_tasks: t.Dict[str, asyncio.Task]
job_state_lock: asyncio.Lock
logger: logging.Logger
stop_event: asyncio.Event
submit_job_request_queue: asyncio.Queue[QueryJobSubmitRequestQueueItem]

@classmethod
def setup(
Expand Down Expand Up @@ -97,9 +80,6 @@ def __init__(
self.job_tasks = {}
self.job_state_lock = asyncio.Lock()
self.logger = log_override or logger
self.stop_event = asyncio.Event()

self.submit_job_request_queue = asyncio.Queue()

async def handle_query_job_submit_request(
self, job_id: str, result_path_base: str, input: QueryJobSubmitRequest
Expand Down Expand Up @@ -198,7 +178,6 @@ async def _handle_query_job_submit_request(
async def close(self):
await self.cluster_manager.close()
await self.cache_manager.stop()
self.stop_event.set()

async def start_cluster(self, start_request: ClusterStartRequest) -> ClusterStatus:
self.logger.debug("starting cluster")
Expand Down

0 comments on commit 3bd5c3e

Please sign in to comment.