From 216d39fe3f6cc98c635011fda4ca438a7417c26b Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Sun, 15 Oct 2023 17:39:50 +0300 Subject: [PATCH 01/13] Initial blockscout extractor implementation --- eth-bytecode-db-extractors/Cargo.toml | 3 + .../blockscout-entity/Cargo.toml | 11 + .../src/contract_addresses.rs | 38 ++ .../blockscout-entity/src/contract_details.rs | 52 +++ .../blockscout-entity/src/lib.rs | 7 + .../blockscout-entity/src/prelude.rs | 5 + .../src/sea_orm_active_enums.rs | 26 ++ .../blockscout-migration/Cargo.toml | 22 ++ .../blockscout-migration/README.md | 41 ++ .../blockscout-migration/src/lib.rs | 44 +++ .../src/m20230426_170496_create_functions.rs | 28 ++ .../m20230426_170508_create_language_enum.rs | 25 ++ .../m20230426_170520_create_status_enum.rs | 26 ++ ..._170541_create_contract_addresses_table.rs | 44 +++ ...26_170553_create_contract_details_table.rs | 46 +++ .../blockscout-migration/src/main.rs | 6 + .../blockscout/.gitignore | 1 + .../blockscout/Cargo.toml | 35 ++ .../blockscout/src/blockscout.rs | 374 ++++++++++++++++++ .../blockscout/src/client.rs | 374 ++++++++++++++++++ .../blockscout/src/eth_bytecode_db.rs | 120 ++++++ .../blockscout/src/lib.rs | 7 + .../blockscout/src/main.rs | 51 +++ .../blockscout/src/settings.rs | 104 +++++ 24 files changed, 1490 insertions(+) create mode 100644 eth-bytecode-db-extractors/blockscout-entity/Cargo.toml create mode 100644 eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs create mode 100644 eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs create mode 100644 eth-bytecode-db-extractors/blockscout-entity/src/lib.rs create mode 100644 eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs create mode 100644 eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/Cargo.toml create mode 100644 eth-bytecode-db-extractors/blockscout-migration/README.md create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/lib.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170496_create_functions.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170508_create_language_enum.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs create mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/main.rs create mode 100644 eth-bytecode-db-extractors/blockscout/.gitignore create mode 100644 eth-bytecode-db-extractors/blockscout/Cargo.toml create mode 100644 eth-bytecode-db-extractors/blockscout/src/blockscout.rs create mode 100644 eth-bytecode-db-extractors/blockscout/src/client.rs create mode 100644 eth-bytecode-db-extractors/blockscout/src/eth_bytecode_db.rs create mode 100644 eth-bytecode-db-extractors/blockscout/src/lib.rs create mode 100644 eth-bytecode-db-extractors/blockscout/src/main.rs create mode 100644 eth-bytecode-db-extractors/blockscout/src/settings.rs diff --git a/eth-bytecode-db-extractors/Cargo.toml b/eth-bytecode-db-extractors/Cargo.toml index c928908a1..841ffa829 100644 --- a/eth-bytecode-db-extractors/Cargo.toml +++ b/eth-bytecode-db-extractors/Cargo.toml @@ -1,6 +1,9 @@ [workspace] resolver = "2" members = [ + "blockscout", + "blockscout-entity", + "blockscout-migration", "smart-contract-fiesta", "smart-contract-fiesta-entity", "smart-contract-fiesta-migration", diff --git a/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml b/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml new file mode 100644 index 000000000..9d95a9e98 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "blockscout-entity" +version = "0.1.0" +edition = "2021" + +[lib] +name = "entity" +path = "src/lib.rs" + +[dependencies] +sea-orm = { version = "^0" } \ No newline at end of file diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs new file mode 100644 index 000000000..6865b818c --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs @@ -0,0 +1,38 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use super::sea_orm_active_enums::{Language, Status}; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "contract_addresses")] +pub struct Model { + #[sea_orm( + primary_key, + auto_increment = false, + column_type = "Binary(BlobSize::Blob(None))" + )] + pub contract_address: Vec, + #[sea_orm(primary_key, auto_increment = false)] + pub chain_id: Decimal, + pub created_at: DateTime, + pub modified_at: DateTime, + pub verified_at: DateTimeWithTimeZone, + pub language: Language, + pub compiler_version: String, + pub status: Status, + pub log: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::contract_details::Entity")] + ContractDetails, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::ContractDetails.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs new file mode 100644 index 000000000..c6b81d9f7 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs @@ -0,0 +1,52 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "contract_details")] +pub struct Model { + pub created_at: DateTime, + pub modified_at: DateTime, + #[sea_orm( + primary_key, + auto_increment = false, + column_type = "Binary(BlobSize::Blob(None))" + )] + pub contract_address: Vec, + #[sea_orm(primary_key, auto_increment = false)] + pub chain_id: Decimal, + #[sea_orm(column_type = "JsonBinary")] + pub sources: Json, + #[sea_orm(column_type = "JsonBinary")] + pub settings: Json, + #[sea_orm(column_type = "Binary(BlobSize::Blob(None))", nullable)] + pub creation_code: Option>, + #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")] + pub runtime_code: Vec, + #[sea_orm(column_type = "Binary(BlobSize::Blob(None))", nullable)] + pub transaction_hash: Option>, + pub block_number: Decimal, + pub transaction_index: Option, + #[sea_orm(column_type = "Binary(BlobSize::Blob(None))", nullable)] + pub deployer: Option>, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::contract_addresses::Entity", + from = "Column::ContractAddress", + to = "super::contract_addresses::Column::ChainId", + on_update = "NoAction", + on_delete = "NoAction" + )] + ContractAddresses, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::ContractAddresses.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs b/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs new file mode 100644 index 000000000..bb13562fa --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs @@ -0,0 +1,7 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +pub mod prelude; + +pub mod contract_addresses; +pub mod contract_details; +pub mod sea_orm_active_enums; diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs b/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs new file mode 100644 index 000000000..c30e3bab7 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs @@ -0,0 +1,5 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +pub use super::{ + contract_addresses::Entity as ContractAddresses, contract_details::Entity as ContractDetails, +}; diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs b/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs new file mode 100644 index 000000000..bf8f359fa --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs @@ -0,0 +1,26 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use sea_orm::entity::prelude::*; + +#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "language")] +pub enum Language { + #[sea_orm(string_value = "solidity")] + Solidity, + #[sea_orm(string_value = "vyper")] + Vyper, + #[sea_orm(string_value = "yul")] + Yul, +} +#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "status")] +pub enum Status { + #[sea_orm(string_value = "error")] + Error, + #[sea_orm(string_value = "in_process")] + InProcess, + #[sea_orm(string_value = "success")] + Success, + #[sea_orm(string_value = "waiting")] + Waiting, +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml b/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml new file mode 100644 index 000000000..9d467b9e1 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "blockscout-migration" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +name = "migration" +path = "src/lib.rs" + +[dependencies] +async-std = { version = "1", features = ["attributes", "tokio1"] } + +[dependencies.sea-orm-migration] +version = "^0" +features = [ + # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. + # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. + # e.g. + "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature + "sqlx-postgres", # `DATABASE_DRIVER` feature +] diff --git a/eth-bytecode-db-extractors/blockscout-migration/README.md b/eth-bytecode-db-extractors/blockscout-migration/README.md new file mode 100644 index 000000000..b3ea53eb4 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/README.md @@ -0,0 +1,41 @@ +# Running Migrator CLI + +- Generate a new migration file + ```sh + cargo run -- migrate generate MIGRATION_NAME + ``` +- Apply all pending migrations + ```sh + cargo run + ``` + ```sh + cargo run -- up + ``` +- Apply first 10 pending migrations + ```sh + cargo run -- up -n 10 + ``` +- Rollback last applied migrations + ```sh + cargo run -- down + ``` +- Rollback last 10 applied migrations + ```sh + cargo run -- down -n 10 + ``` +- Drop all tables from the database, then reapply all migrations + ```sh + cargo run -- fresh + ``` +- Rollback all applied migrations, then reapply all migrations + ```sh + cargo run -- refresh + ``` +- Rollback all applied migrations + ```sh + cargo run -- reset + ``` +- Check the status of all migrations + ```sh + cargo run -- status + ``` diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs b/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs new file mode 100644 index 000000000..6dfc6d7bd --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs @@ -0,0 +1,44 @@ +pub use sea_orm_migration::prelude::*; +use sea_orm_migration::sea_orm::{Statement, TransactionTrait}; + +mod m20230426_170496_create_functions; +mod m20230426_170508_create_language_enum; +mod m20230426_170520_create_status_enum; +mod m20230426_170541_create_contract_addresses_table; +mod m20230426_170553_create_contract_details_table; + +pub struct Migrator; + +#[async_trait::async_trait] +impl MigratorTrait for Migrator { + fn migrations() -> Vec> { + vec![ + Box::new(m20230426_170496_create_functions::Migration), + Box::new(m20230426_170508_create_language_enum::Migration), + Box::new(m20230426_170520_create_status_enum::Migration), + Box::new(m20230426_170541_create_contract_addresses_table::Migration), + Box::new(m20230426_170553_create_contract_details_table::Migration), + ] + } +} + +pub async fn from_statements( + manager: &SchemaManager<'_>, + statements: &[&str], +) -> Result<(), DbErr> { + let txn = manager.get_connection().begin().await?; + for statement in statements { + txn.execute(Statement::from_string( + manager.get_database_backend(), + statement.to_string(), + )) + .await + .map_err(|err| DbErr::Migration(format!("{err}\nQuery: {statement}")))?; + } + txn.commit().await +} + +pub async fn from_sql(manager: &SchemaManager<'_>, content: &str) -> Result<(), DbErr> { + let statements: Vec<&str> = content.split(';').collect(); + from_statements(manager, statements.as_slice()).await +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170496_create_functions.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170496_create_functions.rs new file mode 100644 index 000000000..47150ec13 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170496_create_functions.rs @@ -0,0 +1,28 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let set_modified_at = r#" + CREATE OR REPLACE FUNCTION set_modified_at() + RETURNS TRIGGER AS + $$ + BEGIN + NEW.modified_at = now(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + "#; + + crate::from_statements(manager, &[set_modified_at]).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let set_modified_at = r#"DROP FUNCTION "set_modified_at";"#; + + crate::from_statements(manager, &[set_modified_at]).await + } +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170508_create_language_enum.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170508_create_language_enum.rs new file mode 100644 index 000000000..e961015c9 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170508_create_language_enum.rs @@ -0,0 +1,25 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#" + CREATE TYPE "language" AS ENUM ( + 'solidity', + 'yul', + 'vyper' + ); + "#; + + crate::from_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#"DROP TYPE "language";"#; + + crate::from_sql(manager, sql).await + } +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs new file mode 100644 index 000000000..d847cc020 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs @@ -0,0 +1,26 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#" + CREATE TYPE "status" AS ENUM ( + 'waiting', + 'in_process', + 'success', + 'error' + ); + "#; + + crate::from_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#"DROP TYPE "status";"#; + + crate::from_sql(manager, sql).await + } +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs new file mode 100644 index 000000000..c8132fc00 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs @@ -0,0 +1,44 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#" + CREATE TABLE "contract_addresses" ( + "contract_address" bytea NOT NULL, + "chain_id" numeric NOT NULL, + + "created_at" timestamp NOT NULL DEFAULT (now()), + "modified_at" timestamp NOT NULL DEFAULT (now()), + + "verified_at" timestamptz NOT NULL, + "language" language NOT NULL, + "compiler_version" varchar NOT NULL, + + "status" status NOT NULL DEFAULT 'waiting', + "log" varchar, + + PRIMARY KEY ("contract_address", "chain_id") + ); + + CREATE TRIGGER trigger_set_modified_at + BEFORE INSERT ON contract_addresses + FOR EACH ROW + EXECUTE FUNCTION set_modified_at(); + "#; + + crate::from_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#" + DROP TRIGGER trigger_set_modified_at ON contract_addresses; + DROP TABLE contract_addresses; + "#; + + crate::from_sql(manager, sql).await + } +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs new file mode 100644 index 000000000..7f5d0dc58 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs @@ -0,0 +1,46 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#" + CREATE TABLE "contract_details" ( + "created_at" timestamp NOT NULL DEFAULT (now()), + "modified_at" timestamp NOT NULL DEFAULT (now()), + + "contract_address" bytea NOT NULL, + "chain_id" numeric NOT NULL, + + "sources" jsonb NOT NULL, + "settings" jsonb NOT NULL, + + "creation_code" bytea, + "runtime_code" bytea NOT NULL, + "transaction_hash" bytea, + "block_number" numeric NOT NULL, + "transaction_index" numeric, + "deployer" bytea, + + PRIMARY KEY ("contract_address", "chain_id") + ); + + ALTER TABLE "contract_details" ADD FOREIGN KEY ("contract_address", "chain_id") REFERENCES "contract_addresses" ("contract_address", "chain_id"); + + CREATE TRIGGER trigger_set_modified_at + BEFORE INSERT ON contract_details + FOR EACH ROW + EXECUTE FUNCTION set_modified_at(); + "#; + + crate::from_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = r#"DROP TABLE contract_details;"#; + + crate::from_sql(manager, sql).await + } +} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/main.rs b/eth-bytecode-db-extractors/blockscout-migration/src/main.rs new file mode 100644 index 000000000..c6b6e48db --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-migration/src/main.rs @@ -0,0 +1,6 @@ +use sea_orm_migration::prelude::*; + +#[async_std::main] +async fn main() { + cli::run_cli(migration::Migrator).await; +} diff --git a/eth-bytecode-db-extractors/blockscout/.gitignore b/eth-bytecode-db-extractors/blockscout/.gitignore new file mode 100644 index 000000000..a9d80f558 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/.gitignore @@ -0,0 +1 @@ +dataset/* diff --git a/eth-bytecode-db-extractors/blockscout/Cargo.toml b/eth-bytecode-db-extractors/blockscout/Cargo.toml new file mode 100644 index 000000000..30a35c0bc --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "blockscout" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +blockscout-entity = { path = "../blockscout-entity" } +blockscout-migration = { path = "../blockscout-migration" } + +anyhow = "1.0.70" +async-trait = "0.1" +blockscout-display-bytes = "1.0.0" +blockscout-service-launcher = { version = "0.8.0", features = ["database-0_12"] } +chrono = "0.4.31" +config = "0.13.3" +eth-bytecode-db-proto = { git = "https://github.com/blockscout/blockscout-rs", branch = "rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion" } +futures = "0.3" +governor = "0.5.1" +reqwest-rate-limiter = { git = "https://github.com/blockscout/blockscout-rs", rev = "edb610b" } +reqwest = "0.11" +reqwest-middleware = "0.2.3" +reqwest-retry = "0.2.3" +scraper = "0.17.1" +sea-orm = "0.12.3" +sea-orm-migration = "0.12.3" +serde = "1.0.160" +serde_json = "1.0.96" +serde_path_to_error = "0.1.14" +task-local-extensions = "0.1.4" +tokio = { version = "1", features = [ "rt-multi-thread", "macros", "fs" ] } +tokio-stream = "0.1.14" +tracing = "0.1.37" +url = "2.3.1" \ No newline at end of file diff --git a/eth-bytecode-db-extractors/blockscout/src/blockscout.rs b/eth-bytecode-db-extractors/blockscout/src/blockscout.rs new file mode 100644 index 000000000..c78108e77 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/src/blockscout.rs @@ -0,0 +1,374 @@ +use anyhow::Context; +use blockscout_display_bytes::Bytes; +use governor::{Quota, RateLimiter}; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_rate_limiter::RateLimiterMiddleware; +use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; +use serde::{de::DeserializeOwned, Deserialize}; +use std::{collections::BTreeMap, num::NonZeroU32, str::FromStr}; +use url::Url; + +#[derive(Clone)] +pub struct Client { + blockscout_base_url: Url, + request_client: ClientWithMiddleware, +} + +pub struct ContractDetails { + pub creation_code: Option>, + pub runtime_code: Vec, + + pub transaction_hash: Vec, + pub block_number: u64, + pub transaction_index: Option, + pub deployer: Vec, + + pub sources: serde_json::Value, + pub settings: serde_json::Value, +} + +impl Client { + pub fn try_new( + blockscout_base_url: String, + limit_requests_per_second: u32, + ) -> anyhow::Result { + let blockscout_base_url = + Url::from_str(&blockscout_base_url).context("invalid blockscout base url")?; + let max_burst = NonZeroU32::new(limit_requests_per_second) + .ok_or_else(|| anyhow::anyhow!("invalid limit requests per second"))?; + + let rate_limiter = RateLimiter::direct(Quota::per_second(max_burst)); + + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = ClientBuilder::new(reqwest::Client::new()) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .with(RateLimiterMiddleware::new(rate_limiter)) + .build(); + + Ok(Self { + blockscout_base_url, + request_client: client, + }) + } + + pub async fn get_verified_contracts( + &self, + ) -> anyhow::Result { + verified_contracts::VerifiedContractsIterator::new(self.clone()).await + } + + pub async fn get_contract_details( + &self, + contract_address: Bytes, + ) -> anyhow::Result { + // creation_code, runtime_code, transaction_hash, block_number, transaction_index, deployer + + let smart_contract_details = + smart_contracts::retrieve_smart_contract_details(self, contract_address.clone()) + .await + .context("get smart contract details failed")?; + let address_details = addresses::retrieve_address_details(self, contract_address) + .await + .context("get address details failed")?; + let transaction_details = transactions::retrieve_transaction_details( + self, + address_details.creation_tx_hash.clone(), + ) + .await + .context("get transaction details failed")?; + + let sources = + serde_json::to_value(smart_contracts::retrieve_sources(&smart_contract_details)) + .unwrap(); + + Ok(ContractDetails { + creation_code: smart_contract_details.creation_bytecode.map(|v| v.to_vec()), + runtime_code: smart_contract_details.deployed_bytecode.to_vec(), + + transaction_hash: address_details.creation_tx_hash.to_vec(), + block_number: transaction_details.block, + transaction_index: None, + deployer: address_details.creator_address_hash.to_vec(), + + sources, + settings: smart_contract_details.compiler_settings, + }) + } + + async fn send_request(&self, url: Url) -> anyhow::Result { + let response = self + .request_client + .get(url) + .send() + .await + .context("sending request failed")?; + + // Continue only in case if request results is success + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "Invalid status code get as a result: {}", + response.status() + )); + } + + let result = response + .text() + .await + .context("deserializing response into string failed")?; + let jd = &mut serde_json::Deserializer::from_str(&result); + serde_path_to_error::deserialize(jd).context("deserializing response failed") + } +} + +mod verified_contracts { + use super::*; + + pub struct VerifiedContract { + pub address: Vec, + pub verified_at: chrono::DateTime, + pub language: String, + pub compiler_version: String, + } + + /// Used as a return type from [`Client::get_verified_contracts`]. + /// Does not implement an `Iterator` trait due to internal async calls, + /// but provides a `next` function which could be used by the caller. + /// On each `next` call the request to blockscout server is made, + /// which returns the next 50 elements. + pub struct VerifiedContractsIterator { + client: Client, + url: Url, + next_page_params: Option, + items: Vec, + } + + impl VerifiedContractsIterator { + pub async fn new(client: Client) -> anyhow::Result { + let url = { + let path = "/api/v2/smart-contracts"; + let mut url = client.blockscout_base_url.clone(); + url.set_path(path); + url + }; + + let response = Self::load_next_page(&client, url.clone(), 0, None).await?; + + Ok(Self { + client, + url, + next_page_params: response.next_page_params, + items: Self::process_response_items(response.items), + }) + } + + /// Returns the next 50 verified contracts. The number of contracts may be less + /// if that is the last page. The next iteration will return `None` in that case. + pub async fn next_page(&mut self) -> anyhow::Result>> { + if self.items.is_empty() { + return Ok(None); + } + + let items = self.items.drain(..).collect::>(); + + if let Some(next_page_params) = &self.next_page_params { + let response = Self::load_next_page( + &self.client, + self.url.clone(), + next_page_params.items_count, + Some(next_page_params.smart_contract_id), + ) + .await?; + self.next_page_params = response.next_page_params; + self.items = Self::process_response_items(response.items); + } + + Ok(Some(items)) + } + + pub fn smart_contract_id(&self) -> Option { + self.next_page_params + .as_ref() + .map(|params| params.smart_contract_id) + } + + pub fn items_count(&self) -> Option { + self.next_page_params + .as_ref() + .map(|params| params.items_count) + } + + async fn load_next_page( + client: &Client, + mut url: Url, + items_count: usize, + smart_contract_id: Option, + ) -> anyhow::Result { + let query = { + let mut query = format!("items_count={items_count}"); + if let Some(smart_contract_id) = smart_contract_id { + query.push_str(&format!("&smart_contract_id={smart_contract_id}")); + } + query + }; + url.set_query(Some(&query)); + + client.send_request(url).await + } + + fn process_response_items(items: Vec) -> Vec { + items + .into_iter() + .map(|item| VerifiedContract { + address: item.address.hash.to_vec(), + verified_at: item.verified_at, + language: item.language, + compiler_version: item.compiler_version, + }) + .collect() + } + } + + #[derive(Debug, Deserialize)] + struct Response { + items: Vec, + next_page_params: Option, + } + + #[derive(Debug, Deserialize)] + struct NextPageParams { + items_count: usize, + smart_contract_id: usize, + } + + #[derive(Debug, Deserialize)] + struct Item { + address: Address, + language: String, + verified_at: chrono::DateTime, + compiler_version: String, + } + + #[derive(Debug, Deserialize)] + struct Address { + hash: Bytes, + } +} + +mod smart_contracts { + use super::*; + use serde::Serialize; + + pub async fn retrieve_smart_contract_details( + client: &Client, + address: Bytes, + ) -> anyhow::Result { + let url = { + let path = format!("/api/v2/smart-contracts/{address}"); + let mut url = client.blockscout_base_url.clone(); + url.set_path(&path); + url + }; + + client.send_request(url).await + } + + pub fn retrieve_sources(response: &Response) -> serde_json::Value { + #[derive(Debug, Serialize)] + struct Source<'a> { + content: &'a str, + } + + let mut sources = BTreeMap::new(); + sources.insert( + response.file_path.as_deref().unwrap_or(".sol"), + Source { + content: response.source_code.as_str(), + }, + ); + for additional_source in &response.additional_sources { + sources.insert( + additional_source.file_path.as_str(), + Source { + content: additional_source.source_code.as_str(), + }, + ); + } + serde_json::to_value(sources).unwrap() + } + + #[derive(Debug, Deserialize)] + pub struct Response { + pub verified_at: chrono::DateTime, + pub is_vyper_contract: bool, + pub optimization_enabled: Option, + pub optimization_runs: Option, + pub compiler_version: String, + // pub evm_version: Option, + pub source_code: String, + pub file_path: Option, + pub compiler_settings: serde_json::Value, + pub additional_sources: Vec, + pub deployed_bytecode: Bytes, + pub creation_bytecode: Option, + // pub external_libraries: Vec, + } + + #[derive(Debug, Deserialize)] + pub struct AdditionalSource { + file_path: String, + source_code: String, + } + + // #[derive(Debug, Deserialize)] + // pub struct ExternalLibrary { + // name: String, + // address_hash: Bytes, + // } +} + +mod addresses { + use super::*; + + pub async fn retrieve_address_details( + client: &Client, + address: Bytes, + ) -> anyhow::Result { + let url = { + let path = format!("/api/v2/addresses/{address}"); + let mut url = client.blockscout_base_url.clone(); + url.set_path(&path); + url + }; + + client.send_request(url).await + } + + #[derive(Debug, Deserialize)] + pub struct Response { + pub creator_address_hash: Bytes, + pub creation_tx_hash: Bytes, + } +} + +mod transactions { + use super::*; + + pub async fn retrieve_transaction_details( + client: &Client, + transaction_hash: Bytes, + ) -> anyhow::Result { + let url = { + let path = format!("/api/v2/transactions/{transaction_hash}"); + let mut url = client.blockscout_base_url.clone(); + url.set_path(&path); + url + }; + + client.send_request(url).await + } + + #[derive(Debug, Deserialize)] + pub struct Response { + pub block: u64, + } +} diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs new file mode 100644 index 000000000..898546ccf --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -0,0 +1,374 @@ +use crate::{blockscout, eth_bytecode_db}; +use anyhow::Context; +use blockscout_display_bytes::Bytes; +use entity::{contract_addresses, contract_details, sea_orm_active_enums}; +use eth_bytecode_db_proto::blockscout::eth_bytecode_db::v2::{ + BytecodeType, Source, VerificationMetadata, VerifySolidityStandardJsonRequest, + VerifyVyperStandardJsonRequest, +}; +use sea_orm::{ + sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ConnectionTrait, DatabaseBackend, + DatabaseConnection, DbErr, EntityTrait, Statement, +}; +use serde::Serialize; +use std::sync::Arc; + +macro_rules! process_result { + ( $result:expr, $self:expr, $contract_address:expr) => { + match $result { + Ok(res) => res, + Err(err) => { + tracing::warn!( + contract_address = $contract_address.to_string(), + error = format!("{err:#}"), + "Error processing contract" + ); + + contract_addresses::ActiveModel { + contract_address: Set($contract_address.to_vec()), + chain_id: Set($self.chain_id.into()), + status: Set(sea_orm_active_enums::Status::Error), + log: Set(Some(format!("{:#?}", err))), + ..Default::default() + } + .update($self.db_client.as_ref()) + .await + .context(format!( + "saving error details failed; contract={}, chain_id={}", + $contract_address, $self.chain_id, + ))?; + + continue; + } + } + }; +} + +#[derive(Clone)] +pub struct Client { + pub db_client: Arc, + pub chain_id: u64, + pub blockscout_client: blockscout::Client, + pub eth_bytecode_db_client: eth_bytecode_db::Client, +} + +impl Client { + pub fn try_new_arc( + db_client: Arc, + chain_id: u64, + blockscout_url: String, + eth_bytecode_db_url: String, + eth_bytecode_db_api_key: String, + limit_requests_per_second: u32, + ) -> anyhow::Result { + let blockscout_client = + blockscout::Client::try_new(blockscout_url, limit_requests_per_second)?; + + let eth_bytecode_db_client = + eth_bytecode_db::Client::try_new(eth_bytecode_db_url, eth_bytecode_db_api_key)?; + + let client = Self { + db_client, + chain_id, + blockscout_client, + eth_bytecode_db_client, + }; + + Ok(client) + } +} + +impl Client { + pub async fn import_contract_addresses(&self) -> anyhow::Result { + let mut verified_contracts = self + .blockscout_client + .get_verified_contracts() + .await + .context("get list of verified contracts")?; + + let mut processed = 0; + while let Some(items) = verified_contracts.next_page().await.context(format!( + "extracting contract addresses failed: items_count={:?}, smart_contract_id={:?}", + verified_contracts.items_count(), + verified_contracts.smart_contract_id() + ))? { + processed += items.len(); + if processed % 200 == 0 { + tracing::info!( + "Processed={processed}, next_page_smart_contract_id={:?}", + verified_contracts.smart_contract_id() + ); + } + + let address_models = items + .iter() + .map(|item| { + let language = match item.language.as_ref() { + "solidity" => sea_orm_active_enums::Language::Solidity, + "yul" => sea_orm_active_enums::Language::Yul, + "vyper" => sea_orm_active_enums::Language::Vyper, + language => return Err(anyhow::anyhow!("Invalid language: {language}")), + }; + Ok(contract_addresses::ActiveModel { + contract_address: Set(item.address.to_vec()), + chain_id: Set(self.chain_id.into()), + verified_at: Set(item.verified_at), + language: Set(language), + compiler_version: Set(item.compiler_version.clone()), + ..Default::default() + }) + }) + .collect::, _>>()?; + + match contract_addresses::Entity::insert_many(address_models) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .exec(self.db_client.as_ref()) + .await + { + Ok(_) => {} + Err(DbErr::RecordNotInserted) => { + tracing::info!("No records have been inserted. Stop dataset import"); + break; + } + Err(err) => { + return Err(err).context(format!( + "inserting contract addresses failed: items_count={:?}, smart_contract_id={:?}", + verified_contracts.items_count(), + verified_contracts.smart_contract_id() + )) + } + } + } + + Ok(processed) + } + + pub async fn verify_contracts(self) -> anyhow::Result { + let mut processed = 0; + while let Some(contract_model) = self.next_contract().await? { + processed += 1; + let contract_address = Bytes::from(contract_model.contract_address.clone()); + + tracing::info!( + contract_address = contract_address.to_string(), + "contract processed" + ); + + let contract_details_model = process_result!( + self.import_contract_details(contract_address.clone()).await, + &self, + contract_address + ); + + let source = process_result!( + self.verify_contract(contract_model, contract_details_model) + .await, + &self, + contract_address + ); + self.mark_as_success(contract_address, source).await?; + } + + Ok(processed) + } + + async fn import_contract_details( + &self, + contract_address: Bytes, + ) -> anyhow::Result { + let contract_details = self + .blockscout_client + .get_contract_details(contract_address.clone()) + .await + .context("getting contract details failed")?; + + let contract_details_model = contract_details::ActiveModel { + contract_address: Set(contract_address.to_vec()), + chain_id: Set(self.chain_id.into()), + sources: Set(contract_details.sources), + settings: Set(contract_details.settings), + creation_code: Set(contract_details.creation_code), + runtime_code: Set(contract_details.runtime_code), + transaction_hash: Set(Some(contract_details.transaction_hash)), + block_number: Set(contract_details.block_number.into()), + transaction_index: Set(contract_details.transaction_index.map(|index| index.into())), + deployer: Set(Some(contract_details.deployer)), + ..Default::default() + } + .insert(self.db_client.as_ref()) + .await + .context("updating contract_details model to insert contract details")?; + + Ok(contract_details_model) + } + + async fn verify_contract( + &self, + contract: contract_addresses::Model, + contract_details: contract_details::Model, + ) -> anyhow::Result { + let input = Self::generate_input(contract.language.clone(), &contract_details)?; + + let (bytecode, bytecode_type) = + if let Some(creation_code) = contract_details.creation_code.as_ref() { + (creation_code.clone(), BytecodeType::CreationInput) + } else { + ( + contract_details.runtime_code.clone(), + BytecodeType::DeployedBytecode, + ) + }; + + let vec_to_string = |vec: Vec| Bytes::from(vec).to_string(); + + let metadata = VerificationMetadata { + chain_id: Some(format!("{}", self.chain_id)), + contract_address: Some(vec_to_string(contract.contract_address)), + transaction_hash: contract_details.transaction_hash.map(vec_to_string), + block_number: Some(contract_details.block_number.try_into().unwrap()), + transaction_index: contract_details + .transaction_index + .map(|v| v.try_into().unwrap()), + deployer: contract_details.deployer.map(vec_to_string), + creation_code: contract_details.creation_code.map(vec_to_string), + runtime_code: Some(vec_to_string(contract_details.runtime_code)), + }; + + macro_rules! send_eth_bytecode_db_request { + ($request_type:tt, $verify:tt) => {{ + let request = $request_type { + bytecode: Bytes::from(bytecode).to_string(), + bytecode_type: bytecode_type.into(), + compiler_version: contract.compiler_version, + input: input.to_string(), + metadata: Some(metadata), + }; + + self.eth_bytecode_db_client.$verify(request).await + }}; + } + + let source = match contract.language { + sea_orm_active_enums::Language::Solidity | sea_orm_active_enums::Language::Yul => { + send_eth_bytecode_db_request!( + VerifySolidityStandardJsonRequest, + verify_solidity_standard_json + ) + } + sea_orm_active_enums::Language::Vyper => { + send_eth_bytecode_db_request!( + VerifyVyperStandardJsonRequest, + verify_vyper_standard_json + ) + } + } + .context("verify through eth_bytecode_db failed")?; + + Ok(source) + } + + fn generate_input( + language: sea_orm_active_enums::Language, + contract_details: &contract_details::Model, + ) -> anyhow::Result { + #[derive(Debug, Serialize)] + struct StandardJson { + language: String, + interfaces: Option, + sources: serde_json::Value, + settings: serde_json::Value, + } + + let (language, interfaces) = match language { + sea_orm_active_enums::Language::Solidity => ("Solidity", None), + sea_orm_active_enums::Language::Yul => ("Yul", None), + sea_orm_active_enums::Language::Vyper => ("Vyper", Some(serde_json::json!({}))), + }; + serde_json::to_value(StandardJson { + language: language.to_string(), + sources: contract_details.sources.clone(), + interfaces, + settings: contract_details.settings.clone(), + }) + .context("converting standard json to serde_json::value failed") + } + + async fn mark_as_success( + &self, + contract_address: Bytes, + source: eth_bytecode_db::Source, + ) -> anyhow::Result<()> { + contract_addresses::ActiveModel { + contract_address: Set(contract_address.to_vec()), + chain_id: Set(self.chain_id.into()), + status: Set(sea_orm_active_enums::Status::Success), + log: Set(Some( + serde_json::to_string(&source) + .context("serializing success result (source) failed")?, + )), + ..Default::default() + } + .update(self.db_client.as_ref()) + .await + .context(format!( + "saving success details failed for the contract {}", + contract_address, + ))?; + + Ok(()) + } + + async fn next_contract(&self) -> anyhow::Result> { + // Notice that we are looking only for contracts with given `chain_id` + let next_contract_address_sql = format!( + r#" + UPDATE contract_addresses + SET status = 'in_process' + WHERE contract_address = (SELECT contract_address + FROM contract_addresses + WHERE status = 'waiting' + AND chain_id = {} + LIMIT 1 FOR UPDATE SKIP LOCKED) + RETURNING contract_address; + "#, + self.chain_id + ); + + let next_contract_address_stmt = Statement::from_string( + DatabaseBackend::Postgres, + next_contract_address_sql.to_string(), + ); + + let next_contract_address = self + .db_client + .as_ref() + .query_one(next_contract_address_stmt) + .await + .context("querying for the next contract address")? + .map(|query_result| { + query_result + .try_get_by::, _>("contract_address") + .expect("error while try_get_by contract_address") + }); + + if let Some(contract_address) = next_contract_address { + let model = contract_addresses::Entity::find_by_id(( + contract_address.clone(), + self.chain_id.into(), + )) + .one(self.db_client.as_ref()) + .await + .expect("querying contract_address model failed") + .ok_or_else(|| { + anyhow::anyhow!( + "contract_address model does not exist for the contract: {}", + Bytes::from(contract_address), + ) + })?; + + return Ok(Some(model)); + } + + Ok(None) + } +} diff --git a/eth-bytecode-db-extractors/blockscout/src/eth_bytecode_db.rs b/eth-bytecode-db-extractors/blockscout/src/eth_bytecode_db.rs new file mode 100644 index 000000000..720765360 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/src/eth_bytecode_db.rs @@ -0,0 +1,120 @@ +pub use eth_bytecode_db_proto::blockscout::eth_bytecode_db::v2::{ + solidity_verifier_client::SolidityVerifierClient, verify_response, + vyper_verifier_client::VyperVerifierClient, BytecodeType, SearchSourcesRequest, + SearchSourcesResponse, Source, VerificationMetadata, VerifyResponse, + VerifySolidityMultiPartRequest, VerifySolidityStandardJsonRequest, VerifyVyperMultiPartRequest, +}; + +use anyhow::Context; +use eth_bytecode_db_proto::blockscout::eth_bytecode_db::v2::VerifyVyperStandardJsonRequest; +use serde::{de::DeserializeOwned, Serialize}; +use std::{str::FromStr, time::Duration}; +use url::Url; + +#[derive(Clone)] +pub struct Client { + url: Url, + api_key: String, + request_client: reqwest::Client, +} + +impl Client { + pub fn try_new(service_url: String, api_key: String) -> anyhow::Result { + let service_url = + Url::from_str(&service_url).context("invalid eth_bytecode_db service url")?; + + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(300)) + .build() + .unwrap(); + + Ok(Self { + url: service_url, + api_key, + request_client: client, + }) + } + + pub async fn verify_solidity_multi_part( + &self, + request: VerifySolidityMultiPartRequest, + ) -> anyhow::Result { + let path = "/api/v2/verifier/solidity/sources:verify-multi-part"; + Self::process_verification_response(self.send_request(path, request).await?) + } + + pub async fn verify_solidity_standard_json( + &self, + request: VerifySolidityStandardJsonRequest, + ) -> anyhow::Result { + let path = "/api/v2/verifier/solidity/sources:verify-standard-json"; + Self::process_verification_response(self.send_request(path, request).await?) + } + + pub async fn verify_vyper_multi_part( + &self, + request: VerifyVyperMultiPartRequest, + ) -> anyhow::Result { + let path = "/api/v2/verifier/vyper/sources:verify-multi-part"; + Self::process_verification_response(self.send_request(path, request).await?) + } + + pub async fn verify_vyper_standard_json( + &self, + request: VerifyVyperStandardJsonRequest, + ) -> anyhow::Result { + let path = "/api/v2/verifier/vyper/sources:verify-standard-json"; + Self::process_verification_response(self.send_request(path, request).await?) + } + + pub async fn search_sources( + &self, + request: SearchSourcesRequest, + ) -> anyhow::Result { + let path = "/api/v2/bytecodes/sources:search"; + self.send_request(path, request).await + } + + async fn send_request( + &self, + path: &str, + request: Request, + ) -> anyhow::Result { + let url = { + let mut url = self.url.clone(); + url.set_path(path); + url + }; + let response = self + .request_client + .post(url) + .json(&request) + .header("x-api-key", &self.api_key) + .send() + .await + .context("error sending request")?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "verification http request failed with the following status: {}, message: {:?}", + response.status(), + response.text().await + )); + } + + response + .json::() + .await + .context("verify response deserialization failed") + } + + fn process_verification_response(response: VerifyResponse) -> anyhow::Result { + if let verify_response::Status::Success = + verify_response::Status::from_i32(response.status).unwrap() + { + Ok(response.source.unwrap()) + } else { + Err(anyhow::anyhow!("verification failed: {}", response.message)) + } + } +} diff --git a/eth-bytecode-db-extractors/blockscout/src/lib.rs b/eth-bytecode-db-extractors/blockscout/src/lib.rs new file mode 100644 index 000000000..55f08af89 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/src/lib.rs @@ -0,0 +1,7 @@ +mod blockscout; +mod client; +mod eth_bytecode_db; +mod settings; + +pub use client::Client; +pub use settings::Settings; diff --git a/eth-bytecode-db-extractors/blockscout/src/main.rs b/eth-bytecode-db-extractors/blockscout/src/main.rs new file mode 100644 index 000000000..9e4cd32d4 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/src/main.rs @@ -0,0 +1,51 @@ +use anyhow::Context; +use blockscout::{Client, Settings}; +use blockscout_service_launcher as launcher; +use migration::Migrator; +// use smart_contract_fiesta::{database, dataset, Settings, VerificationClient}; +use std::sync::Arc; + +const SERVICE_NAME: &str = "blockscout-extractor"; + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + launcher::tracing::init_logs(SERVICE_NAME, &Default::default(), &Default::default()) + .context("tracing initialization")?; + + let settings = Settings::new().context("failed to read config")?; + + let mut connect_options = sea_orm::ConnectOptions::new(&settings.database_url); + connect_options.sqlx_logging_level(tracing::log::LevelFilter::Debug); + launcher::database::initialize_postgres::( + connect_options.clone(), + settings.create_database, + settings.run_migrations, + ) + .await?; + let db_connection = Arc::new(sea_orm::Database::connect(connect_options).await?); + + let client = Client::try_new_arc( + db_connection, + settings.chain_id, + settings.blockscout_url, + settings.eth_bytecode_db_url, + settings.eth_bytecode_db_api_key, + settings.limit_requests_per_second, + )?; + + tracing::info!("importing contract addresses started"); + let processed = client.import_contract_addresses().await?; + tracing::info!("importing contract addresses finished. Total processed contracts={processed}"); + + let mut handles = Vec::with_capacity(settings.n_threads); + for _ in 0..settings.n_threads { + let client = client.clone(); + let handle = tokio::spawn(client.verify_contracts()); + handles.push(handle); + } + for result in futures::future::join_all(handles).await { + result.context("join handle")?.context("verify contracts")?; + } + + Ok(()) +} diff --git a/eth-bytecode-db-extractors/blockscout/src/settings.rs b/eth-bytecode-db-extractors/blockscout/src/settings.rs new file mode 100644 index 000000000..5fb6ac1d4 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout/src/settings.rs @@ -0,0 +1,104 @@ +use config::{Config, File}; +use serde::{de, Deserialize}; + +/// Wrapper under [`serde::de::IgnoredAny`] which implements +/// [`PartialEq`] and [`Eq`] for fields to be ignored. +#[derive(Copy, Clone, Debug, Default, Deserialize)] +struct IgnoredAny(de::IgnoredAny); + +impl PartialEq for IgnoredAny { + fn eq(&self, _other: &Self) -> bool { + // We ignore that values, so they should not impact the equality + true + } +} + +impl Eq for IgnoredAny {} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct Settings { + pub database_url: String, + #[serde(default)] + pub create_database: bool, + #[serde(default)] + pub run_migrations: bool, + + #[serde(default = "default_chain_id")] + pub chain_id: u64, + + #[serde(default = "default_blockscout_url")] + pub blockscout_url: String, + #[serde(default = "default_limit_requests_per_second")] + pub limit_requests_per_second: u32, + + pub eth_bytecode_db_url: String, + pub eth_bytecode_db_api_key: String, + + #[serde(default = "default_n_threads")] + pub n_threads: usize, + + // Is required as we deny unknown fields, but allow users provide + // path to config through PREFIX__CONFIG env variable. If removed, + // the setup would fail with `unknown field `config`, expected one of...` + #[serde(default, rename = "config")] + config_path: IgnoredAny, +} + +// fn default_chain_id() -> u64 { +// 1 +// } + +// fn default_blockscout_url() -> String { +// "https://eth.blockscout.com".to_string() +// } + +fn default_chain_id() -> u64 { + 5 +} + +fn default_blockscout_url() -> String { + "https://eth-goerli.blockscout.com".to_string() +} + +fn default_limit_requests_per_second() -> u32 { + u32::MAX +} + +fn default_n_threads() -> usize { + 1 +} + +// fn default_search_n_threads() -> usize { +// 1 +// } + +impl Settings { + pub fn new() -> anyhow::Result { + let config_path = std::env::var("BLOCKSCOUT_EXTRACTOR__CONFIG"); + + let mut builder = Config::builder(); + if let Ok(config_path) = config_path { + builder = builder.add_source(File::with_name(&config_path)); + }; + // Use `__` so that it would be possible to address keys with underscores in names (e.g. `access_key`) + builder = builder + .add_source(config::Environment::with_prefix("BLOCKSCOUT_EXTRACTOR").separator("__")); + + let settings: Settings = builder.build()?.try_deserialize()?; + + settings.validate()?; + + Ok(settings) + } + + fn validate(&self) -> anyhow::Result<()> { + // if self.import_dataset && self.dataset.is_none() { + // return Err(anyhow::anyhow!( + // "`dataset` path should be specified if `import_dataset` is enabled" + // )); + // } + + Ok(()) + } +} From 73617b45fd1fe92a93e654c291068c68acb7aaeb Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Mon, 16 Oct 2023 21:15:47 +0300 Subject: [PATCH 02/13] Process inputs without settings --- .../blockscout-entity/src/contract_details.rs | 10 ++- ...26_170553_create_contract_details_table.rs | 8 +- .../blockscout/src/blockscout.rs | 75 ++++++++++++----- .../blockscout/src/client.rs | 84 +++++++++++++++---- 4 files changed, 137 insertions(+), 40 deletions(-) diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs index c6b81d9f7..3b8abbd73 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs @@ -17,8 +17,14 @@ pub struct Model { pub chain_id: Decimal, #[sea_orm(column_type = "JsonBinary")] pub sources: Json, - #[sea_orm(column_type = "JsonBinary")] - pub settings: Json, + #[sea_orm(column_type = "JsonBinary", nullable)] + pub settings: Option, + pub verified_via_sourcify: bool, + pub optimization_enabled: Option, + pub optimization_runs: Option, + pub evm_version: Option, + #[sea_orm(column_type = "JsonBinary", nullable)] + pub libraries: Option, #[sea_orm(column_type = "Binary(BlobSize::Blob(None))", nullable)] pub creation_code: Option>, #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")] diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs index 7f5d0dc58..d15fca70a 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170553_create_contract_details_table.rs @@ -15,7 +15,13 @@ impl MigrationTrait for Migration { "chain_id" numeric NOT NULL, "sources" jsonb NOT NULL, - "settings" jsonb NOT NULL, + "settings" jsonb, + + "verified_via_sourcify" bool NOT NULL DEFAULT false, + "optimization_enabled" bool, + "optimization_runs" bigint, + "evm_version" varchar, + "libraries" jsonb, "creation_code" bytea, "runtime_code" bytea NOT NULL, diff --git a/eth-bytecode-db-extractors/blockscout/src/blockscout.rs b/eth-bytecode-db-extractors/blockscout/src/blockscout.rs index c78108e77..c40d5dc5e 100644 --- a/eth-bytecode-db-extractors/blockscout/src/blockscout.rs +++ b/eth-bytecode-db-extractors/blockscout/src/blockscout.rs @@ -24,7 +24,13 @@ pub struct ContractDetails { pub deployer: Vec, pub sources: serde_json::Value, - pub settings: serde_json::Value, + pub settings: Option, + + pub verified_via_sourcify: bool, + pub optimization_enabled: Option, + pub optimization_runs: Option, + pub evm_version: Option, + pub libraries: Option, } impl Client { @@ -77,9 +83,11 @@ impl Client { .await .context("get transaction details failed")?; - let sources = - serde_json::to_value(smart_contracts::retrieve_sources(&smart_contract_details)) - .unwrap(); + let sources = smart_contracts::retrieve_sources(&smart_contract_details); + + let libraries = smart_contracts::parse_external_libraries( + smart_contract_details.external_libraries.clone(), + ); Ok(ContractDetails { creation_code: smart_contract_details.creation_bytecode.map(|v| v.to_vec()), @@ -92,6 +100,14 @@ impl Client { sources, settings: smart_contract_details.compiler_settings, + + verified_via_sourcify: smart_contract_details + .is_verified_via_sourcify + .unwrap_or_default(), + optimization_enabled: smart_contract_details.optimization_enabled, + optimization_runs: smart_contract_details.optimization_runs, + evm_version: smart_contract_details.evm_version, + libraries, }) } @@ -285,32 +301,45 @@ mod smart_contracts { content: response.source_code.as_str(), }, ); - for additional_source in &response.additional_sources { - sources.insert( - additional_source.file_path.as_str(), - Source { - content: additional_source.source_code.as_str(), - }, - ); + if let Some(additional_sources) = response.additional_sources.as_ref() { + for additional_source in additional_sources { + sources.insert( + additional_source.file_path.as_str(), + Source { + content: additional_source.source_code.as_str(), + }, + ); + } } serde_json::to_value(sources).unwrap() } + pub fn parse_external_libraries(libraries: Vec) -> Option { + if libraries.is_empty() { + return None; + } + + Some(serde_json::to_value(libraries).unwrap()) + } + #[derive(Debug, Deserialize)] pub struct Response { pub verified_at: chrono::DateTime, - pub is_vyper_contract: bool, - pub optimization_enabled: Option, - pub optimization_runs: Option, + pub compiler_version: String, - // pub evm_version: Option, pub source_code: String, pub file_path: Option, - pub compiler_settings: serde_json::Value, - pub additional_sources: Vec, + pub compiler_settings: Option, + pub additional_sources: Option>, pub deployed_bytecode: Bytes, pub creation_bytecode: Option, - // pub external_libraries: Vec, + + pub is_vyper_contract: bool, + pub is_verified_via_sourcify: Option, + pub optimization_enabled: Option, + pub optimization_runs: Option, + pub evm_version: Option, + pub external_libraries: Vec, } #[derive(Debug, Deserialize)] @@ -319,11 +348,11 @@ mod smart_contracts { source_code: String, } - // #[derive(Debug, Deserialize)] - // pub struct ExternalLibrary { - // name: String, - // address_hash: Bytes, - // } + #[derive(Debug, Clone, Deserialize, Serialize)] + pub struct ExternalLibrary { + name: String, + address_hash: Bytes, + } } mod addresses { diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index 898546ccf..326f1d287 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -44,6 +44,15 @@ macro_rules! process_result { }; } +#[derive(Debug, Serialize)] +struct StandardJson { + language: String, + #[serde(skip_serializing_if = "Option::is_none")] + interfaces: Option, + sources: serde_json::Value, + settings: serde_json::Value, +} + #[derive(Clone)] pub struct Client { pub db_client: Arc, @@ -187,6 +196,11 @@ impl Client { chain_id: Set(self.chain_id.into()), sources: Set(contract_details.sources), settings: Set(contract_details.settings), + verified_via_sourcify: Set(contract_details.verified_via_sourcify), + optimization_enabled: Set(contract_details.optimization_enabled), + optimization_runs: Set(contract_details.optimization_runs), + evm_version: Set(contract_details.evm_version), + libraries: Set(contract_details.libraries), creation_code: Set(contract_details.creation_code), runtime_code: Set(contract_details.runtime_code), transaction_hash: Set(Some(contract_details.transaction_hash)), @@ -207,7 +221,13 @@ impl Client { contract: contract_addresses::Model, contract_details: contract_details::Model, ) -> anyhow::Result { - let input = Self::generate_input(contract.language.clone(), &contract_details)?; + let input = if contract_details.verified_via_sourcify { + self.generate_input_from_sourcify().await? + } else if let Some(_libraries) = contract_details.libraries { + Self::generate_input_with_libraries()? + } else { + Self::generate_input(contract.language.clone(), &contract_details)? + }; let (bytecode, bytecode_type) = if let Some(creation_code) = contract_details.creation_code.as_ref() { @@ -240,7 +260,8 @@ impl Client { bytecode: Bytes::from(bytecode).to_string(), bytecode_type: bytecode_type.into(), compiler_version: contract.compiler_version, - input: input.to_string(), + input: serde_json::to_string(&input) + .context("serializing standard json input failed")?, metadata: Some(metadata), }; @@ -270,27 +291,62 @@ impl Client { fn generate_input( language: sea_orm_active_enums::Language, contract_details: &contract_details::Model, - ) -> anyhow::Result { - #[derive(Debug, Serialize)] - struct StandardJson { - language: String, - interfaces: Option, - sources: serde_json::Value, - settings: serde_json::Value, - } - + ) -> anyhow::Result { let (language, interfaces) = match language { sea_orm_active_enums::Language::Solidity => ("Solidity", None), sea_orm_active_enums::Language::Yul => ("Yul", None), sea_orm_active_enums::Language::Vyper => ("Vyper", Some(serde_json::json!({}))), }; - serde_json::to_value(StandardJson { + + let settings = if let Some(settings) = &contract_details.settings { + settings.clone() + } else { + #[derive(Debug, Serialize)] + #[serde(rename_all = "camelCase")] + struct Settings { + pub optimizer: Optimizer, + pub evm_version: Option, + } + + #[derive(Debug, Serialize)] + #[serde(rename_all = "camelCase")] + struct Optimizer { + pub enabled: Option, + pub runs: Option, + } + + let settings = Settings { + optimizer: Optimizer { + enabled: contract_details.optimization_enabled, + runs: contract_details.optimization_runs, + }, + evm_version: contract_details + .evm_version + .clone() + .filter(|v| v != "default"), + }; + + serde_json::to_value(settings).unwrap() + }; + + Ok(StandardJson { language: language.to_string(), sources: contract_details.sources.clone(), interfaces, - settings: contract_details.settings.clone(), + settings, }) - .context("converting standard json to serde_json::value failed") + } + + fn generate_input_with_libraries() -> anyhow::Result { + Err(anyhow::anyhow!( + "Input generation for sources with libraries is not implemented yet" + )) + } + + async fn generate_input_from_sourcify(&self) -> anyhow::Result { + Err(anyhow::anyhow!( + "Input generation from sourcify is not implemented yet" + )) } async fn mark_as_success( From e133daee7a20573f638b7ef9bb03d091eee64712 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Mon, 16 Oct 2023 23:10:37 +0300 Subject: [PATCH 03/13] Update launcher to v0.9.0 --- .../blockscout/Cargo.toml | 2 +- .../blockscout/src/client.rs | 6 +- .../blockscout/src/main.rs | 13 ++--- .../blockscout/src/settings.rs | 56 ++----------------- 4 files changed, 14 insertions(+), 63 deletions(-) diff --git a/eth-bytecode-db-extractors/blockscout/Cargo.toml b/eth-bytecode-db-extractors/blockscout/Cargo.toml index 30a35c0bc..61c352e25 100644 --- a/eth-bytecode-db-extractors/blockscout/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout/Cargo.toml @@ -12,7 +12,7 @@ blockscout-migration = { path = "../blockscout-migration" } anyhow = "1.0.70" async-trait = "0.1" blockscout-display-bytes = "1.0.0" -blockscout-service-launcher = { version = "0.8.0", features = ["database-0_12"] } +blockscout-service-launcher = { version = "0.9.0", features = ["database-0_12"] } chrono = "0.4.31" config = "0.13.3" eth-bytecode-db-proto = { git = "https://github.com/blockscout/blockscout-rs", branch = "rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion" } diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index 326f1d287..ff246e647 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -62,8 +62,8 @@ pub struct Client { } impl Client { - pub fn try_new_arc( - db_client: Arc, + pub fn try_new( + db_client: DatabaseConnection, chain_id: u64, blockscout_url: String, eth_bytecode_db_url: String, @@ -77,7 +77,7 @@ impl Client { eth_bytecode_db::Client::try_new(eth_bytecode_db_url, eth_bytecode_db_api_key)?; let client = Self { - db_client, + db_client: Arc::new(db_client), chain_id, blockscout_client, eth_bytecode_db_client, diff --git a/eth-bytecode-db-extractors/blockscout/src/main.rs b/eth-bytecode-db-extractors/blockscout/src/main.rs index 9e4cd32d4..fc31146c8 100644 --- a/eth-bytecode-db-extractors/blockscout/src/main.rs +++ b/eth-bytecode-db-extractors/blockscout/src/main.rs @@ -1,9 +1,7 @@ use anyhow::Context; use blockscout::{Client, Settings}; -use blockscout_service_launcher as launcher; +use blockscout_service_launcher::{self as launcher, launcher::ConfigSettings}; use migration::Migrator; -// use smart_contract_fiesta::{database, dataset, Settings, VerificationClient}; -use std::sync::Arc; const SERVICE_NAME: &str = "blockscout-extractor"; @@ -12,19 +10,18 @@ async fn main() -> Result<(), anyhow::Error> { launcher::tracing::init_logs(SERVICE_NAME, &Default::default(), &Default::default()) .context("tracing initialization")?; - let settings = Settings::new().context("failed to read config")?; + let settings = Settings::build().context("failed to read config")?; let mut connect_options = sea_orm::ConnectOptions::new(&settings.database_url); connect_options.sqlx_logging_level(tracing::log::LevelFilter::Debug); - launcher::database::initialize_postgres::( - connect_options.clone(), + let db_connection = launcher::database::initialize_postgres::( + connect_options, settings.create_database, settings.run_migrations, ) .await?; - let db_connection = Arc::new(sea_orm::Database::connect(connect_options).await?); - let client = Client::try_new_arc( + let client = Client::try_new( db_connection, settings.chain_id, settings.blockscout_url, diff --git a/eth-bytecode-db-extractors/blockscout/src/settings.rs b/eth-bytecode-db-extractors/blockscout/src/settings.rs index 5fb6ac1d4..9c94397a6 100644 --- a/eth-bytecode-db-extractors/blockscout/src/settings.rs +++ b/eth-bytecode-db-extractors/blockscout/src/settings.rs @@ -1,19 +1,5 @@ -use config::{Config, File}; -use serde::{de, Deserialize}; - -/// Wrapper under [`serde::de::IgnoredAny`] which implements -/// [`PartialEq`] and [`Eq`] for fields to be ignored. -#[derive(Copy, Clone, Debug, Default, Deserialize)] -struct IgnoredAny(de::IgnoredAny); - -impl PartialEq for IgnoredAny { - fn eq(&self, _other: &Self) -> bool { - // We ignore that values, so they should not impact the equality - true - } -} - -impl Eq for IgnoredAny {} +use blockscout_service_launcher::launcher; +use serde::Deserialize; #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(deny_unknown_fields)] @@ -37,12 +23,10 @@ pub struct Settings { #[serde(default = "default_n_threads")] pub n_threads: usize, +} - // Is required as we deny unknown fields, but allow users provide - // path to config through PREFIX__CONFIG env variable. If removed, - // the setup would fail with `unknown field `config`, expected one of...` - #[serde(default, rename = "config")] - config_path: IgnoredAny, +impl launcher::ConfigSettings for Settings { + const SERVICE_NAME: &'static str = "BLOCKSCOUT_EXTRACTOR"; } // fn default_chain_id() -> u64 { @@ -72,33 +56,3 @@ fn default_n_threads() -> usize { // fn default_search_n_threads() -> usize { // 1 // } - -impl Settings { - pub fn new() -> anyhow::Result { - let config_path = std::env::var("BLOCKSCOUT_EXTRACTOR__CONFIG"); - - let mut builder = Config::builder(); - if let Ok(config_path) = config_path { - builder = builder.add_source(File::with_name(&config_path)); - }; - // Use `__` so that it would be possible to address keys with underscores in names (e.g. `access_key`) - builder = builder - .add_source(config::Environment::with_prefix("BLOCKSCOUT_EXTRACTOR").separator("__")); - - let settings: Settings = builder.build()?.try_deserialize()?; - - settings.validate()?; - - Ok(settings) - } - - fn validate(&self) -> anyhow::Result<()> { - // if self.import_dataset && self.dataset.is_none() { - // return Err(anyhow::anyhow!( - // "`dataset` path should be specified if `import_dataset` is enabled" - // )); - // } - - Ok(()) - } -} From 90fabb07bcc67a5be91e158bb500c16dbabf33b8 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Tue, 17 Oct 2023 10:36:52 +0300 Subject: [PATCH 04/13] Remove unused parsing --- eth-bytecode-db-extractors/blockscout/src/blockscout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth-bytecode-db-extractors/blockscout/src/blockscout.rs b/eth-bytecode-db-extractors/blockscout/src/blockscout.rs index c40d5dc5e..fcc99095d 100644 --- a/eth-bytecode-db-extractors/blockscout/src/blockscout.rs +++ b/eth-bytecode-db-extractors/blockscout/src/blockscout.rs @@ -334,7 +334,7 @@ mod smart_contracts { pub deployed_bytecode: Bytes, pub creation_bytecode: Option, - pub is_vyper_contract: bool, + // pub is_vyper_contract: Option, pub is_verified_via_sourcify: Option, pub optimization_enabled: Option, pub optimization_runs: Option, From 250e72f6e3c11c8661af9b7381f8f939ffc241e4 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Tue, 17 Oct 2023 11:55:05 +0300 Subject: [PATCH 05/13] Add 'force-import' configuration --- eth-bytecode-db-extractors/blockscout/src/client.rs | 9 ++++++--- eth-bytecode-db-extractors/blockscout/src/main.rs | 4 +++- eth-bytecode-db-extractors/blockscout/src/settings.rs | 3 +++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index ff246e647..748c257ca 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -88,7 +88,7 @@ impl Client { } impl Client { - pub async fn import_contract_addresses(&self) -> anyhow::Result { + pub async fn import_contract_addresses(&self, force_import: bool) -> anyhow::Result { let mut verified_contracts = self .blockscout_client .get_verified_contracts() @@ -136,8 +136,11 @@ impl Client { { Ok(_) => {} Err(DbErr::RecordNotInserted) => { - tracing::info!("No records have been inserted. Stop dataset import"); - break; + // Do not stop if re-import of all contracts have been setup. + if !force_import { + tracing::info!("No records have been inserted. Stop dataset import"); + break; + } } Err(err) => { return Err(err).context(format!( diff --git a/eth-bytecode-db-extractors/blockscout/src/main.rs b/eth-bytecode-db-extractors/blockscout/src/main.rs index fc31146c8..52dc5bcfb 100644 --- a/eth-bytecode-db-extractors/blockscout/src/main.rs +++ b/eth-bytecode-db-extractors/blockscout/src/main.rs @@ -31,7 +31,9 @@ async fn main() -> Result<(), anyhow::Error> { )?; tracing::info!("importing contract addresses started"); - let processed = client.import_contract_addresses().await?; + let processed = client + .import_contract_addresses(settings.force_import) + .await?; tracing::info!("importing contract addresses finished. Total processed contracts={processed}"); let mut handles = Vec::with_capacity(settings.n_threads); diff --git a/eth-bytecode-db-extractors/blockscout/src/settings.rs b/eth-bytecode-db-extractors/blockscout/src/settings.rs index 9c94397a6..338486c8e 100644 --- a/eth-bytecode-db-extractors/blockscout/src/settings.rs +++ b/eth-bytecode-db-extractors/blockscout/src/settings.rs @@ -10,6 +10,9 @@ pub struct Settings { #[serde(default)] pub run_migrations: bool, + #[serde(default)] + pub force_import: bool, + #[serde(default = "default_chain_id")] pub chain_id: u64, From 09d0201e198b370d79512620cc32a4b7f01a6b3c Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Tue, 17 Oct 2023 22:17:52 +0300 Subject: [PATCH 06/13] Update Cargo.lock --- eth-bytecode-db-extractors/Cargo.lock | 206 ++++++++++++++++++++++++-- 1 file changed, 191 insertions(+), 15 deletions(-) diff --git a/eth-bytecode-db-extractors/Cargo.lock b/eth-bytecode-db-extractors/Cargo.lock index d7ba09bb3..b13e3aa3b 100644 --- a/eth-bytecode-db-extractors/Cargo.lock +++ b/eth-bytecode-db-extractors/Cargo.lock @@ -405,9 +405,9 @@ dependencies = [ [[package]] name = "async-executor" -version = "1.5.4" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1da3ae8dabd9c00f453a329dfe1fb28da3c0a72e2478cdcd93171740c20499" +checksum = "4b0c4a4f319e45986f347ee47fef8bf5e81c9abc3f6f58dc2391439f30df65f0" dependencies = [ "async-lock", "async-task", @@ -690,6 +690,38 @@ dependencies = [ "tracing", ] +[[package]] +name = "blockscout" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "blockscout-display-bytes", + "blockscout-entity", + "blockscout-migration", + "blockscout-service-launcher", + "chrono", + "config", + "eth-bytecode-db-proto 0.1.0 (git+https://github.com/blockscout/blockscout-rs?branch=rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion)", + "futures", + "governor", + "reqwest", + "reqwest-middleware", + "reqwest-rate-limiter", + "reqwest-retry", + "scraper 0.17.1", + "sea-orm", + "sea-orm-migration", + "serde", + "serde_json", + "serde_path_to_error", + "task-local-extensions", + "tokio", + "tokio-stream", + "tracing", + "url", +] + [[package]] name = "blockscout-display-bytes" version = "1.0.0" @@ -702,6 +734,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "blockscout-entity" +version = "0.1.0" +dependencies = [ + "sea-orm", +] + +[[package]] +name = "blockscout-migration" +version = "0.1.0" +dependencies = [ + "async-std", + "sea-orm-migration", +] + [[package]] name = "blockscout-service-launcher" version = "0.9.0" @@ -867,8 +914,10 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets", ] @@ -1074,6 +1123,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "cssparser" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b3df4f93e5fbbe73ec01ec8d3f68bba73107993a5b1e7519273c32db9b0d5be" +dependencies = [ + "cssparser-macros", + "dtoa-short", + "itoa", + "phf 0.11.2", + "smallvec", +] + [[package]] name = "cssparser-macros" version = "0.6.1" @@ -1267,6 +1329,23 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "eth-bytecode-db-proto" +version = "0.1.0" +source = "git+https://github.com/blockscout/blockscout-rs?branch=rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion#154536951d19283b306a60347ce56b06d032bc43" +dependencies = [ + "actix-prost", + "actix-prost-build", + "actix-prost-macros", + "actix-web", + "prost 0.11.9", + "prost-build", + "serde", + "serde_with", + "tonic 0.8.3", + "tonic-build", +] + [[package]] name = "eth-bytecode-db-proto" version = "0.1.0" @@ -1802,16 +1881,16 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows", + "windows-core", ] [[package]] @@ -2648,11 +2727,21 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" dependencies = [ - "phf_macros", + "phf_macros 0.10.0", "phf_shared 0.10.0", "proc-macro-hack", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_macros 0.11.2", + "phf_shared 0.11.2", +] + [[package]] name = "phf_codegen" version = "0.8.0" @@ -2693,6 +2782,16 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared 0.11.2", + "rand 0.8.5", +] + [[package]] name = "phf_macros" version = "0.10.0" @@ -2707,6 +2806,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "phf_macros" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b" +dependencies = [ + "phf_generator 0.11.2", + "phf_shared 0.11.2", + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "phf_shared" version = "0.8.0" @@ -2725,6 +2837,15 @@ dependencies = [ "siphasher", ] +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -3491,12 +3612,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59e25654b5e9fd557a67dbaab5a5d36b8c448d0561beb4c041b6dbb902eddfa6" dependencies = [ "ahash 0.8.3", - "cssparser", + "cssparser 0.29.6", "ego-tree", "getopts", "html5ever", "once_cell", - "selectors", + "selectors 0.24.0", + "smallvec", + "tendril", +] + +[[package]] +name = "scraper" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c95a930e03325234c18c7071fd2b60118307e025d6fff3e12745ffbf63a3d29c" +dependencies = [ + "ahash 0.8.3", + "cssparser 0.31.2", + "ego-tree", + "getopts", + "html5ever", + "once_cell", + "selectors 0.25.0", "smallvec", "tendril", ] @@ -3706,14 +3844,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c37578180969d00692904465fb7f6b3d50b9a2b952b87c23d0e2e5cb5013416" dependencies = [ "bitflags 1.3.2", - "cssparser", + "cssparser 0.29.6", "derive_more", "fxhash", "log", "phf 0.8.0", "phf_codegen 0.8.0", "precomputed-hash", - "servo_arc", + "servo_arc 0.2.0", + "smallvec", +] + +[[package]] +name = "selectors" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb30575f3638fc8f6815f448d50cb1a2e255b0897985c8c59f4d37b72a07b06" +dependencies = [ + "bitflags 2.4.1", + "cssparser 0.31.2", + "derive_more", + "fxhash", + "log", + "new_debug_unreachable", + "phf 0.10.1", + "phf_codegen 0.10.0", + "precomputed-hash", + "servo_arc 0.3.0", "smallvec", ] @@ -3754,6 +3911,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3817,6 +3984,15 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "servo_arc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d036d71a959e00c77a63538b90a6c2390969f9772b096ea837205c6bd0491a44" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3903,14 +4079,14 @@ dependencies = [ "blockscout-display-bytes", "blockscout-service-launcher", "config", - "eth-bytecode-db-proto", + "eth-bytecode-db-proto 0.1.0 (git+https://github.com/blockscout/blockscout-rs?rev=6977d09)", "futures", "governor", "reqwest", "reqwest-middleware", "reqwest-rate-limiter", "reqwest-retry", - "scraper", + "scraper 0.16.0", "sea-orm", "sea-orm-migration", "serde", @@ -5063,10 +5239,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows" -version = "0.48.0" +name = "windows-core" +version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ "windows-targets", ] From 77b945cd76a54a446a7e1530394009cae38c5c96 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Wed, 18 Oct 2023 00:05:38 +0300 Subject: [PATCH 07/13] Update eth-bytecode-db proto definition to use specific commit --- eth-bytecode-db-extractors/Cargo.lock | 4 ++-- eth-bytecode-db-extractors/blockscout/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/eth-bytecode-db-extractors/Cargo.lock b/eth-bytecode-db-extractors/Cargo.lock index b13e3aa3b..d3a84a9be 100644 --- a/eth-bytecode-db-extractors/Cargo.lock +++ b/eth-bytecode-db-extractors/Cargo.lock @@ -702,7 +702,7 @@ dependencies = [ "blockscout-service-launcher", "chrono", "config", - "eth-bytecode-db-proto 0.1.0 (git+https://github.com/blockscout/blockscout-rs?branch=rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion)", + "eth-bytecode-db-proto 0.1.0 (git+https://github.com/blockscout/blockscout-rs?rev=41fc491)", "futures", "governor", "reqwest", @@ -1332,7 +1332,7 @@ dependencies = [ [[package]] name = "eth-bytecode-db-proto" version = "0.1.0" -source = "git+https://github.com/blockscout/blockscout-rs?branch=rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion#154536951d19283b306a60347ce56b06d032bc43" +source = "git+https://github.com/blockscout/blockscout-rs?rev=41fc491#41fc49174b022503c336f4349a84868e6ecda4d5" dependencies = [ "actix-prost", "actix-prost-build", diff --git a/eth-bytecode-db-extractors/blockscout/Cargo.toml b/eth-bytecode-db-extractors/blockscout/Cargo.toml index 61c352e25..2e32af914 100644 --- a/eth-bytecode-db-extractors/blockscout/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout/Cargo.toml @@ -15,7 +15,7 @@ blockscout-display-bytes = "1.0.0" blockscout-service-launcher = { version = "0.9.0", features = ["database-0_12"] } chrono = "0.4.31" config = "0.13.3" -eth-bytecode-db-proto = { git = "https://github.com/blockscout/blockscout-rs", branch = "rimrakhimov/eth-bytecode-db/verifier-alliance-deployment-insertion" } +eth-bytecode-db-proto = { git = "https://github.com/blockscout/blockscout-rs", rev = "41fc491" } futures = "0.3" governor = "0.5.1" reqwest-rate-limiter = { git = "https://github.com/blockscout/blockscout-rs", rev = "edb610b" } From bb3ab643815a19008787e7f75d11555ac4dfbc65 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Mon, 30 Oct 2023 00:27:36 +0300 Subject: [PATCH 08/13] Extract job-queue into a separate crate. Connect blockscout-extractor with extracted job-queue --- eth-bytecode-db-extractors/Cargo.lock | 11 ++ eth-bytecode-db-extractors/Cargo.toml | 1 + .../blockscout-entity/Cargo.toml | 3 +- .../src/contract_addresses.rs | 20 ++- .../blockscout-entity/src/contract_details.rs | 13 ++ .../blockscout-entity/src/lib.rs | 2 + .../blockscout-entity/src/prelude.rs | 1 + .../src/sea_orm_active_enums.rs | 12 -- .../blockscout-entity/src/seaql_migrations.rs | 16 +++ .../blockscout-migration/Cargo.toml | 3 +- .../blockscout-migration/src/lib.rs | 3 +- .../m20230426_170520_create_status_enum.rs | 26 ---- ..._170541_create_contract_addresses_table.rs | 45 ++++-- .../blockscout/.gitignore | 1 - .../blockscout/Cargo.toml | 1 + .../blockscout/src/client.rs | 106 +++++++-------- .../job-queue/Cargo.toml | 22 +++ .../job-queue/src/entity.rs | 32 +++++ .../job-queue/src/functions.rs | 91 +++++++++++++ .../job-queue/src/lib.rs | 8 ++ .../job-queue/src/migration.rs | 128 ++++++++++++++++++ 21 files changed, 433 insertions(+), 112 deletions(-) create mode 100644 eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs delete mode 100644 eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs delete mode 100644 eth-bytecode-db-extractors/blockscout/.gitignore create mode 100644 eth-bytecode-db-extractors/job-queue/Cargo.toml create mode 100644 eth-bytecode-db-extractors/job-queue/src/entity.rs create mode 100644 eth-bytecode-db-extractors/job-queue/src/functions.rs create mode 100644 eth-bytecode-db-extractors/job-queue/src/lib.rs create mode 100644 eth-bytecode-db-extractors/job-queue/src/migration.rs diff --git a/eth-bytecode-db-extractors/Cargo.lock b/eth-bytecode-db-extractors/Cargo.lock index d3a84a9be..83c43a8ac 100644 --- a/eth-bytecode-db-extractors/Cargo.lock +++ b/eth-bytecode-db-extractors/Cargo.lock @@ -705,6 +705,7 @@ dependencies = [ "eth-bytecode-db-proto 0.1.0 (git+https://github.com/blockscout/blockscout-rs?rev=41fc491)", "futures", "governor", + "job-queue", "reqwest", "reqwest-middleware", "reqwest-rate-limiter", @@ -738,6 +739,7 @@ dependencies = [ name = "blockscout-entity" version = "0.1.0" dependencies = [ + "job-queue", "sea-orm", ] @@ -746,6 +748,7 @@ name = "blockscout-migration" version = "0.1.0" dependencies = [ "async-std", + "job-queue", "sea-orm-migration", ] @@ -2009,6 +2012,14 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "job-queue" +version = "0.1.0" +dependencies = [ + "sea-orm", + "sea-orm-migration", +] + [[package]] name = "jobserver" version = "0.1.27" diff --git a/eth-bytecode-db-extractors/Cargo.toml b/eth-bytecode-db-extractors/Cargo.toml index 841ffa829..f9ae4b5ca 100644 --- a/eth-bytecode-db-extractors/Cargo.toml +++ b/eth-bytecode-db-extractors/Cargo.toml @@ -7,4 +7,5 @@ members = [ "smart-contract-fiesta", "smart-contract-fiesta-entity", "smart-contract-fiesta-migration", + "job-queue", ] diff --git a/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml b/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml index 9d95a9e98..58479571e 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml @@ -8,4 +8,5 @@ name = "entity" path = "src/lib.rs" [dependencies] -sea-orm = { version = "^0" } \ No newline at end of file +sea-orm = { version = "^0" } +job-queue = { path = "../job-queue", features = ["entity"] } \ No newline at end of file diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs index 6865b818c..7c90dd172 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs @@ -1,6 +1,6 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 -use super::sea_orm_active_enums::{Language, Status}; +use super::sea_orm_active_enums::Language; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] @@ -19,16 +19,30 @@ pub struct Model { pub verified_at: DateTimeWithTimeZone, pub language: Language, pub compiler_version: String, - pub status: Status, - pub log: Option, + #[sea_orm(column_name = "_job_id")] + pub job_id: Uuid, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm( + belongs_to = "super::job_queue::Entity", + from = "Column::JobId", + to = "super::job_queue::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + JobQueue, #[sea_orm(has_many = "super::contract_details::Entity")] ContractDetails, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::JobQueue.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::ContractDetails.def() diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs index 3b8abbd73..e0db76a43 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs @@ -55,4 +55,17 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + super::contract_addresses::Relation::JobQueue.def() + } + fn via() -> Option { + Some( + super::contract_addresses::Relation::ContractDetails + .def() + .rev(), + ) + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs b/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs index bb13562fa..385b06313 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs @@ -5,3 +5,5 @@ pub mod prelude; pub mod contract_addresses; pub mod contract_details; pub mod sea_orm_active_enums; + +pub use job_queue::entity as job_queue; diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs b/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs index c30e3bab7..1531687c4 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs @@ -3,3 +3,4 @@ pub use super::{ contract_addresses::Entity as ContractAddresses, contract_details::Entity as ContractDetails, }; +pub use job_queue::entity::Entity as JobQueue; diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs b/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs index bf8f359fa..48a46b25f 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs @@ -12,15 +12,3 @@ pub enum Language { #[sea_orm(string_value = "yul")] Yul, } -#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "status")] -pub enum Status { - #[sea_orm(string_value = "error")] - Error, - #[sea_orm(string_value = "in_process")] - InProcess, - #[sea_orm(string_value = "success")] - Success, - #[sea_orm(string_value = "waiting")] - Waiting, -} diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs b/eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs new file mode 100644 index 000000000..8f5d22a64 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs @@ -0,0 +1,16 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "seaql_migrations")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version: String, + pub applied_at: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml b/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml index 9d467b9e1..43261d6d2 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml @@ -10,6 +10,7 @@ path = "src/lib.rs" [dependencies] async-std = { version = "1", features = ["attributes", "tokio1"] } +job-queue = { path = "../job-queue", features = ["migration"] } [dependencies.sea-orm-migration] version = "^0" @@ -19,4 +20,4 @@ features = [ # e.g. "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature "sqlx-postgres", # `DATABASE_DRIVER` feature -] +] \ No newline at end of file diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs b/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs index 6dfc6d7bd..b33265fbe 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs +++ b/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs @@ -3,7 +3,6 @@ use sea_orm_migration::sea_orm::{Statement, TransactionTrait}; mod m20230426_170496_create_functions; mod m20230426_170508_create_language_enum; -mod m20230426_170520_create_status_enum; mod m20230426_170541_create_contract_addresses_table; mod m20230426_170553_create_contract_details_table; @@ -13,9 +12,9 @@ pub struct Migrator; impl MigratorTrait for Migrator { fn migrations() -> Vec> { vec![ + Box::new(job_queue::migration::Migration), Box::new(m20230426_170496_create_functions::Migration), Box::new(m20230426_170508_create_language_enum::Migration), - Box::new(m20230426_170520_create_status_enum::Migration), Box::new(m20230426_170541_create_contract_addresses_table::Migration), Box::new(m20230426_170553_create_contract_details_table::Migration), ] diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs deleted file mode 100644 index d847cc020..000000000 --- a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170520_create_status_enum.rs +++ /dev/null @@ -1,26 +0,0 @@ -use sea_orm_migration::prelude::*; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let sql = r#" - CREATE TYPE "status" AS ENUM ( - 'waiting', - 'in_process', - 'success', - 'error' - ); - "#; - - crate::from_sql(manager, sql).await - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let sql = r#"DROP TYPE "status";"#; - - crate::from_sql(manager, sql).await - } -} diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs index c8132fc00..460b9d354 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs @@ -6,7 +6,7 @@ pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let sql = r#" + let create_contract_addresses_table = r#" CREATE TABLE "contract_addresses" ( "contract_address" bytea NOT NULL, "chain_id" numeric NOT NULL, @@ -18,27 +18,46 @@ impl MigrationTrait for Migration { "language" language NOT NULL, "compiler_version" varchar NOT NULL, - "status" status NOT NULL DEFAULT 'waiting', - "log" varchar, + "_job_id" uuid NOT NULL REFERENCES "_job_queue" ("id"), PRIMARY KEY ("contract_address", "chain_id") ); - + "#; + let create_trigger_set_modified_at = r#" CREATE TRIGGER trigger_set_modified_at - BEFORE INSERT ON contract_addresses + BEFORE UPDATE ON contract_addresses FOR EACH ROW EXECUTE FUNCTION set_modified_at(); "#; - - crate::from_sql(manager, sql).await + let create_trigger_insert_into_job_queue = + job_queue::migration::create_trigger_insert_job_statement("contract_addresses"); + + crate::from_statements( + manager, + &[ + create_contract_addresses_table, + create_trigger_set_modified_at, + &create_trigger_insert_into_job_queue, + ], + ) + .await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let sql = r#" - DROP TRIGGER trigger_set_modified_at ON contract_addresses; - DROP TABLE contract_addresses; - "#; - - crate::from_sql(manager, sql).await + let drop_trigger_insert_into_job_queue = + job_queue::migration::drop_trigger_insert_job_statement("contract_addresses"); + let drop_trigger_set_modified_at = + "DROP TRIGGER trigger_set_modified_at ON contract_addresses;"; + let drop_table_contract_addresses = "DROP TABLE contract_addresses;"; + + crate::from_statements( + manager, + &[ + &drop_trigger_insert_into_job_queue, + drop_trigger_set_modified_at, + drop_table_contract_addresses, + ], + ) + .await } } diff --git a/eth-bytecode-db-extractors/blockscout/.gitignore b/eth-bytecode-db-extractors/blockscout/.gitignore deleted file mode 100644 index a9d80f558..000000000 --- a/eth-bytecode-db-extractors/blockscout/.gitignore +++ /dev/null @@ -1 +0,0 @@ -dataset/* diff --git a/eth-bytecode-db-extractors/blockscout/Cargo.toml b/eth-bytecode-db-extractors/blockscout/Cargo.toml index 2e32af914..91f0bef5d 100644 --- a/eth-bytecode-db-extractors/blockscout/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] blockscout-entity = { path = "../blockscout-entity" } blockscout-migration = { path = "../blockscout-migration" } +job-queue = { path = "../job-queue", features = ["functions"] } anyhow = "1.0.70" async-trait = "0.1" diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index 748c257ca..639073cff 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -7,31 +7,32 @@ use eth_bytecode_db_proto::blockscout::eth_bytecode_db::v2::{ VerifyVyperStandardJsonRequest, }; use sea_orm::{ - sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ConnectionTrait, DatabaseBackend, - DatabaseConnection, DbErr, EntityTrait, Statement, + prelude::Uuid, sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ColumnTrait, + ConnectionTrait, DatabaseBackend, DatabaseConnection, DbErr, EntityTrait, QueryFilter, + Statement, }; use serde::Serialize; use std::sync::Arc; macro_rules! process_result { - ( $result:expr, $self:expr, $contract_address:expr) => { + ( $result:expr, $self:expr, $job_id:expr, $contract_address:expr) => { match $result { Ok(res) => res, Err(err) => { + let formatted_error = format!("{err:#}"); + tracing::warn!( contract_address = $contract_address.to_string(), - error = format!("{err:#}"), + chain_id = format!("{}", $self.chain_id), + error = formatted_error, "Error processing contract" ); - contract_addresses::ActiveModel { - contract_address: Set($contract_address.to_vec()), - chain_id: Set($self.chain_id.into()), - status: Set(sea_orm_active_enums::Status::Error), - log: Set(Some(format!("{:#?}", err))), - ..Default::default() - } - .update($self.db_client.as_ref()) + job_queue::functions::mark_as_error( + $self.db_client.as_ref(), + $job_id, + Some(formatted_error), + ) .await .context(format!( "saving error details failed; contract={}, chain_id={}", @@ -160,6 +161,7 @@ impl Client { while let Some(contract_model) = self.next_contract().await? { processed += 1; let contract_address = Bytes::from(contract_model.contract_address.clone()); + let job_id = contract_model.job_id; tracing::info!( contract_address = contract_address.to_string(), @@ -169,6 +171,7 @@ impl Client { let contract_details_model = process_result!( self.import_contract_details(contract_address.clone()).await, &self, + job_id.clone(), contract_address ); @@ -176,9 +179,11 @@ impl Client { self.verify_contract(contract_model, contract_details_model) .await, &self, + job_id.clone(), contract_address ); - self.mark_as_success(contract_address, source).await?; + self.mark_as_success(job_id.clone(), contract_address, source) + .await?; } Ok(processed) @@ -354,20 +359,18 @@ impl Client { async fn mark_as_success( &self, + job_id: Uuid, contract_address: Bytes, - source: eth_bytecode_db::Source, + source: Source, ) -> anyhow::Result<()> { - contract_addresses::ActiveModel { - contract_address: Set(contract_address.to_vec()), - chain_id: Set(self.chain_id.into()), - status: Set(sea_orm_active_enums::Status::Success), - log: Set(Some( + job_queue::functions::mark_as_success( + self.db_client.as_ref(), + job_id, + Some( serde_json::to_string(&source) .context("serializing success result (source) failed")?, - )), - ..Default::default() - } - .update(self.db_client.as_ref()) + ), + ) .await .context(format!( "saving success details failed for the contract {}", @@ -379,51 +382,48 @@ impl Client { async fn next_contract(&self) -> anyhow::Result> { // Notice that we are looking only for contracts with given `chain_id` - let next_contract_address_sql = format!( + let next_job_id_sql = format!( r#" - UPDATE contract_addresses + UPDATE _job_queue SET status = 'in_process' - WHERE contract_address = (SELECT contract_address - FROM contract_addresses - WHERE status = 'waiting' - AND chain_id = {} + WHERE id = (SELECT id + FROM _job_queue JOIN contract_addresses + ON _job_queue.id = contract_addresses._job_id + WHERE _job_queue.status = 'waiting' + AND contract_addresses.chain_id = {} LIMIT 1 FOR UPDATE SKIP LOCKED) - RETURNING contract_address; + RETURNING _job_queue.id; "#, self.chain_id ); - let next_contract_address_stmt = Statement::from_string( - DatabaseBackend::Postgres, - next_contract_address_sql.to_string(), - ); + let next_job_id_statement = + Statement::from_string(DatabaseBackend::Postgres, next_job_id_sql.to_string()); - let next_contract_address = self + let next_job_id = self .db_client .as_ref() - .query_one(next_contract_address_stmt) + .query_one(next_job_id_statement) .await - .context("querying for the next contract address")? + .context("querying for the next job id")? .map(|query_result| { query_result - .try_get_by::, _>("contract_address") - .expect("error while try_get_by contract_address") + .try_get_by::("id") + .expect("error while try_get_by id") }); - if let Some(contract_address) = next_contract_address { - let model = contract_addresses::Entity::find_by_id(( - contract_address.clone(), - self.chain_id.into(), - )) - .one(self.db_client.as_ref()) - .await - .expect("querying contract_address model failed") - .ok_or_else(|| { - anyhow::anyhow!( - "contract_address model does not exist for the contract: {}", - Bytes::from(contract_address), - ) - })?; + if let Some(job_id) = next_job_id { + let model = contract_addresses::Entity::find() + .filter(contract_addresses::Column::JobId.eq(job_id)) + .one(self.db_client.as_ref()) + .await + .expect("querying contract_address model failed") + .ok_or_else(|| { + anyhow::anyhow!( + "contract_address model does not exist for the job_id: {}", + Bytes::from(job_id.as_bytes().to_vec()), + ) + })?; return Ok(Some(model)); } diff --git a/eth-bytecode-db-extractors/job-queue/Cargo.toml b/eth-bytecode-db-extractors/job-queue/Cargo.toml new file mode 100644 index 000000000..7ac364d7b --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "job-queue" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +sea-orm = { version = "^0", optional = true } +sea-orm-migration = { version = "^0", optional = true } + +[features] +entity = [ + "dep:sea-orm", +] +functions = [ + "entity", +] +migration = [ + "dep:sea-orm-migration", +] + diff --git a/eth-bytecode-db-extractors/job-queue/src/entity.rs b/eth-bytecode-db-extractors/job-queue/src/entity.rs new file mode 100644 index 000000000..df709de2c --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/entity.rs @@ -0,0 +1,32 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use sea_orm::entity::prelude::*; + +#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "_job_status")] +pub enum JobStatus { + #[sea_orm(string_value = "error")] + Error, + #[sea_orm(string_value = "in_process")] + InProcess, + #[sea_orm(string_value = "success")] + Success, + #[sea_orm(string_value = "waiting")] + Waiting, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "_job_queue")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub created_at: DateTime, + pub modified_at: DateTime, + pub status: JobStatus, + pub log: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/job-queue/src/functions.rs b/eth-bytecode-db-extractors/job-queue/src/functions.rs new file mode 100644 index 000000000..ba32ee465 --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/functions.rs @@ -0,0 +1,91 @@ +use super::entity as job_queue; +use crate::entity::JobStatus; +use sea_orm::{prelude::Uuid, ActiveModelTrait, ActiveValue::Set, ConnectionTrait, DbErr}; + +pub async fn mark_as_success( + db: &C, + id: Uuid, + message: Option>, +) -> Result<(), DbErr> { + update_status(db, JobStatus::Success, id, message).await +} + +pub async fn mark_as_error( + db: &C, + id: Uuid, + message: Option>, +) -> Result<(), DbErr> { + update_status(db, JobStatus::Error, id, message).await +} + +async fn update_status( + db: &C, + new_status: JobStatus, + id: Uuid, + message: Option>, +) -> Result<(), DbErr> { + job_queue::ActiveModel { + id: Set(id), + status: Set(new_status), + log: Set(message.map(|msg| msg.into())), + ..Default::default() + } + .update(db) + .await + .map(|_| ()) +} + +// pub async fn next(db: &C) -> Result, DbErr> { +// // Notice that we are looking only for contracts with given `chain_id` +// let next_contract_address_sql = format!( +// r#" +// UPDATE workload_queue +// SET status = 'in_process' +// WHERE id = (SELECT id +// FROM workload_queue JOIN contract_addresses +// ON workload_queue.id = contract_addresses.workload_queue_id +// WHERE workload_queue.status = 'waiting' +// AND contract_addresses.chain_id = {} +// LIMIT 1 FOR UPDATE SKIP LOCKED) +// RETURNING contract_address; +// "#, +// self.chain_id +// ); +// +// let next_contract_address_stmt = Statement::from_string( +// DatabaseBackend::Postgres, +// next_contract_address_sql.to_string(), +// ); +// +// let next_contract_address = self +// .db_client +// .as_ref() +// .query_one(next_contract_address_stmt) +// .await +// .context("querying for the next contract address")? +// .map(|query_result| { +// query_result +// .try_get_by::, _>("contract_address") +// .expect("error while try_get_by contract_address") +// }); +// +// if let Some(contract_address) = next_contract_address { +// let model = contract_addresses::Entity::find_by_id(( +// contract_address.clone(), +// self.chain_id.into(), +// )) +// .one(self.db_client.as_ref()) +// .await +// .expect("querying contract_address model failed") +// .ok_or_else(|| { +// anyhow::anyhow!( +// "contract_address model does not exist for the contract: {}", +// Bytes::from(contract_address), +// ) +// })?; +// +// return Ok(Some(model)); +// } +// +// Ok(None) +// } diff --git a/eth-bytecode-db-extractors/job-queue/src/lib.rs b/eth-bytecode-db-extractors/job-queue/src/lib.rs new file mode 100644 index 000000000..f2606a90e --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/lib.rs @@ -0,0 +1,8 @@ +#[cfg(feature = "entity")] +pub mod entity; + +#[cfg(feature = "functions")] +pub mod functions; + +#[cfg(feature = "migration")] +pub mod migration; diff --git a/eth-bytecode-db-extractors/job-queue/src/migration.rs b/eth-bytecode-db-extractors/job-queue/src/migration.rs new file mode 100644 index 000000000..857d1eb50 --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/migration.rs @@ -0,0 +1,128 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{Statement, TransactionTrait}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let create_extension_pgcrypto = r#" + -- Needed for gen_random_uuid() + CREATE EXTENSION IF NOT EXISTS "pgcrypto"; + "#; + let create_function_job_queue_set_modified_at = r#" + CREATE OR REPLACE FUNCTION _job_queue_set_modified_at() + RETURNS TRIGGER AS + $$ + BEGIN + NEW.modified_at = now(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + "#; + let create_enum_job_status = r#" + CREATE TYPE "_job_status" AS ENUM ( + 'waiting', + 'in_process', + 'success', + 'error' + ); + "#; + let create_table_job_queue = r#" + CREATE TABLE "_job_queue" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + + "created_at" timestamp NOT NULL DEFAULT (now()), + "modified_at" timestamp NOT NULL DEFAULT (now()), + + "status" _job_status NOT NULL DEFAULT 'waiting', + "log" varchar + ); + "#; + let create_trigger_job_queue_set_modified_at = r#" + CREATE TRIGGER "set_modified_at" + BEFORE UPDATE ON "_job_queue" + FOR EACH ROW + EXECUTE FUNCTION _job_queue_set_modified_at(); + "#; + let create_function_insert_job = r#" + CREATE OR REPLACE FUNCTION _insert_job() + RETURNS TRIGGER AS $$ + BEGIN + -- Insert a new row into the jobs table and get the ID + INSERT INTO _job_queue DEFAULT VALUES + RETURNING id INTO NEW._job_id; + + -- Update the jobs_id in the contract_addresses table + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + "#; + + from_statements( + manager, + &[ + create_extension_pgcrypto, + create_function_job_queue_set_modified_at, + create_enum_job_status, + create_table_job_queue, + create_trigger_job_queue_set_modified_at, + create_function_insert_job, + ], + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let drop_function_insert_job = "DROP FUNCTION _insert_job;"; + let drop_trigger_job_queue_set_modified_at = "DROP TRIGGER set_modified_at ON _job_queue;"; + let drop_table_job_queue = "DROP TABLE _job_queue;"; + let drop_enum_job_status = "DROP TYPE _job_status;"; + let drop_function_job_queue_set_modified_at = "DROP FUNCTION _job_queue_set_modified_at;"; + let drop_extension_pgcrypto = "DROP EXTENSION pgcrypto;"; + + from_statements( + manager, + &[ + drop_function_insert_job, + drop_trigger_job_queue_set_modified_at, + drop_table_job_queue, + drop_enum_job_status, + drop_function_job_queue_set_modified_at, + drop_extension_pgcrypto, + ], + ) + .await + } +} + +pub fn create_trigger_insert_job_statement(relation: &str) -> String { + return format!( + r#" + CREATE TRIGGER insert_job + BEFORE INSERT ON {relation} + FOR EACH ROW + EXECUTE FUNCTION _insert_job(); + "# + ); +} + +pub fn drop_trigger_insert_job_statement(relation: &str) -> String { + return format!("DROP TRIGGER insert_job ON {relation};"); +} + +async fn from_statements(manager: &SchemaManager<'_>, statements: &[&str]) -> Result<(), DbErr> { + let txn = manager.get_connection().begin().await?; + for statement in statements { + txn.execute(Statement::from_string( + manager.get_database_backend(), + statement.to_string(), + )) + .await + .map_err(|err| DbErr::Migration(format!("{err}\nQuery: {statement}")))?; + } + txn.commit().await +} From cfbcd858adad1b918607ad824f2084242c48a291 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Mon, 30 Oct 2023 10:33:17 +0300 Subject: [PATCH 09/13] Extract looking for the next_job_id into a job-queue crate --- .../blockscout/src/client.rs | 58 +++----- .../job-queue/src/functions.rs | 125 ++++++++++-------- .../job-queue/src/lib.rs | 2 + .../job-queue/src/migration.rs | 6 +- .../job-queue/src/processor.rs | 1 + 5 files changed, 93 insertions(+), 99 deletions(-) create mode 100644 eth-bytecode-db-extractors/job-queue/src/processor.rs diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index 639073cff..ab324f684 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -8,8 +8,8 @@ use eth_bytecode_db_proto::blockscout::eth_bytecode_db::v2::{ }; use sea_orm::{ prelude::Uuid, sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ColumnTrait, - ConnectionTrait, DatabaseBackend, DatabaseConnection, DbErr, EntityTrait, QueryFilter, - Statement, + DatabaseConnection, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait, + Select, }; use serde::Serialize; use std::sync::Arc; @@ -171,7 +171,7 @@ impl Client { let contract_details_model = process_result!( self.import_contract_details(contract_address.clone()).await, &self, - job_id.clone(), + job_id, contract_address ); @@ -179,10 +179,10 @@ impl Client { self.verify_contract(contract_model, contract_details_model) .await, &self, - job_id.clone(), + job_id, contract_address ); - self.mark_as_success(job_id.clone(), contract_address, source) + self.mark_as_success(job_id, contract_address, source) .await?; } @@ -382,48 +382,24 @@ impl Client { async fn next_contract(&self) -> anyhow::Result> { // Notice that we are looking only for contracts with given `chain_id` - let next_job_id_sql = format!( - r#" - UPDATE _job_queue - SET status = 'in_process' - WHERE id = (SELECT id - FROM _job_queue JOIN contract_addresses - ON _job_queue.id = contract_addresses._job_id - WHERE _job_queue.status = 'waiting' - AND contract_addresses.chain_id = {} - LIMIT 1 FOR UPDATE SKIP LOCKED) - RETURNING _job_queue.id; - "#, - self.chain_id - ); - - let next_job_id_statement = - Statement::from_string(DatabaseBackend::Postgres, next_job_id_sql.to_string()); - - let next_job_id = self - .db_client - .as_ref() - .query_one(next_job_id_statement) - .await - .context("querying for the next job id")? - .map(|query_result| { - query_result - .try_get_by::("id") - .expect("error while try_get_by id") - }); + let chain_id_filter = |select: Select| { + select + .join_rev(JoinType::Join, contract_addresses::Relation::JobQueue.def()) + .filter(contract_addresses::Column::ChainId.eq(self.chain_id)) + }; + + let next_job_id = + job_queue::functions::next_job_id_with_filter(self.db_client.as_ref(), chain_id_filter) + .await + .context("querying the next_job_id")?; if let Some(job_id) = next_job_id { let model = contract_addresses::Entity::find() .filter(contract_addresses::Column::JobId.eq(job_id)) .one(self.db_client.as_ref()) .await - .expect("querying contract_address model failed") - .ok_or_else(|| { - anyhow::anyhow!( - "contract_address model does not exist for the job_id: {}", - Bytes::from(job_id.as_bytes().to_vec()), - ) - })?; + .context("querying contract_address model")? + .expect("contract_address model does not exist"); return Ok(Some(model)); } diff --git a/eth-bytecode-db-extractors/job-queue/src/functions.rs b/eth-bytecode-db-extractors/job-queue/src/functions.rs index ba32ee465..604afd57c 100644 --- a/eth-bytecode-db-extractors/job-queue/src/functions.rs +++ b/eth-bytecode-db-extractors/job-queue/src/functions.rs @@ -1,6 +1,13 @@ use super::entity as job_queue; use crate::entity::JobStatus; -use sea_orm::{prelude::Uuid, ActiveModelTrait, ActiveValue::Set, ConnectionTrait, DbErr}; +use sea_orm::{ + prelude::Uuid, + sea_query::{LockBehavior, LockType, SelectStatement}, + ActiveModelTrait, + ActiveValue::Set, + ColumnTrait, ConnectionTrait, DatabaseBackend, DbErr, EntityTrait, QueryFilter, QuerySelect, + QueryTrait, Select, Statement, +}; pub async fn mark_as_success( db: &C, @@ -35,57 +42,65 @@ async fn update_status( .map(|_| ()) } -// pub async fn next(db: &C) -> Result, DbErr> { -// // Notice that we are looking only for contracts with given `chain_id` -// let next_contract_address_sql = format!( -// r#" -// UPDATE workload_queue -// SET status = 'in_process' -// WHERE id = (SELECT id -// FROM workload_queue JOIN contract_addresses -// ON workload_queue.id = contract_addresses.workload_queue_id -// WHERE workload_queue.status = 'waiting' -// AND contract_addresses.chain_id = {} -// LIMIT 1 FOR UPDATE SKIP LOCKED) -// RETURNING contract_address; -// "#, -// self.chain_id -// ); -// -// let next_contract_address_stmt = Statement::from_string( -// DatabaseBackend::Postgres, -// next_contract_address_sql.to_string(), -// ); -// -// let next_contract_address = self -// .db_client -// .as_ref() -// .query_one(next_contract_address_stmt) -// .await -// .context("querying for the next contract address")? -// .map(|query_result| { -// query_result -// .try_get_by::, _>("contract_address") -// .expect("error while try_get_by contract_address") -// }); -// -// if let Some(contract_address) = next_contract_address { -// let model = contract_addresses::Entity::find_by_id(( -// contract_address.clone(), -// self.chain_id.into(), -// )) -// .one(self.db_client.as_ref()) -// .await -// .expect("querying contract_address model failed") -// .ok_or_else(|| { -// anyhow::anyhow!( -// "contract_address model does not exist for the contract: {}", -// Bytes::from(contract_address), -// ) -// })?; -// -// return Ok(Some(model)); -// } -// -// Ok(None) -// } +pub async fn next_job_id(db: &C) -> Result, DbErr> { + let noop_filter = |select| select; + next_job_id_with_filter(db, noop_filter).await +} + +pub async fn next_job_id_with_filter( + db: &C, + related_tables_filter: F, +) -> Result, DbErr> +where + F: Fn(Select) -> Select, +{ + let statement = next_job_id_statement(related_tables_filter); + Ok(db.query_one(statement).await?.map(|query_result| { + query_result + .try_get_by::("id") + .expect("error while try_get_by id") + })) +} + +fn next_job_id_statement(related_tables_filter: F) -> Statement +where + F: Fn(Select) -> Select, +{ + let subexpression = next_job_id_where_subexpression(related_tables_filter); + let sql = format!( + r#" + UPDATE _job_queue + SET status = 'in_process' + WHERE id = ({}) + RETURNING _job_queue.id; + "#, + subexpression.to_string(sea_orm::sea_query::PostgresQueryBuilder) + ); + + Statement::from_string(DatabaseBackend::Postgres, sql.to_string()) +} + +/// Unwraps into (example): +/// ` +/// SELECT +/// FROM _job_queue JOIN contract_address +/// ON _job_queue.id = contract_addresses._job_ +/// WHERE _job_queue.status = 'waiting' +/// AND contract_addresses.chain_id = {} +/// LIMIT 1 FOR UPDATE SKIP LOCKED +/// ` +fn next_job_id_where_subexpression(related_tables_filter: F) -> SelectStatement +where + F: Fn(Select) -> Select, +{ + let mut where_id_subexpression = job_queue::Entity::find() + .select_only() + .column(job_queue::Column::Id) + .filter(job_queue::Column::Status.eq(job_queue::JobStatus::Waiting)) + .limit(1); + where_id_subexpression = related_tables_filter(where_id_subexpression); + where_id_subexpression + .into_query() + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .to_owned() +} diff --git a/eth-bytecode-db-extractors/job-queue/src/lib.rs b/eth-bytecode-db-extractors/job-queue/src/lib.rs index f2606a90e..cd040f490 100644 --- a/eth-bytecode-db-extractors/job-queue/src/lib.rs +++ b/eth-bytecode-db-extractors/job-queue/src/lib.rs @@ -3,6 +3,8 @@ pub mod entity; #[cfg(feature = "functions")] pub mod functions; +#[cfg(feature = "functions")] +pub mod processor; #[cfg(feature = "migration")] pub mod migration; diff --git a/eth-bytecode-db-extractors/job-queue/src/migration.rs b/eth-bytecode-db-extractors/job-queue/src/migration.rs index 857d1eb50..cc8ab5887 100644 --- a/eth-bytecode-db-extractors/job-queue/src/migration.rs +++ b/eth-bytecode-db-extractors/job-queue/src/migration.rs @@ -100,18 +100,18 @@ impl MigrationTrait for Migration { } pub fn create_trigger_insert_job_statement(relation: &str) -> String { - return format!( + format!( r#" CREATE TRIGGER insert_job BEFORE INSERT ON {relation} FOR EACH ROW EXECUTE FUNCTION _insert_job(); "# - ); + ) } pub fn drop_trigger_insert_job_statement(relation: &str) -> String { - return format!("DROP TRIGGER insert_job ON {relation};"); + format!("DROP TRIGGER insert_job ON {relation};") } async fn from_statements(manager: &SchemaManager<'_>, statements: &[&str]) -> Result<(), DbErr> { diff --git a/eth-bytecode-db-extractors/job-queue/src/processor.rs b/eth-bytecode-db-extractors/job-queue/src/processor.rs new file mode 100644 index 000000000..322a4607f --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/processor.rs @@ -0,0 +1 @@ +pub trait JobProcessor {} From 4ff0fcef74bee14b33db363ed81a124f13538270 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Mon, 30 Oct 2023 14:46:17 +0300 Subject: [PATCH 10/13] Extract process_result into job-queue crate --- .../blockscout/Cargo.toml | 2 +- .../blockscout/src/client.rs | 49 ++++--------------- .../job-queue/Cargo.toml | 2 +- .../job-queue/src/lib.rs | 15 ++++-- .../job-queue/src/macros.rs | 35 +++++++++++++ .../job-queue/src/processor.rs | 1 - 6 files changed, 57 insertions(+), 47 deletions(-) create mode 100644 eth-bytecode-db-extractors/job-queue/src/macros.rs delete mode 100644 eth-bytecode-db-extractors/job-queue/src/processor.rs diff --git a/eth-bytecode-db-extractors/blockscout/Cargo.toml b/eth-bytecode-db-extractors/blockscout/Cargo.toml index 91f0bef5d..6a6ee7b6c 100644 --- a/eth-bytecode-db-extractors/blockscout/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] blockscout-entity = { path = "../blockscout-entity" } blockscout-migration = { path = "../blockscout-migration" } -job-queue = { path = "../job-queue", features = ["functions"] } +job-queue = { path = "../job-queue", features = ["logic"] } anyhow = "1.0.70" async-trait = "0.1" diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index ab324f684..61836b33e 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -14,37 +14,6 @@ use sea_orm::{ use serde::Serialize; use std::sync::Arc; -macro_rules! process_result { - ( $result:expr, $self:expr, $job_id:expr, $contract_address:expr) => { - match $result { - Ok(res) => res, - Err(err) => { - let formatted_error = format!("{err:#}"); - - tracing::warn!( - contract_address = $contract_address.to_string(), - chain_id = format!("{}", $self.chain_id), - error = formatted_error, - "Error processing contract" - ); - - job_queue::functions::mark_as_error( - $self.db_client.as_ref(), - $job_id, - Some(formatted_error), - ) - .await - .context(format!( - "saving error details failed; contract={}, chain_id={}", - $contract_address, $self.chain_id, - ))?; - - continue; - } - } - }; -} - #[derive(Debug, Serialize)] struct StandardJson { language: String, @@ -168,20 +137,22 @@ impl Client { "contract processed" ); - let contract_details_model = process_result!( + let contract_details_model = job_queue::process_result!( + self.db_client.as_ref(), self.import_contract_details(contract_address.clone()).await, - &self, job_id, - contract_address + contract_address = contract_address, + chain_id = self.chain_id ); - let source = process_result!( + let source = job_queue::process_result!( + self.db_client.as_ref(), self.verify_contract(contract_model, contract_details_model) .await, - &self, job_id, - contract_address + contract_address = contract_address ); + self.mark_as_success(job_id, contract_address, source) .await?; } @@ -363,7 +334,7 @@ impl Client { contract_address: Bytes, source: Source, ) -> anyhow::Result<()> { - job_queue::functions::mark_as_success( + job_queue::mark_as_success( self.db_client.as_ref(), job_id, Some( @@ -389,7 +360,7 @@ impl Client { }; let next_job_id = - job_queue::functions::next_job_id_with_filter(self.db_client.as_ref(), chain_id_filter) + job_queue::next_job_id_with_filter(self.db_client.as_ref(), chain_id_filter) .await .context("querying the next_job_id")?; diff --git a/eth-bytecode-db-extractors/job-queue/Cargo.toml b/eth-bytecode-db-extractors/job-queue/Cargo.toml index 7ac364d7b..317b3482b 100644 --- a/eth-bytecode-db-extractors/job-queue/Cargo.toml +++ b/eth-bytecode-db-extractors/job-queue/Cargo.toml @@ -13,7 +13,7 @@ sea-orm-migration = { version = "^0", optional = true } entity = [ "dep:sea-orm", ] -functions = [ +logic = [ "entity", ] migration = [ diff --git a/eth-bytecode-db-extractors/job-queue/src/lib.rs b/eth-bytecode-db-extractors/job-queue/src/lib.rs index cd040f490..373fd8b3d 100644 --- a/eth-bytecode-db-extractors/job-queue/src/lib.rs +++ b/eth-bytecode-db-extractors/job-queue/src/lib.rs @@ -1,10 +1,15 @@ #[cfg(feature = "entity")] pub mod entity; -#[cfg(feature = "functions")] -pub mod functions; -#[cfg(feature = "functions")] -pub mod processor; - #[cfg(feature = "migration")] pub mod migration; + +#[cfg(feature = "logic")] +mod functions; +#[cfg(feature = "logic")] +mod macros; + +#[cfg(feature = "logic")] +pub use functions::*; +#[cfg(feature = "logic")] +pub use macros::*; diff --git a/eth-bytecode-db-extractors/job-queue/src/macros.rs b/eth-bytecode-db-extractors/job-queue/src/macros.rs new file mode 100644 index 000000000..b803fe0de --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/macros.rs @@ -0,0 +1,35 @@ +#[macro_export] +macro_rules! process_result { + ( $db:expr, $result:expr, $job_id:expr $(, $arg_ident:ident = $arg_value:expr)*) => { + match $result { + Ok(res) => res, + Err(err) => { + let formatted_error = format!("{err:#}"); + + tracing::warn!( + $($arg_ident = %$arg_value, )* + error = formatted_error, + "error while processing the job" + ); + + $crate::mark_as_error( + $db, + $job_id, + Some(formatted_error), + ) + .await + .or_else(|err| { + let args = vec![ + $( format!("{}={}, ", stringify!($arg_ident), $arg_value), )* + ].join(", "); + let message = format!("saving job error details failed; {args}"); + + Err(err).context(message) + })?; + + continue; + } + } + }; +} +pub use process_result; diff --git a/eth-bytecode-db-extractors/job-queue/src/processor.rs b/eth-bytecode-db-extractors/job-queue/src/processor.rs deleted file mode 100644 index 322a4607f..000000000 --- a/eth-bytecode-db-extractors/job-queue/src/processor.rs +++ /dev/null @@ -1 +0,0 @@ -pub trait JobProcessor {} From 12672b2f221ac9352ebd420686ca31dc2b7eb687 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Mon, 30 Oct 2023 14:51:17 +0300 Subject: [PATCH 11/13] Add index on _job_queue.status --- eth-bytecode-db-extractors/job-queue/src/migration.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eth-bytecode-db-extractors/job-queue/src/migration.rs b/eth-bytecode-db-extractors/job-queue/src/migration.rs index cc8ab5887..cd24fe7bd 100644 --- a/eth-bytecode-db-extractors/job-queue/src/migration.rs +++ b/eth-bytecode-db-extractors/job-queue/src/migration.rs @@ -42,6 +42,8 @@ impl MigrationTrait for Migration { "log" varchar ); "#; + let create_index_job_queue_status = + "CREATE INDEX _job_queue_status_index ON _job_queue (status);"; let create_trigger_job_queue_set_modified_at = r#" CREATE TRIGGER "set_modified_at" BEFORE UPDATE ON "_job_queue" @@ -69,6 +71,7 @@ impl MigrationTrait for Migration { create_function_job_queue_set_modified_at, create_enum_job_status, create_table_job_queue, + create_index_job_queue_status, create_trigger_job_queue_set_modified_at, create_function_insert_job, ], @@ -79,6 +82,7 @@ impl MigrationTrait for Migration { async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { let drop_function_insert_job = "DROP FUNCTION _insert_job;"; let drop_trigger_job_queue_set_modified_at = "DROP TRIGGER set_modified_at ON _job_queue;"; + let drop_index_job_queue_status = "DROP INDEX _job_queue_status_index;"; let drop_table_job_queue = "DROP TABLE _job_queue;"; let drop_enum_job_status = "DROP TYPE _job_status;"; let drop_function_job_queue_set_modified_at = "DROP FUNCTION _job_queue_set_modified_at;"; @@ -89,6 +93,7 @@ impl MigrationTrait for Migration { &[ drop_function_insert_job, drop_trigger_job_queue_set_modified_at, + drop_index_job_queue_status, drop_table_job_queue, drop_enum_job_status, drop_function_job_queue_set_modified_at, From ff10a28cb6d901afb6e7035e1bafeee516991911 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Wed, 1 Nov 2023 11:36:58 +0300 Subject: [PATCH 12/13] Make message a String type for mark functions --- eth-bytecode-db-extractors/job-queue/src/functions.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/eth-bytecode-db-extractors/job-queue/src/functions.rs b/eth-bytecode-db-extractors/job-queue/src/functions.rs index 604afd57c..0b9c53b31 100644 --- a/eth-bytecode-db-extractors/job-queue/src/functions.rs +++ b/eth-bytecode-db-extractors/job-queue/src/functions.rs @@ -12,7 +12,7 @@ use sea_orm::{ pub async fn mark_as_success( db: &C, id: Uuid, - message: Option>, + message: Option, ) -> Result<(), DbErr> { update_status(db, JobStatus::Success, id, message).await } @@ -20,7 +20,7 @@ pub async fn mark_as_success( pub async fn mark_as_error( db: &C, id: Uuid, - message: Option>, + message: Option, ) -> Result<(), DbErr> { update_status(db, JobStatus::Error, id, message).await } @@ -29,12 +29,12 @@ async fn update_status( db: &C, new_status: JobStatus, id: Uuid, - message: Option>, + message: Option, ) -> Result<(), DbErr> { job_queue::ActiveModel { id: Set(id), status: Set(new_status), - log: Set(message.map(|msg| msg.into())), + log: Set(message), ..Default::default() } .update(db) From 9deb32eb05b055c59d4afd18e3a460b758009b14 Mon Sep 17 00:00:00 2001 From: Rim Rakhimov Date: Wed, 1 Nov 2023 13:13:51 +0300 Subject: [PATCH 13/13] Add index on _job_id for the connected relation --- ..._170541_create_contract_addresses_table.rs | 46 +++++++++---------- .../job-queue/src/migration.rs | 27 +++++++---- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs index 460b9d354..10597493c 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs @@ -29,35 +29,35 @@ impl MigrationTrait for Migration { FOR EACH ROW EXECUTE FUNCTION set_modified_at(); "#; - let create_trigger_insert_into_job_queue = - job_queue::migration::create_trigger_insert_job_statement("contract_addresses"); - - crate::from_statements( - manager, - &[ - create_contract_addresses_table, - create_trigger_set_modified_at, - &create_trigger_insert_into_job_queue, - ], - ) - .await + let create_job_queue_connection_statements = + job_queue::migration::create_job_queue_connection_statements("contract_addresses"); + + let mut statements = vec![ + create_contract_addresses_table, + create_trigger_set_modified_at, + ]; + statements.extend( + create_job_queue_connection_statements + .iter() + .map(|v| v.as_str()), + ); + + crate::from_statements(manager, &statements).await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let drop_trigger_insert_into_job_queue = - job_queue::migration::drop_trigger_insert_job_statement("contract_addresses"); + let drop_job_queue_connection_statements = + job_queue::migration::drop_job_queue_connection_statements("contract_addresses"); let drop_trigger_set_modified_at = "DROP TRIGGER trigger_set_modified_at ON contract_addresses;"; let drop_table_contract_addresses = "DROP TABLE contract_addresses;"; - crate::from_statements( - manager, - &[ - &drop_trigger_insert_into_job_queue, - drop_trigger_set_modified_at, - drop_table_contract_addresses, - ], - ) - .await + let mut statements = drop_job_queue_connection_statements + .iter() + .map(|v| v.as_str()) + .collect::>(); + statements.extend([drop_trigger_set_modified_at, drop_table_contract_addresses]); + + crate::from_statements(manager, &statements).await } } diff --git a/eth-bytecode-db-extractors/job-queue/src/migration.rs b/eth-bytecode-db-extractors/job-queue/src/migration.rs index cd24fe7bd..f64275c13 100644 --- a/eth-bytecode-db-extractors/job-queue/src/migration.rs +++ b/eth-bytecode-db-extractors/job-queue/src/migration.rs @@ -104,19 +104,26 @@ impl MigrationTrait for Migration { } } -pub fn create_trigger_insert_job_statement(relation: &str) -> String { - format!( +pub fn create_job_queue_connection_statements(relation: &str) -> Vec { + let create_trigger_insert_job = format!( r#" - CREATE TRIGGER insert_job - BEFORE INSERT ON {relation} - FOR EACH ROW - EXECUTE FUNCTION _insert_job(); - "# - ) + CREATE TRIGGER insert_job + BEFORE INSERT ON {relation} + FOR EACH ROW + EXECUTE FUNCTION _insert_job(); + "# + ); + let create_index_on_job_id = + format!("CREATE INDEX _{relation}_job_id_index ON {relation} (_job_id);"); + + vec![create_trigger_insert_job, create_index_on_job_id] } -pub fn drop_trigger_insert_job_statement(relation: &str) -> String { - format!("DROP TRIGGER insert_job ON {relation};") +pub fn drop_job_queue_connection_statements(relation: &str) -> Vec { + let drop_trigger_insert_job = format!("DROP TRIGGER insert_job ON {relation};"); + let drop_index_on_job_id = format!("DROP INDEX _{relation}_job_id_index;"); + + vec![drop_index_on_job_id, drop_trigger_insert_job] } async fn from_statements(manager: &SchemaManager<'_>, statements: &[&str]) -> Result<(), DbErr> {