diff --git a/ops/tf-modules/warehouse-cluster/main.tf b/ops/tf-modules/warehouse-cluster/main.tf index c23fa0eca..0b8248dd6 100644 --- a/ops/tf-modules/warehouse-cluster/main.tf +++ b/ops/tf-modules/warehouse-cluster/main.tf @@ -132,6 +132,28 @@ locals { preemptible = false initial_node_count = 0 }, + # SQLMesh Workers + { + name = "${var.cluster_name}-sqlmesh-worker-node-pool" + machine_type = "n2-highmem-32" + node_locations = join(",", var.cluster_zones) + min_count = 0 + max_count = 10 + local_ssd_count = 0 + local_ssd_ephemeral_storage_count = 2 + spot = false + disk_size_gb = 100 + disk_type = "pd-standard" + image_type = "COS_CONTAINERD" + enable_gcfs = false + enable_gvnic = false + logging_variant = "DEFAULT" + auto_repair = true + auto_upgrade = true + service_account = local.node_service_account_email + preemptible = false + initial_node_count = 0 + }, ], var.extra_node_pools) @@ -160,6 +182,10 @@ locals { default_node_pool = false pool_type = "trino-coordinator" } + "${var.cluster_name}-sqlmesh-worker-node-pool" = { + default_node_pool = false + pool_type = "sqlmesh-worker" + } }, var.extra_node_labels) node_pool_metadata = merge({ @@ -204,6 +230,13 @@ locals { effect = "NO_SCHEDULE" }, ] + "${var.cluster_name}-sqlmesh-worker-node-pool" = [ + { + key = "pool_type" + value = "sqlmesh-worker" + effect = "NO_SCHEDULE" + }, + ] }, var.extra_node_taints) node_pool_tags = merge({ @@ -225,6 +258,9 @@ locals { "${var.cluster_name}-trino-coordinator-pool" = [ "trino-coordinator", ] + "${var.cluster_name}-sqlmesh-worker-pool" = [ + "sqlmesh-worker", + ] }, var.extra_node_tags) node_pool_oauth_scopes = merge({ diff --git a/warehouse/metrics_mesh/models/events_daily_to_artifact.sql b/warehouse/metrics_mesh/models/events_daily_to_artifact.sql index 5e6fc81d1..38432336c 100644 --- a/warehouse/metrics_mesh/models/events_daily_to_artifact.sql +++ b/warehouse/metrics_mesh/models/events_daily_to_artifact.sql @@ -2,7 +2,7 @@ MODEL ( name metrics.events_daily_to_artifact, kind INCREMENTAL_BY_TIME_RANGE ( time_column bucket_day, - batch_size 90, + batch_size 45, batch_concurrency 1 ), start '2015-01-01', diff --git a/warehouse/metrics_tools/hack/load_sources.py b/warehouse/metrics_tools/hack/load_sources.py index 68ec00c84..a6fc93707 100644 --- a/warehouse/metrics_tools/hack/load_sources.py +++ b/warehouse/metrics_tools/hack/load_sources.py @@ -2,50 +2,147 @@ import os import click import duckdb +import queue +from dataclasses import dataclass +from boltons import fileutils from google.cloud import bigquery, storage from oso_dagster.utils.bq import export_to_gcs, BigQueryTableConfig -def download_parquet_files_from_gcs( +@dataclass(kw_only=True) +class Work: + bucket_name: str + blob_path: str + destination_path: str + table_name: str + + +@dataclass(kw_only=True) +class ParquetLoaded: + path: str + table_name: str + + +class Cancel(Exception): + pass + + +class Noop: + pass + + +def get_or_cancel[T](queue: queue.Queue, expected_type: t.Type[T]) -> T: + item = queue.get() + if isinstance(item, expected_type): + return t.cast(T, item) + raise Cancel("cancel queue") + + +def download_parquet_from_gcs( + work_queue: queue.Queue, + result_queue: queue.Queue, +): + print("starting parquet worker") + client = storage.Client() + while True: + try: + work = get_or_cancel(work_queue, Work) + except Cancel: + break + print(f"starting download to {work.destination_path}") + # Download + with open(work.destination_path, "wb") as file_obj: + blob_uri = os.path.join("gs://", work.bucket_name, work.blob_path) + client.download_blob_to_file( + blob_uri, + file_obj, + ) + print(f"downloaded: {blob_uri} to {work.destination_path}") + result_queue.put( + ParquetLoaded( + path=work.destination_path, + table_name=work.table_name, + ) + ) + # Enqueue the result + work_queue.task_done() + print("download worker done") + + +def load_into_duckdb(conn: duckdb.DuckDBPyConnection, queue: queue.Queue): + already_created: t.Dict[str, bool] = {} + + print("starting duckdb worker") + while True: + try: + loaded = get_or_cancel(queue, ParquetLoaded) + except Cancel: + break + print("hi") + # Load the file into duckdb + if loaded.table_name not in already_created: + already_created[loaded.table_name] = True + conn.execute( + f"CREATE TABLE {loaded.table_name} AS SELECT * FROM read_parquet('{loaded.path}');" + ) + print(f"Loaded {loaded.path} into new DuckDB table {loaded.table_name}") + else: + conn.execute( + f"INSERT INTO {loaded.table_name} SELECT * FROM read_parquet('{loaded.path}');" + ) + print(f"Appended {loaded.path} to DuckDB table {loaded.table_name}") + # Delete the file + os.remove(loaded.path) + queue.task_done() + print("duckdb done") + + +def get_parquet_work_list( gcs_client: storage.Client, bucket_name: str, prefix: str, local_folder: str, -): - bucket = gcs_client.bucket(bucket_name) + table_name: str, +) -> t.List[Work]: + bucket = gcs_client.get_bucket(bucket_name) blobs = bucket.list_blobs(prefix=prefix) + work_list: t.List[Work] = [] + for blob in blobs: if blob.name.endswith(".parquet"): local_file_path = os.path.join(local_folder, os.path.basename(blob.name)) - blob.download_to_filename(local_file_path) - print(f"Downloaded {blob.name} to {local_file_path}") + work = Work( + bucket_name=bucket_name, + blob_path=blob.name, + destination_path=local_file_path, + table_name=table_name, + ) + work_list.append(work) + return work_list -def load_parquet_to_duckdb( - con: duckdb.DuckDBPyConnection, parquet_folder: str, table_name: str -): - parquet_files = [f for f in os.listdir(parquet_folder) if f.endswith(".parquet")] +def bq_to_duckdb(table_mapping: t.Dict[str, str], conn: duckdb.DuckDBPyConnection): + """Copies the tables in table_mapping to tables in duckdb - table_exists = False - for parquet_file in parquet_files: - file_path = os.path.join(parquet_folder, parquet_file) + The table_mapping is in the form { "bigquery_table_fqn": "duckdb_table_fqn" } + """ + bqclient = bigquery.Client() - if table_exists: - # If table exists, append the data - con.execute( - f"INSERT INTO {table_name} SELECT * FROM read_parquet('{file_path}');" - ) - print(f"Appended {parquet_file} to DuckDB table {table_name}") - else: - # If table does not exist, create it with the Parquet data - con.execute( - f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}');" - ) - print(f"Loaded {parquet_file} into new DuckDB table {table_name}") - table_exists = True + conn.sql("CREATE SCHEMA IF NOT EXISTS sources;") + + for bq_table, duckdb_table in table_mapping.items(): + table = bigquery.TableReference.from_string(bq_table) + rows = bqclient.list_rows(table) - con.close() + table_as_arrow = rows.to_arrow(create_bqstorage_client=True) # noqa: F841 + + conn.sql( + f""" + CREATE TABLE IF NOT EXISTS {duckdb_table} AS + SELECT * FROM table_as_arrow + """ + ) class ExporterLoader: @@ -72,51 +169,62 @@ def __init__( def gcs_path(self): return os.path.join("gs://", self._gcs_bucket_name, self._gcs_bucket_path) - def run(self, tables: t.List[str]): + def run(self, tables: t.List[str], resume: bool = False): + self._db_conn.execute("SET memory_limit = '64GB';") self._db_conn.execute("CREATE SCHEMA IF NOT EXISTS sources;") + self._db_conn.execute( + """ + CREATE SECRET IF NOT EXISTS secret1 ( + TYPE GCS, + PROVIDER CREDENTIAL_CHAIN + ); + """ + ) + for table in tables: - self._export_and_load(table) + print(f"loading work for {table}") + prefix = self._export_and_load(table, resume) + print(f"creating table from {prefix}/*.parquet") + + self._db_conn.sql( + f""" + CREATE TABLE sources.{table} AS + SELECT * + FROM read_parquet('{prefix}/*.parquet'); + """ + ) def make_download_path(self, table_name: str): - return os.path.join(self._download_path, self._version, table_name) + download_path = os.path.join(self._download_path, self._version, table_name) + fileutils.mkdir_p(download_path) + return download_path - def _export_and_load(self, table: str): - events_gcs_path = os.path.join( + def _export_and_load(self, table: str, resume: bool = False): + gcs_path_uri = os.path.join( self.gcs_path, self._version, - "timeseries_events_by_artifact_v0", - ) - export_to_gcs( - self._bq_client, - BigQueryTableConfig( - project_id="opensource-observer", - dataset_name="oso", - service_account=None, - table_name=table, - ), - gcs_path=events_gcs_path, + table, ) + if not resume: + export_to_gcs( + self._bq_client, + BigQueryTableConfig( + project_id="opensource-observer", + dataset_name="oso", + service_account=None, + table_name=table, + ), + gcs_path=gcs_path_uri, + ) + print("gcs exported") - download_path = self.make_download_path(table) - - # Download the gcs stuff to a local working directory - prefix = os.path.join(self._gcs_bucket_path, self._version) - download_parquet_files_from_gcs( - self._gcs_client, - self._gcs_bucket_name, - prefix, - download_path, - ) + # download_path = self.make_download_path(table) # Load the data into duckdb - load_parquet_to_duckdb( - self._db_conn, - download_path, - f"sources.{table}", - ) - - # Delete the downloaded files - os.rmdir(download_path) + # get_parquet_work_list( + # self._gcs_client, self._gcs_bucket_name, prefix, download_path, table + # ) + return gcs_path_uri @click.command() @@ -124,6 +232,7 @@ def _export_and_load(self, table: str): @click.option("--gcs-bucket-name", envvar="GCS_BUCKET", required=True) @click.option("--gcs-bucket-path", envvar="GCS_BUCKET_PATH", required=True) @click.option("--download-path", envvar="DOWNLOAD_PATH", required=True) +@click.option("--resume/--no-resume", default=False) @click.option( "--version", envvar="VERSION", @@ -135,6 +244,7 @@ def main( gcs_bucket_name: str, gcs_bucket_path: str, download_path: str, + resume: bool, version: str, ): duckdb_conn = duckdb.connect(db_path) @@ -156,7 +266,8 @@ def main( "timeseries_events_by_artifact_v0", "artifacts_by_project_v1", "projects_by_collection_v1", - ] + ], + resume=resume, ) diff --git a/warehouse/metrics_tools/hack/run_metrics.sh b/warehouse/metrics_tools/hack/run_metrics.sh index 6c13aa458..99fe62aea 100644 --- a/warehouse/metrics_tools/hack/run_metrics.sh +++ b/warehouse/metrics_tools/hack/run_metrics.sh @@ -4,10 +4,12 @@ version=$1 mkdir -p /scratch/duckdb -mkdir -p /scratch/downloads +mkdir -p /scratch/downloads/${version} + +apt-get install -y vim tmux # Download everything to duckdb -python load_sources.py \ +python3 load_sources.py \ --gcs-bucket-name oso-dataset-transfer-bucket \ --gcs-bucket-path metrics-backstop \ --db-path /scratch/duckdb/metrics.db \