diff --git a/eth-bytecode-db-extractors/Cargo.lock b/eth-bytecode-db-extractors/Cargo.lock index d3a84a9be..83c43a8ac 100644 --- a/eth-bytecode-db-extractors/Cargo.lock +++ b/eth-bytecode-db-extractors/Cargo.lock @@ -705,6 +705,7 @@ dependencies = [ "eth-bytecode-db-proto 0.1.0 (git+https://github.com/blockscout/blockscout-rs?rev=41fc491)", "futures", "governor", + "job-queue", "reqwest", "reqwest-middleware", "reqwest-rate-limiter", @@ -738,6 +739,7 @@ dependencies = [ name = "blockscout-entity" version = "0.1.0" dependencies = [ + "job-queue", "sea-orm", ] @@ -746,6 +748,7 @@ name = "blockscout-migration" version = "0.1.0" dependencies = [ "async-std", + "job-queue", "sea-orm-migration", ] @@ -2009,6 +2012,14 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "job-queue" +version = "0.1.0" +dependencies = [ + "sea-orm", + "sea-orm-migration", +] + [[package]] name = "jobserver" version = "0.1.27" diff --git a/eth-bytecode-db-extractors/Cargo.toml b/eth-bytecode-db-extractors/Cargo.toml index 841ffa829..f9ae4b5ca 100644 --- a/eth-bytecode-db-extractors/Cargo.toml +++ b/eth-bytecode-db-extractors/Cargo.toml @@ -7,4 +7,5 @@ members = [ "smart-contract-fiesta", "smart-contract-fiesta-entity", "smart-contract-fiesta-migration", + "job-queue", ] diff --git a/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml b/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml index 9d95a9e98..58479571e 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout-entity/Cargo.toml @@ -8,4 +8,5 @@ name = "entity" path = "src/lib.rs" [dependencies] -sea-orm = { version = "^0" } \ No newline at end of file +sea-orm = { version = "^0" } +job-queue = { path = "../job-queue", features = ["entity"] } \ No newline at end of file diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs index 6865b818c..7c90dd172 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_addresses.rs @@ -1,6 +1,6 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 -use super::sea_orm_active_enums::{Language, Status}; +use super::sea_orm_active_enums::Language; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] @@ -19,16 +19,30 @@ pub struct Model { pub verified_at: DateTimeWithTimeZone, pub language: Language, pub compiler_version: String, - pub status: Status, - pub log: Option, + #[sea_orm(column_name = "_job_id")] + pub job_id: Uuid, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm( + belongs_to = "super::job_queue::Entity", + from = "Column::JobId", + to = "super::job_queue::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + JobQueue, #[sea_orm(has_many = "super::contract_details::Entity")] ContractDetails, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::JobQueue.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::ContractDetails.def() diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs index 3b8abbd73..e0db76a43 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/contract_details.rs @@ -55,4 +55,17 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + super::contract_addresses::Relation::JobQueue.def() + } + fn via() -> Option { + Some( + super::contract_addresses::Relation::ContractDetails + .def() + .rev(), + ) + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs b/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs index bb13562fa..385b06313 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/lib.rs @@ -5,3 +5,5 @@ pub mod prelude; pub mod contract_addresses; pub mod contract_details; pub mod sea_orm_active_enums; + +pub use job_queue::entity as job_queue; diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs b/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs index c30e3bab7..1531687c4 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/prelude.rs @@ -3,3 +3,4 @@ pub use super::{ contract_addresses::Entity as ContractAddresses, contract_details::Entity as ContractDetails, }; +pub use job_queue::entity::Entity as JobQueue; diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs b/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs index bf8f359fa..48a46b25f 100644 --- a/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs +++ b/eth-bytecode-db-extractors/blockscout-entity/src/sea_orm_active_enums.rs @@ -12,15 +12,3 @@ pub enum Language { #[sea_orm(string_value = "yul")] Yul, } -#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "status")] -pub enum Status { - #[sea_orm(string_value = "error")] - Error, - #[sea_orm(string_value = "in_process")] - InProcess, - #[sea_orm(string_value = "success")] - Success, - #[sea_orm(string_value = "waiting")] - Waiting, -} diff --git a/eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs b/eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs new file mode 100644 index 000000000..8f5d22a64 --- /dev/null +++ b/eth-bytecode-db-extractors/blockscout-entity/src/seaql_migrations.rs @@ -0,0 +1,16 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "seaql_migrations")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version: String, + pub applied_at: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml b/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml index 9d467b9e1..3d6147db2 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout-migration/Cargo.toml @@ -10,6 +10,7 @@ path = "src/lib.rs" [dependencies] async-std = { version = "1", features = ["attributes", "tokio1"] } +job-queue = { path = "../job-queue", features = ["migration"] } [dependencies.sea-orm-migration] version = "^0" diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs b/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs index 6dfc6d7bd..b33265fbe 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs +++ b/eth-bytecode-db-extractors/blockscout-migration/src/lib.rs @@ -3,7 +3,6 @@ use sea_orm_migration::sea_orm::{Statement, TransactionTrait}; mod m20230426_170496_create_functions; mod m20230426_170508_create_language_enum; -mod m20230426_170520_create_status_enum; mod m20230426_170541_create_contract_addresses_table; mod m20230426_170553_create_contract_details_table; @@ -13,9 +12,9 @@ pub struct Migrator; impl MigratorTrait for Migrator { fn migrations() -> Vec> { vec![ + Box::new(job_queue::migration::Migration), Box::new(m20230426_170496_create_functions::Migration), Box::new(m20230426_170508_create_language_enum::Migration), - Box::new(m20230426_170520_create_status_enum::Migration), Box::new(m20230426_170541_create_contract_addresses_table::Migration), Box::new(m20230426_170553_create_contract_details_table::Migration), ] diff --git a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs index c8132fc00..10597493c 100644 --- a/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs +++ b/eth-bytecode-db-extractors/blockscout-migration/src/m20230426_170541_create_contract_addresses_table.rs @@ -6,7 +6,7 @@ pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let sql = r#" + let create_contract_addresses_table = r#" CREATE TABLE "contract_addresses" ( "contract_address" bytea NOT NULL, "chain_id" numeric NOT NULL, @@ -18,27 +18,46 @@ impl MigrationTrait for Migration { "language" language NOT NULL, "compiler_version" varchar NOT NULL, - "status" status NOT NULL DEFAULT 'waiting', - "log" varchar, + "_job_id" uuid NOT NULL REFERENCES "_job_queue" ("id"), PRIMARY KEY ("contract_address", "chain_id") ); - + "#; + let create_trigger_set_modified_at = r#" CREATE TRIGGER trigger_set_modified_at - BEFORE INSERT ON contract_addresses + BEFORE UPDATE ON contract_addresses FOR EACH ROW EXECUTE FUNCTION set_modified_at(); "#; - - crate::from_sql(manager, sql).await + let create_job_queue_connection_statements = + job_queue::migration::create_job_queue_connection_statements("contract_addresses"); + + let mut statements = vec![ + create_contract_addresses_table, + create_trigger_set_modified_at, + ]; + statements.extend( + create_job_queue_connection_statements + .iter() + .map(|v| v.as_str()), + ); + + crate::from_statements(manager, &statements).await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let sql = r#" - DROP TRIGGER trigger_set_modified_at ON contract_addresses; - DROP TABLE contract_addresses; - "#; - - crate::from_sql(manager, sql).await + let drop_job_queue_connection_statements = + job_queue::migration::drop_job_queue_connection_statements("contract_addresses"); + let drop_trigger_set_modified_at = + "DROP TRIGGER trigger_set_modified_at ON contract_addresses;"; + let drop_table_contract_addresses = "DROP TABLE contract_addresses;"; + + let mut statements = drop_job_queue_connection_statements + .iter() + .map(|v| v.as_str()) + .collect::>(); + statements.extend([drop_trigger_set_modified_at, drop_table_contract_addresses]); + + crate::from_statements(manager, &statements).await } } diff --git a/eth-bytecode-db-extractors/blockscout/Cargo.toml b/eth-bytecode-db-extractors/blockscout/Cargo.toml index 2e32af914..6a6ee7b6c 100644 --- a/eth-bytecode-db-extractors/blockscout/Cargo.toml +++ b/eth-bytecode-db-extractors/blockscout/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] blockscout-entity = { path = "../blockscout-entity" } blockscout-migration = { path = "../blockscout-migration" } +job-queue = { path = "../job-queue", features = ["logic"] } anyhow = "1.0.70" async-trait = "0.1" diff --git a/eth-bytecode-db-extractors/blockscout/src/client.rs b/eth-bytecode-db-extractors/blockscout/src/client.rs index 748c257ca..61836b33e 100644 --- a/eth-bytecode-db-extractors/blockscout/src/client.rs +++ b/eth-bytecode-db-extractors/blockscout/src/client.rs @@ -7,43 +7,13 @@ use eth_bytecode_db_proto::blockscout::eth_bytecode_db::v2::{ VerifyVyperStandardJsonRequest, }; use sea_orm::{ - sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ConnectionTrait, DatabaseBackend, - DatabaseConnection, DbErr, EntityTrait, Statement, + prelude::Uuid, sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ColumnTrait, + DatabaseConnection, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait, + Select, }; use serde::Serialize; use std::sync::Arc; -macro_rules! process_result { - ( $result:expr, $self:expr, $contract_address:expr) => { - match $result { - Ok(res) => res, - Err(err) => { - tracing::warn!( - contract_address = $contract_address.to_string(), - error = format!("{err:#}"), - "Error processing contract" - ); - - contract_addresses::ActiveModel { - contract_address: Set($contract_address.to_vec()), - chain_id: Set($self.chain_id.into()), - status: Set(sea_orm_active_enums::Status::Error), - log: Set(Some(format!("{:#?}", err))), - ..Default::default() - } - .update($self.db_client.as_ref()) - .await - .context(format!( - "saving error details failed; contract={}, chain_id={}", - $contract_address, $self.chain_id, - ))?; - - continue; - } - } - }; -} - #[derive(Debug, Serialize)] struct StandardJson { language: String, @@ -160,25 +130,31 @@ impl Client { while let Some(contract_model) = self.next_contract().await? { processed += 1; let contract_address = Bytes::from(contract_model.contract_address.clone()); + let job_id = contract_model.job_id; tracing::info!( contract_address = contract_address.to_string(), "contract processed" ); - let contract_details_model = process_result!( + let contract_details_model = job_queue::process_result!( + self.db_client.as_ref(), self.import_contract_details(contract_address.clone()).await, - &self, - contract_address + job_id, + contract_address = contract_address, + chain_id = self.chain_id ); - let source = process_result!( + let source = job_queue::process_result!( + self.db_client.as_ref(), self.verify_contract(contract_model, contract_details_model) .await, - &self, - contract_address + job_id, + contract_address = contract_address ); - self.mark_as_success(contract_address, source).await?; + + self.mark_as_success(job_id, contract_address, source) + .await?; } Ok(processed) @@ -354,20 +330,18 @@ impl Client { async fn mark_as_success( &self, + job_id: Uuid, contract_address: Bytes, - source: eth_bytecode_db::Source, + source: Source, ) -> anyhow::Result<()> { - contract_addresses::ActiveModel { - contract_address: Set(contract_address.to_vec()), - chain_id: Set(self.chain_id.into()), - status: Set(sea_orm_active_enums::Status::Success), - log: Set(Some( + job_queue::mark_as_success( + self.db_client.as_ref(), + job_id, + Some( serde_json::to_string(&source) .context("serializing success result (source) failed")?, - )), - ..Default::default() - } - .update(self.db_client.as_ref()) + ), + ) .await .context(format!( "saving success details failed for the contract {}", @@ -379,51 +353,24 @@ impl Client { async fn next_contract(&self) -> anyhow::Result> { // Notice that we are looking only for contracts with given `chain_id` - let next_contract_address_sql = format!( - r#" - UPDATE contract_addresses - SET status = 'in_process' - WHERE contract_address = (SELECT contract_address - FROM contract_addresses - WHERE status = 'waiting' - AND chain_id = {} - LIMIT 1 FOR UPDATE SKIP LOCKED) - RETURNING contract_address; - "#, - self.chain_id - ); - - let next_contract_address_stmt = Statement::from_string( - DatabaseBackend::Postgres, - next_contract_address_sql.to_string(), - ); - - let next_contract_address = self - .db_client - .as_ref() - .query_one(next_contract_address_stmt) - .await - .context("querying for the next contract address")? - .map(|query_result| { - query_result - .try_get_by::, _>("contract_address") - .expect("error while try_get_by contract_address") - }); - - if let Some(contract_address) = next_contract_address { - let model = contract_addresses::Entity::find_by_id(( - contract_address.clone(), - self.chain_id.into(), - )) - .one(self.db_client.as_ref()) - .await - .expect("querying contract_address model failed") - .ok_or_else(|| { - anyhow::anyhow!( - "contract_address model does not exist for the contract: {}", - Bytes::from(contract_address), - ) - })?; + let chain_id_filter = |select: Select| { + select + .join_rev(JoinType::Join, contract_addresses::Relation::JobQueue.def()) + .filter(contract_addresses::Column::ChainId.eq(self.chain_id)) + }; + + let next_job_id = + job_queue::next_job_id_with_filter(self.db_client.as_ref(), chain_id_filter) + .await + .context("querying the next_job_id")?; + + if let Some(job_id) = next_job_id { + let model = contract_addresses::Entity::find() + .filter(contract_addresses::Column::JobId.eq(job_id)) + .one(self.db_client.as_ref()) + .await + .context("querying contract_address model")? + .expect("contract_address model does not exist"); return Ok(Some(model)); } diff --git a/eth-bytecode-db-extractors/job-queue/Cargo.toml b/eth-bytecode-db-extractors/job-queue/Cargo.toml new file mode 100644 index 000000000..317b3482b --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "job-queue" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +sea-orm = { version = "^0", optional = true } +sea-orm-migration = { version = "^0", optional = true } + +[features] +entity = [ + "dep:sea-orm", +] +logic = [ + "entity", +] +migration = [ + "dep:sea-orm-migration", +] + diff --git a/eth-bytecode-db-extractors/job-queue/src/entity.rs b/eth-bytecode-db-extractors/job-queue/src/entity.rs new file mode 100644 index 000000000..df709de2c --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/entity.rs @@ -0,0 +1,32 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 + +use sea_orm::entity::prelude::*; + +#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "_job_status")] +pub enum JobStatus { + #[sea_orm(string_value = "error")] + Error, + #[sea_orm(string_value = "in_process")] + InProcess, + #[sea_orm(string_value = "success")] + Success, + #[sea_orm(string_value = "waiting")] + Waiting, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "_job_queue")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub created_at: DateTime, + pub modified_at: DateTime, + pub status: JobStatus, + pub log: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/eth-bytecode-db-extractors/job-queue/src/functions.rs b/eth-bytecode-db-extractors/job-queue/src/functions.rs new file mode 100644 index 000000000..0b9c53b31 --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/functions.rs @@ -0,0 +1,106 @@ +use super::entity as job_queue; +use crate::entity::JobStatus; +use sea_orm::{ + prelude::Uuid, + sea_query::{LockBehavior, LockType, SelectStatement}, + ActiveModelTrait, + ActiveValue::Set, + ColumnTrait, ConnectionTrait, DatabaseBackend, DbErr, EntityTrait, QueryFilter, QuerySelect, + QueryTrait, Select, Statement, +}; + +pub async fn mark_as_success( + db: &C, + id: Uuid, + message: Option, +) -> Result<(), DbErr> { + update_status(db, JobStatus::Success, id, message).await +} + +pub async fn mark_as_error( + db: &C, + id: Uuid, + message: Option, +) -> Result<(), DbErr> { + update_status(db, JobStatus::Error, id, message).await +} + +async fn update_status( + db: &C, + new_status: JobStatus, + id: Uuid, + message: Option, +) -> Result<(), DbErr> { + job_queue::ActiveModel { + id: Set(id), + status: Set(new_status), + log: Set(message), + ..Default::default() + } + .update(db) + .await + .map(|_| ()) +} + +pub async fn next_job_id(db: &C) -> Result, DbErr> { + let noop_filter = |select| select; + next_job_id_with_filter(db, noop_filter).await +} + +pub async fn next_job_id_with_filter( + db: &C, + related_tables_filter: F, +) -> Result, DbErr> +where + F: Fn(Select) -> Select, +{ + let statement = next_job_id_statement(related_tables_filter); + Ok(db.query_one(statement).await?.map(|query_result| { + query_result + .try_get_by::("id") + .expect("error while try_get_by id") + })) +} + +fn next_job_id_statement(related_tables_filter: F) -> Statement +where + F: Fn(Select) -> Select, +{ + let subexpression = next_job_id_where_subexpression(related_tables_filter); + let sql = format!( + r#" + UPDATE _job_queue + SET status = 'in_process' + WHERE id = ({}) + RETURNING _job_queue.id; + "#, + subexpression.to_string(sea_orm::sea_query::PostgresQueryBuilder) + ); + + Statement::from_string(DatabaseBackend::Postgres, sql.to_string()) +} + +/// Unwraps into (example): +/// ` +/// SELECT +/// FROM _job_queue JOIN contract_address +/// ON _job_queue.id = contract_addresses._job_ +/// WHERE _job_queue.status = 'waiting' +/// AND contract_addresses.chain_id = {} +/// LIMIT 1 FOR UPDATE SKIP LOCKED +/// ` +fn next_job_id_where_subexpression(related_tables_filter: F) -> SelectStatement +where + F: Fn(Select) -> Select, +{ + let mut where_id_subexpression = job_queue::Entity::find() + .select_only() + .column(job_queue::Column::Id) + .filter(job_queue::Column::Status.eq(job_queue::JobStatus::Waiting)) + .limit(1); + where_id_subexpression = related_tables_filter(where_id_subexpression); + where_id_subexpression + .into_query() + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .to_owned() +} diff --git a/eth-bytecode-db-extractors/job-queue/src/lib.rs b/eth-bytecode-db-extractors/job-queue/src/lib.rs new file mode 100644 index 000000000..373fd8b3d --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/lib.rs @@ -0,0 +1,15 @@ +#[cfg(feature = "entity")] +pub mod entity; + +#[cfg(feature = "migration")] +pub mod migration; + +#[cfg(feature = "logic")] +mod functions; +#[cfg(feature = "logic")] +mod macros; + +#[cfg(feature = "logic")] +pub use functions::*; +#[cfg(feature = "logic")] +pub use macros::*; diff --git a/eth-bytecode-db-extractors/job-queue/src/macros.rs b/eth-bytecode-db-extractors/job-queue/src/macros.rs new file mode 100644 index 000000000..b803fe0de --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/macros.rs @@ -0,0 +1,35 @@ +#[macro_export] +macro_rules! process_result { + ( $db:expr, $result:expr, $job_id:expr $(, $arg_ident:ident = $arg_value:expr)*) => { + match $result { + Ok(res) => res, + Err(err) => { + let formatted_error = format!("{err:#}"); + + tracing::warn!( + $($arg_ident = %$arg_value, )* + error = formatted_error, + "error while processing the job" + ); + + $crate::mark_as_error( + $db, + $job_id, + Some(formatted_error), + ) + .await + .or_else(|err| { + let args = vec![ + $( format!("{}={}, ", stringify!($arg_ident), $arg_value), )* + ].join(", "); + let message = format!("saving job error details failed; {args}"); + + Err(err).context(message) + })?; + + continue; + } + } + }; +} +pub use process_result; diff --git a/eth-bytecode-db-extractors/job-queue/src/migration.rs b/eth-bytecode-db-extractors/job-queue/src/migration.rs new file mode 100644 index 000000000..f64275c13 --- /dev/null +++ b/eth-bytecode-db-extractors/job-queue/src/migration.rs @@ -0,0 +1,140 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{Statement, TransactionTrait}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let create_extension_pgcrypto = r#" + -- Needed for gen_random_uuid() + CREATE EXTENSION IF NOT EXISTS "pgcrypto"; + "#; + let create_function_job_queue_set_modified_at = r#" + CREATE OR REPLACE FUNCTION _job_queue_set_modified_at() + RETURNS TRIGGER AS + $$ + BEGIN + NEW.modified_at = now(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + "#; + let create_enum_job_status = r#" + CREATE TYPE "_job_status" AS ENUM ( + 'waiting', + 'in_process', + 'success', + 'error' + ); + "#; + let create_table_job_queue = r#" + CREATE TABLE "_job_queue" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + + "created_at" timestamp NOT NULL DEFAULT (now()), + "modified_at" timestamp NOT NULL DEFAULT (now()), + + "status" _job_status NOT NULL DEFAULT 'waiting', + "log" varchar + ); + "#; + let create_index_job_queue_status = + "CREATE INDEX _job_queue_status_index ON _job_queue (status);"; + let create_trigger_job_queue_set_modified_at = r#" + CREATE TRIGGER "set_modified_at" + BEFORE UPDATE ON "_job_queue" + FOR EACH ROW + EXECUTE FUNCTION _job_queue_set_modified_at(); + "#; + let create_function_insert_job = r#" + CREATE OR REPLACE FUNCTION _insert_job() + RETURNS TRIGGER AS $$ + BEGIN + -- Insert a new row into the jobs table and get the ID + INSERT INTO _job_queue DEFAULT VALUES + RETURNING id INTO NEW._job_id; + + -- Update the jobs_id in the contract_addresses table + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + "#; + + from_statements( + manager, + &[ + create_extension_pgcrypto, + create_function_job_queue_set_modified_at, + create_enum_job_status, + create_table_job_queue, + create_index_job_queue_status, + create_trigger_job_queue_set_modified_at, + create_function_insert_job, + ], + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let drop_function_insert_job = "DROP FUNCTION _insert_job;"; + let drop_trigger_job_queue_set_modified_at = "DROP TRIGGER set_modified_at ON _job_queue;"; + let drop_index_job_queue_status = "DROP INDEX _job_queue_status_index;"; + let drop_table_job_queue = "DROP TABLE _job_queue;"; + let drop_enum_job_status = "DROP TYPE _job_status;"; + let drop_function_job_queue_set_modified_at = "DROP FUNCTION _job_queue_set_modified_at;"; + let drop_extension_pgcrypto = "DROP EXTENSION pgcrypto;"; + + from_statements( + manager, + &[ + drop_function_insert_job, + drop_trigger_job_queue_set_modified_at, + drop_index_job_queue_status, + drop_table_job_queue, + drop_enum_job_status, + drop_function_job_queue_set_modified_at, + drop_extension_pgcrypto, + ], + ) + .await + } +} + +pub fn create_job_queue_connection_statements(relation: &str) -> Vec { + let create_trigger_insert_job = format!( + r#" + CREATE TRIGGER insert_job + BEFORE INSERT ON {relation} + FOR EACH ROW + EXECUTE FUNCTION _insert_job(); + "# + ); + let create_index_on_job_id = + format!("CREATE INDEX _{relation}_job_id_index ON {relation} (_job_id);"); + + vec![create_trigger_insert_job, create_index_on_job_id] +} + +pub fn drop_job_queue_connection_statements(relation: &str) -> Vec { + let drop_trigger_insert_job = format!("DROP TRIGGER insert_job ON {relation};"); + let drop_index_on_job_id = format!("DROP INDEX _{relation}_job_id_index;"); + + vec![drop_index_on_job_id, drop_trigger_insert_job] +} + +async fn from_statements(manager: &SchemaManager<'_>, statements: &[&str]) -> Result<(), DbErr> { + let txn = manager.get_connection().begin().await?; + for statement in statements { + txn.execute(Statement::from_string( + manager.get_database_backend(), + statement.to_string(), + )) + .await + .map_err(|err| DbErr::Migration(format!("{err}\nQuery: {statement}")))?; + } + txn.commit().await +}