Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MCS Deployment Fixes/Tweaks #2646

Merged
merged 9 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ops/helm-charts/metrics-calculation-service/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ name: metrics-calculation-service
description: The metrics calculation service

type: application
version: 0.2.1
version: 0.7.0
appVersion: 0.1.0
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ app.kubernetes.io/name: {{ include "mcs.fullname" . }}

{{- define "mcs.selectorLabels" -}}
{{ include "mcs.labels" . }}
component: mcs-frontend
{{- end -}}
12 changes: 9 additions & 3 deletions ops/helm-charts/metrics-calculation-service/templates/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ spec:
replicas: 1
selector:
matchLabels:
{{- include "mcs.labels" . | nindent 6 }}
component: mcs-frontend
template:
metadata:
labels:
{{- include "mcs.labels" . | nindent 8 }}
{{- include "mcs.selectorLabels" . | nindent 8 }}
spec:
serviceAccountName: {{ include "mcs.fullname" . }}
containers:
Expand All @@ -39,6 +39,8 @@ spec:
env:
- name: METRICS_CLUSTER_NAMESPACE
value: "{{ .Values.mcs.cluster.namespace }}"
- name: METRICS_CLUSTER_SERVICE_ACCOUNT
value: "{{ .Values.mcs.cluster.serviceAccount }}"
- name: METRICS_CLUSTER_NAME
value: "{{ .Values.mcs.cluster.name }}"
- name: METRICS_CLUSTER_IMAGE_REPO
Expand All @@ -49,14 +51,18 @@ spec:
value: "{{ .Values.mcs.cluster.scheduler.memory.limit }}"
- name: METRICS_SCHEDULER_MEMORY_REQUEST
value: "{{ .Values.mcs.cluster.scheduler.memory.request }}"
- name: METRICS_CLUSTER_WORKER_THREADS
- name: METRICS_SCHEDULER_POOL_TYPE
value: "{{ .Values.mcs.cluster.scheduler.poolType }}"
- name: METRICS_WORKER_THREADS
value: "{{ .Values.mcs.cluster.worker.threads }}"
- name: METRICS_WORKER_MEMORY_LIMIT
value: "{{ .Values.mcs.cluster.worker.memory.limit }}"
- name: METRICS_WORKER_MEMORY_REQUEST
value: "{{ .Values.mcs.cluster.worker.memory.request }}"
- name: METRICS_WORKER_DUCKDB_PATH
value: "{{ .Values.mcs.cluster.worker.duckdb_path }}"
- name: METRICS_WORKER_POOL_TYPE
value: "{{ .Values.mcs.cluster.worker.poolType }}"
- name: METRICS_GCS_BUCKET
value: "{{ .Values.mcs.gcs.bucket }}"
- name: METRICS_GCS_KEY_ID
Expand Down
31 changes: 23 additions & 8 deletions ops/helm-charts/metrics-calculation-service/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,27 @@ kind: ClusterRole
metadata:
name: {{ include "mcs.fullname" . }}
rules:
- apiGroups: [""]
resources: ["pods", "services"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# Application: watching & handling for the custom resource we declare.
- apiGroups: [kubernetes.dask.org]
resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
verbs: [get, list, watch, patch, create, delete]

# Application: other resources it needs to watch and get information from.
- apiGroups:
- "" # indicates the core API group
resources: [pods, pods/status]
verbs:
- "get"
- "list"
- "watch"

- apiGroups:
- "" # indicates the core API group
resources: [services]
verbs:
- "get"
- "list"
- "watch"

---
apiVersion: rbac.authorization.k8s.io/v1
Expand All @@ -18,9 +33,9 @@ metadata:
name: {{ include "mcs.fullname" . }}
subjects:
- kind: ServiceAccount
name: {{ include "mcs.fullname" . }}-sa
name: {{ include "mcs.fullname" . }}
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: {{ include "mcs.fullname" . }}-role
name: {{ include "mcs.fullname" . }}
apiGroup: rbac.authorization.k8s.io
3 changes: 3 additions & 0 deletions ops/helm-charts/metrics-calculation-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mcs:
repo: ghcr.io/opensource-observer/oso
tag: latest
cluster:
serviceAccount: "default"
namespace: "default"
name: "default"
image:
Expand All @@ -24,11 +25,13 @@ mcs:
memory:
limit: "2Gi"
request: "2Gi"
poolType: ""
worker:
threads: "4"
memory:
limit: "2Gi"
request: "2Gi"
poolType: ""
duckdb_path: "/scratch/mcs-local.db"
gcs:
bucket: "oso-playground-dataset-transfer-bucket"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ metadata:
spec:
values:
mcs:
cluster:
name: mcs-worker
namespace: production-mcs
serviceAccount: production-mcs
image:
repo: "ghcr.io/opensource-observer/oso"
tag: "latest"
scheduler:
memory:
limit: "23Gi"
request: "20Gi"
poolType: "mcs-scheduler"
worker:
threads: "16"
memory:
limit: "96Gi"
request: "90Gi"
poolType: "mcs-worker"
duckdb_path: "/scratch/mcs-local.db"
trino:
host: production-trino-trino.production-trino.svc.cluster.local
port: 8080
Expand Down
1 change: 1 addition & 0 deletions warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
concurrent_tasks=int(
os.environ.get("SQLMESH_TRINO_CONCURRENT_TASKS", "64")
),
retries=int(os.environ.get("SQLMESH_TRINO_RETRIES", "5")),
),
state_connection=GCPPostgresConnectionConfig(
instance_connection_string=os.environ.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
MODEL (
name metrics.issue_event_time_deltas,
kind INCREMENTAL_BY_TIME_RANGE (
time_column time
time_column time,
batch_size 365,
batch_concurrency 1
),
start '2015-01-01',
cron '@daily',
Expand Down
4 changes: 3 additions & 1 deletion warehouse/metrics_mesh/models/events_daily_to_artifact.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
MODEL (
name metrics.events_daily_to_artifact,
kind INCREMENTAL_BY_TIME_RANGE (
time_column bucket_day
time_column bucket_day,
batch_size 365,
batch_concurrency 1
),
start '2015-01-01',
cron '@daily',
Expand Down
122 changes: 101 additions & 21 deletions warehouse/metrics_tools/compute/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ class ExportCacheQueueItem(BaseModel):
table: str


class ExportError(Exception):
def __init__(self, table: str, error: Exception):
self.table = table
self.error = error


class DBExportAdapter(abc.ABC):
async def export_table(
self, table: str, execution_time: datetime
Expand Down Expand Up @@ -104,19 +110,20 @@ async def export_table(
)
"""

# Trino's hive connector has some issues with certain column types so we
# will forcibly cast those columns to values that will work
processed_columns: t.List[
t.Tuple[exp.Identifier, exp.ColumnDef, exp.Expression]
] = [
self.process_columns(column_name, parse_one(column_type, into=exp.DataType))
for column_name, column_type in columns
]

# Parse the create query
create_query = parse_one(base_create_query)
# Rewrite the column definitions we need to rewrite
create_query.this.set(
"expressions",
[
exp.ColumnDef(
this=exp.to_identifier(column_name),
kind=parse_one(column_type, into=exp.DataType),
)
for column_name, column_type in columns
],
)
# Rewrite the column definitions we need to rewrite.

create_query.this.set("expressions", [row[1] for row in processed_columns])

# Execute the create query which will create the export table
await self.run_query(create_query.sql(dialect="trino"))
Expand All @@ -129,9 +136,8 @@ async def export_table(
FROM {table_exp}
"""

column_identifiers = [
exp.to_identifier(column_name) for column_name, _ in columns
]
column_identifiers = [row[0] for row in processed_columns]
column_selects = [row[2] for row in processed_columns]

# Rewrite the column identifiers in the insert into statement
insert_query = parse_one(base_insert_query)
Expand All @@ -142,7 +148,7 @@ async def export_table(

# Rewrite the column identifiers in the select statement
select = t.cast(exp.Select, insert_query.expression)
select.set("expressions", column_identifiers)
select.set("expressions", column_selects)

# Execute the insert query which will populate the export table
await self.run_query(insert_query.sql(dialect="trino"))
Expand All @@ -156,10 +162,49 @@ async def export_table(

async def run_query(self, query: str):
cursor = await self.db.cursor()
self.logger.info(f"EXECUTING: {query}")
self.logger.info(f"Executing SQL: {query}")
await cursor.execute(query)
return await cursor.fetchall()

def process_columns(
self, column_name: str, column_type: exp.Expression
) -> t.Tuple[exp.Identifier, exp.ColumnDef, exp.Expression]:
assert isinstance(
column_type, exp.DataType
), "column_type must parse into DataType"

self.logger.debug(
f"creating column def for column_name: {column_name} column_type: {column_type}"
)
column_select = exp.to_identifier(column_name)
column_identifier = exp.to_identifier(column_name)

if column_type.this == exp.DataType.Type.TIMESTAMPTZ:
# We need to cast the timestamptz to a timestamp without time zone that is
# compatible with the hive connector
column_type = exp.DataType(this=exp.DataType.Type.TIMESTAMP, nested=False)
column_select = exp.Cast(
this=exp.Anonymous(
this="at_timezone",
expressions=[
exp.to_identifier(column_name),
exp.Literal(this="UTC", is_string=True),
],
),
to=column_type,
)
elif column_type.this == exp.DataType.Type.TIMESTAMP:
column_type = exp.DataType(this=exp.DataType.Type.TIMESTAMP, nested=False)
column_select = exp.Cast(
this=exp.to_identifier(column_name),
to=column_type,
)
return (
column_identifier,
exp.ColumnDef(this=column_identifier, kind=column_type),
column_select,
)

async def clean_export_table(self, table: str):
pass

Expand Down Expand Up @@ -252,24 +297,47 @@ async def stop(self):

async def export_queue_loop(self):
in_progress: t.Set[str] = set()
errors: t.Dict[str, t.List[Exception]] = {}

async def export_table(table: str, execution_time: datetime):
async def export_table(table: str, execution_time: datetime) -> ExportReference:
try:
return await self._export_table_for_cache(table, execution_time)
except Exception as e:
self.logger.error(f"Error exporting table {table}: {e}")
in_progress.remove(table)
raise ExportError(table, e)

while not self.stop_signal.is_set():
try:
item = await asyncio.wait_for(self.export_queue.get(), timeout=1)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
continue
except RuntimeError:
break
if item.table in in_progress:
# The table is already being exported. Skip this in the queue
continue
if item.table in errors:
# The table has already errored. Skip this in the queue
continue

in_progress.add(item.table)
export_reference = await export_table(item.table, item.execution_time)
try:
export_reference = await export_table(item.table, item.execution_time)
except ExportError as e:
table = e.table
error = e.error
self.logger.error(f"Error exporting table {table}: {error}")

# Save the error for later
table_errors = errors.get(table, [])
table_errors.append(error)
errors[table] = table_errors

in_progress.remove(table)
self.event_emitter.emit("exported_table", table=table, error=error)
self.export_queue.task_done()
continue
self.event_emitter.emit(
"exported_table", table=item.table, export_reference=export_reference
)
Expand Down Expand Up @@ -333,8 +401,20 @@ async def resolve_export_references(
self.logger.info(f"unknown tables to export: {tables_to_export}")

async def handle_exported_table(
*, table: str, export_reference: ExportReference
*,
table: str,
export_reference: t.Optional[ExportReference] = None,
error: t.Optional[Exception] = None,
):
if not export_reference and not error:
raise RuntimeError("export_reference or error must be provided")

# If there was an error send it back to the listener
if not export_reference:
assert error is not None
future.set_exception(error)
return

self.logger.info(f"exported table ready: {table} -> {export_reference}")
if table in tables_to_export:
tables_to_export.remove(table)
Expand Down
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/compute/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def make_new_cluster_with_defaults():
return make_new_cluster(
image=f"{constants.cluster_image_repo}:{constants.cluster_image_tag}",
cluster_id=constants.cluster_name,
service_account_name=constants.cluster_namespace,
service_account_name=constants.cluster_service_account,
threads=constants.worker_threads,
scheduler_memory_limit=constants.scheduler_memory_limit,
scheduler_memory_request=constants.scheduler_memory_request,
Expand Down
Loading
Loading