Skip to content

Commit

Permalink
require chain ident to create chain store
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Jun 6, 2024
1 parent 58f76d3 commit 738af4b
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 53 deletions.
5 changes: 5 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ pub trait QueryStoreManager: Send + Sync + 'static {
pub trait BlockStore: Send + Sync + 'static {
type ChainStore: ChainStore;

fn create_chain_store(
&self,
network: &str,
ident: ChainIdentifier,
) -> anyhow::Result<Arc<Self::ChainStore>>;
fn chain_store(&self, network: &str) -> Option<Arc<Self::ChainStore>>;
}

Expand Down
33 changes: 18 additions & 15 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use graph::slog::{debug, error, info, o, Logger};
use graph::url::Url;
use graph::util::security::SafeDisplay;
use graph_chain_ethereum::{self as ethereum, Transport};
use graph_store_postgres::{ChainHeadUpdateListener, Store};
use graph_store_postgres::{BlockStore, ChainHeadUpdateListener};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::Arc;
Expand Down Expand Up @@ -397,7 +397,7 @@ pub async fn networks_as_chains(
node_id: &NodeId,
logger: &Logger,
networks: &Networks,
store: &Store,
store: Arc<BlockStore>,
logger_factory: &LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
chain_head_update_listener: Arc<ChainHeadUpdateListener>,
Expand All @@ -416,7 +416,7 @@ pub async fn networks_as_chains(
.flat_map(|a| a.as_substreams())
.collect();

let chains = adapters.into_iter().filter_map(|(chain_id, adapters)| {
let chains = adapters.into_iter().map(|(chain_id, adapters)| {
let adapters: Vec<&AdapterConfiguration> = adapters.into_iter().collect();
let kind = adapters
.iter()
Expand All @@ -427,19 +427,22 @@ pub async fn networks_as_chains(
(k, _) => k,
})
.expect("each chain should have at least 1 adapter");
store
.block_store()
.chain_store(chain_id)
.map(|chain_store| (chain_id, chain_store, adapters, kind))
.or_else(|| {
error!(
logger,
"No store configured for {} chain {}; ignoring this chain", kind, chain_id
);
None
})
(chain_id, adapters, kind)
});
for (chain_id, chain_store, adapters, kind) in chains.into_iter() {
for (chain_id, adapters, kind) in chains.into_iter() {
let chain_store = match store.chain_store(chain_id) {
Some(c) => c,
None => {
let ident = networks
.chain_identifier(&logger, chain_id)
.await
.expect("must be able to get chain identity to create a store");
store
.create_chain_store(chain_id, ident)
.expect("must be able to create store if one is not yet setup for the chain")
}
};

match kind {
BlockchainKind::Arweave => {
let firehose_endpoints = networks.firehose_endpoints(chain_id.clone());
Expand Down
3 changes: 2 additions & 1 deletion node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ async fn main() {
let primary_pool = store_builder.primary_pool();

let network_store = store_builder.network_store(config.chain_ids());
let block_store = network_store.block_store();
let validator: Arc<dyn IdentValidator> = network_store.block_store();
let network_adapters = Networks::from_config(
logger.cheap_clone(),
Expand All @@ -273,7 +274,7 @@ async fn main() {
&env_vars,
&node_id,
&logger,
&network_store,
block_store,
&logger_factory,
metrics_registry.cheap_clone(),
chain_head_update_listener,
Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub fn change_block_cache_shard(


// Create a new chain with the name in the destination shard
let _ = add_chain(conn, &chain_name, &shard)?;
let _ = add_chain(conn, &chain_name, &shard, ident)?;

// Re-add the foreign key constraint
sql_query(
Expand Down
3 changes: 2 additions & 1 deletion node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub async fn run(

let chain_head_update_listener = store_builder.chain_head_update_listener();
let network_store = store_builder.network_store(config.chain_ids());
let block_store = network_store.block_store();
let ident_validator: Arc<dyn IdentValidator> = network_store.block_store();
let networks = Networks::from_config(
logger.cheap_clone(),
Expand All @@ -111,7 +112,7 @@ pub async fn run(
&env_vars,
&node_id,
&logger,
&network_store,
block_store,
&logger_factory,
metrics_registry.cheap_clone(),
chain_head_update_listener,
Expand Down
68 changes: 62 additions & 6 deletions node/src/network_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@ use ethereum::{
BlockIngestor,
};
use graph::{
anyhow,
blockchain::{Blockchain, BlockchainKind, BlockchainMap},
anyhow::{self, bail},
blockchain::{Blockchain, BlockchainKind, BlockchainMap, ChainIdentifier},
cheap_clone::CheapClone,
components::{
adapter::{ChainId, IdentValidator, MockIdentValidator, ProviderManager},
adapter::{ChainId, IdentValidator, MockIdentValidator, NetIdentifiable, ProviderManager},
metrics::MetricsRegistry,
},
endpoint::EndpointMetrics,
env::EnvVars,
firehose::{FirehoseEndpoint, FirehoseEndpoints},
futures03::future::TryFutureExt,
itertools::Itertools,
log::factory::LoggerFactory,
prelude::{
anyhow::{anyhow, Result},
info, Logger, NodeId,
},
slog::{o, Discard},
slog::{o, warn, Discard},
};
use graph_chain_ethereum as ethereum;
use graph_store_postgres::{ChainHeadUpdateListener, Store};
use graph_store_postgres::{BlockStore, ChainHeadUpdateListener};

use std::{any::Any, cmp::Ordering, sync::Arc, time::Duration};

Expand Down Expand Up @@ -122,6 +123,61 @@ impl Networks {
}
}

pub async fn chain_identifier(
&self,
logger: &Logger,
chain_id: &ChainId,
) -> Result<ChainIdentifier> {
async fn get_identifier<T: NetIdentifiable + Clone>(
pm: ProviderManager<T>,
logger: &Logger,
chain_id: &ChainId,
provider_type: &str,
) -> Result<ChainIdentifier> {
for adapter in pm.get_all_unverified(chain_id).unwrap_or_default() {
match adapter.net_identifiers().await {
Ok(ident) => return Ok(ident),
Err(err) => {
warn!(
logger,
"unable to get chain identification from {} provider {} for chain {}, err: {}",
provider_type,
adapter.provider_name(),
chain_id,
err.to_string(),
);
}
}
}

bail!("no working adapters for chain {}", chain_id);
}

get_identifier(
self.rpc_provider_manager.cheap_clone(),
logger,
chain_id,
"rpc",
)
.or_else(|_| {
get_identifier(
self.firehose_provider_manager.cheap_clone(),
logger,
chain_id,
"firehose",
)
.or_else(|_| {
get_identifier(
self.substreams_provider_manager.cheap_clone(),
logger,
chain_id,
"substreams",
)
})
})
.await
}

pub async fn from_config(
logger: Logger,
config: &crate::config::Config,
Expand Down Expand Up @@ -306,7 +362,7 @@ impl Networks {
config: &Arc<EnvVars>,
node_id: &NodeId,
logger: &Logger,
store: &Store,
store: Arc<BlockStore>,
logger_factory: &LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
chain_head_update_listener: Arc<ChainHeadUpdateListener>,
Expand Down
46 changes: 37 additions & 9 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ pub mod primary {
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
name: &str,
shard: &Shard,
ident: ChainIdentifier,
) -> Result<Chain, StoreError> {
let ident = ChainIdentifier::default();

// For tests, we want to have a chain that still uses the
// shared `ethereum_blocks` table
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -195,6 +194,8 @@ pub struct BlockStore {
/// known to the system at startup, either from configuration or from
/// previous state in the database.
stores: RwLock<HashMap<String, Arc<ChainStore>>>,
// We keep this information so we can create chain stores during startup
shards: Vec<(String, Shard)>,
pools: HashMap<Shard, ConnectionPool>,
sender: Arc<NotificationSender>,
mirror: PrimaryMirror,
Expand All @@ -216,8 +217,8 @@ impl BlockStore {
/// a chain uses the pool from `pools` for the given shard.
pub fn new(
logger: Logger,
// (network, ident, shard)
chains: Vec<(String, Shard)>,
// (network, shard)
shards: Vec<(String, Shard)>,
// shard -> pool
pools: HashMap<Shard, ConnectionPool>,
sender: Arc<NotificationSender>,
Expand All @@ -230,10 +231,12 @@ impl BlockStore {
let mirror = PrimaryMirror::new(&pools);
let existing_chains = mirror.read(|conn| primary::load_chains(conn))?;
let chain_head_cache = TimedCache::new(CHAIN_HEAD_CACHE_TTL);
let chains = shards.clone();

let block_store = Self {
logger,
stores: RwLock::new(HashMap::new()),
shards,
pools,
sender,
mirror,
Expand Down Expand Up @@ -276,11 +279,7 @@ impl BlockStore {
};
block_store.add_chain_store(chain, status, false)?;
}
None => {
let mut conn = block_store.mirror.primary().get()?;
let chain = primary::add_chain(&mut conn, &chain_name, &shard)?;
block_store.add_chain_store(&chain, ChainStatus::Ingestible, true)?;
}
None => {}
};
}

Expand Down Expand Up @@ -529,4 +528,33 @@ impl BlockStoreTrait for BlockStore {
fn chain_store(&self, network: &str) -> Option<Arc<Self::ChainStore>> {
self.store(network)
}

fn create_chain_store(
&self,
network: &str,
ident: ChainIdentifier,
) -> anyhow::Result<Arc<Self::ChainStore>> {
match self.store(network) {
Some(chain_store) => {
return Ok(chain_store);
}
None => {}
}

let mut conn = self.mirror.primary().get()?;
let shard = self
.shards
.iter()
.find_map(|(chain_id, shard)| {
if chain_id.as_str().eq(network) {
Some(shard)
} else {
None
}
})
.ok_or_else(|| anyhow!("unable to find shard for network {}", network))?;
let chain = primary::add_chain(&mut conn, &network, &shard, ident)?;
self.add_chain_store(&chain, ChainStatus::Ingestible, true)
.map_err(anyhow::Error::from)
}
}
21 changes: 16 additions & 5 deletions store/test-store/src/block_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{convert::TryFrom, str::FromStr, sync::Arc};

use graph::blockchain::BlockTime;
use graph::blockchain::{BlockTime, ChainIdentifier};
use lazy_static::lazy_static;

use graph::components::store::BlockStore;
Expand All @@ -14,6 +14,8 @@ use graph::{
use graph_chain_ethereum::codec::{Block, BlockHeader};
use prost_types::Timestamp;

use crate::{GENESIS_PTR, NETWORK_VERSION};

lazy_static! {
// Genesis block
pub static ref GENESIS_BLOCK: FakeBlock = FakeBlock {
Expand Down Expand Up @@ -186,10 +188,19 @@ pub type FakeBlockList = Vec<&'static FakeBlock>;
/// network's genesis block to `genesis_hash`, and head block to
/// `null`
pub async fn set_chain(chain: FakeBlockList, network: &str) -> Vec<(BlockPtr, BlockHash)> {
let store = crate::store::STORE
.block_store()
.chain_store(network)
.unwrap();
let block_store = crate::store::STORE.block_store();
let store = match block_store.chain_store(network) {
Some(cs) => cs,
None => block_store
.create_chain_store(
network,
ChainIdentifier {
net_version: NETWORK_VERSION.to_string(),
genesis_block_hash: GENESIS_PTR.hash.clone(),
},
)
.unwrap(),
};
let chain: Vec<Arc<dyn BlockchainBlock>> = chain
.iter()
.cloned()
Expand Down
Loading

0 comments on commit 738af4b

Please sign in to comment.