diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index f37b1ffa235..83f2e311e82 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -92,25 +92,30 @@ impl Default for EthereumNetworkAdapters { impl EthereumNetworkAdapters { #[cfg(debug_assertions)] - pub fn for_testing( - adapters: Vec, + pub async fn for_testing( + mut adapters: Vec, call_only: Vec, ) -> Self { + use std::cmp::Ordering; + use graph::slog::{o, Discard, Logger}; use graph::components::adapter::MockIdentValidator; let chain_id: ChainId = "testing".into(); + adapters.sort_by(|a, b| { + a.capabilities + .partial_cmp(&b.capabilities) + .unwrap_or(Ordering::Equal) + }); - Self::new( - chain_id.clone(), - ProviderManager::new( - Logger::root(Discard, o!()), - vec![(chain_id, adapters)].into_iter(), - Arc::new(MockIdentValidator), - ), - call_only, - None, - ) + let provider = ProviderManager::new( + Logger::root(Discard, o!()), + vec![(chain_id.clone(), adapters)].into_iter(), + Arc::new(MockIdentValidator), + ); + provider.mark_all_valid().await; + + Self::new(chain_id, provider, call_only, None) } pub fn new( @@ -515,7 +520,8 @@ mod tests { eth_call_adapter.clone(), SubgraphLimit::Limit(3), )], - ); + ) + .await; // one reference above and one inside adapters struct assert_eq!(Arc::strong_count(ð_call_adapter), 2); assert_eq!(Arc::strong_count(ð_adapter), 2); @@ -608,7 +614,7 @@ mod tests { archive: true, traces: false, }, - eth_adapter.clone(), + eth_call_adapter.clone(), SubgraphLimit::Unlimited, )], vec![EthereumNetworkAdapter::new( @@ -617,14 +623,17 @@ mod tests { archive: true, traces: false, }, - eth_call_adapter.clone(), - SubgraphLimit::Limit(3), + eth_adapter.clone(), + SubgraphLimit::Limit(2), )], - ); + ) + .await; // one reference above and one inside adapters struct assert_eq!(Arc::strong_count(ð_call_adapter), 2); assert_eq!(Arc::strong_count(ð_adapter), 2); + // verify that after all call_only were exhausted, we can still + // get normal adapters let keep: Vec> = vec![0; 10] .iter() .map(|_| adapters.call_or_cheapest(None).unwrap()) @@ -688,7 +697,8 @@ mod tests { eth_adapter.clone(), SubgraphLimit::Limit(3), )], - ); + ) + .await; // one reference above and one inside adapters struct assert_eq!(Arc::strong_count(ð_call_adapter), 2); assert_eq!(Arc::strong_count(ð_adapter), 2); @@ -734,7 +744,8 @@ mod tests { SubgraphLimit::Limit(3), )], vec![], - ); + ) + .await; // one reference above and one inside adapters struct assert_eq!(Arc::strong_count(ð_adapter), 2); assert_eq!( @@ -829,6 +840,8 @@ mod tests { .into_iter(), Arc::new(MockIdentValidator), ); + manager.mark_all_valid().await; + let no_retest_adapters = EthereumNetworkAdapters::new( chain_id.clone(), manager.cheap_clone(), @@ -917,17 +930,13 @@ mod tests { }); let manager = ProviderManager::::new( logger.clone(), - vec![( - chain_id.clone(), - no_retest_adapters - .iter() - .cloned() - .chain(always_retest_adapters.iter().cloned()) - .collect(), - )] - .into_iter(), + always_retest_adapters + .iter() + .cloned() + .map(|a| (chain_id.clone(), vec![a])), Arc::new(MockIdentValidator), ); + manager.mark_all_valid().await; let always_retest_adapters = EthereumNetworkAdapters::new( chain_id.clone(), @@ -946,6 +955,17 @@ mod tests { .provider(), no_error_provider ); + + let manager = ProviderManager::::new( + logger.clone(), + no_retest_adapters + .iter() + .cloned() + .map(|a| (chain_id.clone(), vec![a])), + Arc::new(MockIdentValidator), + ); + manager.mark_all_valid().await; + let no_retest_adapters = EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64)); assert_eq!( @@ -986,6 +1006,8 @@ mod tests { .into_iter(), Arc::new(MockIdentValidator), ); + manager.mark_all_valid().await; + let no_available_adapter = EthereumNetworkAdapters::new(chain_id, manager, vec![], None); let res = no_available_adapter .cheapest_with(&NodeCapabilities { diff --git a/graph/src/components/adapter.rs b/graph/src/components/adapter.rs index c72d54c932f..714643fe15f 100644 --- a/graph/src/components/adapter.rs +++ b/graph/src/components/adapter.rs @@ -64,7 +64,7 @@ struct Ident { chain_id: ChainId, } -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum IdentValidatorError { #[error("Store ident wasn't set")] UnsetIdent, @@ -235,7 +235,7 @@ impl ProviderManager { .unwrap_or_default() } - #[cfg(test)] + #[cfg(debug_assertions)] pub async fn mark_all_valid(&self) { for (_, status) in self.inner.status.iter() { let mut s = status.write().await; @@ -533,89 +533,64 @@ mod test { use crate::{blockchain::ChainIdentifier, components::adapter::ProviderManagerError}; - use super::{NetIdentifiable, ProviderManager, ProviderName, VALIDATION_ATTEMPT_TTL_SECONDS}; + use super::{ + IdentValidator, IdentValidatorError, NetIdentifiable, ProviderManager, ProviderName, + VALIDATION_ATTEMPT_TTL_SECONDS, + }; - const FAILED_CHAIN: &str = "failed"; - const VALID_CHAIN: &str = "valid"; - const TESTABLE_CHAIN: &str = "testable"; - const UNTESTABLE_CHAIN: &str = "untestable"; - const EMPTY_CHAIN: &str = "empty"; + const TEST_CHAIN_ID: &str = "valid"; lazy_static! { - static ref VALID_IDENT: ChainIdentifier = ChainIdentifier { - net_version: VALID_CHAIN.into(), - genesis_block_hash: BlockHash::default(), - }; - static ref FAILED_IDENT: ChainIdentifier = ChainIdentifier { - net_version: FAILED_CHAIN.into(), - genesis_block_hash: BlockHash::default(), - }; static ref UNTESTABLE_ADAPTER: MockAdapter = MockAdapter{ - provider: UNTESTABLE_CHAIN.into(), - ident: FAILED_IDENT.clone(), - checked_at: Some(Utc::now()), + provider: "untestable".into(), + status: GenesisCheckStatus::TemporaryFailure { checked_at: Utc::now()}, }; // way past TTL, ready to check again static ref TESTABLE_ADAPTER: MockAdapter = MockAdapter{ - provider: TESTABLE_CHAIN.into(), - ident: VALID_IDENT.clone(), - checked_at: Some(Utc::now().sub(Duration::seconds(10000000))), + provider: "testable".into(), + status: GenesisCheckStatus::TemporaryFailure { checked_at: Utc::now().sub(Duration::seconds(10000000)) }, }; - static ref VALID_ADAPTER: MockAdapter = MockAdapter::valid(); - static ref FAILED_ADAPTER: MockAdapter = MockAdapter::failed(); + static ref VALID_ADAPTER: MockAdapter = MockAdapter {provider: "valid".into(), status: GenesisCheckStatus::Valid,}; + static ref FAILED_ADAPTER: MockAdapter = MockAdapter {provider: "FAILED".into(), status: GenesisCheckStatus::Failed,}; } - #[derive(Clone, PartialEq, Eq, Debug)] - struct MockAdapter { - provider: Word, - ident: ChainIdentifier, - checked_at: Option>, + struct TestValidator { + result: Result<(), IdentValidatorError>, } - impl MockAdapter { - fn failed() -> Self { - Self { - provider: FAILED_CHAIN.into(), - ident: FAILED_IDENT.clone(), - checked_at: None, - } - } - - fn valid() -> Self { - Self { - provider: VALID_CHAIN.into(), - ident: VALID_IDENT.clone(), - checked_at: None, - } + impl IdentValidator for TestValidator { + fn check_ident( + &self, + _chain_id: &ChainId, + _ident: &ChainIdentifier, + ) -> Result<(), IdentValidatorError> { + self.result.clone() } + } - fn testable(ttl: DateTime) -> Self { - Self { - provider: TESTABLE_CHAIN.into(), - ident: VALID_IDENT.clone(), - checked_at: Some(ttl), - } - } + #[derive(Clone, PartialEq, Eq, Debug)] + struct MockAdapter { + provider: Word, + status: GenesisCheckStatus, } #[async_trait] impl NetIdentifiable for MockAdapter { async fn net_identifiers(&self) -> Result { - match self.checked_at { - Some(checked_at) + match self.status { + GenesisCheckStatus::TemporaryFailure { checked_at } if checked_at > Utc::now().sub(Duration::seconds(VALIDATION_ATTEMPT_TTL_SECONDS)) => { - unreachable!("should never check if ttl has not elapsed") + unreachable!("should never check if ttl has not elapsed"); } - _ => {} + _ => Ok(ChainIdentifier::default()), } - - Ok(self.ident.clone()) } + fn provider_name(&self) -> ProviderName { self.provider.clone() } @@ -626,100 +601,78 @@ mod test { struct Case<'a> { name: &'a str, chain_id: &'a str, - status: Vec<(ProviderName, GenesisCheckStatus)>, adapters: Vec<(ChainId, Vec)>, - idents: Vec<(ChainId, Option)>, + validator: Option, expected: Result, ProviderManagerError>, } let cases = vec![ Case { name: "no adapters", - chain_id: EMPTY_CHAIN, - status: vec![], + chain_id: TEST_CHAIN_ID, adapters: vec![], - idents: vec![(VALID_CHAIN.into(), None)], + validator: None, expected: Ok(vec![]), }, Case { name: "adapter temporary failure with Ident None", - chain_id: VALID_CHAIN, - status: vec![( - UNTESTABLE_CHAIN.into(), - GenesisCheckStatus::TemporaryFailure { - checked_at: UNTESTABLE_ADAPTER.checked_at.unwrap(), - }, - )], + chain_id: TEST_CHAIN_ID, // UNTESTABLE_ADAPTER has failed ident, will be valid cause idents has None value - adapters: vec![(VALID_CHAIN.into(), vec![UNTESTABLE_ADAPTER.clone()])], - idents: vec![(VALID_CHAIN.into(), None)], + adapters: vec![(TEST_CHAIN_ID.into(), vec![UNTESTABLE_ADAPTER.clone()])], + validator: None, expected: Err(ProviderManagerError::NoProvidersAvailable( - VALID_CHAIN.into(), + TEST_CHAIN_ID.into(), )), }, Case { name: "adapter temporary failure", - chain_id: VALID_CHAIN, - status: vec![( - UNTESTABLE_CHAIN.into(), - GenesisCheckStatus::TemporaryFailure { - checked_at: Utc::now(), - }, - )], - adapters: vec![(VALID_CHAIN.into(), vec![UNTESTABLE_ADAPTER.clone()])], - idents: vec![(VALID_CHAIN.into(), Some(FAILED_IDENT.clone()))], + chain_id: TEST_CHAIN_ID, + adapters: vec![(TEST_CHAIN_ID.into(), vec![UNTESTABLE_ADAPTER.clone()])], + validator: None, expected: Err(ProviderManagerError::NoProvidersAvailable( - VALID_CHAIN.into(), + TEST_CHAIN_ID.into(), )), }, - Case { - name: "chain ident None", - chain_id: VALID_CHAIN, - // Failed adapter has VALID_CHAIN as the ident, which is not validated if - // the expected ident is None - status: vec![], - adapters: vec![(VALID_CHAIN.into(), vec![FAILED_ADAPTER.clone()])], - idents: vec![(VALID_CHAIN.into(), None)], - expected: Ok(vec![&FAILED_ADAPTER]), - }, Case { name: "wrong chain ident", - chain_id: VALID_CHAIN, - status: vec![], - adapters: vec![(VALID_CHAIN.into(), vec![MockAdapter::failed()])], - idents: vec![(VALID_CHAIN.into(), Some(VALID_IDENT.clone()))], - expected: Err(ProviderManagerError::AllProvidersFailed(VALID_CHAIN.into())), + chain_id: TEST_CHAIN_ID, + adapters: vec![(TEST_CHAIN_ID.into(), vec![FAILED_ADAPTER.clone()])], + validator: Some(TestValidator { + result: Err(IdentValidatorError::ChangedNetVersion { + chain_id: TEST_CHAIN_ID.into(), + store_net_version: "".to_string(), + chain_net_version: "".to_string(), + }), + }), + expected: Err(ProviderManagerError::AllProvidersFailed( + TEST_CHAIN_ID.into(), + )), }, Case { name: "all adapters ok or not checkable yet", - chain_id: VALID_CHAIN, - status: vec![( - FAILED_CHAIN.into(), - GenesisCheckStatus::TemporaryFailure { - checked_at: Utc::now(), - }, - )], + chain_id: TEST_CHAIN_ID, adapters: vec![( - VALID_CHAIN.into(), + TEST_CHAIN_ID.into(), vec![VALID_ADAPTER.clone(), FAILED_ADAPTER.clone()], )], - idents: vec![(VALID_CHAIN.into(), Some(VALID_IDENT.clone()))], + // if a check is performed (which it shouldn't) the test will fail + validator: Some(TestValidator { + result: Err(IdentValidatorError::ChangedNetVersion { + chain_id: TEST_CHAIN_ID.into(), + store_net_version: "".to_string(), + chain_net_version: "".to_string(), + }), + }), expected: Ok(vec![&VALID_ADAPTER]), }, Case { name: "all adapters ok or checkable", - chain_id: VALID_CHAIN, - status: vec![( - TESTABLE_CHAIN.into(), - GenesisCheckStatus::TemporaryFailure { - checked_at: TESTABLE_ADAPTER.checked_at.unwrap(), - }, - )], + chain_id: TEST_CHAIN_ID, adapters: vec![( - VALID_CHAIN.into(), + TEST_CHAIN_ID.into(), vec![VALID_ADAPTER.clone(), TESTABLE_ADAPTER.clone()], )], - idents: vec![(VALID_CHAIN.into(), Some(VALID_IDENT.clone()))], + validator: None, expected: Ok(vec![&VALID_ADAPTER, &TESTABLE_ADAPTER]), }, ]; @@ -728,30 +681,36 @@ mod test { let Case { name, chain_id, - status, adapters, - idents, + validator, expected, } = case; let logger = Logger::root(Discard, o!()); let chain_id = chain_id.into(); - let validator = Arc::new(MockIdentValidator); - let manager = ProviderManager::new(logger, adapters.into_iter(), validator); - - for (provider, status) in status.iter() { - let slot = manager - .inner - .status - .iter() - .find(|(ident, _)| ident.provider.eq(provider)) - .expect(&format!( - "case: {} - there should be a status for provider \"{}\"", - name, provider - )); - let mut s = slot.1.write().await; - *s = status.clone(); + let validator: Arc = match validator { + None => Arc::new(MockIdentValidator {}), + Some(validator) => Arc::new(validator), + }; + + let manager = ProviderManager::new(logger, adapters.clone().into_iter(), validator); + + for (_, adapters) in adapters.iter() { + for adapter in adapters.iter() { + let provider = adapter.provider.clone(); + let slot = manager + .inner + .status + .iter() + .find(|(ident, _)| ident.provider.eq(&provider)) + .expect(&format!( + "case: {} - there should be a status for provider \"{}\"", + name, provider + )); + let mut s = slot.1.write().await; + *s = adapter.status.clone(); + } } let result = manager.get_all(&chain_id).await; diff --git a/node/src/config.rs b/node/src/config.rs index 55cde3d9f0e..3686038879e 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -35,6 +35,7 @@ use graph_store_postgres::{ use graph::http::{HeaderMap, Uri}; use std::{ any::Any, + cmp::Ordering, collections::{BTreeMap, BTreeSet}, fmt, sync::Arc, @@ -221,11 +222,20 @@ impl Networks { let eth_adapters = adapters.iter().flat_map(|a| a.as_rpc()).cloned().map( |EthAdapterConfig { chain_id, - adapters, + mut adapters, call_only: _, polling_interval: _, - }| { (chain_id, adapters) }, + }| { + adapters.sort_by(|a, b| { + a.capabilities + .partial_cmp(&b.capabilities) + .unwrap_or(Ordering::Equal) + }); + + (chain_id, adapters) + }, ); + let firehose_adapters = adapters .iter() .flat_map(|a| a.as_firehose())