From 64963c51b8190fecb7b117df0206ccd07c9cf413 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Mon, 15 Apr 2024 14:47:06 +0100 Subject: [PATCH] wip --- chain/ethereum/src/network.rs | 12 +- graph/src/components/adapter.rs | 208 ++++++++++++++++++++++++++++++++ graph/src/components/mod.rs | 2 + 3 files changed, 219 insertions(+), 3 deletions(-) create mode 100644 graph/src/components/adapter.rs diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 11b06eddb5a..6758ed2a9b9 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -2,10 +2,14 @@ use anyhow::{anyhow, bail}; use graph::cheap_clone::CheapClone; use graph::endpoint::EndpointMetrics; use graph::firehose::{AvailableCapacity, SubgraphLimit}; +use graph::parking_lot::RwLock; +use graph::prelude::chrono::{DateTime, Utc}; use graph::prelude::rand::seq::IteratorRandom; use graph::prelude::rand::{self, Rng}; +use jsonrpc_core::futures_util::future::join_all; use std::cmp::Ordering; use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU8}; use std::sync::Arc; pub use graph::impl_slog_value; @@ -46,9 +50,10 @@ impl EthereumNetworkAdapter { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct EthereumNetworkAdapters { - pub adapters: Vec, + // provider_status: ProviderGenesisVerification, + adapters: Vec, call_only_adapters: Vec, // Percentage of request that should be used to retest errored adapters. retest_percent: f64, @@ -66,6 +71,7 @@ impl EthereumNetworkAdapters { adapters: vec![], call_only_adapters: vec![], retest_percent: retest_percent.unwrap_or(DEFAULT_ADAPTER_ERROR_RETEST_PERCENT), + // provider_status: ProviderGenesisVerification::default(), } } @@ -76,7 +82,7 @@ impl EthereumNetworkAdapters { self.adapters.push(adapter); } } - pub fn all_cheapest_with( + fn all_cheapest_with( &self, required_capabilities: &NodeCapabilities, ) -> impl Iterator + '_ { diff --git a/graph/src/components/adapter.rs b/graph/src/components/adapter.rs new file mode 100644 index 00000000000..2565e7a5d8f --- /dev/null +++ b/graph/src/components/adapter.rs @@ -0,0 +1,208 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; +use itertools::Itertools; + +use slog::Logger; +use thiserror::Error; + +use crate::{blockchain::ChainIdentifier, prelude::error, tokio::sync::RwLock}; + +const VALIDATION_ATTEMPT_TTL_SECONDS: Duration = Duration::seconds(60 * 5); + +#[derive(Debug, Error)] +pub enum ProviderManagerError { + #[error("unknown error {0}")] + Unknown(#[from] anyhow::Error), + #[error("all providers for chain_id {0} have failed")] + AllProvidersFailed(ChainId), +} + +#[async_trait] +trait NetIdentifiable: Clone { + async fn net_identifiers(&self) -> Result; + fn provider_name(&self) -> ProviderName; +} + +pub type ProviderName = String; +pub type ChainId = String; + +#[derive(Debug, PartialEq, Eq)] +struct Ident { + provider: ProviderName, + chain_id: ChainId, +} + +/// ProviderCorrectness will maintain a list of providers which have had their +/// ChainIdentifiers checked. The first identifier is considered correct, if a later +/// provider for the same chain offers a different ChainIdentifier, this will be considered a +/// failed validation and it will be disabled. +#[derive(Debug)] +struct ProviderManager { + logger: Logger, + // usize is the status index so we can quickly check + adapters: HashMap>, + status: Vec<(Ident, RwLock)>, + // When an adapter is not available at the start and is a new chain, an identifier will not + // be available. This will be set to None + expected_idents: HashMap>, +} + +impl ProviderManager { + pub fn new(logger: Logger) -> Self { + Self { + logger, + adapters: HashMap::default(), + status: Vec::new(), + expected_idents: HashMap::default(), + } + } + + pub fn add_adapter(&mut self, chain_id: ChainId, adapter: T) { + let provider = adapter.provider_name(); + let ident = Ident { + provider, + chain_id: chain_id.clone(), + }; + + let index = match self.status.iter().find_position(|s| ident.eq(&s.0)) { + Some((index, _)) => index, + None => { + let index = self.status.len(); + self.status + .push((ident, RwLock::new(GenesisCheckStatus::NotChecked))); + + index + } + }; + + match self.adapters.get_mut(&chain_id) { + Some(entry) => entry.push((index, adapter)), + None => { + self.adapters.insert(chain_id, vec![(index, adapter)]); + } + } + } + + async fn is_all_verified(&self, adapters: &Vec<(usize, T)>) -> bool { + for (index, _) in adapters.iter() { + let status = self.status.get(*index).unwrap().1.read().await; + if *status == GenesisCheckStatus::Valid { + return false; + } + } + + true + } + + async fn get_verified_for_chain(&self, chain_id: &ChainId) -> Vec<&T> { + let mut out = vec![]; + let adapters = match self.adapters.get(chain_id) { + Some(adapters) => adapters, + None => return vec![], + }; + + for (index, adapter) in adapters.iter() { + let status = self.status.get(*index).unwrap().1.read().await; + if *status != GenesisCheckStatus::Valid { + continue; + } + out.push(adapter); + } + + out + } + + async fn get_status(&self, index: usize) -> GenesisCheckStatus { + match self.status.get(index) { + Some(status) => status.1.read().await.clone(), + None => GenesisCheckStatus::Failed, + } + } + + fn ttl_has_elapsed(ttl: &DateTime) -> bool { + ttl.checked_add_signed(VALIDATION_ATTEMPT_TTL_SECONDS) + .unwrap() + > Utc::now() + } + + fn should_verify(status: &GenesisCheckStatus) -> bool { + match status { + GenesisCheckStatus::TemporaryFailure { checked_at } + if Self::ttl_has_elapsed(checked_at) => + { + true + } + // Let check the provider + GenesisCheckStatus::NotChecked => true, + _ => false, + } + } + + async fn verify_provider(&self, index: usize, adapter: T) -> Result<(), ProviderManagerError> { + let status = self.get_status(index).await; + if !Self::should_verify(&status) { + return Ok(()); + } + + // unwrap: If index didn't exist it would have failed the previous check so it's safe + // to unwrap. + let status = self.status.get(index).unwrap().1.write().await; + // double check nothing has changed. + if !Self::should_verify(&status) { + return Ok(()); + } + + let chain_ident = adapter.net_identifiers().await?; + + Ok(()) + } + + async fn verify(&self, adapters: &Vec<(usize, T)>) -> Result<(), ProviderManagerError> { + let mut tasks = vec![]; + + for (index, adapter) in adapters.iter() {} + + Ok(()) + } + + pub async fn get_all(&self, chain_id: ChainId) -> Vec<&T> { + let adapters = match self.adapters.get(&chain_id) { + Some(adapters) => adapters, + None => return vec![], + }; + + if self.is_all_verified(&adapters).await { + return adapters.iter().map(|v| &v.1).collect(); + } + + match self.verify(adapters).await { + Ok(_) => {} + Err(error) => error!( + self.logger, + "unable to verify genesis for adapter: {}", + error.to_string() + ), + } + + self.get_verified_for_chain(&chain_id).await + } +} + +#[derive(Debug)] +struct Item { + index: u8, + item: T, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum GenesisCheckStatus { + NotChecked, + TemporaryFailure { checked_at: DateTime }, + Valid, + Failed, +} + +#[cfg(test)] +mod test {} diff --git a/graph/src/components/mod.rs b/graph/src/components/mod.rs index 79d698f8aa7..86ed7b3e0b8 100644 --- a/graph/src/components/mod.rs +++ b/graph/src/components/mod.rs @@ -60,6 +60,8 @@ pub mod metrics; /// Components dealing with versioning pub mod versions; +pub mod adapter; + /// A component that receives events of type `T`. pub trait EventConsumer { /// Get the event sink.