Skip to content

Commit

Permalink
update chain_store to load ident from db
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed May 16, 2024
1 parent c9784a8 commit c828c7d
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 38 deletions.
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ impl Blockchain for Chain {
.map_err(Into::into)
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ impl Blockchain for Chain {
.map_err(Into::into)
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
Expand Down
12 changes: 5 additions & 7 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ pub struct Chain {
logger_factory: LoggerFactory,
pub name: ChainId,
node_id: NodeId,
chain_identifier: Arc<ChainIdentifier>,
registry: Arc<MetricsRegistry>,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
Expand Down Expand Up @@ -332,12 +331,10 @@ impl Chain {
polling_ingestor_interval: Duration,
is_ingestible: bool,
) -> Self {
let chain_identifier = Arc::new(chain_store.chain_identifier().clone());
Chain {
logger_factory,
name,
node_id,
chain_identifier,
registry,
client,
chain_store,
Expand Down Expand Up @@ -488,23 +485,24 @@ impl Blockchain for Chain {
self.block_refetcher.get_block(self, logger, cursor).await
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
let call_cache = Arc::new(BufferedCallCache::new(self.call_cache.cheap_clone()));
let chain_ident = self.chain_store.chain_identifier()?;

let builder = self.runtime_adapter_builder.build(
self.eth_adapters.cheap_clone(),
call_cache.cheap_clone(),
self.chain_identifier.cheap_clone(),
Arc::new(chain_ident.clone()),
);
let eth_call_gas = eth_call_gas(&self.chain_identifier);
let eth_call_gas = eth_call_gas(&chain_ident);

let decoder_hook = crate::data_source::DecoderHook::new(
self.eth_adapters.cheap_clone(),
call_cache,
eth_call_gas,
);

(builder, decoder_hook)
Ok((builder, decoder_hook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
Expand Down
4 changes: 2 additions & 2 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ impl Blockchain for Chain {
.await
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
Expand Down
6 changes: 4 additions & 2 deletions chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ impl Blockchain for Chain {
.await
}

fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(
&self,
) -> graph::anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ impl Blockchain for Chain {
number,
})
}
fn runtime(&self) -> (Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook) {
(Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook)
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)> {
Ok((Arc::new(NoopRuntimeAdapter::default()), NoopDecoderHook))
}

fn chain_client(&self) -> Arc<ChainClient<Self>> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64;
block_stream_metrics.deployment_head.set(deployment_head);

let (runtime_adapter, decoder_hook) = chain.runtime();
let (runtime_adapter, decoder_hook) = chain.runtime()?;
let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new(
runtime_adapter,
self.link_resolver.cheap_clone(),
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:
volumes:
- ./data/ipfs:/data/ipfs:Z
postgres:
image: postgres:14
image: postgres
ports:
- '5432:5432'
command:
Expand Down
7 changes: 5 additions & 2 deletions graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
bail,
components::{
link_resolver::LinkResolver,
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator},
Expand Down Expand Up @@ -371,8 +372,10 @@ impl Blockchain for MockBlockchain {
todo!()
}

fn runtime(&self) -> (std::sync::Arc<dyn RuntimeAdapter<Self>>, Self::DecoderHook) {
todo!()
fn runtime(
&self,
) -> anyhow::Result<(std::sync::Arc<dyn RuntimeAdapter<Self>>, Self::DecoderHook)> {
bail!("mock has no runtime adapter")
}

fn chain_client(&self) -> Arc<ChainClient<MockBlockchain>> {
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {

fn is_refetch_block_required(&self) -> bool;

fn runtime(&self) -> (Arc<dyn RuntimeAdapter<Self>>, Self::DecoderHook);
fn runtime(&self) -> anyhow::Result<(Arc<dyn RuntimeAdapter<Self>>, Self::DecoderHook)>;

fn chain_client(&self) -> Arc<ChainClient<Self>>;

Expand Down
6 changes: 6 additions & 0 deletions graph/src/blockchain/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ pub struct ChainIdentifier {
pub genesis_block_hash: BlockHash,
}

impl ChainIdentifier {
pub fn is_default(&self) -> bool {
ChainIdentifier::default().eq(self)
}
}

impl Default for ChainIdentifier {
fn default() -> Self {
Self {
Expand Down
19 changes: 15 additions & 4 deletions graph/src/components/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ struct Ident {

#[derive(Error, Debug, Clone)]
pub enum IdentValidatorError {
#[error("database error: {0}")]
UnknownError(String),
#[error("Store ident wasn't set")]
UnsetIdent,
#[error("the net version for chain {chain_id} has changed from {store_net_version} to {chain_net_version} since the last time we ran")]
Expand All @@ -84,6 +86,12 @@ pub enum IdentValidatorError {
UnavailableStore(ChainId),
}

impl From<anyhow::Error> for IdentValidatorError {
fn from(value: anyhow::Error) -> Self {
IdentValidatorError::UnknownError(value.to_string())
}
}

#[async_trait]
/// IdentValidator validates that the provided chain ident matches the expected value for a certain
/// chain_id. This is probably only going to matter for the ChainStore but this allows us to decouple
Expand All @@ -105,9 +113,11 @@ impl<T: ChainStoreTrait, B: BlockStoreTrait<ChainStore = T>> IdentValidator for
let network_chain = self
.chain_store(&chain_id)
.ok_or_else(|| IdentValidatorError::UnavailableStore(chain_id.clone()))?;
let store_ident = network_chain.chain_identifier();
let store_ident = network_chain
.chain_identifier()
.map_err(IdentValidatorError::from)?;

if store_ident == &ChainIdentifier::default() {
if store_ident == ChainIdentifier::default() {
return Err(IdentValidatorError::UnsetIdent);
}

Expand Down Expand Up @@ -494,13 +504,14 @@ impl<T: NetIdentifiable + 'static> Inner<T> {
},
});
}
IdentValidatorError::UnavailableStore(_) => {
e @ IdentValidatorError::UnavailableStore(_)
| e @ IdentValidatorError::UnknownError(_) => {
*status = GenesisCheckStatus::TemporaryFailure {
checked_at: Utc::now(),
};

return Err(ProviderManagerError::Unknown(crate::anyhow::anyhow!(
"chain store unavailable"
e.to_string()
)));
}
},
Expand Down
5 changes: 4 additions & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,10 @@ pub trait ChainStore: Send + Sync + 'static {
async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>;

/// Return the chain identifier for this store.
fn chain_identifier(&self) -> &ChainIdentifier;
fn chain_identifier(&self) -> Result<ChainIdentifier, Error>;

/// Update the chain identifier for this store.
fn set_chain_identifier(&self, ident: ChainIdentifier) -> Result<(), Error>;
}

pub trait EthereumCallCache: Send + Sync + 'static {
Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ pub fn change_block_cache_shard(
.chain_store(&chain_name)
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
let new_name = format!("{}-old", &chain_name);
let ident = chain_store.chain_identifier()?;

conn.transaction(|conn| -> Result<(), StoreError> {
let ident = chain_store.chain_identifier.clone();
let shard = Shard::new(shard.to_string())?;

let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident)?;
Expand Down
1 change: 0 additions & 1 deletion store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ impl BlockStore {
logger,
chain.name.clone(),
chain.storage.clone(),
&ident,
status,
sender,
pool,
Expand Down
50 changes: 41 additions & 9 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use graph::prometheus::{CounterVec, GaugeVec};
use graph::slog::Logger;
use graph::stable_hash::crypto_stable_hash;
use graph::util::herd_cache::HerdCache;
use itertools::Itertools as _;

use std::{
collections::HashMap,
Expand Down Expand Up @@ -1612,8 +1613,6 @@ pub struct ChainStore {
pool: ConnectionPool,
pub chain: String,
pub(crate) storage: data::Storage,
pub chain_identifier: ChainIdentifier,
genesis_block_ptr: BlockPtr,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
// TODO: We currently only use this cache for
Expand All @@ -1631,7 +1630,6 @@ impl ChainStore {
logger: Logger,
chain: String,
storage: data::Storage,
net_identifier: &ChainIdentifier,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
pool: ConnectionPool,
Expand All @@ -1646,10 +1644,8 @@ impl ChainStore {
pool,
chain,
storage,
genesis_block_ptr: BlockPtr::new(net_identifier.genesis_block_hash.clone(), 0),
status,
chain_head_update_sender,
chain_identifier: net_identifier.clone(),
recent_blocks_cache,
lookup_herd,
}
Expand Down Expand Up @@ -1828,7 +1824,18 @@ impl ChainStore {
#[async_trait]
impl ChainStoreTrait for ChainStore {
fn genesis_block_ptr(&self) -> Result<BlockPtr, Error> {
Ok(self.genesis_block_ptr.clone())
let mut conn = self.pool.get()?;
let hashes = self
.storage
.block_hashes_by_block_number(&mut conn, &self.chain, 10)?;

hashes
.into_iter()
.find_or_first(|h: &BlockHash| !h.as_slice().eq(H256::zero().as_bytes()))
.ok_or_else(|| {
graph::anyhow::anyhow!("expected chain to have at least the genesis block")
})
.map(|hash| BlockPtr { hash, number: 0 })
}

async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
Expand Down Expand Up @@ -1869,6 +1876,7 @@ impl ChainStoreTrait for ChainStore {

let (missing, ptr) = {
let chain_store = self.clone();
let genesis_block_ptr = self.genesis_block_ptr()?.hash_as_h256();
self.pool
.with_conn(move |conn, _| {
let candidate = chain_store
Expand All @@ -1887,7 +1895,7 @@ impl ChainStoreTrait for ChainStore {
&chain_store.chain,
first_block as i64,
ptr.hash_as_h256(),
chain_store.genesis_block_ptr.hash_as_h256(),
genesis_block_ptr,
)
.map_err(CancelableError::from)?
{
Expand Down Expand Up @@ -2249,8 +2257,32 @@ impl ChainStoreTrait for ChainStore {
.await
}

fn chain_identifier(&self) -> &ChainIdentifier {
&self.chain_identifier
fn set_chain_identifier(&self, ident: ChainIdentifier) -> Result<(), Error> {
use public::ethereum_networks as n;

let mut conn = self.pool.get()?;
diesel::update(n::table.filter(n::name.eq(&self.chain)))
.set((
n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()),
n::net_version.eq(ident.net_version),
))
.execute(&mut conn)?;

Ok(())
}

fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
let mut conn = self.pool.get()?;
use public::ethereum_networks as n;
let (genesis_block_hash, net_version) = n::table
.select((n::genesis_block_hash, n::net_version))
.filter(n::name.eq(&self.chain))
.get_result::<(BlockHash, String)>(&mut conn)?;

Ok(ChainIdentifier {
net_version,
genesis_block_hash,
})
}
}

Expand Down

0 comments on commit c828c7d

Please sign in to comment.