Skip to content

Commit

Permalink
Some manual loading fixes (#2437)
Browse files Browse the repository at this point in the history
* fixes

* speed update attempts

* Just use duckdb

* cut batch_size

* adds sqlmesh worker with more disk

* go big
  • Loading branch information
ravenac95 authored Oct 31, 2024
1 parent 461f5ab commit 5afdab5
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 64 deletions.
36 changes: 36 additions & 0 deletions ops/tf-modules/warehouse-cluster/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ locals {
preemptible = false
initial_node_count = 0
},
# SQLMesh Workers
{
name = "${var.cluster_name}-sqlmesh-worker-node-pool"
machine_type = "n2-highmem-32"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 10
local_ssd_count = 0
local_ssd_ephemeral_storage_count = 2
spot = false
disk_size_gb = 100
disk_type = "pd-standard"
image_type = "COS_CONTAINERD"
enable_gcfs = false
enable_gvnic = false
logging_variant = "DEFAULT"
auto_repair = true
auto_upgrade = true
service_account = local.node_service_account_email
preemptible = false
initial_node_count = 0
},

], var.extra_node_pools)

Expand Down Expand Up @@ -160,6 +182,10 @@ locals {
default_node_pool = false
pool_type = "trino-coordinator"
}
"${var.cluster_name}-sqlmesh-worker-node-pool" = {
default_node_pool = false
pool_type = "sqlmesh-worker"
}
}, var.extra_node_labels)

node_pool_metadata = merge({
Expand Down Expand Up @@ -204,6 +230,13 @@ locals {
effect = "NO_SCHEDULE"
},
]
"${var.cluster_name}-sqlmesh-worker-node-pool" = [
{
key = "pool_type"
value = "sqlmesh-worker"
effect = "NO_SCHEDULE"
},
]
}, var.extra_node_taints)

node_pool_tags = merge({
Expand All @@ -225,6 +258,9 @@ locals {
"${var.cluster_name}-trino-coordinator-pool" = [
"trino-coordinator",
]
"${var.cluster_name}-sqlmesh-worker-pool" = [
"sqlmesh-worker",
]
}, var.extra_node_tags)

node_pool_oauth_scopes = merge({
Expand Down
2 changes: 1 addition & 1 deletion warehouse/metrics_mesh/models/events_daily_to_artifact.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ MODEL (
name metrics.events_daily_to_artifact,
kind INCREMENTAL_BY_TIME_RANGE (
time_column bucket_day,
batch_size 90,
batch_size 45,
batch_concurrency 1
),
start '2015-01-01',
Expand Down
233 changes: 172 additions & 61 deletions warehouse/metrics_tools/hack/load_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,147 @@
import os
import click
import duckdb
import queue
from dataclasses import dataclass
from boltons import fileutils
from google.cloud import bigquery, storage
from oso_dagster.utils.bq import export_to_gcs, BigQueryTableConfig


def download_parquet_files_from_gcs(
@dataclass(kw_only=True)
class Work:
bucket_name: str
blob_path: str
destination_path: str
table_name: str


@dataclass(kw_only=True)
class ParquetLoaded:
path: str
table_name: str


class Cancel(Exception):
pass


class Noop:
pass


def get_or_cancel[T](queue: queue.Queue, expected_type: t.Type[T]) -> T:
item = queue.get()
if isinstance(item, expected_type):
return t.cast(T, item)
raise Cancel("cancel queue")


def download_parquet_from_gcs(
work_queue: queue.Queue,
result_queue: queue.Queue,
):
print("starting parquet worker")
client = storage.Client()
while True:
try:
work = get_or_cancel(work_queue, Work)
except Cancel:
break
print(f"starting download to {work.destination_path}")
# Download
with open(work.destination_path, "wb") as file_obj:
blob_uri = os.path.join("gs://", work.bucket_name, work.blob_path)
client.download_blob_to_file(
blob_uri,
file_obj,
)
print(f"downloaded: {blob_uri} to {work.destination_path}")
result_queue.put(
ParquetLoaded(
path=work.destination_path,
table_name=work.table_name,
)
)
# Enqueue the result
work_queue.task_done()
print("download worker done")


def load_into_duckdb(conn: duckdb.DuckDBPyConnection, queue: queue.Queue):
already_created: t.Dict[str, bool] = {}

print("starting duckdb worker")
while True:
try:
loaded = get_or_cancel(queue, ParquetLoaded)
except Cancel:
break
print("hi")
# Load the file into duckdb
if loaded.table_name not in already_created:
already_created[loaded.table_name] = True
conn.execute(
f"CREATE TABLE {loaded.table_name} AS SELECT * FROM read_parquet('{loaded.path}');"
)
print(f"Loaded {loaded.path} into new DuckDB table {loaded.table_name}")
else:
conn.execute(
f"INSERT INTO {loaded.table_name} SELECT * FROM read_parquet('{loaded.path}');"
)
print(f"Appended {loaded.path} to DuckDB table {loaded.table_name}")
# Delete the file
os.remove(loaded.path)
queue.task_done()
print("duckdb done")


def get_parquet_work_list(
gcs_client: storage.Client,
bucket_name: str,
prefix: str,
local_folder: str,
):
bucket = gcs_client.bucket(bucket_name)
table_name: str,
) -> t.List[Work]:
bucket = gcs_client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)

work_list: t.List[Work] = []

for blob in blobs:
if blob.name.endswith(".parquet"):
local_file_path = os.path.join(local_folder, os.path.basename(blob.name))
blob.download_to_filename(local_file_path)
print(f"Downloaded {blob.name} to {local_file_path}")
work = Work(
bucket_name=bucket_name,
blob_path=blob.name,
destination_path=local_file_path,
table_name=table_name,
)
work_list.append(work)
return work_list


def load_parquet_to_duckdb(
con: duckdb.DuckDBPyConnection, parquet_folder: str, table_name: str
):
parquet_files = [f for f in os.listdir(parquet_folder) if f.endswith(".parquet")]
def bq_to_duckdb(table_mapping: t.Dict[str, str], conn: duckdb.DuckDBPyConnection):
"""Copies the tables in table_mapping to tables in duckdb
table_exists = False
for parquet_file in parquet_files:
file_path = os.path.join(parquet_folder, parquet_file)
The table_mapping is in the form { "bigquery_table_fqn": "duckdb_table_fqn" }
"""
bqclient = bigquery.Client()

if table_exists:
# If table exists, append the data
con.execute(
f"INSERT INTO {table_name} SELECT * FROM read_parquet('{file_path}');"
)
print(f"Appended {parquet_file} to DuckDB table {table_name}")
else:
# If table does not exist, create it with the Parquet data
con.execute(
f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}');"
)
print(f"Loaded {parquet_file} into new DuckDB table {table_name}")
table_exists = True
conn.sql("CREATE SCHEMA IF NOT EXISTS sources;")

for bq_table, duckdb_table in table_mapping.items():
table = bigquery.TableReference.from_string(bq_table)
rows = bqclient.list_rows(table)

con.close()
table_as_arrow = rows.to_arrow(create_bqstorage_client=True) # noqa: F841

conn.sql(
f"""
CREATE TABLE IF NOT EXISTS {duckdb_table} AS
SELECT * FROM table_as_arrow
"""
)


class ExporterLoader:
Expand All @@ -72,58 +169,70 @@ def __init__(
def gcs_path(self):
return os.path.join("gs://", self._gcs_bucket_name, self._gcs_bucket_path)

def run(self, tables: t.List[str]):
def run(self, tables: t.List[str], resume: bool = False):
self._db_conn.execute("SET memory_limit = '64GB';")
self._db_conn.execute("CREATE SCHEMA IF NOT EXISTS sources;")
self._db_conn.execute(
"""
CREATE SECRET IF NOT EXISTS secret1 (
TYPE GCS,
PROVIDER CREDENTIAL_CHAIN
);
"""
)

for table in tables:
self._export_and_load(table)
print(f"loading work for {table}")
prefix = self._export_and_load(table, resume)
print(f"creating table from {prefix}/*.parquet")

self._db_conn.sql(
f"""
CREATE TABLE sources.{table} AS
SELECT *
FROM read_parquet('{prefix}/*.parquet');
"""
)

def make_download_path(self, table_name: str):
return os.path.join(self._download_path, self._version, table_name)
download_path = os.path.join(self._download_path, self._version, table_name)
fileutils.mkdir_p(download_path)
return download_path

def _export_and_load(self, table: str):
events_gcs_path = os.path.join(
def _export_and_load(self, table: str, resume: bool = False):
gcs_path_uri = os.path.join(
self.gcs_path,
self._version,
"timeseries_events_by_artifact_v0",
)
export_to_gcs(
self._bq_client,
BigQueryTableConfig(
project_id="opensource-observer",
dataset_name="oso",
service_account=None,
table_name=table,
),
gcs_path=events_gcs_path,
table,
)
if not resume:
export_to_gcs(
self._bq_client,
BigQueryTableConfig(
project_id="opensource-observer",
dataset_name="oso",
service_account=None,
table_name=table,
),
gcs_path=gcs_path_uri,
)
print("gcs exported")

download_path = self.make_download_path(table)

# Download the gcs stuff to a local working directory
prefix = os.path.join(self._gcs_bucket_path, self._version)
download_parquet_files_from_gcs(
self._gcs_client,
self._gcs_bucket_name,
prefix,
download_path,
)
# download_path = self.make_download_path(table)

# Load the data into duckdb
load_parquet_to_duckdb(
self._db_conn,
download_path,
f"sources.{table}",
)

# Delete the downloaded files
os.rmdir(download_path)
# get_parquet_work_list(
# self._gcs_client, self._gcs_bucket_name, prefix, download_path, table
# )
return gcs_path_uri


@click.command()
@click.option("--db-path", envvar="DB_PATH", required=True)
@click.option("--gcs-bucket-name", envvar="GCS_BUCKET", required=True)
@click.option("--gcs-bucket-path", envvar="GCS_BUCKET_PATH", required=True)
@click.option("--download-path", envvar="DOWNLOAD_PATH", required=True)
@click.option("--resume/--no-resume", default=False)
@click.option(
"--version",
envvar="VERSION",
Expand All @@ -135,6 +244,7 @@ def main(
gcs_bucket_name: str,
gcs_bucket_path: str,
download_path: str,
resume: bool,
version: str,
):
duckdb_conn = duckdb.connect(db_path)
Expand All @@ -156,7 +266,8 @@ def main(
"timeseries_events_by_artifact_v0",
"artifacts_by_project_v1",
"projects_by_collection_v1",
]
],
resume=resume,
)


Expand Down
6 changes: 4 additions & 2 deletions warehouse/metrics_tools/hack/run_metrics.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
version=$1

mkdir -p /scratch/duckdb
mkdir -p /scratch/downloads
mkdir -p /scratch/downloads/${version}

apt-get install -y vim tmux

# Download everything to duckdb
python load_sources.py \
python3 load_sources.py \
--gcs-bucket-name oso-dataset-transfer-bucket \
--gcs-bucket-path metrics-backstop \
--db-path /scratch/duckdb/metrics.db \
Expand Down

0 comments on commit 5afdab5

Please sign in to comment.