diff --git a/warehouse/oso_dagster/goldsky.py b/warehouse/oso_dagster/goldsky.py index 915069f4e..32ff0d4fe 100644 --- a/warehouse/oso_dagster/goldsky.py +++ b/warehouse/oso_dagster/goldsky.py @@ -16,7 +16,13 @@ import heapq from dagster import asset, AssetExecutionContext from dagster_gcp import BigQueryResource, GCSResource -from google.api_core.exceptions import NotFound +from google.api_core.exceptions import ( + NotFound, + InternalServerError, + BadRequest, + MethodNotAllowed, + ClientError, +) from google.cloud.bigquery import ( TableReference, LoadJobConfig, @@ -283,6 +289,21 @@ async def process(self, context: AssetExecutionContext): raise NotImplementedError("process not implemented on the base class") +def bq_retry( + context: AssetExecutionContext, f: Callable, retries: int = 5, min_wait: float = 1.0 +): + retry_wait = min_wait + for i in range(retries): + try: + return f() + except InternalServerError: + context.log.info("Server error encountered. waiting to retry") + time.sleep(retry_wait) + retry_wait += min_wait + except ClientError as e: + raise e + + class DirectGoldskyWorker(GoldskyWorker): async def process( self, @@ -311,13 +332,17 @@ def commit_pointer( context.log.debug("schema being overridden") job_config_options["schema"] = self.schema job_config = LoadJobConfig(**job_config_options) - load_job = client.load_table_from_uri( - files_to_load, - self.raw_table, - job_config=job_config, - timeout=self.config.load_table_timeout_seconds, - ) - load_job.result() + + def load_retry(): + load_job = client.load_table_from_uri( + files_to_load, + self.raw_table, + job_config=job_config, + timeout=self.config.load_table_timeout_seconds, + ) + return load_job.result() + + bq_retry(load_retry) context.log.info(f"Worker[{self.name}] Data loaded into bigquery") self.update_pointer_table(client, context, checkpoint, pointer_table_mutex)