Skip to content

Commit

Permalink
Connect Trino to Bigquery (#2640)
Browse files Browse the repository at this point in the history
* Adds additional jvm options for bigquery

* Remove clickhouse on sqlmesh and various deployment fixes

* fixes for trino + bigquery

* reset to main

* remove use of clickhouse dialect

* ops fixes before deployment

* Update warehouse/metrics_mesh/models/projects_by_collection_v1.sql
  • Loading branch information
ravenac95 authored Dec 14, 2024
1 parent fa9dac4 commit 497e92f
Show file tree
Hide file tree
Showing 29 changed files with 307 additions and 182 deletions.
2 changes: 1 addition & 1 deletion .github/scripts/publish-docker-containers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ for path in $images_to_build; do
latest_image="${image_repo}:latest"


echo "Building ${image_name} plugin"
echo "Building ${image_name} image"
docker build \
-t ${sha_image} \
-t ${latest_image} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ FROM ghcr.io/opensource-observer/oso-base:${REPO_SHA}
ARG PYTHON_VERSION=3.12

RUN apt-get update \
&& apt-get install -y vim curl git htop postgresql-client && \
&& apt-get install -y vim curl git htop tmux postgresql-client && \
curl -sL https://deb.nodesource.com/setup_20.x -o nodesource_setup.sh && \
bash nodesource_setup.sh && \
mkdir -p /usr/src/app && \
apt-get install nodejs && \
npm install -g pnpm

COPY ./docker/images/oso-debug/build.sh /build.sh
RUN chmod +x /build.sh

# A docker image for debugging
ENTRYPOINT ["/bin/bash"]
ENTRYPOINT ["/bin/bash"]
CMD ["/build.sh"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

set -euxo pipefail

mkdir -p /usr/src/app
cd /usr/src/app

curl -sL https://deb.nodesource.com/setup_20.x -o nodesource_setup.sh
bash nodesource_setup.sh

git clone https://github.com/opensource-observer/oso.git
cd oso

Expand Down
2 changes: 1 addition & 1 deletion ops/clusters/warehouse/flux-system/gotk-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
name: flux-system
namespace: flux-system
spec:
interval: 15s
interval: 60s
ref:
branch: main
url: https://github.com/opensource-observer/oso.git
Expand Down
4 changes: 2 additions & 2 deletions ops/helm-charts/metrics-calculation-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ mcs:
uvicorn:
level: "INFO"
image:
repo: ghcr.io/opensource-observer/dagster-dask
repo: ghcr.io/opensource-observer/oso
tag: latest
cluster:
namespace: "default"
name: "default"
image:
repo: "ghcr.io/opensource-observer/dagster-dask"
repo: "ghcr.io/opensource-observer/oso"
tag: "latest"
scheduler:
memory:
Expand Down
49 changes: 21 additions & 28 deletions ops/k8s-apps/base/trino/trino.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ spec:
remediation:
retries: 3
values:
additionalConfigProperties:
- retry-policy=QUERY
additionalExchangeManagerProperties:
- "exchange.sink-buffers-per-partition=6"
- "exchange.sink-buffer-pool-min-size=6"
- "exchange.source-concurrent-readers=6"
- "exchange.s3.region=us"
- "exchange.s3.aws-access-key=${ENV:TRINO_GCS_KEY_ID}"
- "exchange.s3.aws-secret-key=${ENV:TRINO_GCS_SECRET}"
- "exchange.s3.endpoint=https://storage.googleapis.com"
serviceAccount:
create: true
name: base-trino
Expand All @@ -46,17 +56,8 @@ spec:
mountPath: /metrics-cache
jvm:
maxHeapSize: "17G"
resources:
requests:
cpu: 2000m
memory: 17000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-coordinator
effect: NoSchedule
nodeSelector:
pool_type: trino-coordinator
additionalJVMConfig:
- "--add-opens=java.base/java.nio=ALL-UNNAMED"

worker:
additionalVolumes:
Expand All @@ -68,29 +69,23 @@ spec:

config:
query:
maxMemoryPerNode: 15GB
maxMemoryPerNode: 140GB
jvm:
maxHeapSize: "70G"
resources:
requests:
cpu: 15000m
memory: 71000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-worker
effect: NoSchedule
nodeSelector:
pool_type: trino-worker
maxHeapSize: "350G"
additionalJVMConfig:
- "--add-opens=java.base/java.nio=ALL-UNNAMED"

server:
exchangeManager:
name: filesystem
baseDir: gs://oso-dataset-transfer-bucket/trino-exchange/
config:
query:
maxMemory: "500GB"
maxMemory: "1400GB"
workers: 1
autoscaling:
enabled: true
maxReplicas: 12
maxReplicas: 9
targetCPUUtilizationPercentage: 20
behavior:
scaleDown:
Expand Down Expand Up @@ -125,14 +120,12 @@ spec:
fs.cache.directories=/metrics-cache
gcs.project-id=opensource-observer
iceberg.max-partitions-per-writer=1000
# gcs.use-access-token=true
source: |
connector.name=hive
hive.metastore.uri=thrift://10.145.192.27:9083
fs.native-gcs.enabled=true
gcs.project-id=opensource-observer
hive.non-managed-table-writes-enabled=true
# gcs.use-access-token=true
bigquery: |
connector.name=bigquery
bigquery.project-id=opensource-observer
31 changes: 30 additions & 1 deletion ops/k8s-apps/production/trino/custom-helm-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,34 @@ metadata:
name: production-trino
spec:
values:
env:
- name: TRINO_GCS_KEY_ID
value: gcp:secretmanager:production-mcs-gcs-key-id/versions/latest
- name: TRINO_GCS_SECRET
value: gcp:secretmanager:production-mcs-gcs-secret/versions/latest
serviceAccount:
name: production-trino
name: production-trino
coordinator:
resources:
requests:
cpu: 7500m
memory: 46000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-coordinator
effect: NoSchedule
nodeSelector:
pool_type: trino-coordinator
worker:
resources:
requests:
cpu: 63000m
memory: 390000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-worker
effect: NoSchedule
nodeSelector:
pool_type: trino-worker
57 changes: 47 additions & 10 deletions ops/tf-modules/warehouse-cluster/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ locals {
# TRINO COORIDNATOR POOL
{
name = "${var.cluster_name}-trino-coordinator-node-pool"
machine_type = "n1-highmem-4"
machine_type = "n1-highmem-8"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 1
Expand All @@ -113,7 +113,7 @@ locals {
# Trino worker pool
{
name = "${var.cluster_name}-trino-worker-node-pool"
machine_type = "n1-highmem-16"
machine_type = "n1-highmem-64"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 10
Expand All @@ -132,9 +132,32 @@ locals {
preemptible = false
initial_node_count = 0
},
# SQLMesh Workers
# MCS (Metrics Calculation Service) scheduler
{
name = "${var.cluster_name}-sqlmesh-worker-node-pool"
name = "${var.cluster_name}-mcs-scheduler-node-pool"
machine_type = "n1-highmem-4"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 4
local_ssd_count = 0
local_ssd_ephemeral_storage_count = 0
spot = false
disk_size_gb = 100
disk_type = "pd-standard"
image_type = "COS_CONTAINERD"
enable_gcfs = false
enable_gvnic = false
logging_variant = "DEFAULT"
auto_repair = true
auto_upgrade = true
service_account = local.node_service_account_email
preemptible = false
initial_node_count = 0
},

# MCS Workers
{
name = "${var.cluster_name}-mcs-worker-node-pool"
machine_type = "n1-highmem-16"
node_locations = join(",", var.cluster_zones)
min_count = 0
Expand Down Expand Up @@ -182,9 +205,13 @@ locals {
default_node_pool = false
pool_type = "trino-coordinator"
}
"${var.cluster_name}-sqlmesh-worker-node-pool" = {
"${var.cluster_name}-mcs-scheduler-node-pool" = {
default_node_pool = false
pool_type = "mcs-scheduler"
}
"${var.cluster_name}-mcs-worker-node-pool" = {
default_node_pool = false
pool_type = "sqlmesh-worker"
pool_type = "mcs-worker"
}
}, var.extra_node_labels)

Expand Down Expand Up @@ -230,10 +257,17 @@ locals {
effect = "NO_SCHEDULE"
},
]
"${var.cluster_name}-sqlmesh-worker-node-pool" = [
"${var.cluster_name}-mcs-scheduler-node-pool" = [
{
key = "pool_type"
value = "sqlmesh-worker"
value = "mcs-scheduler"
effect = "NO_SCHEDULE"
},
]
"${var.cluster_name}-mcs-worker-node-pool" = [
{
key = "pool_type"
value = "mcs-worker"
effect = "NO_SCHEDULE"
},
]
Expand All @@ -258,8 +292,11 @@ locals {
"${var.cluster_name}-trino-coordinator-pool" = [
"trino-coordinator",
]
"${var.cluster_name}-sqlmesh-worker-pool" = [
"sqlmesh-worker",
"${var.cluster_name}-mcs-scheduler-pool" = [
"mcs-scheduler",
]
"${var.cluster_name}-mcs-worker-pool" = [
"mcs-worker",
]
}, var.extra_node_tags)

Expand Down
44 changes: 4 additions & 40 deletions warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,21 @@
ModelDefaultsConfig,
)
from sqlmesh.core.config.connection import (
ClickhouseConnectionConfig,
GCPPostgresConnectionConfig,
TrinoConnectionConfig,
)

dotenv.load_dotenv()


def pool_manager_factory(config: ClickhouseConnectionConfig):
from clickhouse_connect.driver import httputil

return httputil.get_pool_manager(
num_pools=config.concurrent_tasks,
max_size=config.concurrent_tasks,
)


config = Config(
model_defaults=ModelDefaultsConfig(dialect="clickhouse", start="2024-08-01"),
model_defaults=ModelDefaultsConfig(dialect="duckdb", start="2024-08-01"),
gateways={
"local": GatewayConfig(
connection=DuckDBConnectionConfig(
database=os.environ.get("SQLMESH_DUCKDB_LOCAL_PATH")
),
variables={
"oso_source": "sources",
"oso_source_db": "sources",
},
),
"trino": GatewayConfig(
Expand All @@ -56,35 +45,10 @@ def pool_manager_factory(config: ClickhouseConnectionConfig):
db=os.environ.get("SQLMESH_POSTGRES_DB", ""),
),
variables={
"oso_source": "default",
"oso_source_db": "oso",
"oso_source_catalog": "bigquery",
},
),
"clickhouse": GatewayConfig(
connection=ClickhouseConnectionConfig(
host=os.environ.get("SQLMESH_CLICKHOUSE_HOST", ""),
username=os.environ.get("SQLMESH_CLICKHOUSE_USERNAME", ""),
password=os.environ.get("SQLMESH_CLICKHOUSE_PASSWORD", ""),
port=int(os.environ.get("SQLMESH_CLICKHOUSE_PORT", "443")),
concurrent_tasks=int(
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "16")
),
send_receive_timeout=1800,
# connection_settings={"allow_nondeterministic_mutations": 1},
connection_pool_options={
"maxsize": 24,
"retries": 0,
},
),
state_connection=GCPPostgresConnectionConfig(
instance_connection_string=os.environ.get(
"SQLMESH_POSTGRES_INSTANCE_CONNECTION_STRING", ""
),
user=os.environ.get("SQLMESH_POSTGRES_USER", ""),
password=os.environ.get("SQLMESH_POSTGRES_PASSWORD", "placeholder"),
db=os.environ.get("SQLMESH_POSTGRES_DB", ""),
),
variables={"oso_source": "default"},
),
},
default_gateway="local",
variables={"fulltime_dev_days": 10, "activity_window": 30},
Expand Down
11 changes: 9 additions & 2 deletions warehouse/metrics_mesh/macros/oso_id.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator
from sqlglot import expressions as exp


@macro()
def oso_id(evaluator: MacroEvaluator, *args: exp.Expression):
if evaluator.runtime_stage in ["loading", "creating"]:
return exp.Literal(this="someid", is_string=True)
concatenated = exp.Concat(expressions=args, safe=True, coalesce=False)
if evaluator.engine_adapter.dialect == "trino":
# Trino's SHA256 function only accepts type `varbinary`. So we convert
# the varchar to varbinary with trino's to_utf8.
concatenated = exp.Anonymous(this="to_utf8", expressions=[concatenated])
sha = exp.SHA2(
this=exp.Concat(expressions=args, safe=True, coalesce=False),
this=concatenated,
length=exp.Literal(this=256, is_string=False),
)
if evaluator.runtime_stage in ["loading", "creating"]:
Expand Down
Loading

0 comments on commit 497e92f

Please sign in to comment.