From 0a79d92d90a83eb35f698430b6e7b9f20e2740c0 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Tue, 28 May 2024 17:27:05 -0400 Subject: [PATCH] Goldsky Backfill and Automated Block number data checks (#1551) * goldsky backfills and cleanup * Working backfill job * Adds automated data check for block number --- warehouse/oso_dagster/assets.py | 25 +- warehouse/oso_dagster/cbt/__init__.py | 25 +- warehouse/oso_dagster/constants.py | 1 + warehouse/oso_dagster/definitions.py | 1 + warehouse/oso_dagster/factories/common.py | 14 +- warehouse/oso_dagster/factories/gcs.py | 8 +- .../oso_dagster/factories/goldsky/__init__.py | 3 + .../goldsky/assets.py} | 447 +++++++++++++----- .../oso_dagster/factories/goldsky/checks.py | 107 +++++ .../oso_dagster/factories/goldsky/config.py | 70 +++ .../goldsky/dask.py} | 0 .../goldsky/queries/block_number_check.sql | 5 + warehouse/oso_dagster/utils/gcs.py | 16 + 13 files changed, 582 insertions(+), 140 deletions(-) create mode 100644 warehouse/oso_dagster/factories/goldsky/__init__.py rename warehouse/oso_dagster/{goldsky.py => factories/goldsky/assets.py} (75%) create mode 100644 warehouse/oso_dagster/factories/goldsky/checks.py create mode 100644 warehouse/oso_dagster/factories/goldsky/config.py rename warehouse/oso_dagster/{goldsky_dask.py => factories/goldsky/dask.py} (100%) create mode 100644 warehouse/oso_dagster/factories/goldsky/queries/block_number_check.sql create mode 100644 warehouse/oso_dagster/utils/gcs.py diff --git a/warehouse/oso_dagster/assets.py b/warehouse/oso_dagster/assets.py index bb9da8367..b9895dd60 100644 --- a/warehouse/oso_dagster/assets.py +++ b/warehouse/oso_dagster/assets.py @@ -5,9 +5,12 @@ from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator from google.cloud.bigquery.schema import SchemaField from .constants import main_dbt_manifests, main_dbt_project_dir -from .goldsky import ( +from .factories.goldsky import ( GoldskyConfig, goldsky_asset, + traces_checks, + transactions_checks, + blocks_checks, ) from .factories import interval_gcs_import_asset, SourceMode, Interval, IntervalGCSAsset @@ -105,7 +108,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", - # uncomment the following value to test + checks=[blocks_checks()], ), ) @@ -121,7 +124,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", schema_overrides=[SchemaField(name="value", field_type="BYTES")], - # uncomment the following value to test + checks=[transactions_checks()], ), ) @@ -136,8 +139,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", - # uncomment the following value to test - # max_objects_to_load=2, + checks=[traces_checks()], ), ) @@ -152,6 +154,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[blocks_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -169,6 +172,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", schema_overrides=[SchemaField(name="value", field_type="BYTES")], + checks=[transactions_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -185,6 +189,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[traces_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -201,6 +206,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[blocks_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -218,6 +224,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", schema_overrides=[SchemaField(name="value", field_type="BYTES")], + checks=[transactions_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -234,6 +241,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[traces_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -250,6 +258,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="block_timestamp", dedupe_model="optimism_dedupe.sql", + checks=[traces_checks()], # uncomment the following value to test # max_objects_to_load=2000, ), @@ -266,6 +275,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[blocks_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -283,6 +293,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", schema_overrides=[SchemaField(name="value", field_type="BYTES")], + checks=[transactions_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -299,6 +310,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[traces_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -315,6 +327,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[blocks_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -332,6 +345,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", schema_overrides=[SchemaField(name="value", field_type="BYTES")], + checks=[transactions_checks()], # uncomment the following value to test # max_objects_to_load=1, ), @@ -348,6 +362,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs): destination_dataset_name="superchain", partition_column_name="block_timestamp", partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)", + checks=[traces_checks()], # uncomment the following value to test # max_objects_to_load=1, ), diff --git a/warehouse/oso_dagster/cbt/__init__.py b/warehouse/oso_dagster/cbt/__init__.py index 3654ce073..9895d65e0 100644 --- a/warehouse/oso_dagster/cbt/__init__.py +++ b/warehouse/oso_dagster/cbt/__init__.py @@ -56,14 +56,31 @@ def __init__( search_paths: List[str], ): self.bigquery = bigquery - search_paths.append( + self.search_paths = [ os.path.join(os.path.abspath(os.path.dirname(__file__)), "operations"), - ) - loader = FileSystemLoader(search_paths) + ] + self.add_search_paths(search_paths) + + self.log = log + self.load_env() + + def load_env(self): + loader = FileSystemLoader(self.search_paths) self.env = Environment( loader=loader, ) - self.log = log + + def add_search_paths(self, search_paths: List[str]): + for p in search_paths: + if not p in self.search_paths: + self.search_paths.append(p) + self.load_env() + + def query(self, model_file: str, timeout: float = 300, **vars): + with self.bigquery.get_client() as client: + rendered = self.render_model(model_file, **vars) + job = client.query(rendered) + return job.result() def transform( self, diff --git a/warehouse/oso_dagster/constants.py b/warehouse/oso_dagster/constants.py index fe9eb45fb..fbf1b6615 100644 --- a/warehouse/oso_dagster/constants.py +++ b/warehouse/oso_dagster/constants.py @@ -4,6 +4,7 @@ import pathlib import requests +from dagster import DefaultSensorStatus from dagster_dbt import DbtCliResource main_dbt_project_dir = Path(__file__).joinpath("..", "..", "..").resolve() diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py index 500145597..73590f629 100644 --- a/warehouse/oso_dagster/definitions.py +++ b/warehouse/oso_dagster/definitions.py @@ -42,6 +42,7 @@ def load_definitions(): assets=asset_defs + asset_factories.assets, schedules=schedules, jobs=asset_factories.jobs, + asset_checks=asset_factories.checks, sensors=asset_factories.sensors, resources=resources, ) diff --git a/warehouse/oso_dagster/factories/common.py b/warehouse/oso_dagster/factories/common.py index 7f8aebdda..9f2e6cdb9 100644 --- a/warehouse/oso_dagster/factories/common.py +++ b/warehouse/oso_dagster/factories/common.py @@ -2,7 +2,12 @@ from typing import List from dataclasses import dataclass, field -from dagster import SensorDefinition, AssetsDefinition, JobDefinition +from dagster import ( + SensorDefinition, + AssetsDefinition, + JobDefinition, + AssetChecksDefinition, +) class GenericGCSAsset: @@ -18,6 +23,7 @@ class AssetFactoryResponse: assets: List[AssetsDefinition] sensors: List[SensorDefinition] = field(default_factory=lambda: []) jobs: List[JobDefinition] = field(default_factory=lambda: []) + checks: List[AssetChecksDefinition] = field(default_factory=lambda: []) def load_assets_factories_from_modules( @@ -26,10 +32,14 @@ def load_assets_factories_from_modules( assets: List[AssetsDefinition] = [] sensors: List[SensorDefinition] = [] jobs: List[JobDefinition] = [] + checks: List[AssetChecksDefinition] = [] for module in modules: for _, obj in module.__dict__.items(): if type(obj) == AssetFactoryResponse: assets.extend(obj.assets) sensors.extend(obj.sensors) jobs.extend(obj.jobs) - return AssetFactoryResponse(assets=assets, sensors=sensors, jobs=jobs) + checks.extend(obj.checks) + return AssetFactoryResponse( + assets=assets, sensors=sensors, jobs=jobs, checks=checks + ) diff --git a/warehouse/oso_dagster/factories/gcs.py b/warehouse/oso_dagster/factories/gcs.py index c75d25ef8..72a2c72b0 100644 --- a/warehouse/oso_dagster/factories/gcs.py +++ b/warehouse/oso_dagster/factories/gcs.py @@ -196,9 +196,11 @@ def gcs_asset( } ) + asset_config = config + @op(name=f"{config.name}_clean_up_op") def gcs_clean_up_op(context: OpExecutionContext, config: dict): - context.log.info(f"Running clean up for {key}") + context.log.info(f"Running clean up for {asset_config.name}") print(config) @job(name=f"{config.name}_clean_up_job") @@ -209,7 +211,7 @@ def gcs_clean_up_job(): asset_key=gcs_asset.key, name=f"{config.name}_clean_up_sensor", job=gcs_clean_up_job, - default_status=DefaultSensorStatus.RUNNING, + default_status=DefaultSensorStatus.STOPPED, ) def gcs_clean_up_sensor( context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry @@ -220,7 +222,7 @@ def gcs_clean_up_sensor( run_config=RunConfig( ops={ f"{config.name}_clean_up_op": { - "config": {"asset_event": asset_event} + "op_config": {"asset_event": asset_event} } } ), diff --git a/warehouse/oso_dagster/factories/goldsky/__init__.py b/warehouse/oso_dagster/factories/goldsky/__init__.py new file mode 100644 index 000000000..1712123ae --- /dev/null +++ b/warehouse/oso_dagster/factories/goldsky/__init__.py @@ -0,0 +1,3 @@ +from .assets import * +from .dask import * +from .checks import * diff --git a/warehouse/oso_dagster/goldsky.py b/warehouse/oso_dagster/factories/goldsky/assets.py similarity index 75% rename from warehouse/oso_dagster/goldsky.py rename to warehouse/oso_dagster/factories/goldsky/assets.py index b977f4ec0..aecc71d39 100644 --- a/warehouse/oso_dagster/goldsky.py +++ b/warehouse/oso_dagster/factories/goldsky/assets.py @@ -12,15 +12,29 @@ from dask.distributed import get_worker from dask_kubernetes.operator import make_cluster_spec from dataclasses import dataclass, field -from typing import List, Mapping, Tuple, Callable, Optional, Sequence +from typing import List, Mapping, Tuple, Dict, Callable, Optional, Sequence import heapq -from dagster import asset, AssetExecutionContext +from dagster import ( + asset, + op, + job, + asset_sensor, + asset_check, + AssetExecutionContext, + RunRequest, + SensorEvaluationContext, + EventLogEntry, + RunConfig, + OpExecutionContext, + DagsterLogManager, + DefaultSensorStatus, + AssetsDefinition, + AssetChecksDefinition, +) from dagster_gcp import BigQueryResource, GCSResource from google.api_core.exceptions import ( NotFound, InternalServerError, - BadRequest, - MethodNotAllowed, ClientError, ) from google.cloud.bigquery import ( @@ -30,66 +44,11 @@ Client as BQClient, ) from google.cloud.bigquery.schema import SchemaField -from .goldsky_dask import setup_kube_cluster_client, DuckDBGCSPlugin, RetryTaskManager -from .cbt import CBTResource, UpdateStrategy, TimePartitioning -from .factories import AssetFactoryResponse - - -@dataclass(kw_only=True) -class GoldskyConfig: - # This is the name of the asset within the goldsky directory path in gcs - name: str - key_prefix: Optional[str | Sequence[str]] = "" - project_id: str - source_name: str - destination_table_name: str - - # Maximum number of objects we can load into a load job is 10000 so the - # largest this can be is 10000. - pointer_size: int = int(os.environ.get("GOLDSKY_CHECKPOINT_SIZE", "5000")) - - max_objects_to_load: int = 200_000 - - destination_dataset_name: str = "oso_sources" - destination_bucket_name: str = "oso-dataset-transfer-bucket" - - source_bucket_name: str = "oso-dataset-transfer-bucket" - source_goldsky_dir: str = "goldsky" - - dask_worker_memory: str = "4096Mi" - dask_scheduler_memory: str = "2560Mi" - dask_image: str = "ghcr.io/opensource-observer/dagster-dask:distributed-test-10" - dask_is_enabled: bool = False - dask_bucket_key_id: str = "" - dask_bucket_secret: str = "" - - # Allow 15 minute load table jobs - load_table_timeout_seconds: float = 3600 - transform_timeout_seconds: float = 3600 - - working_destination_dataset_name: str = "oso_raw_sources" - working_destination_preload_path: str = "_temp" - - dedupe_model: str = "goldsky_dedupe.sql" - dedupe_unique_column: str = "id" - dedupe_order_column: str = "ingestion_time" - merge_workers_model: str = "goldsky_merge_workers.sql" - - partition_column_name: str = "" - partition_column_type: str = "DAY" - partition_column_transform: Callable = lambda a: a - - schema_overrides: List[SchemaField] = field(default_factory=lambda: []) - - @property - def destination_table_fqdn(self): - return f"{self.project_id}.{self.destination_dataset_name}.{self.destination_table_name}" - - def worker_raw_table_fqdn(self, worker: str): - return f"{self.project_id}.{self.working_destination_dataset_name}.{self.destination_table_name}_{worker}" - - def worker_deduped_table_fqdn(self, worker: str): - return f"{self.project_id}.{self.working_destination_dataset_name}.{self.destination_table_name}_deduped_{worker}" +from .dask import setup_kube_cluster_client, DuckDBGCSPlugin, RetryTaskManager +from ...cbt import CBTResource, UpdateStrategy, TimePartitioning +from .. import AssetFactoryResponse +from ...utils.gcs import batch_delete_blobs +from .config import GoldskyConfig @dataclass @@ -134,6 +93,25 @@ def __ge__(self, other): return self > other +class GoldskyCheckpointRange: + def __init__( + self, + start: Optional[GoldskyCheckpoint] = None, + end: Optional[GoldskyCheckpoint] = None, + ): + self._start = start or GoldskyCheckpoint("0", 0, 0) + self._end = end + + def in_range(self, checkpoint: GoldskyCheckpoint) -> bool: + if checkpoint >= self._start: + if self._end is None: + return True + else: + return checkpoint < self._end + else: + return False + + @dataclass class GoldskyQueueItem: checkpoint: GoldskyCheckpoint @@ -617,24 +595,6 @@ def blocking_update_pointer_table( context.log.info(rows) -def goldsky_asset(config: GoldskyConfig) -> AssetFactoryResponse: - @asset(name=config.name, key_prefix=config.key_prefix) - def generated_asset( - context: AssetExecutionContext, - bigquery: BigQueryResource, - gcs: GCSResource, - cbt: CBTResource, - ): - loop = asyncio.new_event_loop() - context.log.info(f"Run ID: {context.run_id}") - gs_asset = GoldskyAsset(gcs, bigquery, cbt, config) - loop.run_until_complete(gs_asset.materialize(loop, context)) - - return AssetFactoryResponse( - assets=[generated_asset], - ) - - def decimal_convert(name: str, field: polars.Decimal): if field.precision == 100 and field.scale == 0: return SchemaField(name, field_type="NUMERIC") @@ -678,6 +638,7 @@ def __init__( bigquery: BigQueryResource, cbt: CBTResource, config: GoldskyConfig, + pointer_table_suffix: str = "", ): self.config = config self.gcs = gcs @@ -687,16 +648,20 @@ def __init__( self._job_id = arrow.now().format("YYYYMMDDHHmm") self.cached_blobs_to_process: List[re.Match[str]] | None = None self.schema = None + self.pointer_table_suffix = pointer_table_suffix async def materialize( - self, loop: asyncio.AbstractEventLoop, context: AssetExecutionContext + self, + loop: asyncio.AbstractEventLoop, + context: AssetExecutionContext, + checkpoint_range: Optional[GoldskyCheckpointRange] = None, ): context.log.info( {"info": "starting goldsky asset load", "name": self.config.source_name} ) self.ensure_datasets(context) - workers = await self.load_worker_tables(loop, context) + workers = await self.load_worker_tables(loop, context, checkpoint_range) # Dedupe and partition the current worker table into a deduped and partitioned table await self.dedupe_worker_tables(context, workers) @@ -744,19 +709,26 @@ def ensure_dataset(self, context: AssetExecutionContext, dataset_id: str): client.create_dataset(dataset_id) async def load_worker_tables( - self, loop: asyncio.AbstractEventLoop, context: AssetExecutionContext + self, + loop: asyncio.AbstractEventLoop, + context: AssetExecutionContext, + checkpoint_range: GoldskyCheckpointRange, ): self.ensure_pointer_table(context) if self.config.dask_is_enabled: return await self.dask_load_worker_tables(loop, context) - return await self.direct_load_worker_tables(context) + return await self.direct_load_worker_tables(context, checkpoint_range) async def direct_load_worker_tables( - self, context: AssetExecutionContext + self, + context: AssetExecutionContext, + checkpoint_range: GoldskyCheckpointRange, ) -> GoldskyWorker: worker_coroutines = [] workers: List[GoldskyWorker] = [] - worker_status, queues = self.load_queues(context) + worker_status, queues = self.load_queues_to_process( + context.log, checkpoint_range + ) if len(self.config.schema_overrides) > 0: self.load_schema(queues) @@ -782,7 +754,10 @@ async def direct_load_worker_tables( return workers async def dask_load_worker_tables( - self, loop: asyncio.AbstractEventLoop, context: AssetExecutionContext + self, + loop: asyncio.AbstractEventLoop, + context: AssetExecutionContext, + checkpoint_range: Optional[GoldskyCheckpointRange], ) -> List[GoldskyWorker]: context.log.info("loading worker tables for goldsky asset") last_restart = time.time() @@ -797,7 +772,9 @@ async def dask_load_worker_tables( context.log, ) try: - return await self.parallel_load_worker_tables(task_manager, context) + return await self.parallel_load_worker_tables( + task_manager, context, checkpoint_range + ) finally: task_manager.close() except Exception as e: @@ -814,9 +791,14 @@ async def dask_load_worker_tables( retries += 1 async def parallel_load_worker_tables( - self, task_manager: RetryTaskManager, context: AssetExecutionContext + self, + task_manager: RetryTaskManager, + context: AssetExecutionContext, + checkpoint_range: Optional[GoldskyCheckpointRange], ): - worker_status, queues = self.load_queues(context) + worker_status, queues = self.load_queues_to_process( + context.log, checkpoint_range + ) context.log.debug(f"spec: ${json.dumps(self.cluster_spec)}") @@ -912,7 +894,7 @@ async def clean_working_destination( client.delete_table(worker.raw_table) client.delete_table(worker.deduped_table) - def get_worker_status(self, context: AssetExecutionContext): + def get_worker_status(self, log: DagsterLogManager): worker_status: Mapping[str, GoldskyCheckpoint] = {} # Get the current state with self.bigquery.get_client() as client: @@ -924,26 +906,35 @@ def get_worker_status(self, context: AssetExecutionContext): """ ) for row in rows: - context.log.debug(row) worker_status[row.worker] = GoldskyCheckpoint( job_id=row.job_id, timestamp=row.timestamp, worker_checkpoint=row.checkpoint, ) + log.info( + f"Worker[{row.worker}]: Last checkpoint @ TS:{row.timestamp} JOB:{row.job_id} CHK:{row.checkpoint}" + ) except NotFound: - context.log.info( + log.info( f"No pointer status found at {self.pointer_table}. Will create the table later" ) return worker_status @property def pointer_table(self): - return f"{self.config.project_id}.{self.config.working_destination_dataset_name}.{self.config.destination_table_name}_pointer_state" + return f"{self.config.project_id}.{self.config.working_destination_dataset_name}.{self.pointer_table_name}" + + @property + def pointer_table_name(self): + pointer_table_suffix = self.pointer_table_suffix + if pointer_table_suffix != "" and not pointer_table_suffix.startswith("_"): + pointer_table_suffix = f"_{pointer_table_suffix}" + return f"{self.config.destination_table_name}_pointer_state{self.pointer_table_suffix}" def ensure_pointer_table(self, context: AssetExecutionContext): config = self.config - pointer_table_name = f"{config.destination_table_name}_pointer_state" - pointer_table = f"{config.project_id}.{config.working_destination_dataset_name}.{pointer_table_name}" + pointer_table_name = self.pointer_table_name + pointer_table = self.pointer_table context.log.info( f"ensuring that the sync pointer table exists at {pointer_table}" ) @@ -1011,44 +1002,110 @@ def goldsky_re(self): + r"/(?P\d+)-(?P[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-(?P\d+)-(?P\d+).parquet" ) - def load_queues( - self, context: AssetExecutionContext - ) -> Tuple[dict[str, GoldskyCheckpoint], GoldskyQueues]: + def clean_up(self, log: DagsterLogManager): + worker_status = self.get_worker_status(log) + + end_checkpoint = worker_status.get("0") + for worker, checkpoint in worker_status.items(): + if checkpoint < end_checkpoint: + end_checkpoint = checkpoint + + queues = self.load_queues( + log, + checkpoint_range=GoldskyCheckpointRange(end=end_checkpoint), + max_objects_to_load=100_000, + blobs_loader=self._uncached_blobs_loader, + ) + + for worker, queue in queues.worker_queues(): + cleaning_count = queue.len() - self.config.retention_files + if cleaning_count <= 0: + log.info(f"Worker[{worker}]: nothing to clean") + log.info(f"Worker[{worker}]: cleaning {cleaning_count} files") + + blobs: str = [] + for i in range(cleaning_count): + item = queue.dequeue() + blobs.append(item.blob_name) + last_blob = blobs[-1] + log.info(f"would delete up to {last_blob}") + gcs_client = self.gcs.get_client() + # batch_delete_blobs(gcs_client, self.config.source_bucket_name, blobs, 1000) + + def _uncached_blobs_loader(self, log: DagsterLogManager): + log.info("Loading blobs list for processing") gcs_client = self.gcs.get_client() + blobs = gcs_client.list_blobs( + self.config.source_bucket_name, + prefix=f"{self.config.source_goldsky_dir}/{self.config.source_name}", + ) + blobs_to_process = [] + for blob in blobs: + match = self.goldsky_re.match(blob.name) + if not match: + continue + blobs_to_process.append(match) + return blobs_to_process + def _cached_blobs_loader(self, log: DagsterLogManager): if self.cached_blobs_to_process is None: - context.log.info("Caching blob list for processing") - blobs = gcs_client.list_blobs( - self.config.source_bucket_name, - prefix=f"{self.config.source_goldsky_dir}/{self.config.source_name}", - ) - self.cached_blobs_to_process = [] - for blob in blobs: - match = self.goldsky_re.match(blob.name) - if not match: - continue - self.cached_blobs_to_process.append(match) + self.cached_blobs_to_process = self._uncached_blobs_loader(log) else: - context.log.info("Using cached blob list for processing") - - examples = dict() - queues = GoldskyQueues(max_size=self.config.max_objects_to_load) - - # We should not cache the worker status as we may add unnecessary duplicate work - worker_status = self.get_worker_status(context) + log.info("using cached blobs") + return self.cached_blobs_to_process + def load_queues( + self, + log: DagsterLogManager, + worker_status: Dict[str, GoldskyCheckpoint] = None, + max_objects_to_load: Optional[int] = None, + blobs_loader: Callable[[DagsterLogManager], List[re.Match[str]]] = None, + checkpoint_range: Optional[GoldskyCheckpointRange] = None, + ) -> GoldskyQueues: latest_timestamp = 0 + if not max_objects_to_load: + max_objects_to_load = self.config.max_objects_to_load + queues = GoldskyQueues(max_size=max_objects_to_load) + + if not blobs_loader: + blobs_loader = self._cached_blobs_loader + + # The default filter condition is to skip things that are _before_ the worker + blobs_to_process = blobs_loader(log) + + if checkpoint_range: + log.info( + { + "message": "Using a checkpoint range", + "end": checkpoint_range._end, + "start": checkpoint_range._start, + } + ) - for match in self.cached_blobs_to_process: + for match in blobs_to_process: worker = match.group("worker") job_id = match.group("job_id") timestamp = int(match.group("timestamp")) - examples[job_id] = match + if timestamp > latest_timestamp: + latest_timestamp = timestamp worker_checkpoint = int(match.group("checkpoint")) checkpoint = GoldskyCheckpoint(job_id, timestamp, worker_checkpoint) - if checkpoint <= worker_status.get(worker, GoldskyCheckpoint("", 0, 0)): - continue - context.log.debug(f"Queuing {match.group()}") + + # If there's a checkpoint range only queue checkpoints within that range + if checkpoint_range: + if not checkpoint_range.in_range(checkpoint): + continue + + # If there's a worker status then queue if the current checkpoint is + # greater than or equal to it + if worker_status: + worker_checkpoint = worker_status.get( + worker, GoldskyCheckpoint("", 0, 0) + ) + if worker_checkpoint >= checkpoint: + continue + + # log.debug(f"Queueing {match.group()}") queues.enqueue( worker, GoldskyQueueItem( @@ -1060,8 +1117,12 @@ def load_queues( keys = list(worker_status.keys()) if len(keys) > 0: expected_timestamp_of_worker_status = worker_status.get(keys[0]) + # Originally multiple timestamp values keys was considered an error + # but it turns out that this is a normal part of the process. This + # check is just to get a log for when it does change which might be + # useful for our own tracing/debugging purposes. if expected_timestamp_of_worker_status.timestamp != latest_timestamp: - context.log.info( + log.info( { "message": ( "Pipeline timestamp changed." @@ -1073,7 +1134,141 @@ def load_queues( } ) + return queues + + def load_queues_to_process( + self, + log: DagsterLogManager, + checkpoint_range: Optional[GoldskyCheckpointRange], + ) -> Tuple[dict[str, GoldskyCheckpoint], GoldskyQueues]: + worker_status = self.get_worker_status(log) + + queues = self.load_queues( + log, worker_status=worker_status, checkpoint_range=checkpoint_range + ) + for worker, queue in queues.worker_queues(): - context.log.info(f"Worker[{worker}] queue size: {queue.len()}") + log.info(f"Worker[{worker}] queue size: {queue.len()}") return (worker_status, queues) + + +@dataclass +class GoldskyBackfillOpInput: + backfill_label: str + start_checkpoint: Optional[GoldskyCheckpoint] + end_checkpoint: Optional[GoldskyCheckpoint] + + +def goldsky_asset(asset_config: GoldskyConfig) -> AssetFactoryResponse: + def materialize_asset( + context: OpExecutionContext, + bigquery: BigQueryResource, + gcs: GCSResource, + cbt: CBTResource, + checkpoint_range: Optional[GoldskyCheckpointRange] = None, + pointer_table_suffix: str = "", + ): + loop = asyncio.new_event_loop() + gs_asset = GoldskyAsset( + gcs, bigquery, cbt, asset_config, pointer_table_suffix=pointer_table_suffix + ) + loop.run_until_complete( + gs_asset.materialize(loop, context, checkpoint_range=checkpoint_range) + ) + + @asset(name=asset_config.name, key_prefix=asset_config.key_prefix) + def generated_asset( + context: AssetExecutionContext, + bigquery: BigQueryResource, + gcs: GCSResource, + cbt: CBTResource, + ): + context.log.info(f"Run ID: {context.run_id} AssetKey: {context.asset_key}") + materialize_asset(context, bigquery, gcs, cbt) + + related_ops_prefix = "_".join(generated_asset.key.path) + + @op(name=f"{related_ops_prefix}_clean_up_op") + def goldsky_clean_up_op( + context: OpExecutionContext, + bigquery: BigQueryResource, + gcs: GCSResource, + cbt: CBTResource, + config: dict, + ): + print(config) + gs_asset = GoldskyAsset(gcs, bigquery, cbt, asset_config) + gs_asset.clean_up(context.log) + + @op(name=f"{related_ops_prefix}_backfill_op") + def goldsky_backfill_op( + context: OpExecutionContext, + bigquery: BigQueryResource, + gcs: GCSResource, + cbt: CBTResource, + config: dict, + ): + start_checkpoint = None + end_checkpoint = None + if "start" in config: + start_checkpoint = GoldskyCheckpoint(*config["start"]) + if "end" in config: + end_checkpoint = GoldskyCheckpoint(*config["end"]) + op_input = GoldskyBackfillOpInput( + backfill_label=config["backfill_label"], + start_checkpoint=start_checkpoint, + end_checkpoint=end_checkpoint, + ) + context.log.info("Starting a backfill") + materialize_asset( + context, + bigquery, + gcs, + cbt, + checkpoint_range=GoldskyCheckpointRange( + start=op_input.start_checkpoint, end=op_input.end_checkpoint + ), + pointer_table_suffix=op_input.backfill_label, + ) + # Hack for now. + return "Done" + + @job(name=f"{related_ops_prefix}_clean_up_job") + def goldsky_clean_up_job(): + goldsky_clean_up_op() + + @job(name=f"{related_ops_prefix}_backfill_job") + def goldsky_backfill_job(): + goldsky_backfill_op() + + @asset_sensor( + asset_key=generated_asset.key, + name=f"{related_ops_prefix}_clean_up_sensor", + job=goldsky_clean_up_job, + default_status=DefaultSensorStatus.STOPPED, + ) + def goldsky_clean_up_sensor( + context: SensorEvaluationContext, asset_event: EventLogEntry + ): + yield RunRequest( + run_key=context.cursor, + run_config=RunConfig( + ops={ + f"{related_ops_prefix}_clean_up_op": { + "config": {"asset_event": asset_event} + } + } + ), + ) + + checks: List[AssetChecksDefinition] = [] + for check in asset_config.checks: + checks.extend(check(asset_config, generated_asset)) + + return AssetFactoryResponse( + assets=[generated_asset], + sensors=[goldsky_clean_up_sensor], + jobs=[goldsky_clean_up_job, goldsky_backfill_job], + checks=checks, + ) diff --git a/warehouse/oso_dagster/factories/goldsky/checks.py b/warehouse/oso_dagster/factories/goldsky/checks.py new file mode 100644 index 000000000..008c06d21 --- /dev/null +++ b/warehouse/oso_dagster/factories/goldsky/checks.py @@ -0,0 +1,107 @@ +import os +from typing import List +from dataclasses import dataclass + +from .config import GoldskyConfig, CheckFactory +from dagster import ( + AssetsDefinition, + asset_check, + AssetChecksDefinition, + AssetCheckExecutionContext, + AssetCheckResult, + AssetCheckSeverity, +) +from dagster_gcp import BigQueryResource +from ...cbt import CBTResource + + +def generated_asset_prefix(asset: AssetsDefinition): + return "_".join(asset.key.path) + + +def block_number_check( + block_number_column_name: str, config: GoldskyConfig, asset: AssetsDefinition +) -> AssetChecksDefinition: + prefix = generated_asset_prefix(asset) + + @asset_check(name=f"{prefix}_block_number_check", asset=asset) + def _block_number_check( + context: AssetCheckExecutionContext, + cbt: CBTResource, + ): + c = cbt.get(context.log) + c.add_search_paths( + [os.path.join(os.path.abspath(os.path.dirname(__file__)), "queries")] + ) + + block_number_results = list( + c.query( + "block_number_check.sql", + block_number_column_name=block_number_column_name, + table=config.destination_table_fqdn, + ) + ) + if len(block_number_results) != 1: + return AssetCheckResult( + passed=False, + severity=AssetCheckSeverity.ERROR, + description="No block results. Is this table empty?", + ) + min_block_number = block_number_results[0].min_block_number + max_block_number = block_number_results[0].max_block_number + blocks_count = block_number_results[0].blocks_count + # Add one to include the min block in the count + expected_count = max_block_number - min_block_number + 1 + metadata = dict( + min_block_number=min_block_number, + max_block_number=max_block_number, + blocks_count=blocks_count, + expected_count=expected_count, + ) + if min_block_number not in [0, 1]: + return AssetCheckResult( + passed=False, + severity=AssetCheckSeverity.WARN, + description="Minimum block number is not 0 or 1 for a full scan", + metadata=metadata, + ) + + return AssetCheckResult( + passed=blocks_count == expected_count, + severity=AssetCheckSeverity.WARN, + description="Did not get the expected number of blocks", + metadata=metadata, + ) + + return _block_number_check + + +def traces_checks( + block_number_column_name: str = "block_number", +) -> CheckFactory[GoldskyConfig]: + def check_factory( + config: GoldskyConfig, asset: AssetsDefinition + ) -> List[AssetChecksDefinition]: + # TODO add a check to check traces exist for all transaction_ids + return [block_number_check(block_number_column_name, config, asset)] + + return check_factory + + +def transactions_checks( + block_number_column_name: str = "block_number", +) -> CheckFactory[GoldskyConfig]: + def check_factory(config: GoldskyConfig, asset: AssetsDefinition): + # TODO add a check to check ensure that transactions exist for all blocks + return [block_number_check(block_number_column_name, config, asset)] + + return check_factory + + +def blocks_checks( + block_number_column_name: str = "number", +) -> CheckFactory[GoldskyConfig]: + def check_factory(config: GoldskyConfig, asset: AssetsDefinition): + return [block_number_check(block_number_column_name, config, asset)] + + return check_factory diff --git a/warehouse/oso_dagster/factories/goldsky/config.py b/warehouse/oso_dagster/factories/goldsky/config.py new file mode 100644 index 000000000..78e65588f --- /dev/null +++ b/warehouse/oso_dagster/factories/goldsky/config.py @@ -0,0 +1,70 @@ +import os +from typing import Optional, Callable, List, Sequence, TypeVar +from dataclasses import dataclass, field + +from google.cloud.bigquery.schema import SchemaField +from dagster import AssetChecksDefinition, AssetsDefinition + +T = TypeVar("T") +CheckFactory = Callable[[T, AssetsDefinition], List[AssetChecksDefinition]] + + +@dataclass(kw_only=True) +class GoldskyConfig: + # This is the name of the asset within the goldsky directory path in gcs + name: str + key_prefix: Optional[str | Sequence[str]] = "" + project_id: str + source_name: str + destination_table_name: str + + # Maximum number of objects we can load into a load job is 10000 so the + # largest this can be is 10000. + pointer_size: int = int(os.environ.get("GOLDSKY_CHECKPOINT_SIZE", "5000")) + + max_objects_to_load: int = 200_000 + + destination_dataset_name: str = "oso_sources" + destination_bucket_name: str = "oso-dataset-transfer-bucket" + + source_bucket_name: str = "oso-dataset-transfer-bucket" + source_goldsky_dir: str = "goldsky" + + dask_worker_memory: str = "4096Mi" + dask_scheduler_memory: str = "2560Mi" + dask_image: str = "ghcr.io/opensource-observer/dagster-dask:distributed-test-10" + dask_is_enabled: bool = False + dask_bucket_key_id: str = "" + dask_bucket_secret: str = "" + + # Allow 15 minute load table jobs + load_table_timeout_seconds: float = 3600 + transform_timeout_seconds: float = 3600 + + working_destination_dataset_name: str = "oso_raw_sources" + working_destination_preload_path: str = "_temp" + + dedupe_model: str = "goldsky_dedupe.sql" + dedupe_unique_column: str = "id" + dedupe_order_column: str = "ingestion_time" + merge_workers_model: str = "goldsky_merge_workers.sql" + + partition_column_name: str = "" + partition_column_type: str = "DAY" + partition_column_transform: Callable = lambda a: a + + schema_overrides: List[SchemaField] = field(default_factory=lambda: []) + + retention_files: int = 10000 + + checks: List[CheckFactory["GoldskyConfig"]] = field(default_factory=lambda: []) + + @property + def destination_table_fqdn(self): + return f"{self.project_id}.{self.destination_dataset_name}.{self.destination_table_name}" + + def worker_raw_table_fqdn(self, worker: str): + return f"{self.project_id}.{self.working_destination_dataset_name}.{self.destination_table_name}_{worker}" + + def worker_deduped_table_fqdn(self, worker: str): + return f"{self.project_id}.{self.working_destination_dataset_name}.{self.destination_table_name}_deduped_{worker}" diff --git a/warehouse/oso_dagster/goldsky_dask.py b/warehouse/oso_dagster/factories/goldsky/dask.py similarity index 100% rename from warehouse/oso_dagster/goldsky_dask.py rename to warehouse/oso_dagster/factories/goldsky/dask.py diff --git a/warehouse/oso_dagster/factories/goldsky/queries/block_number_check.sql b/warehouse/oso_dagster/factories/goldsky/queries/block_number_check.sql new file mode 100644 index 000000000..dc41da891 --- /dev/null +++ b/warehouse/oso_dagster/factories/goldsky/queries/block_number_check.sql @@ -0,0 +1,5 @@ +SELECT + MIN({{ block_number_column_name }}) as min_block_number, + MAX({{ block_number_column_name }}) as max_block_number, + COUNT(DISTINCT {{ block_number_column_name }}) as blocks_count +FROM {{ table }} \ No newline at end of file diff --git a/warehouse/oso_dagster/utils/gcs.py b/warehouse/oso_dagster/utils/gcs.py new file mode 100644 index 000000000..f1e1c3b14 --- /dev/null +++ b/warehouse/oso_dagster/utils/gcs.py @@ -0,0 +1,16 @@ +from google.cloud.storage import Client +from typing import List + + +def batch_delete_blobs( + gcs_client: Client, bucket_name: str, blobs: List[str], batch_size: int +): + bucket = gcs_client.bucket(bucket_name) + + batch: List[str] = [] + for blob in blobs: + batch.append(blob) + if len(batch) == batch_size: + bucket.delete_blobs(blobs=batch) + if len(batch) > 0: + bucket.delete_blobs(blobs=batch)