Skip to content

Commit

Permalink
fix: use dagster retry mechanism (#2184)
Browse files Browse the repository at this point in the history
* fix: use dagster `retry` mechanism

* feat: implement debounced `retry` mechanism

* fix: use `retry` mechanism with `open_collective` and `eas` assets

* feat: implement `generic` retry mechanism

* fix: remove `ShallowAccount` in favor of `Account`
  • Loading branch information
Jabolol authored Sep 24, 2024
1 parent 4572f93 commit c328ee8
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 159 deletions.
80 changes: 53 additions & 27 deletions warehouse/oso_dagster/assets/eas_optimism.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import datetime, timedelta
from typing import Any, Dict

import dlt
from dagster import AssetExecutionContext, WeeklyPartitionsDefinition
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns
from pydantic import BaseModel
from pydantic import BaseModel, Field

from .open_collective import generate_steps
from ..utils.common import QueryArguments, QueryConfig, query_with_retry


class Attestation(BaseModel):
Expand All @@ -29,12 +30,35 @@ class Attestation(BaseModel):
isOffchain: bool


class EASOptimismParameters(QueryArguments):
take: int = Field(
...,
gt=0,
description="The number of nodes to fetch per query.",
)
skip: int = Field(
...,
ge=0,
description="The number of nodes to skip.",
)
where: Dict[str, Any] = Field(
...,
description="The where clause for the query.",
)


# The first attestation on EAS Optimism was created on the 07/28/2023 9:22:35 am
EAS_OPTIMISM_FIRST_ATTESTATION = datetime.fromtimestamp(1690557755)

# A sensible limit for the number of nodes to fetch per page
EAS_OPTIMISM_STEP_NODES_PER_PAGE = 10_000

# Minimum limit for the query, after which the query will fail
EAS_OPTIMISM_MINIMUM_LIMIT = 100

# The rate limit wait time in seconds
EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS = 65

# Kubernetes configuration for the asset materialization
K8S_CONFIG = {
"merge_behavior": "SHALLOW",
Expand Down Expand Up @@ -132,31 +156,32 @@ def get_optimism_eas_data(
"""
)

for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE):
try:
query = client.execute(
attestations_query,
variable_values={
"take": EAS_OPTIMISM_STEP_NODES_PER_PAGE,
"skip": step,
"where": {
"time": {
"gte": date_from,
"lte": date_to,
},
},
},
)
context.log.info(
f"Fetching attestation {step}/{total_count}",
)
yield query["attestations"]
except Exception as exception:
context.log.warning(
f"An error occurred while fetching EAS data: '{exception}'. "
"We will stop this materialization instead of retrying for now."
)
return []
config = QueryConfig(
limit=EAS_OPTIMISM_STEP_NODES_PER_PAGE,
minimum_limit=EAS_OPTIMISM_MINIMUM_LIMIT,
query_arguments=EASOptimismParameters(
take=EAS_OPTIMISM_STEP_NODES_PER_PAGE,
offset=0,
skip=0,
where={
"time": {
"gte": date_from,
"lte": date_to,
}
},
),
step_key="skip",
extract_fn=lambda data: data["attestations"],
ratelimit_wait_seconds=EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS,
)

yield from query_with_retry(
client,
context,
attestations_query,
total_count,
config,
)


def get_optimism_eas(context: AssetExecutionContext, client: Client):
Expand Down Expand Up @@ -208,4 +233,5 @@ def attestations(context: AssetExecutionContext):
name="attestations",
columns=pydantic_to_dlt_nullable_columns(Attestation),
primary_key="id",
write_disposition="merge",
)
Loading

0 comments on commit c328ee8

Please sign in to comment.