Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect Trino to Bigquery #2640

Merged
merged 7 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/scripts/publish-docker-containers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ for path in $images_to_build; do
latest_image="${image_repo}:latest"


echo "Building ${image_name} plugin"
echo "Building ${image_name} image"
docker build \
-t ${sha_image} \
-t ${latest_image} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ FROM ghcr.io/opensource-observer/oso-base:${REPO_SHA}
ARG PYTHON_VERSION=3.12

RUN apt-get update \
&& apt-get install -y vim curl git htop postgresql-client && \
&& apt-get install -y vim curl git htop tmux postgresql-client && \
curl -sL https://deb.nodesource.com/setup_20.x -o nodesource_setup.sh && \
bash nodesource_setup.sh && \
mkdir -p /usr/src/app && \
apt-get install nodejs && \
npm install -g pnpm

COPY ./docker/images/oso-debug/build.sh /build.sh
RUN chmod +x /build.sh

# A docker image for debugging
ENTRYPOINT ["/bin/bash"]
ENTRYPOINT ["/bin/bash"]
CMD ["/build.sh"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

set -euxo pipefail

mkdir -p /usr/src/app
cd /usr/src/app

curl -sL https://deb.nodesource.com/setup_20.x -o nodesource_setup.sh
bash nodesource_setup.sh

git clone https://github.com/opensource-observer/oso.git
cd oso

Expand Down
2 changes: 1 addition & 1 deletion ops/clusters/warehouse/flux-system/gotk-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
name: flux-system
namespace: flux-system
spec:
interval: 15s
interval: 60s
ref:
branch: main
url: https://github.com/opensource-observer/oso.git
Expand Down
4 changes: 2 additions & 2 deletions ops/helm-charts/metrics-calculation-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ mcs:
uvicorn:
level: "INFO"
image:
repo: ghcr.io/opensource-observer/dagster-dask
repo: ghcr.io/opensource-observer/oso
tag: latest
cluster:
namespace: "default"
name: "default"
image:
repo: "ghcr.io/opensource-observer/dagster-dask"
repo: "ghcr.io/opensource-observer/oso"
tag: "latest"
scheduler:
memory:
Expand Down
49 changes: 21 additions & 28 deletions ops/k8s-apps/base/trino/trino.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ spec:
remediation:
retries: 3
values:
additionalConfigProperties:
- retry-policy=QUERY
additionalExchangeManagerProperties:
- "exchange.sink-buffers-per-partition=6"
- "exchange.sink-buffer-pool-min-size=6"
- "exchange.source-concurrent-readers=6"
- "exchange.s3.region=us"
- "exchange.s3.aws-access-key=${ENV:TRINO_GCS_KEY_ID}"
- "exchange.s3.aws-secret-key=${ENV:TRINO_GCS_SECRET}"
- "exchange.s3.endpoint=https://storage.googleapis.com"
serviceAccount:
create: true
name: base-trino
Expand All @@ -46,17 +56,8 @@ spec:
mountPath: /metrics-cache
jvm:
maxHeapSize: "17G"
resources:
requests:
cpu: 2000m
memory: 17000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-coordinator
effect: NoSchedule
nodeSelector:
pool_type: trino-coordinator
additionalJVMConfig:
- "--add-opens=java.base/java.nio=ALL-UNNAMED"

worker:
additionalVolumes:
Expand All @@ -68,29 +69,23 @@ spec:

config:
query:
maxMemoryPerNode: 15GB
maxMemoryPerNode: 140GB
jvm:
maxHeapSize: "70G"
resources:
requests:
cpu: 15000m
memory: 71000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-worker
effect: NoSchedule
nodeSelector:
pool_type: trino-worker
maxHeapSize: "350G"
additionalJVMConfig:
- "--add-opens=java.base/java.nio=ALL-UNNAMED"

server:
exchangeManager:
name: filesystem
baseDir: gs://oso-dataset-transfer-bucket/trino-exchange/
config:
query:
maxMemory: "500GB"
maxMemory: "1400GB"
workers: 1
autoscaling:
enabled: true
maxReplicas: 12
maxReplicas: 9
targetCPUUtilizationPercentage: 20
behavior:
scaleDown:
Expand Down Expand Up @@ -125,14 +120,12 @@ spec:
fs.cache.directories=/metrics-cache
gcs.project-id=opensource-observer
iceberg.max-partitions-per-writer=1000
# gcs.use-access-token=true
source: |
connector.name=hive
hive.metastore.uri=thrift://10.145.192.27:9083
fs.native-gcs.enabled=true
gcs.project-id=opensource-observer
hive.non-managed-table-writes-enabled=true
# gcs.use-access-token=true
bigquery: |
connector.name=bigquery
bigquery.project-id=opensource-observer
31 changes: 30 additions & 1 deletion ops/k8s-apps/production/trino/custom-helm-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,34 @@ metadata:
name: production-trino
spec:
values:
env:
- name: TRINO_GCS_KEY_ID
value: gcp:secretmanager:production-mcs-gcs-key-id/versions/latest
- name: TRINO_GCS_SECRET
value: gcp:secretmanager:production-mcs-gcs-secret/versions/latest
serviceAccount:
name: production-trino
name: production-trino
coordinator:
resources:
requests:
cpu: 7500m
memory: 46000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-coordinator
effect: NoSchedule
nodeSelector:
pool_type: trino-coordinator
worker:
resources:
requests:
cpu: 63000m
memory: 390000Mi
tolerations:
- key: pool_type
operator: Equal
value: trino-worker
effect: NoSchedule
nodeSelector:
pool_type: trino-worker
57 changes: 47 additions & 10 deletions ops/tf-modules/warehouse-cluster/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ locals {
# TRINO COORIDNATOR POOL
{
name = "${var.cluster_name}-trino-coordinator-node-pool"
machine_type = "n1-highmem-4"
machine_type = "n1-highmem-8"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 1
Expand All @@ -113,7 +113,7 @@ locals {
# Trino worker pool
{
name = "${var.cluster_name}-trino-worker-node-pool"
machine_type = "n1-highmem-16"
machine_type = "n1-highmem-64"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 10
Expand All @@ -132,9 +132,32 @@ locals {
preemptible = false
initial_node_count = 0
},
# SQLMesh Workers
# MCS (Metrics Calculation Service) scheduler
{
name = "${var.cluster_name}-sqlmesh-worker-node-pool"
name = "${var.cluster_name}-mcs-scheduler-node-pool"
machine_type = "n1-highmem-4"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 4
local_ssd_count = 0
local_ssd_ephemeral_storage_count = 0
spot = false
disk_size_gb = 100
disk_type = "pd-standard"
image_type = "COS_CONTAINERD"
enable_gcfs = false
enable_gvnic = false
logging_variant = "DEFAULT"
auto_repair = true
auto_upgrade = true
service_account = local.node_service_account_email
preemptible = false
initial_node_count = 0
},

# MCS Workers
{
name = "${var.cluster_name}-mcs-worker-node-pool"
machine_type = "n1-highmem-16"
node_locations = join(",", var.cluster_zones)
min_count = 0
Expand Down Expand Up @@ -182,9 +205,13 @@ locals {
default_node_pool = false
pool_type = "trino-coordinator"
}
"${var.cluster_name}-sqlmesh-worker-node-pool" = {
"${var.cluster_name}-mcs-scheduler-node-pool" = {
default_node_pool = false
pool_type = "mcs-scheduler"
}
"${var.cluster_name}-mcs-worker-node-pool" = {
default_node_pool = false
pool_type = "sqlmesh-worker"
pool_type = "mcs-worker"
}
}, var.extra_node_labels)

Expand Down Expand Up @@ -230,10 +257,17 @@ locals {
effect = "NO_SCHEDULE"
},
]
"${var.cluster_name}-sqlmesh-worker-node-pool" = [
"${var.cluster_name}-mcs-scheduler-node-pool" = [
{
key = "pool_type"
value = "sqlmesh-worker"
value = "mcs-scheduler"
effect = "NO_SCHEDULE"
},
]
"${var.cluster_name}-mcs-worker-node-pool" = [
{
key = "pool_type"
value = "mcs-worker"
effect = "NO_SCHEDULE"
},
]
Expand All @@ -258,8 +292,11 @@ locals {
"${var.cluster_name}-trino-coordinator-pool" = [
"trino-coordinator",
]
"${var.cluster_name}-sqlmesh-worker-pool" = [
"sqlmesh-worker",
"${var.cluster_name}-mcs-scheduler-pool" = [
"mcs-scheduler",
]
"${var.cluster_name}-mcs-worker-pool" = [
"mcs-worker",
]
}, var.extra_node_tags)

Expand Down
44 changes: 4 additions & 40 deletions warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,21 @@
ModelDefaultsConfig,
)
from sqlmesh.core.config.connection import (
ClickhouseConnectionConfig,
GCPPostgresConnectionConfig,
TrinoConnectionConfig,
)

dotenv.load_dotenv()


def pool_manager_factory(config: ClickhouseConnectionConfig):
from clickhouse_connect.driver import httputil

return httputil.get_pool_manager(
num_pools=config.concurrent_tasks,
max_size=config.concurrent_tasks,
)


config = Config(
model_defaults=ModelDefaultsConfig(dialect="clickhouse", start="2024-08-01"),
model_defaults=ModelDefaultsConfig(dialect="duckdb", start="2024-08-01"),
gateways={
"local": GatewayConfig(
connection=DuckDBConnectionConfig(
database=os.environ.get("SQLMESH_DUCKDB_LOCAL_PATH")
),
variables={
"oso_source": "sources",
"oso_source_db": "sources",
},
),
"trino": GatewayConfig(
Expand All @@ -56,35 +45,10 @@ def pool_manager_factory(config: ClickhouseConnectionConfig):
db=os.environ.get("SQLMESH_POSTGRES_DB", ""),
),
variables={
"oso_source": "default",
"oso_source_db": "oso",
"oso_source_catalog": "bigquery",
},
),
"clickhouse": GatewayConfig(
connection=ClickhouseConnectionConfig(
host=os.environ.get("SQLMESH_CLICKHOUSE_HOST", ""),
username=os.environ.get("SQLMESH_CLICKHOUSE_USERNAME", ""),
password=os.environ.get("SQLMESH_CLICKHOUSE_PASSWORD", ""),
port=int(os.environ.get("SQLMESH_CLICKHOUSE_PORT", "443")),
concurrent_tasks=int(
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "16")
),
send_receive_timeout=1800,
# connection_settings={"allow_nondeterministic_mutations": 1},
connection_pool_options={
"maxsize": 24,
"retries": 0,
},
),
state_connection=GCPPostgresConnectionConfig(
instance_connection_string=os.environ.get(
"SQLMESH_POSTGRES_INSTANCE_CONNECTION_STRING", ""
),
user=os.environ.get("SQLMESH_POSTGRES_USER", ""),
password=os.environ.get("SQLMESH_POSTGRES_PASSWORD", "placeholder"),
db=os.environ.get("SQLMESH_POSTGRES_DB", ""),
),
variables={"oso_source": "default"},
),
},
default_gateway="local",
variables={"fulltime_dev_days": 10, "activity_window": 30},
Expand Down
11 changes: 9 additions & 2 deletions warehouse/metrics_mesh/macros/oso_id.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator
from sqlglot import expressions as exp


@macro()
def oso_id(evaluator: MacroEvaluator, *args: exp.Expression):
if evaluator.runtime_stage in ["loading", "creating"]:
return exp.Literal(this="someid", is_string=True)
concatenated = exp.Concat(expressions=args, safe=True, coalesce=False)
if evaluator.engine_adapter.dialect == "trino":
# Trino's SHA256 function only accepts type `varbinary`. So we convert
# the varchar to varbinary with trino's to_utf8.
concatenated = exp.Anonymous(this="to_utf8", expressions=[concatenated])
sha = exp.SHA2(
this=exp.Concat(expressions=args, safe=True, coalesce=False),
this=concatenated,
length=exp.Literal(this=256, is_string=False),
)
if evaluator.runtime_stage in ["loading", "creating"]:
Expand Down
Loading
Loading