Skip to content

Commit

Permalink
add logging && fix substreams
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed May 17, 2024
1 parent f6f5b6d commit ec0306a
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 12 deletions.
5 changes: 5 additions & 0 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{chain::BlockFinality, ENV_VARS};
use crate::{EthereumAdapter, EthereumAdapterTrait as _};
use graph::blockchain::client::ChainClient;
use graph::blockchain::BlockchainKind;
use graph::components::adapter::ChainId;
use graph::futures03::compat::Future01CompatExt as _;
use graph::slog::o;
Expand Down Expand Up @@ -268,4 +269,8 @@ impl BlockIngestor for PollingBlockIngestor {
fn network_name(&self) -> ChainId {
self.network_name.clone()
}

fn kind(&self) -> BlockchainKind {
BlockchainKind::Ethereum
}
}
4 changes: 4 additions & 0 deletions chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration};
use crate::mapper::Mapper;
use anyhow::{Context, Error};
use graph::blockchain::block_stream::{BlockStreamError, FirehoseCursor};
use graph::blockchain::BlockchainKind;
use graph::blockchain::{
client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor,
};
Expand Down Expand Up @@ -196,4 +197,7 @@ impl BlockIngestor for SubstreamsBlockIngestor {
fn network_name(&self) -> ChainId {
self.chain_name.clone()
}
fn kind(&self) -> BlockchainKind {
BlockchainKind::Substreams
}
}
6 changes: 5 additions & 1 deletion graph/src/blockchain/firehose_block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use prost_types::Any;
use slog::{o, trace};
use tonic::Streaming;

use super::{client::ChainClient, BlockIngestor, Blockchain};
use super::{client::ChainClient, BlockIngestor, Blockchain, BlockchainKind};

const TRANSFORM_ETHEREUM_HEADER_ONLY: &str =
"type.googleapis.com/sf.ethereum.transform.v1.HeaderOnly";
Expand Down Expand Up @@ -229,4 +229,8 @@ where
fn network_name(&self) -> ChainId {
self.chain_name.clone()
}

fn kind(&self) -> BlockchainKind {
C::KIND
}
}
1 change: 1 addition & 0 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use self::{
pub trait BlockIngestor: 'static + Send + Sync {
async fn run(self: Box<Self>);
fn network_name(&self) -> ChainId;
fn kind(&self) -> BlockchainKind;
}

pub trait TriggersAdapterSelector<C: Blockchain>: Sync + Send {
Expand Down
5 changes: 4 additions & 1 deletion graph/src/components/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ const VALIDATION_ATTEMPT_TTL_SECONDS: i64 = 60 * 5;
pub enum ProviderManagerError {
#[error("unknown error {0}")]
Unknown(#[from] anyhow::Error),
#[error("provider {provider} failed verification, expected ident {expected}, got {actual}")]
#[error("provider {provider} on chain {chain_id} failed verification, expected ident {expected}, got {actual}")]
ProviderFailedValidation {
chain_id: ChainId,
provider: ProviderName,
expected: ChainIdentifier,
actual: ChainIdentifier,
Expand Down Expand Up @@ -514,6 +515,7 @@ impl<T: NetIdentifiable + 'static> Inner<T> {
net_version: chain_net_version,
genesis_block_hash: chain_ident.genesis_block_hash,
},
chain_id: ident.chain_id.clone(),
});
}
IdentValidatorError::ChangedHash {
Expand All @@ -532,6 +534,7 @@ impl<T: NetIdentifiable + 'static> Inner<T> {
net_version: chain_ident.net_version,
genesis_block_hash: chain_hash,
},
chain_id: ident.chain_id.clone(),
});
}
e @ IdentValidatorError::UnavailableStore(_)
Expand Down
8 changes: 5 additions & 3 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ pub fn create_substreams_networks(
if let ProviderDetails::Substreams(ref firehose) = provider.details {
info!(
logger,
"Configuring firehose endpoint";
"Configuring substreams endpoint";
"provider" => &provider.label,
"network" => &name.to_string(),
);

let parsed_networks = networks_by_kind
Expand Down Expand Up @@ -204,6 +205,7 @@ pub fn create_firehose_networks(
&logger,
"Configuring firehose endpoint";
"provider" => &provider.label,
"network" => &name.to_string(),
);

let parsed_networks = networks_by_kind
Expand Down Expand Up @@ -540,7 +542,6 @@ pub async fn networks_as_chains(
),
);
}
BlockchainKind::Substreams => {}
BlockchainKind::Starknet => {
let firehose_endpoints = networks.firehose_endpoints(chain_id.clone());
blockchain_map.insert::<graph_chain_starknet::Chain>(
Expand All @@ -558,6 +559,7 @@ pub async fn networks_as_chains(
),
);
}
BlockchainKind::Substreams => {}
}
}

Expand Down Expand Up @@ -602,7 +604,7 @@ pub async fn networks_as_chains(
));
let substreams_endpoints = networks.substreams_endpoints(chain_id.clone());

blockchain_map.insert::<graph_chain_starknet::Chain>(
blockchain_map.insert::<graph_chain_substreams::Chain>(
chain_id.clone(),
Arc::new(
BasicBlockchainBuilder {
Expand Down
20 changes: 15 additions & 5 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,12 @@ impl Networks {

match chain.block_ingestor().await {
Ok(ingestor) => {
info!(&logger, "Started block ingestor");
info!(&logger, "Creating block ingestor");
ingestors.push(ingestor)
}
Err(err) => graph::slog::error!(
&logger,
"unable to start block_ingestor for {}: {}",
"unable to create block_ingestor for {}: {}",
chain_id,
err.to_string()
),
Expand All @@ -336,8 +336,7 @@ impl Networks {
block_ingestor::<graph_chain_cosmos::Chain>(logger, id, chain, &mut res).await?
}
BlockchainKind::Substreams => {
block_ingestor::<graph_chain_substreams::Chain>(logger, id, chain, &mut res)
.await?
// handle substreams later
}
BlockchainKind::Starknet => {
block_ingestor::<graph_chain_starknet::Chain>(logger, id, chain, &mut res)
Expand All @@ -346,6 +345,17 @@ impl Networks {
}
}

// substreams networks that also have other types of chain(rpc or firehose), will have
// block ingestors already running.
let visited: Vec<_> = res.iter().map(|b| b.network_name()).collect();
for ((_, id), chain) in blockchain_map
.0
.iter()
.filter(|((kind, id), _)| BlockchainKind::Substreams.eq(&kind) && !visited.contains(id))
{
block_ingestor::<graph_chain_substreams::Chain>(logger, id, chain, &mut res).await?
}

Ok(res)
}

Expand Down Expand Up @@ -425,7 +435,7 @@ impl Config {
let firehose =
create_firehose_networks(logger.cheap_clone(), &self, endpoint_metrics.cheap_clone());
let substreams = create_substreams_networks(logger.cheap_clone(), &self, endpoint_metrics);
let adapters = eth
let adapters: Vec<_> = eth
.into_iter()
.chain(firehose.into_iter())
.chain(substreams.into_iter())
Expand Down
2 changes: 1 addition & 1 deletion node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async fn main() {

ingestors.into_iter().for_each(|ingestor| {
let logger = logger.clone();
info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str());
info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string());

graph::spawn(ingestor.run());
});
Expand Down
2 changes: 1 addition & 1 deletion server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<S: Store> IndexNodeResolver<S> {
}
BlockchainKind::Starknet => {
let unvalidated_subgraph_manifest =
UnvalidatedSubgraphManifest::<graph_chain_substreams::Chain>::resolve(
UnvalidatedSubgraphManifest::<graph_chain_starknet::Chain>::resolve(
deployment_hash.clone(),
raw_yaml,
&self.link_resolver,
Expand Down

0 comments on commit ec0306a

Please sign in to comment.