diff --git a/chain/ethereum/src/ingestor.rs b/chain/ethereum/src/ingestor.rs index df8bc178e6c..1bedf26876a 100644 --- a/chain/ethereum/src/ingestor.rs +++ b/chain/ethereum/src/ingestor.rs @@ -1,6 +1,7 @@ use crate::{chain::BlockFinality, ENV_VARS}; use crate::{EthereumAdapter, EthereumAdapterTrait as _}; use graph::blockchain::client::ChainClient; +use graph::blockchain::BlockchainKind; use graph::components::adapter::ChainId; use graph::futures03::compat::Future01CompatExt as _; use graph::slog::o; @@ -268,4 +269,8 @@ impl BlockIngestor for PollingBlockIngestor { fn network_name(&self) -> ChainId { self.network_name.clone() } + + fn kind(&self) -> BlockchainKind { + BlockchainKind::Ethereum + } } diff --git a/chain/substreams/src/block_ingestor.rs b/chain/substreams/src/block_ingestor.rs index 2dc12d56dcb..c2a52ef06a7 100644 --- a/chain/substreams/src/block_ingestor.rs +++ b/chain/substreams/src/block_ingestor.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use crate::mapper::Mapper; use anyhow::{Context, Error}; use graph::blockchain::block_stream::{BlockStreamError, FirehoseCursor}; +use graph::blockchain::BlockchainKind; use graph::blockchain::{ client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor, }; @@ -196,4 +197,7 @@ impl BlockIngestor for SubstreamsBlockIngestor { fn network_name(&self) -> ChainId { self.chain_name.clone() } + fn kind(&self) -> BlockchainKind { + BlockchainKind::Substreams + } } diff --git a/graph/src/blockchain/firehose_block_ingestor.rs b/graph/src/blockchain/firehose_block_ingestor.rs index f3479f74ba2..10a19df44b2 100644 --- a/graph/src/blockchain/firehose_block_ingestor.rs +++ b/graph/src/blockchain/firehose_block_ingestor.rs @@ -15,7 +15,7 @@ use prost_types::Any; use slog::{o, trace}; use tonic::Streaming; -use super::{client::ChainClient, BlockIngestor, Blockchain}; +use super::{client::ChainClient, BlockIngestor, Blockchain, BlockchainKind}; const TRANSFORM_ETHEREUM_HEADER_ONLY: &str = "type.googleapis.com/sf.ethereum.transform.v1.HeaderOnly"; @@ -229,4 +229,8 @@ where fn network_name(&self) -> ChainId { self.chain_name.clone() } + + fn kind(&self) -> BlockchainKind { + C::KIND + } } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index bc7e83a62d6..83b8f250429 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -63,6 +63,7 @@ use self::{ pub trait BlockIngestor: 'static + Send + Sync { async fn run(self: Box); fn network_name(&self) -> ChainId; + fn kind(&self) -> BlockchainKind; } pub trait TriggersAdapterSelector: Sync + Send { diff --git a/graph/src/components/adapter.rs b/graph/src/components/adapter.rs index 06830c887eb..fa10e1b6c7b 100644 --- a/graph/src/components/adapter.rs +++ b/graph/src/components/adapter.rs @@ -27,8 +27,9 @@ const VALIDATION_ATTEMPT_TTL_SECONDS: i64 = 60 * 5; pub enum ProviderManagerError { #[error("unknown error {0}")] Unknown(#[from] anyhow::Error), - #[error("provider {provider} failed verification, expected ident {expected}, got {actual}")] + #[error("provider {provider} on chain {chain_id} failed verification, expected ident {expected}, got {actual}")] ProviderFailedValidation { + chain_id: ChainId, provider: ProviderName, expected: ChainIdentifier, actual: ChainIdentifier, @@ -514,6 +515,7 @@ impl Inner { net_version: chain_net_version, genesis_block_hash: chain_ident.genesis_block_hash, }, + chain_id: ident.chain_id.clone(), }); } IdentValidatorError::ChangedHash { @@ -532,6 +534,7 @@ impl Inner { net_version: chain_ident.net_version, genesis_block_hash: chain_hash, }, + chain_id: ident.chain_id.clone(), }); } e @ IdentValidatorError::UnavailableStore(_) diff --git a/node/src/chain.rs b/node/src/chain.rs index 15d412fd0af..540b0afb31c 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -141,8 +141,9 @@ pub fn create_substreams_networks( if let ProviderDetails::Substreams(ref firehose) = provider.details { info!( logger, - "Configuring firehose endpoint"; + "Configuring substreams endpoint"; "provider" => &provider.label, + "network" => &name.to_string(), ); let parsed_networks = networks_by_kind @@ -204,6 +205,7 @@ pub fn create_firehose_networks( &logger, "Configuring firehose endpoint"; "provider" => &provider.label, + "network" => &name.to_string(), ); let parsed_networks = networks_by_kind @@ -540,7 +542,6 @@ pub async fn networks_as_chains( ), ); } - BlockchainKind::Substreams => {} BlockchainKind::Starknet => { let firehose_endpoints = networks.firehose_endpoints(chain_id.clone()); blockchain_map.insert::( @@ -558,6 +559,7 @@ pub async fn networks_as_chains( ), ); } + BlockchainKind::Substreams => {} } } @@ -602,7 +604,7 @@ pub async fn networks_as_chains( )); let substreams_endpoints = networks.substreams_endpoints(chain_id.clone()); - blockchain_map.insert::( + blockchain_map.insert::( chain_id.clone(), Arc::new( BasicBlockchainBuilder { diff --git a/node/src/config.rs b/node/src/config.rs index 3686038879e..fe72e170a88 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -304,12 +304,12 @@ impl Networks { match chain.block_ingestor().await { Ok(ingestor) => { - info!(&logger, "Started block ingestor"); + info!(&logger, "Creating block ingestor"); ingestors.push(ingestor) } Err(err) => graph::slog::error!( &logger, - "unable to start block_ingestor for {}: {}", + "unable to create block_ingestor for {}: {}", chain_id, err.to_string() ), @@ -336,8 +336,7 @@ impl Networks { block_ingestor::(logger, id, chain, &mut res).await? } BlockchainKind::Substreams => { - block_ingestor::(logger, id, chain, &mut res) - .await? + // handle substreams later } BlockchainKind::Starknet => { block_ingestor::(logger, id, chain, &mut res) @@ -346,6 +345,17 @@ impl Networks { } } + // substreams networks that also have other types of chain(rpc or firehose), will have + // block ingestors already running. + let visited: Vec<_> = res.iter().map(|b| b.network_name()).collect(); + for ((_, id), chain) in blockchain_map + .0 + .iter() + .filter(|((kind, id), _)| BlockchainKind::Substreams.eq(&kind) && !visited.contains(id)) + { + block_ingestor::(logger, id, chain, &mut res).await? + } + Ok(res) } @@ -425,7 +435,7 @@ impl Config { let firehose = create_firehose_networks(logger.cheap_clone(), &self, endpoint_metrics.cheap_clone()); let substreams = create_substreams_networks(logger.cheap_clone(), &self, endpoint_metrics); - let adapters = eth + let adapters: Vec<_> = eth .into_iter() .chain(firehose.into_iter()) .chain(substreams.into_iter()) diff --git a/node/src/main.rs b/node/src/main.rs index 4517baf27d3..fd2b61ca333 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -341,7 +341,7 @@ async fn main() { ingestors.into_iter().for_each(|ingestor| { let logger = logger.clone(); - info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str()); + info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); graph::spawn(ingestor.run()); }); diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index ddbd6ed91da..6ba26a5457e 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -593,7 +593,7 @@ impl IndexNodeResolver { } BlockchainKind::Starknet => { let unvalidated_subgraph_manifest = - UnvalidatedSubgraphManifest::::resolve( + UnvalidatedSubgraphManifest::::resolve( deployment_hash.clone(), raw_yaml, &self.link_resolver,