Skip to content

Commit

Permalink
Dagster Checks for Traces and Transactions (#1633)
Browse files Browse the repository at this point in the history
* Starting sql aware checks

* Got time constraining working

* add all checks for blockchain artifacts
  • Loading branch information
ravenac95 authored Jun 13, 2024
1 parent d899213 commit ade9548
Show file tree
Hide file tree
Showing 23 changed files with 2,048 additions and 1,382 deletions.
2,599 changes: 1,253 additions & 1,346 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ packages = [

[tool.poetry.dependencies]
python = "^3.11,<3.13"
example-plugin = { path = "warehouse/cloudquery-example-plugin", develop = true }
google-cloud-bigquery = "^3.17.1"
pendulum = "^3.0.0"
google-api-python-client = "^2.116.0"
Expand All @@ -40,6 +39,8 @@ arrow = "^1.3.0"
polars = "^0.20.23"
requests = "^2.31.0"
dagster-postgres = "^0.23.6"
pytest = "^8.2.1"
dbt-core = "^1.8.2"


[tool.poetry.scripts]
Expand Down
2 changes: 1 addition & 1 deletion warehouse/cloudquery-example-plugin/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ packages = [{ include = "example_plugin" }]

[tool.poetry.dependencies]
python = "^3.11"
cloudquery-plugin-sdk = "^0.1.12"
cloudquery-plugin-sdk = "^0.1.26"

[tool.poetry.scripts]
example_plugin = 'example_plugin.serve:run'
Expand Down
43 changes: 30 additions & 13 deletions warehouse/oso_dagster/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
checks=[transactions_checks("opensource-observer.superchain.base_blocks")],
),
deps=base_blocks.assets,
)

base_traces = goldsky_asset(
Expand All @@ -130,8 +131,9 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
checks=[traces_checks("opensource-observer.superchain.base_transactions")],
),
deps=base_transactions.assets,
)

frax_blocks = goldsky_asset(
Expand Down Expand Up @@ -163,10 +165,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
checks=[transactions_checks("opensource-observer.superchain.frax_blocks")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=frax_blocks.assets,
)

frax_traces = goldsky_asset(
Expand All @@ -180,10 +183,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
checks=[traces_checks("opensource-observer.superchain.frax_transactions")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=frax_transactions.assets,
)

mode_blocks = goldsky_asset(
Expand Down Expand Up @@ -215,10 +219,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
checks=[transactions_checks("opensource-observer.superchain.mode_blocks")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=mode_blocks.assets,
)

mode_traces = goldsky_asset(
Expand All @@ -232,10 +237,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
checks=[traces_checks("opensource-observer.superchain.mode_transactions")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=mode_transactions.assets,
)

metal_blocks = goldsky_asset(
Expand Down Expand Up @@ -265,8 +271,9 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
checks=[transactions_checks("opensource-observer.superchain.metal_blocks")],
),
deps=metal_blocks.assets,
)


Expand All @@ -281,8 +288,9 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
checks=[traces_checks("opensource-observer.superchain.metal_transactions")],
),
deps=metal_transactions.assets,
)

optimism_traces = goldsky_asset(
Expand All @@ -296,7 +304,12 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
dedupe_model="optimism_dedupe.sql",
checks=[traces_checks()],
checks=[
traces_checks(
"bigquery-public-data.goog_blockchain_optimism_mainnet_us.transactions",
transactions_transaction_hash_column_name="transaction_hash",
)
],
# uncomment the following value to test
# max_objects_to_load=2000,
),
Expand Down Expand Up @@ -331,10 +344,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
checks=[transactions_checks("opensource-observer.superchain.pgn_blocks")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=pgn_blocks.assets,
)

pgn_traces = goldsky_asset(
Expand All @@ -348,10 +362,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
checks=[traces_checks("opensource-observer.superchain.pgn_transactions")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=pgn_transactions.assets,
)

zora_blocks = goldsky_asset(
Expand Down Expand Up @@ -383,10 +398,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
checks=[transactions_checks("opensource-observer.superchain.zora_blocks")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=zora_blocks.assets,
)

zora_traces = goldsky_asset(
Expand All @@ -400,10 +416,11 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
checks=[traces_checks("opensource-observer.superchain.zora_transactions")],
# uncomment the following value to test
# max_objects_to_load=1,
),
deps=zora_transactions.assets,
)


Expand Down
29 changes: 27 additions & 2 deletions warehouse/oso_dagster/cbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# A poor excuse for a dbt replacement when calling sql as a library
import os
import arrow
from typing import List, Optional
from typing import List, Optional, Sequence
from dataclasses import dataclass
from enum import Enum
from functools import cache
Expand All @@ -18,8 +18,10 @@
)
from google.cloud.exceptions import NotFound
from jinja2 import Environment, FileSystemLoader, select_autoescape, meta
from sqlglot import expressions as exp

from .bq import BigQueryTableQueryHelper
from .bq import BigQueryTableQueryHelper, BigQueryConnector
from .context import DataContext, ContextQuery, Transformation


class UpdateStrategy(Enum):
Expand Down Expand Up @@ -82,6 +84,29 @@ def query(self, model_file: str, timeout: float = 300, **vars):
job = client.query(rendered)
return job.result()

# we should transition to this instead of using jinja
def query_with_sqlglot(
self,
query: ContextQuery,
transformations: Optional[Sequence[Transformation]] = None,
):
with self.bigquery.get_client() as client:
connector = BigQueryConnector(client)
context = DataContext(connector)
return context.execute_query(query, transformations)

def hybrid_query(
self,
model_file: str,
transformations: Optional[Sequence[Transformation]] = None,
**vars,
):
with self.bigquery.get_client() as client:
rendered = self.render_model(model_file, **vars)
connector = BigQueryConnector(client)
context = DataContext(connector)
return context.execute_query(rendered, transformations)

def transform(
self,
model_file: str,
Expand Down
36 changes: 35 additions & 1 deletion warehouse/oso_dagster/cbt/bq.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Query tools for bigquery tables
from typing import List
from typing import List, Dict, Optional
from functools import cache
from dataclasses import dataclass

from google.cloud.bigquery import Client, Table, TableReference
from google.cloud.bigquery.table import RowIterator
from sqlglot import expressions as exp
from .context import Connector, ColumnList


class TableLoader:
Expand Down Expand Up @@ -110,3 +113,34 @@ def update_columns_with(
lambda c: f"`{self_prefix}`.`{c}` = `{other_prefix}`.`{c}`", ordered_columns
)
return ", ".join(set_columns)


class BigQueryConnector(Connector[RowIterator]):
dialect = "bigquery"

def __init__(self, bq: Client):
self._bq = bq
self._table_columns: Optional[ColumnList] = None

def get_table_columns(self, table: exp.Table) -> ColumnList:
project = table.catalog or self._bq.project

return self._cached_get_table_columns(project, table.db, table.name)

@cache
def _cached_get_table_columns(
self, project: str, dataset: str, table_name: str
) -> ColumnList:

column_list_query = f"""
SELECT column_name, data_type
FROM `{project}`.`{dataset}`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '{table_name}'
"""

result = self._bq.query_and_wait(column_list_query)
return list(result)

def execute_expression(self, exp: exp.Expression):
query = exp.sql(self.dialect)
return self._bq.query_and_wait(query)
110 changes: 110 additions & 0 deletions warehouse/oso_dagster/cbt/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from functools import cache
from typing import Callable, TypeVar, List, Sequence, Tuple, Optional, cast
from collections import OrderedDict

import sqlglot as sql
from sqlglot import expressions as exp


T = TypeVar("T")
ColumnList = List[Tuple[str, str]]


class Connector[T]:
dialect: str

def get_table_columns(self, table: exp.Table) -> ColumnList:
raise NotImplementedError()

def execute_expression(self, exp: exp.Expression) -> T:
raise NotImplementedError()


ContextQuery = Callable[["DataContext"], exp.Expression]
Transformation = Callable[[ContextQuery], ContextQuery]


class Columns:
def __init__(self, column_list: ColumnList):
self._column_list = column_list
self._column_dict = OrderedDict(self._column_list)

def as_dict(self) -> OrderedDict:
return self._column_dict

def __contains__(self, key):
return key in self.as_dict()

def __iter__(self):
for col in self._column_list:
yield col


class DataContext[T]:
def __init__(self, connector: Connector[T]):
self._connector = connector

def get_table_columns(self, table: exp.Table) -> Columns:
return self._connector.get_table_columns(table)

def get_table_columns_from_str(self, table_str: str):
table_expr = sql.to_table(
table_str,
dialect=self._connector.dialect,
)
return self.get_table_columns(table_expr)

def transform_query(
self,
query: ContextQuery | str | exp.Expression,
transformations: Optional[Sequence[Transformation]] = None,
):
transformations = transformations or []
if type(query) == str:
query = context_query_from_str(query)
elif isinstance(query, exp.Expression):
query = context_query_from_expr(query)
for transformation in transformations:
query = transformation(query)
query = cast(ContextQuery, query)
return query(self)

def execute_query(
self,
query: ContextQuery | str | exp.Expression,
transformations: Optional[Sequence[Transformation]] = None,
) -> T:
exp = self.transform_query(query, transformations)
print(exp.sql())
return self._connector.execute_expression(exp)

# def examine_query(self, query: ContextQuery):
# result = executor.execute(exp, self.schema)
# result.columns


def context_query_from_str(s: str) -> ContextQuery:
def _context(_ctx: DataContext):
return sql.parse_one(s)

return _context


def context_query_from_expr(e: exp.Expression) -> ContextQuery:
def _context(_ctx: DataContext):
return e

return _context


def wrap_basic_transform(
transform: Callable[[exp.Expression], exp.Expression]
) -> Transformation:
def _transform(query: ContextQuery) -> ContextQuery:
def _cq(ctx: DataContext):
expression = query(ctx)
return expression.transform(transform)

return _cq

return _transform
Loading

0 comments on commit ade9548

Please sign in to comment.