Skip to content

Commit

Permalink
Merge branch 'tomas/db-rm-recursion' (#2325)
Browse files Browse the repository at this point in the history
* tomas/db-rm-recursion:
  changelog: add #2325
  fix replay_protection iterator to avoid unmatched keys
  rocksdb: warn on unmatched prefix in iterator
  rocksdb: replace recursion from prefix iterator with a loop
  • Loading branch information
brentstone committed Dec 22, 2023
2 parents c844f52 + 2683a68 commit a73ddb5
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 150 deletions.
5 changes: 5 additions & 0 deletions .changelog/unreleased/bug-fixes/2325-db-rm-recursion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- Fixed DB prefix iterators to avoid iterators with key that don't match the
given prefix, which was triggering recursive call that was growing stack with
every new applied tx and on reading state from disk on start-up. Replaced
recursion from RocksDB that was growing stack size with a loop.
([\#2325](https://github.com/anoma/namada/pull/2325))
13 changes: 3 additions & 10 deletions apps/src/lib/node/ledger/shell/finalize_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2942,9 +2942,7 @@ mod test_finalize_block {
mk_wrapper_tx(&shell, &crate::wallet::defaults::albert_keypair());

let wrapper_hash_key =
replay_protection::get_replay_protection_last_key(
&wrapper_tx.header_hash(),
);
replay_protection::last_key(&wrapper_tx.header_hash());

// merkle tree root before finalize_block
let root_pre = shell.shell.wl_storage.storage.block.tree.root();
Expand Down Expand Up @@ -3051,10 +3049,7 @@ mod test_finalize_block {

// Write wrapper hashes in storage
for tx in [&wrapper, &new_wrapper] {
let hash_subkey =
replay_protection::get_replay_protection_last_subkey(
&tx.header_hash(),
);
let hash_subkey = replay_protection::last_key(&tx.header_hash());
shell
.wl_storage
.storage
Expand Down Expand Up @@ -3193,9 +3188,7 @@ mod test_finalize_block {
&failing_wrapper,
] {
let hash_subkey =
replay_protection::get_replay_protection_last_subkey(
&wrapper.header_hash(),
);
replay_protection::last_key(&wrapper.header_hash());
shell
.wl_storage
.storage
Expand Down
8 changes: 2 additions & 6 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2740,8 +2740,7 @@ mod shell_tests {
let mut batch =
namada::core::ledger::storage::testing::TestStorage::batch();
let wrapper_hash = wrapper.header_hash();
let wrapper_hash_key =
replay_protection::get_replay_protection_last_subkey(&wrapper_hash);
let wrapper_hash_key = replay_protection::last_key(&wrapper_hash);
shell
.wl_storage
.storage
Expand Down Expand Up @@ -2779,10 +2778,7 @@ mod shell_tests {

let inner_tx_hash = wrapper.raw_header_hash();
// Write inner hash in storage
let inner_hash_key =
replay_protection::get_replay_protection_last_subkey(
&inner_tx_hash,
);
let inner_hash_key = replay_protection::last_key(&inner_tx_hash);
shell
.wl_storage
.storage
Expand Down
8 changes: 2 additions & 6 deletions apps/src/lib/node/ledger/shell/prepare_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,7 @@ mod test_prepare_proposal {

// Write wrapper hash to storage
let wrapper_unsigned_hash = wrapper.header_hash();
let hash_key = replay_protection::get_replay_protection_last_key(
&wrapper_unsigned_hash,
);
let hash_key = replay_protection::last_key(&wrapper_unsigned_hash);
shell
.wl_storage
.storage
Expand Down Expand Up @@ -919,9 +917,7 @@ mod test_prepare_proposal {
let inner_unsigned_hash = wrapper.raw_header_hash();

// Write inner hash to storage
let hash_key = replay_protection::get_replay_protection_last_key(
&inner_unsigned_hash,
);
let hash_key = replay_protection::last_key(&inner_unsigned_hash);
shell
.wl_storage
.storage
Expand Down
8 changes: 2 additions & 6 deletions apps/src/lib/node/ledger/shell/process_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1417,9 +1417,7 @@ mod test_process_proposal {
let mut batch =
namada::core::ledger::storage::testing::TestStorage::batch();
let wrapper_unsigned_hash = wrapper.header_hash();
let hash_key = replay_protection::get_replay_protection_last_subkey(
&wrapper_unsigned_hash,
);
let hash_key = replay_protection::last_key(&wrapper_unsigned_hash);
shell
.wl_storage
.storage
Expand Down Expand Up @@ -1543,9 +1541,7 @@ mod test_process_proposal {
// Write inner hash to storage
let mut batch =
namada::core::ledger::storage::testing::TestStorage::batch();
let hash_key = replay_protection::get_replay_protection_last_subkey(
&wrapper.raw_header_hash(),
);
let hash_key = replay_protection::last_key(&wrapper.raw_header_hash());
shell
.wl_storage
.storage
Expand Down
108 changes: 56 additions & 52 deletions apps/src/lib/node/ledger/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use namada::core::ledger::masp_conversions::ConversionState;
use namada::core::types::ethereum_structs;
use namada::eth_bridge::storage::proof::BridgePoolRootProof;
use namada::ledger::eth_bridge::storage::bridge_pool;
use namada::ledger::replay_protection;
use namada::ledger::storage::merkle_tree::{
base_tree_key_prefix, subtree_key_prefix,
};
Expand Down Expand Up @@ -509,7 +510,8 @@ impl RocksDB {
// Delete the tx hashes included in the last block
let reprot_cf = self.get_column_family(REPLAY_PROTECTION_CF)?;
tracing::info!("Removing replay protection hashes");
batch.delete_cf(reprot_cf, "last");
batch
.delete_cf(reprot_cf, replay_protection::last_prefix().to_string());

// Execute next step in parallel
let batch = Mutex::new(batch);
Expand Down Expand Up @@ -1163,11 +1165,10 @@ impl DB for RocksDB {
let replay_protection_cf =
self.get_column_family(REPLAY_PROTECTION_CF)?;

for prefix in ["last", "all"] {
let key = Key::parse(prefix)
.map_err(Error::KeyError)?
.push(&hash.to_string())
.map_err(Error::KeyError)?;
for key in [
replay_protection::last_key(hash),
replay_protection::all_key(hash),
] {
if self
.0
.get_pinned_cf(replay_protection_cf, key.to_string())
Expand Down Expand Up @@ -1546,7 +1547,8 @@ impl<'iter> DBIter<'iter> for RocksDB {
.get_column_family(REPLAY_PROTECTION_CF)
.expect("{REPLAY_PROTECTION_CF} column family should exist");

iter_prefix(self, replay_protection_cf, "last".to_string(), None)
let stripped_prefix = Some(replay_protection::last_prefix());
iter_prefix(self, replay_protection_cf, stripped_prefix.as_ref(), None)
}
}

Expand All @@ -1557,59 +1559,56 @@ fn iter_subspace_prefix<'iter>(
let subspace_cf = db
.get_column_family(SUBSPACE_CF)
.expect("{SUBSPACE_CF} column family should exist");
let db_prefix = "".to_owned();
iter_prefix(
db,
subspace_cf,
db_prefix,
prefix.map(|k| {
if k == &Key::default() {
k.to_string()
} else {
format!("{k}/")
}
}),
)
let stripped_prefix = None;
iter_prefix(db, subspace_cf, stripped_prefix, prefix)
}

fn iter_diffs_prefix<'a>(
db: &'a RocksDB,
height: BlockHeight,
prefix: Option<&'a Key>,
prefix: Option<&Key>,
is_old: bool,
) -> PersistentPrefixIterator<'a> {
let diffs_cf = db
.get_column_family(DIFFS_CF)
.expect("{DIFFS_CF} column family should exist");
let kind = if is_old { "old" } else { "new" };
let db_prefix = format!("{}/{}/", height.0.raw(), kind);
let prefix = prefix.map(|k| {
if k == &Key::default() {
db_prefix.clone()
} else {
format!("{db_prefix}{k}/")
}
});
// get keys without a prefix
iter_prefix(db, diffs_cf, db_prefix, prefix)
let stripped_prefix = Some(
Key::from(height.to_db_key())
.push(&kind.to_string())
.unwrap(),
);
// get keys without the `stripped_prefix`
iter_prefix(db, diffs_cf, stripped_prefix.as_ref(), prefix)
}

/// Create an iterator over key-vals in the given CF matching the given
/// prefix(es). If any, the `stripped_prefix` is matched first and will be
/// removed from the matched keys. If any, the second `prefix` is matched
/// against the stripped keys and remains in the matched keys.
fn iter_prefix<'a>(
db: &'a RocksDB,
cf: &'a ColumnFamily,
db_prefix: String,
prefix: Option<String>,
stripped_prefix: Option<&Key>,
prefix: Option<&Key>,
) -> PersistentPrefixIterator<'a> {
let read_opts = make_iter_read_opts(prefix.clone());
let stripped_prefix = match stripped_prefix {
Some(p) if !p.is_empty() => format!("{p}/"),
_ => "".to_owned(),
};
let prefix = match prefix {
Some(p) if !p.is_empty() => {
format!("{stripped_prefix}{p}/")
}
_ => stripped_prefix.clone(),
};
let read_opts = make_iter_read_opts(Some(prefix.clone()));
let iter = db.0.iterator_cf_opt(
cf,
read_opts,
IteratorMode::From(
prefix.unwrap_or_default().as_bytes(),
Direction::Forward,
),
IteratorMode::From(prefix.as_bytes(), Direction::Forward),
);
PersistentPrefixIterator(PrefixIterator::new(iter, db_prefix))
PersistentPrefixIterator(PrefixIterator::new(iter, stripped_prefix))
}

#[derive(Debug)]
Expand All @@ -1622,21 +1621,26 @@ impl<'a> Iterator for PersistentPrefixIterator<'a> {

/// Returns the next pair and the gas cost
fn next(&mut self) -> Option<(String, Vec<u8>, u64)> {
match self.0.iter.next() {
Some(result) => {
let (key, val) =
result.expect("Prefix iterator shouldn't fail");
let key = String::from_utf8(key.to_vec())
.expect("Cannot convert from bytes to key string");
match key.strip_prefix(&self.0.db_prefix) {
Some(k) => {
loop {
match self.0.iter.next() {
Some(result) => {
let (key, val) =
result.expect("Prefix iterator shouldn't fail");
let key = String::from_utf8(key.to_vec())
.expect("Cannot convert from bytes to key string");
if let Some(k) = key.strip_prefix(&self.0.stripped_prefix) {
let gas = k.len() + val.len();
Some((k.to_owned(), val.to_vec(), gas as _))
return Some((k.to_owned(), val.to_vec(), gas as _));
} else {
tracing::warn!(
"Unmatched prefix \"{}\" in iterator's key \
\"{key}\"",
self.0.stripped_prefix
);
}
None => self.next(),
}
None => return None,
}
None => None,
}
}
}
Expand All @@ -1649,8 +1653,8 @@ fn make_iter_read_opts(prefix: Option<String>) -> ReadOptions {

if let Some(prefix) = prefix {
let mut upper_prefix = prefix.into_bytes();
if let Some(last) = upper_prefix.pop() {
upper_prefix.push(last + 1);
if let Some(last) = upper_prefix.last_mut() {
*last += 1;
read_opts.set_iterate_upper_bound(upper_prefix);
}
}
Expand Down
30 changes: 12 additions & 18 deletions core/src/ledger/replay_protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,22 @@ use crate::types::storage::Key;

const ERROR_MSG: &str = "Cannot obtain a valid db key";

/// Get the transaction hash key under the `last` subkey
pub fn get_replay_protection_last_subkey(hash: &Hash) -> Key {
Key::parse("last")
.expect(ERROR_MSG)
.push(&hash.to_string())
.expect(ERROR_MSG)
/// Get the transaction hash prefix under the `all` subkey
pub fn all_prefix() -> Key {
Key::parse("all").expect(ERROR_MSG)
}

/// Get the transaction hash key under the `all` subkey
pub fn get_replay_protection_all_subkey(hash: &Hash) -> Key {
Key::parse("all")
.expect(ERROR_MSG)
.push(&hash.to_string())
.expect(ERROR_MSG)
pub fn all_key(hash: &Hash) -> Key {
all_prefix().push(&hash.to_string()).expect(ERROR_MSG)
}

/// Get the full transaction hash prefix under the `last` subkey
pub fn last_prefix() -> Key {
Key::parse("last").expect(ERROR_MSG)
}

/// Get the full transaction hash key under the `last` subkey
pub fn get_replay_protection_last_key(hash: &Hash) -> Key {
Key::parse("replay_protection")
.expect(ERROR_MSG)
.push(&"last".to_string())
.expect(ERROR_MSG)
.push(&hash.to_string())
.expect(ERROR_MSG)
pub fn last_key(hash: &Hash) -> Key {
last_prefix().push(&hash.to_string()).expect(ERROR_MSG)
}
Loading

0 comments on commit a73ddb5

Please sign in to comment.