Skip to content

Commit

Permalink
Remove provider checks at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Jun 18, 2024
1 parent 301773c commit 6428635
Show file tree
Hide file tree
Showing 51 changed files with 3,589 additions and 2,219 deletions.
15 changes: 9 additions & 6 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graph::blockchain::{
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -41,7 +42,7 @@ use graph::blockchain::block_stream::{

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
name: ChainId,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand All @@ -53,8 +54,9 @@ impl std::fmt::Debug for Chain {
}
}

#[async_trait]
impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
async fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -157,21 +159,22 @@ impl Blockchain for Chain {
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
self.client
.firehose_endpoint()?
.firehose_endpoint()
.await?
.block_ptr_for_number::<codec::Block>(logger, number)
.await
.map_err(Into::into)
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down
14 changes: 8 additions & 6 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::{BlockIngestor, NoopDecoderHook};
use graph::components::adapter::ChainId;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
Expand Down Expand Up @@ -36,7 +37,7 @@ use crate::{codec, TriggerFilter};

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
name: ChainId,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand All @@ -48,8 +49,9 @@ impl std::fmt::Debug for Chain {
}
}

#[async_trait]
impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
async fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -150,23 +152,23 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
let firehose_endpoint = self.client.firehose_endpoint()?;
let firehose_endpoint = self.client.firehose_endpoint().await?;

firehose_endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.await
.map_err(Into::into)
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down
4 changes: 2 additions & 2 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use anyhow::Error;
use graph::{
endpoint::EndpointMetrics,
env::env_var,
firehose::SubgraphLimit,
firehose::{self, FirehoseEndpoint, NoopGenesisDecoder, SubgraphLimit},
log::logger,
prelude::{prost, tokio, tonic, MetricsRegistry},
{firehose, firehose::FirehoseEndpoint},
};
use graph_chain_ethereum::codec;
use hex::ToHex;
Expand Down Expand Up @@ -39,6 +38,7 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
metrics,
NoopGenesisDecoder::boxed(),
));

loop {
Expand Down
56 changes: 32 additions & 24 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transfor
use graph::blockchain::{
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
Expand Down Expand Up @@ -146,7 +147,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
let chain_store = chain.chain_store();
let chain_head_update_stream = chain
.chain_head_update_listener
.subscribe(chain.name.clone(), logger.clone());
.subscribe(chain.name.to_string(), logger.clone());

// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
// This is ok because Celo blocks are always final. And we _need_ to do this because
Expand All @@ -156,6 +157,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
ChainClient::Rpc(adapter) => {
adapter
.cheapest()
.await
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
.chain_id()
.await?
Expand Down Expand Up @@ -199,7 +201,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
logger: &Logger,
cursor: FirehoseCursor,
) -> Result<BlockFinality, Error> {
let endpoint = chain.chain_client().firehose_endpoint()?;
let endpoint = chain.chain_client().firehose_endpoint().await?;
let block = endpoint.get_block::<codec::Block>(cursor, logger).await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(BlockFinality::NonFinal(ethereum_block))
Expand Down Expand Up @@ -286,9 +288,8 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
pub name: ChainId,
node_id: NodeId,
chain_identifier: Arc<ChainIdentifier>,
registry: Arc<MetricsRegistry>,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
Expand All @@ -314,7 +315,7 @@ impl Chain {
/// Creates a new Ethereum [`Chain`].
pub fn new(
logger_factory: LoggerFactory,
name: String,
name: ChainId,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
Expand All @@ -330,12 +331,10 @@ impl Chain {
polling_ingestor_interval: Duration,
is_ingestible: bool,
) -> Self {
let chain_identifier = Arc::new(chain_store.chain_identifier().clone());
Chain {
logger_factory,
name,
node_id,
chain_identifier,
registry,
client,
chain_store,
Expand All @@ -360,12 +359,12 @@ impl Chain {
// TODO: This is only used to build the block stream which could prolly
// be moved to the chain itself and return a block stream future that the
// caller can spawn.
pub fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
pub async fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
let adapters = match self.client.as_ref() {
ChainClient::Firehose(_) => panic!("no adapter with firehose"),
ChainClient::Rpc(adapter) => adapter,
};
adapters.cheapest().unwrap()
adapters.cheapest().await.unwrap()
}
}

Expand Down Expand Up @@ -454,13 +453,15 @@ impl Blockchain for Chain {
) -> Result<BlockPtr, IngestorError> {
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
.endpoint()?
.endpoint()
.await?
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
.await
.map_err(IngestorError::Unknown),
ChainClient::Rpc(adapters) => {
let adapter = adapters
.cheapest()
.await
.with_context(|| format!("no adapter for chain {}", self.name))?
.clone();

Expand All @@ -484,30 +485,31 @@ impl Blockchain for Chain {
self.block_refetcher.get_block(self, logger, cursor).await
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
let call_cache = Arc::new(BufferedCallCache::new(self.call_cache.cheap_clone()));
let chain_ident = self.chain_store.chain_identifier()?;

let builder = self.runtime_adapter_builder.build(
self.eth_adapters.cheap_clone(),
call_cache.cheap_clone(),
self.chain_identifier.cheap_clone(),
Arc::new(chain_ident.clone()),
);
let eth_call_gas = eth_call_gas(&self.chain_identifier);
let eth_call_gas = eth_call_gas(&chain_ident);

let decoder_hook = crate::data_source::DecoderHook::new(
self.eth_adapters.cheap_clone(),
call_cache,
eth_call_gas,
);

(builder, decoder_hook)
Ok((builder, decoder_hook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
ChainClient::Firehose(_) => {
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
Expand All @@ -521,10 +523,7 @@ impl Blockchain for Chain {

Box::new(ingestor)
}
ChainClient::Rpc(rpc) => {
let eth_adapter = rpc
.cheapest()
.ok_or_else(|| anyhow!("unable to get adapter for ethereum block ingestor"))?;
ChainClient::Rpc(_) => {
let logger = self
.logger_factory
.component_logger(
Expand All @@ -535,7 +534,7 @@ impl Blockchain for Chain {
}),
}),
)
.new(o!("provider" => eth_adapter.provider().to_string()));
.new(o!());

if !self.is_ingestible {
bail!(
Expand All @@ -550,7 +549,7 @@ impl Blockchain for Chain {
Box::new(PollingBlockIngestor::new(
logger,
graph::env::ENV_VARS.reorg_threshold,
eth_adapter,
self.chain_client(),
self.chain_store().cheap_clone(),
self.polling_ingestor_interval,
self.name.clone(),
Expand Down Expand Up @@ -675,7 +674,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
filter: &TriggerFilter,
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?,
self.logger.clone(),
self.chain_store.clone(),
self.ethrpc_metrics.clone(),
Expand Down Expand Up @@ -705,7 +707,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {

match &block {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let adapter = self
.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?;
let block_number = block.number() as BlockNumber;
let (blocks, _) = blocks_with_triggers(
adapter,
Expand Down Expand Up @@ -738,6 +744,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
self.chain_client
.rpc()?
.cheapest()
.await
.ok_or(anyhow!("unable to get adapter for is_on_main_chain"))?
.is_on_main_chain(&self.logger, ptr.clone())
.await
Expand Down Expand Up @@ -775,7 +782,8 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}),
ChainClient::Rpc(adapters) => {
let blocks = adapters
.cheapest_with(&self.capabilities)?
.cheapest_with(&self.capabilities)
.await?
.load_blocks(
self.logger.cheap_clone(),
self.chain_store.cheap_clone(),
Expand Down
3 changes: 2 additions & 1 deletion chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,8 @@ pub(crate) async fn get_calls(
} else {
client
.rpc()?
.cheapest_with(capabilities)?
.cheapest_with(capabilities)
.await?
.calls_in_block(
&logger,
subgraph_metrics.clone(),
Expand Down
Loading

0 comments on commit 6428635

Please sign in to comment.