Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc-eth-types+api): add latest chain state tracking to EthStateCacheService - v2 #13482

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
58 changes: 33 additions & 25 deletions crates/rpc/rpc-eth-api/src/helpers/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,24 @@ pub trait EthBlocks: LoadBlock {
.map(|block| block.body.transactions().len()))
}

let block_hash = match self
.provider()
.block_hash_for_id(block_id)
.map_err(Self::Error::from_eth_err)?
{
Some(block_hash) => block_hash,
None => return Ok(None),
let maybe_block = if block_id.is_latest() {
self.cache().latest_block_with_senders().await.map_err(Self::Error::from_eth_err)?
} else {
match self
.provider()
.block_hash_for_id(block_id)
.map_err(Self::Error::from_eth_err)?
{
Some(block_hash) => self
.cache()
.get_sealed_block_with_senders(block_hash)
.await
.map_err(Self::Error::from_eth_err)?,
None => None,
}
};

Ok(self
.cache()
.get_sealed_block_with_senders(block_hash)
.await
.map_err(Self::Error::from_eth_err)?
.map(|b| b.body.transactions().len()))
Ok(maybe_block.map(|b| b.body.transactions().len()))
}
}

Expand Down Expand Up @@ -240,20 +243,25 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt {
None => Ok(None),
};
}

let block_hash = match self
.provider()
.block_hash_for_id(block_id)
.map_err(Self::Error::from_eth_err)?
{
Some(block_hash) => block_hash,
None => return Ok(None),
let maybe_block = if block_id.is_latest() {
self.cache().latest_block_with_senders().await.map_err(Self::Error::from_eth_err)?
} else {
// If not latest, get block by hash
match self
.provider()
.block_hash_for_id(block_id)
.map_err(Self::Error::from_eth_err)?
{
Some(block_hash) => self
.cache()
.get_sealed_block_with_senders(block_hash)
.await
.map_err(Self::Error::from_eth_err)?,
None => None,
}
};

self.cache()
.get_sealed_block_with_senders(block_hash)
.await
.map_err(Self::Error::from_eth_err)
Ok(maybe_block)
}
}
}
40 changes: 32 additions & 8 deletions crates/rpc/rpc-eth-types/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,11 @@ type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSend
///
/// This is the frontend for the async caching service which manages cached data on a different
/// task.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct EthStateCache<B: Block, R> {
to_service: UnboundedSender<CacheAction<B, R>>,
}

impl<B: Block, R> Clone for EthStateCache<B, R> {
fn clone(&self) -> Self {
Self { to_service: self.to_service.clone() }
}
}

impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
/// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
fn create<Provider, Tasks>(
Expand All @@ -95,6 +89,7 @@ impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
latest_chain_change: None,
};
let cache = Self { to_service };
(cache, service)
Expand Down Expand Up @@ -186,6 +181,18 @@ impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}

/// Returns the most recent canonical block from the cache, if available.
/// Used to efficiently handle latest block requests and avoid race conditions during chain
/// reorgs.
/// Returns `None` if no canonical chain is tracked or during reorgs.
pub async fn latest_block_with_senders(
&self,
) -> ProviderResult<Option<Arc<SealedBlockWithSenders<B>>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetLatestBlockWithSenders { response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
}

/// A task than manages caches for data required by the `eth` rpc implementation.
Expand Down Expand Up @@ -236,6 +243,8 @@ pub(crate) struct EthStateCacheService<
action_task_spawner: Tasks,
/// Rate limiter
rate_limiter: Arc<Semaphore>,
/// Tracks latest canonical chain state for cache consistency
latest_chain_change: Option<ChainChange<Provider::Block, Provider::Receipt>>,
}

impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
Expand Down Expand Up @@ -348,6 +357,15 @@ where
}
Some(action) => {
match action {
CacheAction::GetLatestBlockWithSenders { response_tx } => {
let latest_block = this
.latest_chain_change
.as_ref()
.and_then(|chain_change| chain_change.blocks.last())
.cloned()
.map(Arc::new);
let _ = response_tx.send(Ok(latest_block));
}
CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(block)));
Expand Down Expand Up @@ -458,7 +476,8 @@ where
}
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
this.latest_chain_change = Some(chain_change.clone());
for block in chain_change.clone().blocks {
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}

Expand Down Expand Up @@ -499,6 +518,9 @@ enum CacheAction<B: Block, R> {
block_hash: B256,
response_tx: BlockWithSendersResponseSender<B>,
},
GetLatestBlockWithSenders {
response_tx: BlockWithSendersResponseSender<B>,
},
GetHeader {
block_hash: B256,
response_tx: HeaderResponseSender<B::Header>,
Expand Down Expand Up @@ -527,12 +549,14 @@ enum CacheAction<B: Block, R> {
},
}

#[derive(Clone, Debug)]
struct BlockReceipts<R> {
block_hash: B256,
receipts: Vec<Option<R>>,
}

/// A change of the canonical chain
#[derive(Debug, Clone)]
struct ChainChange<B: Block, R> {
blocks: Vec<SealedBlockWithSenders<B>>,
receipts: Vec<BlockReceipts<R>>,
Expand Down
Loading