Skip to content

Commit

Permalink
Merge pull request #3687 from anoma/bat/feat/apply-snapshots-rev
Browse files Browse the repository at this point in the history
 Apply snapshot for state sync
  • Loading branch information
mergify[bot] authored Aug 28, 2024
2 parents 693b463 + 7950a76 commit 9274550
Show file tree
Hide file tree
Showing 15 changed files with 782 additions and 92 deletions.
7 changes: 7 additions & 0 deletions .changelog/unreleased/improvements/3687-apply-snapshots.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- Addresses the remaining points of Issue [\#3307](https://github.com/anoma/namada/issues/3307)

- Implements the `OfferSnapshot` ABCI call
- Implements the `ApplySnapshotChunk` ABCI call
- Adds integration tests

([\#3687](https://github.com/anoma/namada/pull/3687))
3 changes: 3 additions & 0 deletions crates/apps_lib/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub struct Shell {
/// When set, indicates after how many blocks a new snapshot
/// will be taken (counting from the first block)
pub blocks_between_snapshots: Option<NonZeroU64>,
/// Number of snapshots to keep
pub snapshots_to_keep: Option<NonZeroU64>,
}

impl Ledger {
Expand Down Expand Up @@ -156,6 +158,7 @@ impl Ledger {
action_at_height: None,
tendermint_mode: mode,
blocks_between_snapshots: None,
snapshots_to_keep: None,
},
cometbft: tendermint_config,
ethereum_bridge: ethereum_bridge::ledger::Config::default(),
Expand Down
2 changes: 1 addition & 1 deletion crates/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ impl<H: StorageHasher + Default> MerkleTree<H> {
}

/// The root hash of the merkle tree as bytes
#[derive(PartialEq)]
#[derive(Debug, PartialEq)]
pub struct MerkleRoot(pub [u8; 32]);

impl From<H256> for MerkleRoot {
Expand Down
16 changes: 8 additions & 8 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,16 @@ impl Shell {
Ok(Response::CheckTx(self.mempool_validate(&tx.tx, r#type)))
}
Request::ListSnapshots => {
self.list_snapshots().map(Response::ListSnapshots)
Ok(Response::ListSnapshots(self.list_snapshots()))
}
Request::OfferSnapshot(_) => {
Ok(Response::OfferSnapshot(Default::default()))
Request::OfferSnapshot(req) => {
Ok(Response::OfferSnapshot(self.offer_snapshot(req)))
}
Request::LoadSnapshotChunk(req) => self
.load_snapshot_chunk(req)
.map(Response::LoadSnapshotChunk),
Request::ApplySnapshotChunk(_) => {
Ok(Response::ApplySnapshotChunk(Default::default()))
Request::LoadSnapshotChunk(req) => {
Ok(Response::LoadSnapshotChunk(self.load_snapshot_chunk(req)))
}
Request::ApplySnapshotChunk(req) => {
Ok(Response::ApplySnapshotChunk(self.apply_snapshot_chunk(req)))
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions crates/node/src/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use namada_sdk::eth_bridge::{EthBridgeQueries, EthereumOracleConfig};
use namada_sdk::ethereum_events::EthereumEvent;
use namada_sdk::events::log::EventLog;
use namada_sdk::gas::{Gas, TxGasMeter};
use namada_sdk::hash::Hash;
use namada_sdk::key::*;
use namada_sdk::migrations::ScheduledMigration;
use namada_sdk::parameters::{get_gas_scale, validate_tx_bytes};
Expand Down Expand Up @@ -341,6 +342,14 @@ pub enum MempoolTxType {
RecheckTransaction,
}

#[derive(Debug)]
pub struct SnapshotSync {
pub next_chunk: u64,
pub height: BlockHeight,
pub expected: Vec<Hash>,
pub strikes: u64,
}

#[derive(Debug)]
pub struct Shell<D = storage::PersistentDB, H = Sha256Hasher>
where
Expand Down Expand Up @@ -373,6 +382,9 @@ where
/// When set, indicates after how many blocks a new snapshot
/// will be taken (counting from the first block)
pub blocks_between_snapshots: Option<NonZeroU64>,
/// Data for a node downloading and apply snapshots as part of
/// the fast sync protocol.
pub syncing: Option<SnapshotSync>,
}

/// Storage key filter to store the diffs into the storage. Return `false` for
Expand Down Expand Up @@ -608,6 +620,7 @@ where
event_log: EventLog::default(),
scheduled_migration,
blocks_between_snapshots: config.shell.blocks_between_snapshots,
syncing: None,
};
shell.update_eth_oracle(&Default::default());
shell
Expand Down
238 changes: 222 additions & 16 deletions crates/node/src/shell/snapshots.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use borsh::BorshDeserialize;
use borsh_ext::BorshSerializeExt;
use namada_sdk::arith::checked;
use namada_sdk::hash::{Hash, Sha256Hasher};
use namada_sdk::state::BlockHeight;
use namada_sdk::state::{BlockHeight, StorageRead, DB};

use super::{Error, ShellResult};
use super::SnapshotSync;
use crate::facade::tendermint::abci::response::ApplySnapshotChunkResult;
use crate::facade::tendermint::abci::types::Snapshot;
use crate::facade::tendermint::v0_37::abci::{
request as tm_request, response as tm_response,
Expand All @@ -11,32 +14,39 @@ use crate::shell::Shell;
use crate::storage;
use crate::storage::{DbSnapshot, SnapshotMetadata};

pub const MAX_SENDER_STRIKES: u64 = 5;

impl Shell<storage::PersistentDB, Sha256Hasher> {
/// List the snapshot files held locally. Furthermore, the number
/// of chunks, as hash of each chunk, and a hash of the chunk
/// metadata are provided so that syncing nodes can verify can verify
/// metadata are provided so that syncing nodes can verify
/// snapshots they receive.
pub fn list_snapshots(&self) -> ShellResult<tm_response::ListSnapshots> {
pub fn list_snapshots(&self) -> tm_response::ListSnapshots {
if self.blocks_between_snapshots.is_none() {
Ok(Default::default())
Default::default()
} else {
let snapshots = DbSnapshot::files(&self.base_dir)
.map_err(Error::Snapshot)?
tracing::info!("Request for snapshots received.");
let Ok(snapshots) = DbSnapshot::files(&self.base_dir) else {
return Default::default();
};
let snapshots = snapshots
.into_iter()
.map(|SnapshotMetadata { height, chunks, .. }| {
let hash = Hash::sha256(chunks.serialize_to_vec()).0;
let hashes =
chunks.iter().map(|c| c.hash).collect::<Vec<_>>();
let hash = Hash::sha256(hashes.serialize_to_vec()).0;
Snapshot {
height: u32::try_from(height.0).unwrap().into(),
format: 0,
#[allow(clippy::cast_possible_truncation)]
chunks: chunks.len() as u32,
hash: hash.into_iter().collect(),
metadata: Default::default(),
metadata: hashes.serialize_to_vec().into(),
}
})
.collect();

Ok(tm_response::ListSnapshots { snapshots })
tm_response::ListSnapshots { snapshots }
}
}

Expand All @@ -45,15 +55,211 @@ impl Shell<storage::PersistentDB, Sha256Hasher> {
pub fn load_snapshot_chunk(
&self,
req: tm_request::LoadSnapshotChunk,
) -> ShellResult<tm_response::LoadSnapshotChunk> {
let chunk = DbSnapshot::load_chunk(
) -> tm_response::LoadSnapshotChunk {
let Ok(chunk) = DbSnapshot::load_chunk(
BlockHeight(req.height.into()),
u64::from(req.chunk),
&self.base_dir,
)
.map_err(Error::Snapshot)?;
Ok(tm_response::LoadSnapshotChunk {
) else {
tracing::debug!(
"Received a request for a snapshot we do not possess"
);
// N.B. if the snapshot is no longer present,
// this will not match the hash in the metadata and will
// be rejected by syncing nodes. We don't return an error
// so as not to crash this node.
return Default::default();
};
tracing::info!(
"Loading snapshot at height {}, chunk number {}",
req.height,
req.chunk,
);
tm_response::LoadSnapshotChunk {
chunk: chunk.into_iter().collect(),
})
}
}

/// Decide if a snapshot should be accepted to sync the node forward in time
pub fn offer_snapshot(
&mut self,
req: tm_request::OfferSnapshot,
) -> tm_response::OfferSnapshot {
match self.syncing.as_ref() {
None => {
if self.state.get_block_height().unwrap_or_default().0
< u64::from(req.snapshot.height)
{
let Ok(chunks) =
Vec::<Hash>::try_from_slice(&req.snapshot.metadata)
else {
return tm_response::OfferSnapshot::Reject;
};
self.syncing = Some(SnapshotSync {
next_chunk: 0,
height: u64::from(req.snapshot.height).into(),
expected: chunks,
strikes: 0,
});
tracing::info!("Accepting snapshot offer");
tm_response::OfferSnapshot::Accept
} else {
tracing::info!("Rejecting snapshot offer");
tm_response::OfferSnapshot::Reject
}
}
Some(snapshot_sync) => {
if snapshot_sync.height.0 < u64::from(req.snapshot.height) {
let Ok(chunks) =
Vec::<Hash>::try_from_slice(&req.snapshot.metadata)
else {
tracing::info!("Rejecting snapshot offer");
return tm_response::OfferSnapshot::Reject;
};
self.syncing = Some(SnapshotSync {
next_chunk: 0,
height: u64::from(req.snapshot.height).into(),
expected: chunks,
strikes: 0,
});
tracing::info!("Accepting snapshot offer");
tm_response::OfferSnapshot::Accept
} else {
tracing::info!("Rejecting snapshot offer");
tm_response::OfferSnapshot::Reject
}
}
}
}

/// Write a snapshot chunk to the database
pub fn apply_snapshot_chunk(
&mut self,
req: tm_request::ApplySnapshotChunk,
) -> tm_response::ApplySnapshotChunk {
let Some(snapshot_sync) = self.syncing.as_mut() else {
tracing::warn!("Received a snapshot although none were requested");
// if we are not currently syncing, abort this sync protocol
// the syncing status is set by `OfferSnapshot`.
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::Abort,
refetch_chunks: vec![],
reject_senders: vec![],
};
};

// make sure we have been given the correct chunk
if u64::from(req.index) != snapshot_sync.next_chunk {
tracing::error!(
"Received wrong chunk, expected {}, got {}",
snapshot_sync.next_chunk,
req.index,
);
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::Unknown,
refetch_chunks: vec![
u32::try_from(snapshot_sync.next_chunk).unwrap(),
],
reject_senders: vec![],
};
}

let Some(expected_hash) =
snapshot_sync.expected.get(req.index as usize)
else {
tracing::error!(
"Received more chunks than expected; rejecting snapshot"
);
self.syncing = None;
// if we get more chunks than expected, there is something wrong
// with this snapshot and we should reject it.
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::RejectSnapshot,
refetch_chunks: vec![],
reject_senders: vec![],
};
};

// check that the chunk matches the expected hash, otherwise
// re-fetch it in case it was corrupted. If the chunk fails
// to validate too many times, we reject the snapshot and sender.
let chunk_hash = Hash::sha256(&req.chunk);
if *expected_hash != chunk_hash {
tracing::error!(
"Hash of chunk did not match, expected {}, got {}",
expected_hash,
chunk_hash,
);
snapshot_sync.strikes =
checked!(snapshot_sync.strikes + 1).unwrap();
if snapshot_sync.strikes == MAX_SENDER_STRIKES {
snapshot_sync.strikes = 0;
self.syncing = None;

tracing::info!(
"Max number of strikes reached on chunk, rejecting \
snapshot"
);
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::RejectSnapshot,
refetch_chunks: vec![],
reject_senders: vec![req.sender],
};
} else {
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::Retry,
refetch_chunks: vec![req.index],
reject_senders: vec![],
};
}
} else {
snapshot_sync.strikes = 0;
};
// when we first start applying a snapshot,
// clear the existing db.
if req.index == 0 {
self.state.db_mut().clear(snapshot_sync.height).unwrap();
}
// apply snapshot changes to the database
// retry if an error occurs
let mut batch = Default::default();
for (cf, key, value) in DbSnapshot::parse_chunk(&req.chunk) {
if self
.state
.db()
.insert_entry(&mut batch, &cf, &key, value)
.is_err()
{
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::Retry,
refetch_chunks: vec![],
reject_senders: vec![],
};
}
}
if self.state.db().exec_batch(batch).is_err() {
return tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::Retry,
refetch_chunks: vec![],
reject_senders: vec![],
};
}

// increment the chunk counter
snapshot_sync.next_chunk =
checked!(snapshot_sync.next_chunk + 1).unwrap();
// check if all chunks have been applied
if snapshot_sync.next_chunk == snapshot_sync.expected.len() as u64 {
tracing::info!("Snapshot completely applied");
self.syncing = None;
// rebuild the in-memory state
self.state.load_last_state();
}

tm_response::ApplySnapshotChunk {
result: ApplySnapshotChunkResult::Accept,
refetch_chunks: vec![],
reject_senders: vec![],
}
}
}
Loading

0 comments on commit 9274550

Please sign in to comment.