Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove provider checks at startup #5337

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graph::blockchain::{
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -41,7 +42,7 @@ use graph::blockchain::block_stream::{

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
name: ChainId,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand All @@ -53,8 +54,9 @@ impl std::fmt::Debug for Chain {
}
}

#[async_trait]
impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
async fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -157,21 +159,22 @@ impl Blockchain for Chain {
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
self.client
.firehose_endpoint()?
.firehose_endpoint()
.await?
.block_ptr_for_number::<codec::Block>(logger, number)
.await
.map_err(Into::into)
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down
1 change: 1 addition & 0 deletions chain/arweave/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::codec;
// Logging the block is too verbose, so this strips the block from the trigger for Debug.
impl std::fmt::Debug for ArweaveTrigger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[allow(unused)]
#[derive(Debug)]
pub enum MappingTriggerWithoutBlock {
Block,
Expand Down
14 changes: 8 additions & 6 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::{BlockIngestor, NoopDecoderHook};
use graph::components::adapter::ChainId;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
Expand Down Expand Up @@ -36,7 +37,7 @@ use crate::{codec, TriggerFilter};

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
name: ChainId,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand All @@ -48,8 +49,9 @@ impl std::fmt::Debug for Chain {
}
}

#[async_trait]
impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
async fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -150,23 +152,23 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
let firehose_endpoint = self.client.firehose_endpoint()?;
let firehose_endpoint = self.client.firehose_endpoint().await?;

firehose_endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.await
.map_err(Into::into)
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down
1 change: 1 addition & 0 deletions chain/cosmos/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::data_source::EventOrigin;
// Logging the block is too verbose, so this strips the block from the trigger for Debug.
impl std::fmt::Debug for CosmosTrigger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[allow(unused)]
#[derive(Debug)]
pub enum MappingTriggerWithoutBlock<'e> {
Block,
Expand Down
4 changes: 2 additions & 2 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use anyhow::Error;
use graph::{
endpoint::EndpointMetrics,
env::env_var,
firehose::SubgraphLimit,
firehose::{self, FirehoseEndpoint, NoopGenesisDecoder, SubgraphLimit},
log::logger,
prelude::{prost, tokio, tonic, MetricsRegistry},
{firehose, firehose::FirehoseEndpoint},
};
use graph_chain_ethereum::codec;
use hex::ToHex;
Expand Down Expand Up @@ -39,6 +38,7 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
metrics,
NoopGenesisDecoder::boxed(),
));

loop {
Expand Down
56 changes: 32 additions & 24 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transfor
use graph::blockchain::{
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
Expand Down Expand Up @@ -146,7 +147,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
let chain_store = chain.chain_store();
let chain_head_update_stream = chain
.chain_head_update_listener
.subscribe(chain.name.clone(), logger.clone());
.subscribe(chain.name.to_string(), logger.clone());

// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
// This is ok because Celo blocks are always final. And we _need_ to do this because
Expand All @@ -156,6 +157,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
ChainClient::Rpc(adapter) => {
adapter
.cheapest()
.await
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
.chain_id()
.await?
Expand Down Expand Up @@ -199,7 +201,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
logger: &Logger,
cursor: FirehoseCursor,
) -> Result<BlockFinality, Error> {
let endpoint = chain.chain_client().firehose_endpoint()?;
let endpoint = chain.chain_client().firehose_endpoint().await?;
let block = endpoint.get_block::<codec::Block>(cursor, logger).await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(BlockFinality::NonFinal(ethereum_block))
Expand Down Expand Up @@ -286,9 +288,8 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
pub name: ChainId,
node_id: NodeId,
chain_identifier: Arc<ChainIdentifier>,
registry: Arc<MetricsRegistry>,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
Expand All @@ -314,7 +315,7 @@ impl Chain {
/// Creates a new Ethereum [`Chain`].
pub fn new(
logger_factory: LoggerFactory,
name: String,
name: ChainId,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
Expand All @@ -330,12 +331,10 @@ impl Chain {
polling_ingestor_interval: Duration,
is_ingestible: bool,
) -> Self {
let chain_identifier = Arc::new(chain_store.chain_identifier().clone());
Chain {
logger_factory,
name,
node_id,
chain_identifier,
registry,
client,
chain_store,
Expand All @@ -360,12 +359,12 @@ impl Chain {
// TODO: This is only used to build the block stream which could prolly
// be moved to the chain itself and return a block stream future that the
// caller can spawn.
pub fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
pub async fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
let adapters = match self.client.as_ref() {
ChainClient::Firehose(_) => panic!("no adapter with firehose"),
ChainClient::Rpc(adapter) => adapter,
};
adapters.cheapest().unwrap()
adapters.cheapest().await.unwrap()
}
}

Expand Down Expand Up @@ -454,13 +453,15 @@ impl Blockchain for Chain {
) -> Result<BlockPtr, IngestorError> {
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
.endpoint()?
.endpoint()
.await?
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
.await
.map_err(IngestorError::Unknown),
ChainClient::Rpc(adapters) => {
let adapter = adapters
.cheapest()
.await
.with_context(|| format!("no adapter for chain {}", self.name))?
.clone();

Expand All @@ -484,30 +485,31 @@ impl Blockchain for Chain {
self.block_refetcher.get_block(self, logger, cursor).await
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
let call_cache = Arc::new(BufferedCallCache::new(self.call_cache.cheap_clone()));
let chain_ident = self.chain_store.chain_identifier()?;

let builder = self.runtime_adapter_builder.build(
self.eth_adapters.cheap_clone(),
call_cache.cheap_clone(),
self.chain_identifier.cheap_clone(),
Arc::new(chain_ident.clone()),
);
let eth_call_gas = eth_call_gas(&self.chain_identifier);
let eth_call_gas = eth_call_gas(&chain_ident);

let decoder_hook = crate::data_source::DecoderHook::new(
self.eth_adapters.cheap_clone(),
call_cache,
eth_call_gas,
);

(builder, decoder_hook)
Ok((builder, decoder_hook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
ChainClient::Firehose(_) => {
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
Expand All @@ -521,10 +523,7 @@ impl Blockchain for Chain {

Box::new(ingestor)
}
ChainClient::Rpc(rpc) => {
let eth_adapter = rpc
.cheapest()
.ok_or_else(|| anyhow!("unable to get adapter for ethereum block ingestor"))?;
ChainClient::Rpc(_) => {
let logger = self
.logger_factory
.component_logger(
Expand All @@ -535,7 +534,7 @@ impl Blockchain for Chain {
}),
}),
)
.new(o!("provider" => eth_adapter.provider().to_string()));
.new(o!());

if !self.is_ingestible {
bail!(
Expand All @@ -550,7 +549,7 @@ impl Blockchain for Chain {
Box::new(PollingBlockIngestor::new(
logger,
graph::env::ENV_VARS.reorg_threshold,
eth_adapter,
self.chain_client(),
self.chain_store().cheap_clone(),
self.polling_ingestor_interval,
self.name.clone(),
Expand Down Expand Up @@ -675,7 +674,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
filter: &TriggerFilter,
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?,
self.logger.clone(),
self.chain_store.clone(),
self.ethrpc_metrics.clone(),
Expand Down Expand Up @@ -705,7 +707,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {

match &block {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let adapter = self
.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?;
let block_number = block.number() as BlockNumber;
let (blocks, _) = blocks_with_triggers(
adapter,
Expand Down Expand Up @@ -738,6 +744,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
self.chain_client
.rpc()?
.cheapest()
.await
.ok_or(anyhow!("unable to get adapter for is_on_main_chain"))?
.is_on_main_chain(&self.logger, ptr.clone())
.await
Expand Down Expand Up @@ -775,7 +782,8 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}),
ChainClient::Rpc(adapters) => {
let blocks = adapters
.cheapest_with(&self.capabilities)?
.cheapest_with(&self.capabilities)
.await?
.load_blocks(
self.logger.cheap_clone(),
self.chain_store.cheap_clone(),
Expand Down
3 changes: 2 additions & 1 deletion chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,8 @@ pub(crate) async fn get_calls(
} else {
client
.rpc()?
.cheapest_with(capabilities)?
.cheapest_with(capabilities)
.await?
.calls_in_block(
&logger,
subgraph_metrics.clone(),
Expand Down
Loading
Loading