diff --git a/CHANGELOG.md b/CHANGELOG.md index 16b024d7..492b2c7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. +## Version 0.58.5 + +- Enhanced bundler engine for creating short and quick bundles + using state proofs instead of full state for debugging and testing purposes + ## Version 0.58.4 - Added support for due payment fix diff --git a/Cargo.toml b/Cargo.toml index 318dabbc..30bdff8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ build = 'common/build/build.rs' edition = '2021' name = 'ever-node' -version = '0.58.4' +version = '0.58.5' [workspace] members = [ 'storage' ] @@ -101,7 +101,6 @@ validator_session = { path = 'validator-session' } [dev-dependencies] async-trait = '0.1' difference = '2.0' -ed25519-dalek = '1.0.1' external-ip = '4.1.0' pretty_assertions = '1.3' tokio = { features = [ 'macros' ], version = '1.5' } diff --git a/bin/console.rs b/bin/console.rs index 82108c31..445846fb 100644 --- a/bin/console.rs +++ b/bin/console.rs @@ -934,7 +934,6 @@ async fn main() { #[cfg(test)] mod test { - use super::*; use std::{fs, path::Path, sync::Arc, thread}; use serde_json::json; @@ -946,13 +945,16 @@ mod test { ShardIdent, ShardStateUnsplit, ValidatorDescr, ValidatorSet }; use ever_node::{ - collator_test_bundle::{create_engine_telemetry, create_engine_allocated}, + collator_test_bundle::create_engine_allocated, config::TonNodeConfig, engine_traits::{EngineAlloc, EngineOperations}, internal_db::{InternalDbConfig, InternalDb, state_gc_resolver::AllowStateGcSmartResolver}, network::{control::{ControlServer, DataSource}, node_network::NodeNetwork}, shard_state::ShardStateStuff, validator::validator_manager::ValidationStatus, shard_states_keeper::PinnedShardStateGuard, }; + + #[cfg(feature = "telemetry")] + use ever_node::collator_test_bundle::create_engine_telemetry; #[cfg(feature = "telemetry")] use ever_node::engine_traits::EngineTelemetry; diff --git a/src/block.rs b/src/block.rs index cbd9a234..59b977a6 100644 --- a/src/block.rs +++ b/src/block.rs @@ -337,7 +337,6 @@ impl BlockStuff { Ok(shards) } - #[cfg(feature = "external_db")] pub fn top_blocks_all(&self) -> Result> { let mut shards = Vec::new(); self diff --git a/src/collator_test_bundle.rs b/src/collator_test_bundle.rs index 83e9488f..55a6bdb4 100644 --- a/src/collator_test_bundle.rs +++ b/src/collator_test_bundle.rs @@ -10,14 +10,17 @@ * See the License for the specific EVERX DEV software governing permissions and * limitations under the License. */ +#![allow(unused_imports, unused_assignments, unused_variables, dead_code)] use crate::{ - block::BlockStuff, engine_traits::{EngineAlloc, EngineOperations}, - shard_state::ShardStateStuff, types::top_block_descr::TopBlockDescrStuff, + block::BlockStuff, + config::CollatorConfig, + engine_traits::{EngineAlloc, EngineOperations}, + shard_state::ShardStateStuff, + types::top_block_descr::TopBlockDescrStuff, validator::{ - accept_block::create_top_shard_block_description, BlockCandidate, - out_msg_queue::{OutMsgQueueInfoStuff, CachedStates}, - }, config::CollatorConfig + accept_block::create_top_shard_block_description, collator::{CollateResult, Collator}, out_msg_queue::{CachedStates, OutMsgQueueInfoStuff}, validate_query::ValidateQuery, validator_utils::compute_validator_set_cc, BlockCandidate, CollatorSettings + }, }; #[cfg(feature = "telemetry")] use crate::engine_traits::EngineTelemetry; @@ -29,7 +32,7 @@ use std::{ ops::Deref, sync::{Arc, atomic::AtomicU64} }; use storage::{ - StorageAlloc, TimeChecker, + StorageAlloc, block_handle_db::{BlockHandle, BlockHandleDb, BlockHandleStorage}, node_state_db::NodeStateDb, types::BlockMeta, @@ -37,12 +40,12 @@ use storage::{ #[cfg(feature = "telemetry")] use storage::StorageTelemetry; use ever_block::{ - BlockIdExt, Message, ShardIdent, Serializable, MerkleUpdate, Deserializable, - ValidatorBaseInfo, BlockSignaturesPure, BlockSignatures, HashmapAugType, - TopBlockDescrSet, GlobalCapabilities, OutMsgQueue, + error, fail, read_boc, read_single_root_boc, + BlockIdExt, BlockSignatures, BlockSignaturesPure, Cell, CellType, Deserializable, + GlobalCapabilities, HashmapAugType, HashmapType, MerkleProof, Message, OutMsgQueue, + Result, Serializable, ShardIdent, ShardStateUnsplit, TopBlockDescr, TopBlockDescrSet, + UInt256, UsageTree, ValidatorBaseInfo, ValidatorSet, FundamentalSmcAddresses, }; -use ever_block::{ShardStateUnsplit, TopBlockDescr}; -use ever_block::{UInt256, fail, error, Result, CellType, read_boc, read_single_root_boc}; use crate::engine_traits::RempDuplicateStatus; #[derive(serde::Deserialize, serde::Serialize)] @@ -57,10 +60,10 @@ struct CollatorTestBundleIndexJson { prev_blocks: Vec, created_by: String, rand_seed: String, - #[serde(skip_serializing)] + #[serde(default, skip_serializing)] now: u32, + #[serde(default)] now_ms: u64, - fake: bool, contains_ethalon: bool, #[serde(default)] contains_candidate: bool, @@ -103,7 +106,6 @@ impl TryFrom for CollatorTestBundleIndex { created_by: value.created_by.parse()?, rand_seed: Some(value.rand_seed.parse()?), now_ms: if value.now_ms == 0 { (value.now as u64) * 1000 } else { value.now_ms }, - fake: value.fake, contains_ethalon: value.contains_ethalon, contains_candidate: value.contains_candidate, notes: value.notes, @@ -129,7 +131,6 @@ impl From<&CollatorTestBundleIndex> for CollatorTestBundleIndexJson { }, now: (value.now_ms / 1000) as u32, now_ms: value.now_ms, - fake: value.fake, contains_ethalon: value.contains_ethalon, contains_candidate: value.contains_candidate, notes: String::new(), @@ -149,24 +150,11 @@ struct CollatorTestBundleIndex { created_by: UInt256, rand_seed: Option, now_ms: u64, - fake: bool, contains_ethalon: bool, contains_candidate: bool, notes: String, } -impl CollatorTestBundleIndex { - pub fn oldest_mc_state(&self) -> BlockIdExt { - let mut oldest_mc_state = self.last_mc_state.clone(); - for id in self.mc_states.iter() { - if id.seq_no < oldest_mc_state.seq_no { - oldest_mc_state = id.clone(); - } - } - oldest_mc_state - } -} - fn construct_from_file(path: &str) -> Result<(T, UInt256, UInt256)> { let bytes = std::fs::read(path)?; let fh = UInt256::calc_file_hash(&bytes); @@ -225,9 +213,9 @@ pub struct CollatorTestBundle { index: CollatorTestBundleIndex, top_shard_blocks: Vec>, external_messages: Vec<(Arc, UInt256)>, - states: HashMap>, - mc_merkle_updates: HashMap, - blocks: HashMap, + states: HashMap>, // used for loading purposes + state_proofs: HashMap, // merkle proofs for states to lower their size + ethalon_block: Option, candidate: Option, block_handle_storage: BlockHandleStorage, #[cfg(feature = "telemetry")] @@ -290,7 +278,6 @@ impl CollatorTestBundle { created_by: UInt256::default(), rand_seed: None, now_ms, - fake: true, contains_ethalon: false, contains_candidate: false, notes: String::new(), @@ -301,8 +288,9 @@ impl CollatorTestBundle { top_shard_blocks: Default::default(), external_messages: Default::default(), states, - mc_merkle_updates: Default::default(), - blocks: Default::default(), + state_proofs: Default::default(), + ethalon_block: None, + // blocks: Default::default(), block_handle_storage: create_block_handle_storage(), candidate: None, #[cfg(feature = "telemetry")] @@ -313,27 +301,68 @@ impl CollatorTestBundle { }) } - pub fn load(path: &str) -> Result { + fn deserialize_state( + path: &str, + ss_id: &BlockIdExt, + #[cfg(feature = "telemetry")] + telemetry: &EngineTelemetry, + allocated: &EngineAlloc, + ) -> Result> { + let filename = format!("{}/states/{:x}", path, ss_id.root_hash()); + log::info!("Loading state {} from {}", ss_id, filename); + let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; + if ss_id.seq_no() == 0 { + ShardStateStuff::deserialize_zerostate( + ss_id.clone(), + &data, + #[cfg(feature = "telemetry")] + &telemetry, + &allocated + ) + } else if let Ok(proof) = MerkleProof::construct_from_bytes(&data) { + ShardStateStuff::from_state_root_cell( + ss_id.clone(), + proof.proof.virtualize(1), + #[cfg(feature = "telemetry")] + &telemetry, + &allocated + ) + } else { + ShardStateStuff::deserialize_state_inmem( + ss_id.clone(), + Arc::new(data), + #[cfg(feature = "telemetry")] + &telemetry, + &allocated, + &|| false + ) + } + } - if !std::path::Path::new(path).is_dir() { - fail!("Directory not found: {}", path); + pub fn load(path: impl AsRef) -> Result { + + let path = path.as_ref(); + if !path.is_dir() { + fail!("Directory not found: {:?}", path); } + let path = path.to_str().unwrap(); #[cfg(feature = "telemetry")] let telemetry = create_engine_telemetry(); let allocated = create_engine_allocated(); // 🗂 index + // let file = std::fs::File::open(path.join("index.json"))?; let file = std::fs::File::open(format!("{}/index.json", path))?; let index: CollatorTestBundleIndexJson = serde_json::from_reader(file)?; - let mut index: CollatorTestBundleIndex = index.try_into()?; + let index: CollatorTestBundleIndex = index.try_into()?; // ├─📂 top_shard_blocks let mut top_shard_blocks = vec!(); for id in index.top_shard_blocks.iter() { let filename = format!("{}/top_shard_blocks/{:x}", path, id.root_hash()); let tbd = TopBlockDescr::construct_from_file(filename)?; - top_shard_blocks.push(Arc::new(TopBlockDescrStuff::new(tbd, id, index.fake, false)?)); + top_shard_blocks.push(Arc::new(TopBlockDescrStuff::new(tbd, id, true, false)?)); } // to add simple external message: @@ -362,9 +391,13 @@ impl CollatorTestBundle { // ├─📂 states let mut states = HashMap::new(); - // all shardes states - for ss_id in index.neighbors.iter().chain(index.prev_blocks.iter()) { + // all shards and mc states + let iter = index.neighbors.iter() + .chain(index.prev_blocks.iter()) + .chain(index.mc_states.iter()); + for ss_id in iter { let filename = format!("{}/states/{:x}", path, ss_id.root_hash()); + log::info!("Loading state {} from {}", ss_id, filename); let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; let ss = if ss_id.seq_no() == 0 { ShardStateStuff::deserialize_zerostate( @@ -374,112 +407,35 @@ impl CollatorTestBundle { &telemetry, &allocated )? - } else { - ShardStateStuff::deserialize_state_inmem( - ss_id.clone(), - Arc::new(data), + } else if let Ok(proof) = MerkleProof::construct_from_bytes(&data) { + ShardStateStuff::from_state_root_cell( + ss_id.clone(), + proof.proof.virtualize(1), #[cfg(feature = "telemetry")] &telemetry, - &allocated, - &|| false + &allocated )? - }; - states.insert(ss_id.clone(), ss); - - } - if index.contains_ethalon && !index.id.shard().is_masterchain() { - let filename = format!("{}/states/{:x}", path, index.id.root_hash()); - let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - states.insert( - index.id.clone(), + } else { ShardStateStuff::deserialize_state_inmem( - index.id.clone(), + ss_id.clone(), Arc::new(data), #[cfg(feature = "telemetry")] &telemetry, &allocated, &|| false )? - ); - } - - // oldest mc state is saved full - let oldest_mc_state_id = index.oldest_mc_state(); - let filename = format!("{}/states/{:x}", path, oldest_mc_state_id.root_hash()); - let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - let oldest_mc_state = if oldest_mc_state_id.seq_no() == 0 { - ShardStateStuff::deserialize_zerostate( - oldest_mc_state_id.clone(), - &data, - #[cfg(feature = "telemetry")] - &telemetry, - &allocated - )? - } else { - ShardStateStuff::deserialize_state_inmem( - oldest_mc_state_id.clone(), - Arc::new(data), - #[cfg(feature = "telemetry")] - &telemetry, - &allocated, - &|| false - )? - }; - let mut prev_state_root = oldest_mc_state.root_cell().clone(); - states.insert(oldest_mc_state_id.clone(), oldest_mc_state); - - // other states are culculated by merkle updates - let mut mc_merkle_updates = HashMap::new(); - for id in index.mc_states.iter() { - if id != &oldest_mc_state_id { - let filename = format!("{}/states/mc_merkle_updates/{:x}", path, id.root_hash()); - mc_merkle_updates.insert( - id.clone(), - MerkleUpdate::construct_from_file(filename)?, - ); - } - } - index.mc_states.sort_by_key(|id| id.seq_no); - for id in index.mc_states.iter() { - if id != &oldest_mc_state_id { - let mu = mc_merkle_updates.get(id).ok_or_else( - || error!("Can't get merkle update {}", id) - )?; - let new_root = mu.apply_for(&prev_state_root)?; - states.insert( - id.clone(), - ShardStateStuff::from_state_root_cell( - id.clone(), - new_root.clone(), - #[cfg(feature = "telemetry")] - &telemetry, - &allocated - )? - ); - prev_state_root = new_root; - } + }; + states.insert(ss_id.clone(), ss); } // ├─📂 blocks - let mut blocks = HashMap::new(); - if index.contains_ethalon { + let ethalon_block = if !index.contains_ethalon { + None + } else { let filename = format!("{}/blocks/{:x}", path, index.id.root_hash()); let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - blocks.insert( - index.id.clone(), - BlockStuff::deserialize_block(index.id.clone(), data)? - ); - } - for id in index.prev_blocks.iter() { - if id.seq_no() != 0 { - let filename = format!("{}/blocks/{:x}", path, id.root_hash()); - let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - blocks.insert( - id.clone(), - BlockStuff::deserialize_block(id.clone(), data)? - ); - } - } + Some(BlockStuff::deserialize_block(index.id.clone(), data)?) + }; let candidate = if !index.contains_candidate { None @@ -500,8 +456,8 @@ impl CollatorTestBundle { top_shard_blocks, external_messages, states, - mc_merkle_updates, - blocks, + state_proofs: Default::default(), + ethalon_block, block_handle_storage: create_block_handle_storage(), candidate, #[cfg(feature = "telemetry")] @@ -520,11 +476,10 @@ impl CollatorTestBundle { }) } + // returns ethalon block or desrialize it from candidate if present pub fn ethalon_block(&self) -> Result> { if self.index.contains_ethalon { - Ok(Some( - self.blocks.get(&self.index.id).ok_or_else(|| error!("Index declares contains_ethalon=true but the block is not found"))?.clone() - )) + Ok(self.ethalon_block.clone()) } else if let Some(candidate) = self.candidate() { Ok(Some(BlockStuff::deserialize_block_checked(self.index.id.clone(), candidate.data.clone())?)) } else { @@ -532,33 +487,6 @@ impl CollatorTestBundle { } } -/* UNUSED - pub fn ethalon_state(&self) -> Result> { - if self.index.contains_ethalon { - Ok(self.states.get(&self.index.id).cloned()) - } else if let Some(block) = self.ethalon_block()? { - let prev_ss_root = match block.construct_prev_id()? { - (prev1, Some(prev2)) => { - let ss1 = self.states.get(&prev1).ok_or_else(|| error!("Prev state is not found"))?.root_cell().clone(); - let ss2 = self.states.get(&prev2).ok_or_else(|| error!("Prev state is not found"))?.root_cell().clone(); - ShardStateStuff::construct_split_root(ss1, ss2)? - }, - (prev, None) => { - self.states.get(&prev).ok_or_else(|| error!("Prev state is not found"))?.root_cell().clone() - } - }; - let merkle_update = block - .block()? - .read_state_update()?; - let block_id = block.id().clone(); - let ss_root = merkle_update.apply_for(&prev_ss_root)?; - Ok(Some(ShardStateStuff::new(block_id.clone(), ss_root)?)) - } else { - Ok(None) - } - } -*/ - pub fn block_id(&self) -> &BlockIdExt { &self.index.id } pub fn prev_blocks_ids(&self) -> &Vec { &self.index.prev_blocks } pub fn min_ref_mc_seqno(&self) -> u32 { self.index.min_ref_mc_seqno } @@ -569,33 +497,249 @@ impl CollatorTestBundle { } +struct CollatorTestBundleManager { + engine: Arc, + cached_states: CachedStates, + state_proofs: HashMap, + usage_tree: Option, + block: Option, + shard: ShardIdent, + prev_blocks_ids: Vec, + mc_states: Vec, + last_mc_id: BlockIdExt, // TODO: last_mc_state + oldest_mc_seq_no: u32, + newest_mc_seq_no: u32, +} + +impl CollatorTestBundleManager { + fn new(engine: &Arc) -> Self { + Self { + engine: engine.clone(), + cached_states: CachedStates::new(engine), + state_proofs: Default::default(), + usage_tree: None, + block: None, + shard: ShardIdent::default(), + prev_blocks_ids: vec!(), + mc_states: vec!(), + last_mc_id: BlockIdExt::default(), + oldest_mc_seq_no: 0, + newest_mc_seq_no: 0, + } + } + + async fn load_and_simplify_state(&mut self, id: &BlockIdExt) -> Result<()> { + let state = self.engine.load_state(id).await?; + self.add_simplified_state(state.root_cell(), id, None, None) + } + + fn add_simplified_state( + &mut self, + state_root: &Cell, + id: &BlockIdExt, + usage_tree_opt: Option<&UsageTree>, + min_ref_mc_seqno: Option, + ) -> Result<()> { + let block_opt = if self.prev_blocks_ids.iter().any(|b| b == id) { self.block.as_ref() } else { None }; + CollatorTestBundle::add_simplified_state( + state_root, + &mut self.state_proofs, + id, + block_opt, + usage_tree_opt, + min_ref_mc_seqno + ) + } + + async fn load_and_simplify_merge_states(&mut self, prev_block_id: &BlockIdExt, merge_block_id: &BlockIdExt) -> Result<()> { + let key = self.shard.shard_key(false); + let usage_tree = UsageTree::default(); + let state = self.engine.load_state(merge_block_id).await?; + let state_root = usage_tree.use_cell(state.root_cell().clone(), false); + let mut accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + + let other = self.engine.load_state(prev_block_id).await?; + let state_root = usage_tree.use_cell(other.root_cell().clone(), false); + let other_accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + accounts.merge(&other_accounts, &key)?; + + self.add_simplified_state(state.root_cell(), merge_block_id, Some(&usage_tree), None)?; + self.add_simplified_state(other.root_cell(), prev_block_id, Some(&usage_tree), None)?; + + Ok(()) + } + + async fn prepare_mc_states(&mut self) -> Result<()> { + assert_ne!(self.last_mc_id, Default::default()); + self.oldest_mc_seq_no = self.last_mc_id.seq_no(); + self.newest_mc_seq_no = self.last_mc_id.seq_no(); + for (block_id, _state) in self.state_proofs.iter() { + let state = self.engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut self.cached_states).await?; + for entry in nb.entries() { + self.oldest_mc_seq_no = self.oldest_mc_seq_no.min(entry.mc_seqno()); + self.newest_mc_seq_no = self.newest_mc_seq_no.max(entry.mc_seqno()); + } + } + for mc_seq_no in self.oldest_mc_seq_no..self.newest_mc_seq_no { + let handle = self.engine.find_mc_block_by_seq_no(mc_seq_no).await?; + self.load_and_simplify_state(handle.id()).await?; + self.mc_states.push(handle.id().clone()); + } + self.mc_states.push(self.last_mc_id.clone()); + Ok(()) + } + + fn finalize(self) -> Result { + unimplemented!() + } + +} + impl CollatorTestBundle { + fn load_state_internal(&self, block_id: &BlockIdExt) -> Result> { + if let Some(state) = self.states.get(block_id) { + Ok(state.clone()) + } else if let Some(proof) = self.state_proofs.get(block_id) { + ShardStateStuff::from_state_root_cell( + block_id.clone(), + proof.proof.clone().virtualize(1), + #[cfg(feature = "telemetry")] + &self.telemetry, + &self.allocated, + ) + } else { + fail!("bundle doesn't contain state for block {}", block_id) + } + } + + async fn load_and_simplify_state( + engine: &Arc, + state_proofs: &mut HashMap, + id: &BlockIdExt, + block_opt: Option<&BlockStuff>, + ) -> Result<()> { + Self::add_simplified_state( + engine.load_state(id).await?.root_cell(), + state_proofs, + id, + block_opt, + None, + None + ) + } + fn add_simplified_state( + state_root: &Cell, + state_proofs: &mut HashMap, + id: &BlockIdExt, + block_opt: Option<&BlockStuff>, + usage_tree_opt: Option<&UsageTree>, + min_ref_mc_seqno: Option, + ) -> Result<()> { + if state_proofs.get(id).is_some() { + assert!(min_ref_mc_seqno.is_none()); + assert!(block_opt.is_none()); + assert!(usage_tree_opt.is_none()); + log::debug!("state proof already exists {}", id); + return Ok(()); + } + log::debug!("prepare simplified state for {}", id); + // let root_hash = root.repr_hash(); + let usage_tree_local = UsageTree::default(); + let usage_tree = usage_tree_opt.unwrap_or(&usage_tree_local); + let state_root = usage_tree.use_cell(state_root.clone(), false); + let state = ShardStateUnsplit::construct_from_cell(state_root.clone())?; + let mut sub_trees = HashSet::new(); + let accounts = state.read_accounts()?; + let mut smc_addresses = FundamentalSmcAddresses::default(); + if let Some(mut custom) = state.read_custom()? { + if let Some(min_ref_mc_seqno) = min_ref_mc_seqno { + for mc_seqno in min_ref_mc_seqno..id.seq_no { + custom.prev_blocks.get_raw(&mc_seqno)?.unwrap(); + } + // add fake for new block to avoid pruned access + custom.prev_blocks.set( + &id.seq_no, + &Default::default(), + &Default::default() + )?; + + // get all system contracts + smc_addresses = custom.config().fundamental_smc_addr()?; + smc_addresses.add_key(&custom.config().minter_address()?)?; + smc_addresses.add_key(&custom.config().config_address()?)?; + smc_addresses.add_key(&custom.config().elector_address()?)?; + } + // here clear all unnecessary data + custom.prev_blocks = Default::default(); + // serialize struct and store all sub-trees + let cell = custom.serialize()?; + for i in 0..cell.references_count() { + let child = cell.reference(i)?; + for j in 0..child.references_count() { + sub_trees.insert(child.reference(j)?.repr_hash()); + } + } + } + // read all accounts affected in block + if let Some(block) = block_opt { + let extra = block.block()?.read_extra()?; + extra.read_account_blocks()?.iterate_slices(|account_id, _| { + smc_addresses.add_key_serialized(account_id)?; + Ok(true) + })?; + // load all work cells + // log::trace!("traverse accounts"); + // accounts.len()?; + } + smc_addresses.iterate_slices_with_keys(|account_id, _| { + if let (Some(leaf), _) = accounts.clone().set_builder_serialized(account_id, &Default::default(), &Default::default())? { + // if let Some(leaf) = accounts.get_serialized_raw(account_id)? { + sub_trees.insert(leaf.cell().repr_hash()); + } + Ok(true) + })?; + + // don't prune out_msg_queue_info - it could be very big + let hash = state.out_msg_queue_info_cell().repr_hash(); + sub_trees.insert(hash); + let proof = MerkleProof::create_with_subtrees( + &state_root, + |hash| usage_tree.contains(hash), + |hash| sub_trees.contains(hash) + )?; + state_proofs.insert(id.clone(), proof); + Ok(()) + } + // build bundle for a collating (just now) block. // Uses real engine for top shard blocks and external messages. - // Blocks data loading is optional because we sometimes create bundles using a cut database (without blocks). - // Such a bundle will work, but creating merkle updates could be long + // If usage_tree is not present, try to collate block pub async fn build_for_collating_block( - prev_blocks_ids: Vec, engine: &Arc, + prev_blocks_ids: Vec, + usage_tree_opt: Option, ) -> Result { - log::info!("Building for furure block, prev[0]: {}", prev_blocks_ids[0]); // TODO: fill caches states let mut cached_states = CachedStates::new(engine); - // TODO: use cached states instead - let mut states = HashMap::new(); - let shard = if prev_blocks_ids.len() > 1 { prev_blocks_ids[0].shard().merge()? } else { prev_blocks_ids[0].shard().clone() }; - let is_master = shard.is_masterchain(); + let mut state_proofs = HashMap::new(); + let is_master = prev_blocks_ids[0].shard().is_masterchain(); + let shard = if let Some(merge_block_id) = prev_blocks_ids.get(1) { + merge_block_id.shard().merge()? + } else if engine.load_state(&prev_blocks_ids[0]).await?.state()?.before_split() { + prev_blocks_ids[0].shard().split()?.0 + } else { + prev_blocks_ids[0].shard().clone() + }; // // last mc state // let mc_state = engine.load_last_applied_mc_state().await?; let last_mc_id = mc_state.block_id().clone(); - let mut oldest_mc_seq_no = last_mc_id.seq_no(); - let mut newest_mc_seq_no = last_mc_id.seq_no(); // // top shard blocks @@ -614,52 +758,84 @@ impl CollatorTestBundle { 0 ).collect::>(); - // + // + // prev states + // + let (usage_tree, candidate) = if let Some(usage_tree) = usage_tree_opt { + (usage_tree, None) + } else { + // try to collate block + let collate_result = try_collate( + engine, + shard.clone(), + prev_blocks_ids.clone(), + None, + None, + true, + false + ).await?; + (collate_result.usage_tree, collate_result.candidate) + }; + let (id, now_ms, block_opt); + if let Some(candidate) = &candidate { + let block = BlockStuff::deserialize_block(candidate.block_id.clone(), candidate.data.clone())?; + now_ms = block.block()?.read_info()?.gen_utime_ms(); + id = candidate.block_id.clone(); + block_opt = Some(block); + } else { + now_ms = engine.now_ms(); + // now_ms = engine.load_state(&prev_blocks_ids[0]).await?.state_or_queue()?.gen_time_ms() + 1; // TODO: merge? + id = BlockIdExt { + shard_id: shard.clone(), + seq_no: prev_blocks_ids.iter().map(|id| id.seq_no()).max().unwrap() + 1, + root_hash: UInt256::default(), + file_hash: UInt256::default(), + }; + block_opt = None; + } + if let Some(merge_block_id) = prev_blocks_ids.get(1) { + let proof = MerkleProof::create( + engine.load_state(merge_block_id).await?.root_cell(), + |h| usage_tree.contains(h) + )?; + state_proofs.insert(merge_block_id.clone(), proof); + } + if !is_master { + let proof = MerkleProof::create( + engine.load_state(&prev_blocks_ids[0]).await?.root_cell(), + |h| usage_tree.contains(h) + )?; + state_proofs.insert(prev_blocks_ids[0].clone(), proof); + } + + // // neighbors // let mut neighbors = vec!(); let shards = mc_state.shard_hashes()?; + // TODO: this can be improved later by collated block let neighbor_list = shards.neighbours_for(&shard)?; for shard in neighbor_list.iter() { - states.insert(shard.block_id().clone(), engine.load_state(shard.block_id()).await?); + Self::load_and_simplify_state(engine, &mut state_proofs, shard.block_id(), None).await?; neighbors.push(shard.block_id().clone()); } - if shards.is_empty() || mc_state.block_id().seq_no() != 0 { - states.insert(last_mc_id.clone(), mc_state); - } - // master blocks's collator uses new neighbours, based on new shaedes config. + // master blocks's collator uses new neighbours, based on new shards config. // It is difficult to calculate new config there. So add states for all new shard blocks. for tsb in top_shard_blocks.iter() { let id = tsb.proof_for(); - if !states.contains_key(id) { - states.insert(id.clone(), engine.load_state(id).await?); + if !state_proofs.contains_key(id) { + Self::load_and_simplify_state(engine, &mut state_proofs, id, None).await?; neighbors.push(id.clone()); } } - // - // prev_blocks & states - // - let mut blocks = HashMap::new(); - let handle = engine.load_block_handle(&prev_blocks_ids[0])? - .ok_or_else(|| error!("Cannot load handle for prev1 block {}", prev_blocks_ids[0]))?; - if let Ok(prev1) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[0].clone(), prev1); - } - states.insert(prev_blocks_ids[0].clone(), engine.load_state(&prev_blocks_ids[0]).await?); - if prev_blocks_ids.len() > 1 { - let handle = engine.load_block_handle(&prev_blocks_ids[1])? - .ok_or_else(|| error!("Cannot load handle for prev2 block {}", prev_blocks_ids[1]))?; - if let Ok(prev2) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[1].clone(), prev2); - } - states.insert(prev_blocks_ids[1].clone(), engine.load_state(&prev_blocks_ids[1]).await?); - } - // collect needed mc states - for (_, state) in states.iter() { - let nb = OutMsgQueueInfoStuff::from_shard_state(state, &mut cached_states).await?; + let mut oldest_mc_seq_no = last_mc_id.seq_no(); + let mut newest_mc_seq_no = last_mc_id.seq_no(); + for (block_id, _state_root) in state_proofs.iter() { + let state = engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut cached_states).await?; for entry in nb.entries() { if entry.mc_seqno() < oldest_mc_seq_no { oldest_mc_seq_no = entry.mc_seqno(); @@ -669,38 +845,24 @@ impl CollatorTestBundle { } } - // mc states and merkle updates - let oldest_mc_state = engine.load_state( - engine.find_mc_block_by_seq_no(oldest_mc_seq_no).await?.id() - ).await?; - let mut prev_mc_state = oldest_mc_state.clone(); - let mut mc_states = vec!(oldest_mc_state.block_id().clone()); - states.insert(oldest_mc_state.block_id().clone(), oldest_mc_state); - let mut mc_merkle_updates = HashMap::new(); - - for mc_seq_no in oldest_mc_seq_no + 1..=newest_mc_seq_no { + // + // mc states + // + Self::add_simplified_state( + mc_state.root_cell(), + &mut state_proofs, + mc_state.block_id(), + if is_master { block_opt.as_ref() } else { None }, + if is_master { Some(&usage_tree) } else { None }, + Some(oldest_mc_seq_no), + )?; + let mut mc_states = vec!(mc_state.block_id().clone()); + for mc_seq_no in oldest_mc_seq_no..newest_mc_seq_no { let handle = engine.find_mc_block_by_seq_no(mc_seq_no).await?; - let mc_state = engine.load_state(handle.id()).await?; - let merkle_update = if let Ok(block) = engine.load_block(&handle).await { - block.block()?.read_state_update()? - } else { - let _tc = TimeChecker::new(format!("create merkle update for {}", handle.id()), 30); - // MerkleUpdate::default() - MerkleUpdate::create(prev_mc_state.root_cell(), mc_state.root_cell())? - }; - mc_merkle_updates.insert(handle.id().clone(), merkle_update); - prev_mc_state = mc_state.clone(); - states.insert(handle.id().clone(), mc_state); + Self::load_and_simplify_state(engine, &mut state_proofs, handle.id(), None).await?; mc_states.push(handle.id().clone()); } - let id = BlockIdExt { - shard_id: shard, - seq_no: prev_blocks_ids.iter().max_by_key(|id| id.seq_no()).unwrap().seq_no() + 1, - root_hash: UInt256::default(), - file_hash: UInt256::default(), - }; - let index = CollatorTestBundleIndex { id, top_shard_blocks: top_shard_blocks.iter().map(|tsb| tsb.proof_for().clone()).collect(), @@ -712,10 +874,9 @@ impl CollatorTestBundle { prev_blocks: prev_blocks_ids, created_by: UInt256::default(), rand_seed: None, - now_ms: engine.now_ms(), - fake: true, + now_ms, contains_ethalon: false, - contains_candidate: false, + contains_candidate: candidate.is_some(), notes: String::new(), }; @@ -723,11 +884,11 @@ impl CollatorTestBundle { index, top_shard_blocks, external_messages, - states, - mc_merkle_updates, - blocks, + states: Default::default(), + state_proofs, + ethalon_block: None, block_handle_storage: create_block_handle_storage(), - candidate: None, + candidate, #[cfg(feature = "telemetry")] telemetry: create_engine_telemetry(), allocated: create_engine_allocated(), @@ -740,30 +901,28 @@ impl CollatorTestBundle { // Uses real engine for top shard blocks and external messages. // Blocks data loading is optional because we sometimes create bundles using a cut database (without blocks). // Such a bundle will work, but creating merkle updates could be long + pub async fn build_for_validating_block( - shard: ShardIdent, - _min_masterchain_block_id: BlockIdExt, + engine: &Arc, prev_blocks_ids: Vec, candidate: BlockCandidate, - engine: &Arc, ) -> Result { - log::info!("Building for validating block, candidate: {}", candidate.block_id); // TODO: fill caches states let mut cached_states = CachedStates::new(engine); - // TODO: use cached states instead - let mut states = HashMap::new(); - let is_master = shard.is_masterchain(); + let mut state_proofs = HashMap::new(); + let is_master = candidate.block_id.shard().is_masterchain(); + + let block = BlockStuff::deserialize_block_checked(candidate.block_id.clone(), candidate.data.clone())?; + let now_ms = block.block()?.read_info()?.gen_utime_ms(); // // last mc state // let mc_state = engine.load_last_applied_mc_state().await?; let last_mc_id = mc_state.block_id().clone(); - let mut oldest_mc_seq_no = last_mc_id.seq_no(); - let mut newest_mc_seq_no = last_mc_id.seq_no(); // // top shard blocks @@ -777,53 +936,60 @@ impl CollatorTestBundle { // // external messages // - let external_messages = engine.get_external_messages_iterator( - shard.clone(), - 0 - ).collect::>(); + let external_messages = engine.get_external_messages_iterator(candidate.block_id.shard().clone(), 0).collect::>(); + + // + // prev states + // + if let Some(merge_block_id) = prev_blocks_ids.get(1) { + let key = candidate.block_id.shard().shard_key(false); + let usage_tree = UsageTree::default(); + let state = engine.load_state(merge_block_id).await?; + let state_root = usage_tree.use_cell(state.root_cell().clone(), false); + let mut accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + + let other = engine.load_state(&prev_blocks_ids[0]).await?; + let state_root = usage_tree.use_cell(other.root_cell().clone(), false); + let other_accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + accounts.merge(&other_accounts, &key)?; + + Self::add_simplified_state(state.root_cell(), &mut state_proofs, merge_block_id, Some(&block), Some(&usage_tree), None)?; + Self::add_simplified_state(other.root_cell(), &mut state_proofs, &prev_blocks_ids[0], Some(&block), Some(&usage_tree), None)?; + } else if !is_master { + Self::load_and_simplify_state(engine, &mut state_proofs, &prev_blocks_ids[0], Some(&block)).await?; + } // // neighbors // let mut neighbors = vec!(); - let shards = if shard.is_masterchain() { - let block = BlockStuff::deserialize_block_checked(candidate.block_id.clone(), candidate.data.clone())?; + let shards = if is_master { block.shard_hashes()? } else { mc_state.shard_hashes()? }; - let neighbor_list = shards.neighbours_for(&shard)?; + let neighbor_list = shards.neighbours_for(&candidate.block_id.shard())?; for shard in neighbor_list.iter() { - states.insert(shard.block_id().clone(), engine.load_state(shard.block_id()).await?); + Self::load_and_simplify_state(engine, &mut state_proofs, shard.block_id(), None).await?; neighbors.push(shard.block_id().clone()); } - if shards.is_empty() || mc_state.block_id().seq_no() != 0 { - states.insert(last_mc_id.clone(), mc_state); - } - - // - // prev_blocks & states - // - let mut blocks = HashMap::new(); - let handle = engine.load_block_handle(&prev_blocks_ids[0])? - .ok_or_else(|| error!("Cannot load handle for prev1 block {}", prev_blocks_ids[0]))?; - if let Ok(prev1) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[0].clone(), prev1); - } - states.insert(prev_blocks_ids[0].clone(), engine.load_state(&prev_blocks_ids[0]).await?); - if prev_blocks_ids.len() > 1 { - let handle = engine.load_block_handle(&prev_blocks_ids[1])? - .ok_or_else(|| error!("Cannot load handle for prev2 block {}", prev_blocks_ids[1]))?; - if let Ok(prev2) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[1].clone(), prev2); + // master blocks's collator uses new neighbours, based on new shards config. + // It is difficult to calculate new config there. So add states for all new shard blocks. + for tsb in top_shard_blocks.iter() { + let id = tsb.proof_for(); + if !state_proofs.contains_key(id) { + Self::load_and_simplify_state(engine, &mut state_proofs, id, None).await?; + neighbors.push(id.clone()); } - states.insert(prev_blocks_ids[1].clone(), engine.load_state(&prev_blocks_ids[1]).await?); } // collect needed mc states - for (_, state) in states.iter() { - let nb = OutMsgQueueInfoStuff::from_shard_state(state, &mut cached_states).await?; + let mut oldest_mc_seq_no = last_mc_id.seq_no(); + let mut newest_mc_seq_no = last_mc_id.seq_no(); + for (block_id, _state_root) in state_proofs.iter() { + let state = engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut cached_states).await?; for entry in nb.entries() { if entry.mc_seqno() < oldest_mc_seq_no { oldest_mc_seq_no = entry.mc_seqno(); @@ -833,32 +999,26 @@ impl CollatorTestBundle { } } - // mc states and merkle updates - let oldest_mc_state = engine.load_state( - engine.find_mc_block_by_seq_no(oldest_mc_seq_no).await?.id() - ).await?; - let mut prev_mc_state = oldest_mc_state.clone(); - let mut mc_states = vec!(oldest_mc_state.block_id().clone()); - states.insert(oldest_mc_state.block_id().clone(), oldest_mc_state); - let mut mc_merkle_updates = HashMap::new(); - - for mc_seq_no in oldest_mc_seq_no + 1..=newest_mc_seq_no { + // + // mc states + // + Self::add_simplified_state( + mc_state.root_cell(), + &mut state_proofs, + mc_state.block_id(), + if is_master { Some(&block) } else { None }, + None, + Some(oldest_mc_seq_no), + )?; + let mut mc_states = vec!(mc_state.block_id().clone()); + for mc_seq_no in oldest_mc_seq_no..newest_mc_seq_no { let handle = engine.find_mc_block_by_seq_no(mc_seq_no).await?; - let mc_state = engine.load_state(handle.id()).await?; - let merkle_update = if let Ok(block) = engine.load_block(&handle).await { - block.block()?.read_state_update()? - } else { - // be careful - creating of new merkle update is very slow - // if some shards were frozen for a long time - MerkleUpdate::create(prev_mc_state.root_cell(), mc_state.root_cell())? - }; - prev_mc_state = mc_state.clone(); - mc_merkle_updates.insert(handle.id().clone(), merkle_update); - states.insert(handle.id().clone(), mc_state); + Self::load_and_simplify_state(engine, &mut state_proofs, handle.id(), None).await?; mc_states.push(handle.id().clone()); } - let b = BlockStuff::deserialize_block_checked(candidate.block_id.clone(), candidate.data.clone())?; + // let mut blocks = HashMap::new(); + // blocks.insert(candidate.block_id.clone(), block); let index = CollatorTestBundleIndex { id: candidate.block_id.clone(), @@ -871,8 +1031,7 @@ impl CollatorTestBundle { prev_blocks: prev_blocks_ids, created_by: candidate.created_by.clone(), rand_seed: None, - now_ms: b.block()?.read_info()?.gen_utime_ms(), - fake: true, + now_ms, contains_ethalon: false, contains_candidate: true, notes: String::new(), @@ -882,9 +1041,9 @@ impl CollatorTestBundle { index, top_shard_blocks, external_messages, - states, - mc_merkle_updates, - blocks, + states: Default::default(), + state_proofs, + ethalon_block: None, block_handle_storage: create_block_handle_storage(), candidate: Some(candidate), #[cfg(feature = "telemetry")] @@ -899,77 +1058,84 @@ impl CollatorTestBundle { // without signatures. Ethalon block is included, external messages are taken // from ethalon block pub async fn build_with_ethalon( - block_id: &BlockIdExt, engine: &Arc, + block: BlockStuff, ) -> Result { + log::info!("Building with ethalon {}", block.id()); - log::info!("Building with ethalon {}", block_id); - - let handle = engine.load_block_handle(block_id)?.ok_or_else( - || error!("Cannot load handle for block {}", block_id) - )?; - let block = engine.load_block(&handle).await?; let info = block.block()?.read_info()?; let extra = block.block()?.read_extra()?; // TODO: fill caches states let mut cached_states = CachedStates::new(engine); - // TODO: use cached states instead - let mut states = HashMap::new(); + let mut state_proofs = HashMap::new(); + let is_master = block.id().shard().is_masterchain(); // // last mc state // + let (prev, merge_block_id) = block.construct_prev_id()?; let last_mc_id = if let Some(master_ref) = info.read_master_ref()? { BlockIdExt::from_ext_blk(master_ref.master) } else { - block.construct_prev_id()?.0 + prev.clone() }; - let mut oldest_mc_seq_no = last_mc_id.seq_no(); - let mut newest_mc_seq_no = last_mc_id.seq_no(); + let mc_state = engine.load_state(&last_mc_id).await?; // - // top shard blocks (fake) + // prev states // - let mut shard_blocks_ids = vec![]; - if let Ok(shards) = block.shards() { - shards.iterate_shards(|shard_id, descr| { - shard_blocks_ids.push(BlockIdExt { - shard_id, - seq_no: descr.seq_no, - root_hash: descr.root_hash, - file_hash: descr.file_hash, - }); - Ok(true) - })?; + let mut prev_blocks_ids = vec!(prev.clone()); + if let Some(merge_block_id) = merge_block_id { + let key = block.id().shard().shard_key(false); + let usage_tree = UsageTree::default(); + let state = engine.load_state(&merge_block_id).await?; + let state_root = usage_tree.use_cell(state.root_cell().clone(), false); + let mut accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + + let other = engine.load_state(&prev).await?; + let state_root = usage_tree.use_cell(other.root_cell().clone(), false); + let other_accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + accounts.merge(&other_accounts, &key)?; + + Self::add_simplified_state(state.root_cell(), &mut state_proofs, &merge_block_id, Some(&block), Some(&usage_tree), None)?; + Self::add_simplified_state(other.root_cell(), &mut state_proofs, &prev, Some(&block), Some(&usage_tree), None)?; + prev_blocks_ids.push(merge_block_id); + } else if !is_master { + Self::load_and_simplify_state(engine, &mut state_proofs, &prev, Some(&block)).await?; } + + // + // top shard blocks (fake) + // + let shard_blocks_ids = block.top_blocks_all().unwrap_or_default(); let mut top_shard_blocks = vec![]; let mut top_shard_blocks_ids = vec![]; - let mc_state = engine.load_state(&last_mc_id).await?; for shard_block_id in shard_blocks_ids.iter().filter(|id| id.seq_no() != 0) { let handle = engine.load_block_handle(shard_block_id)? .ok_or_else(|| error!("Cannot load handle for shard block {}", shard_block_id))?; - let block = engine.load_block(&handle).await?; - let info = block.block()?.read_info()?; - let prev_blocks_ids = info.read_prev_ids()?; - let base_info = ValidatorBaseInfo::with_params( - info.gen_validator_list_hash_short(), - info.gen_catchain_seqno() - ); - let signatures = BlockSignaturesPure::default(); + if let Ok(block) = engine.load_block(&handle).await { + let info = block.block()?.read_info()?; + let prev_blocks_ids = info.read_prev_ids()?; + let base_info = ValidatorBaseInfo::with_params( + info.gen_validator_list_hash_short(), + info.gen_catchain_seqno() + ); + let signatures = BlockSignaturesPure::default(); - // sometimes some shards don't have new blocks to create TSBD - if let Some(tbd) = create_top_shard_block_description( + // sometimes some shards don't have new blocks to create TSBD + if let Some(tbd) = create_top_shard_block_description( &block, BlockSignatures::with_params(base_info, signatures), &mc_state, // TODO &prev_blocks_ids, engine.deref(), ).await? { - let tbd = TopBlockDescrStuff::new(tbd, block_id, true, false).unwrap(); - top_shard_blocks_ids.push(tbd.proof_for().clone()); - top_shard_blocks.push(Arc::new(tbd)); + let tbd = TopBlockDescrStuff::new(tbd, block.id(), true, false).unwrap(); + top_shard_blocks_ids.push(tbd.proof_for().clone()); + top_shard_blocks.push(Arc::new(tbd)); + } } } @@ -992,47 +1158,19 @@ impl CollatorTestBundle { // neighbors // let mut neighbors = vec!(); - let shards = match block.shard_hashes() { - Ok(shards) => shards, - Err(_) => mc_state.shard_hashes()? - }; - - let neighbor_list = shards.neighbours_for(block_id.shard())?; + let shards = block.shard_hashes().or_else(|_| mc_state.shard_hashes())?; + let neighbor_list = shards.neighbours_for(block.id().shard())?; for shard in neighbor_list.iter() { - states.insert(shard.block_id().clone(), engine.load_state(shard.block_id()).await?); + Self::load_and_simplify_state(engine, &mut state_proofs, shard.block_id(), None).await?; neighbors.push(shard.block_id().clone()); } - if shards.is_empty() || mc_state.block_id().seq_no() != 0 { - states.insert(mc_state.block_id().clone(), mc_state); - } - - // - // prev_blocks & states - // - let mut blocks = HashMap::new(); - let mut prev_blocks_ids = vec!(); - let prev = block.construct_prev_id()?; - let prev1 = engine.load_block_handle(&prev.0)?.ok_or_else( - || error!("Cannot load handle for prev1 block {}", prev.0) - )?; - prev_blocks_ids.push(prev1.id().clone()); - states.insert(prev1.id().clone(), engine.load_state(prev1.id()).await?); - if let Ok(block) = engine.load_block(&prev1).await { - blocks.insert(prev1.id().clone(), block); - } - if let Some(prev2) = prev.1 { - let handle = engine.load_block_handle(&prev2)? - .ok_or_else(|| error!("Cannot load handle for prev2 block {}", prev2 ))?; - let prev2 = engine.load_block(&handle).await?; - prev_blocks_ids.push(prev2.id().clone()); - states.insert(prev2.id().clone(), engine.load_state(prev2.id()).await?); - blocks.insert(prev2.id().clone(), prev2); - } - // collect needed mc states - for (_, state) in states.iter() { - let nb = OutMsgQueueInfoStuff::from_shard_state(state, &mut cached_states).await?; + let mut oldest_mc_seq_no = last_mc_id.seq_no(); + let mut newest_mc_seq_no = last_mc_id.seq_no(); + for (block_id, _state) in state_proofs.iter() { + let state = engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut cached_states).await?; for entry in nb.entries() { if entry.mc_seqno() < oldest_mc_seq_no { oldest_mc_seq_no = entry.mc_seqno(); @@ -1042,35 +1180,26 @@ impl CollatorTestBundle { } } - // ethalon block and state - blocks.insert(block_id.clone(), block); - if block_id.shard().is_masterchain() { - if block_id.seq_no() < oldest_mc_seq_no { - oldest_mc_seq_no = block_id.seq_no(); - } else if block_id.seq_no() > newest_mc_seq_no { - newest_mc_seq_no = block_id.seq_no(); - } - } else { - states.insert(block_id.clone(), engine.load_state(block_id).await?); - } - // mc states and merkle updates - let oldest_mc_state = engine.load_state( - engine.find_mc_block_by_seq_no(oldest_mc_seq_no).await?.id() - ).await?; - let mut mc_states = vec!(oldest_mc_state.block_id().clone()); - states.insert(oldest_mc_state.block_id().clone(), oldest_mc_state); - let mut mc_merkle_updates = HashMap::new(); - - for mc_seq_no in oldest_mc_seq_no + 1..=newest_mc_seq_no { + // + // mc states + // + Self::add_simplified_state( + mc_state.root_cell(), + &mut state_proofs, + mc_state.block_id(), + None, + None, + Some(oldest_mc_seq_no) + )?; + let mut mc_states = vec!(mc_state.block_id().clone()); + for mc_seq_no in oldest_mc_seq_no..newest_mc_seq_no { let handle = engine.find_mc_block_by_seq_no(mc_seq_no).await?; - let block = engine.load_block(&handle).await?; - mc_merkle_updates.insert(block.id().clone(), block.block()?.read_state_update()?); - states.insert(block.id().clone(), engine.load_state(block.id()).await?); - mc_states.push(block.id().clone()); + Self::load_and_simplify_state(engine, &mut state_proofs, handle.id(), None).await?; + mc_states.push(handle.id().clone()); } let index = CollatorTestBundleIndex { - id: block_id.clone(), + id: block.id().clone(), top_shard_blocks: top_shard_blocks_ids, external_messages: external_messages_ids, last_mc_state: last_mc_id, @@ -1081,7 +1210,6 @@ impl CollatorTestBundle { created_by: extra.created_by().clone(), rand_seed: Some(extra.rand_seed().clone()), now_ms: info.gen_utime_ms(), - fake: true, contains_ethalon: true, contains_candidate: false, notes: String::new(), @@ -1091,9 +1219,9 @@ impl CollatorTestBundle { index, top_shard_blocks, external_messages, - states, - mc_merkle_updates, - blocks, + states: Default::default(), + state_proofs, + ethalon_block: Some(block), block_handle_storage: create_block_handle_storage(), candidate: None, #[cfg(feature = "telemetry")] @@ -1115,6 +1243,7 @@ impl CollatorTestBundle { let path = format!("{}/top_shard_blocks/", path); std::fs::create_dir_all(&path)?; let filename = format!("{}/{:x}", path, tbd.proof_for().root_hash()); + log::info!("Saving top_shard_blocks {}", filename); tbd.top_block_descr().write_to_file(filename)?; } @@ -1123,47 +1252,34 @@ impl CollatorTestBundle { let path = format!("{}/external_messages/", path); std::fs::create_dir_all(&path)?; let filename = format!("{}/{:x}", path, id); + log::info!("Saving external message {}", filename); m.write_to_file(filename)?; } // ├─📂 states - // all shardes states + // all states ptoofs let path1 = format!("{}/states/", path); std::fs::create_dir_all(&path1)?; - for ss_id in self.index.neighbors.iter().chain(self.index.prev_blocks.iter()) { + let iter = self.index.neighbors.iter() + .chain(self.index.prev_blocks.iter()) + .chain(self.index.mc_states.iter()); + for ss_id in iter { let filename = format!("{}/{:x}", path1, ss_id.root_hash()); - self.states.get(ss_id) + log::debug!("Saving {} state to {}", ss_id, filename); + let now = std::time::Instant::now(); + self.state_proofs.get(ss_id) .ok_or_else(|| error!("Bundle's internal error (state {})", ss_id))? - .write_to(&mut File::create(filename)?)?; - } - // ethalon state - if self.index.contains_ethalon && !self.index.id.shard().is_masterchain() { - let filename = format!("{}/{:x}", path1, self.index.id.root_hash()); - self.states.get(&self.index.id) - .ok_or_else(|| error!("Bundle's internal error (state {})", self.index.id))? - .write_to(&mut File::create(filename)?)?; - } - // oldest mc state is saved full - let oldest_mc_state = self.index.oldest_mc_state(); - let filename = format!("{}/{:x}", path1, oldest_mc_state.root_hash()); - self.states.get(&oldest_mc_state) - .ok_or_else(|| error!("Bundle's internal error (state {})", oldest_mc_state))? - .write_to(&mut File::create(filename)?)?; - - // merkle updates for all other mc states - let path1 = format!("{}/states/mc_merkle_updates/", path); - std::fs::create_dir_all(&path1)?; - for (id, mu) in self.mc_merkle_updates.iter() { - let filename = format!("{}/{:x}", path1, id.root_hash()); - mu.write_to_file(filename)?; + .write_to_file(&filename)?; + log::debug!("Saved {} state to {} in {} ms", ss_id, filename, now.elapsed().as_millis()); } // ├─📂 blocks - for (id, b) in self.blocks.iter() { + if let Some(block) = &self.ethalon_block { let path = format!("{}/blocks/", path); std::fs::create_dir_all(&path)?; - let filename = format!("{}/{:x}", path, id.root_hash()); - b.write_to(&mut File::create(filename)?)?; + let filename = format!("{}/{:x}", path, block.id().root_hash()); + log::info!("Saving ethalon block {}", filename); + block.write_to(&mut File::create(filename)?)?; } // candidate @@ -1210,9 +1326,7 @@ impl CollatorTestBundle { pub fn set_notes(&mut self, notes: String) { self.index.notes = notes } fn get_messages(&self, remp: bool) -> Result, UInt256)>> { - let remp_enabled = self.states - .get(&self.index.last_mc_state) - .ok_or_else(|| error!("Can't load last ms block to read config"))? + let remp_enabled = self.load_state_internal(&self.index.last_mc_state)? .config_params()?.has_capability(GlobalCapabilities::CapRemp); if remp_enabled == remp { @@ -1242,8 +1356,7 @@ impl EngineOperations for CollatorTestBundle { None )?; if let Some(handle) = handle { - if self.blocks.contains_key(id) && (id != &self.index.id) { - handle.set_data(); + if self.states.contains_key(id) { handle.set_state(); handle.set_block_applied(); } @@ -1255,16 +1368,14 @@ impl EngineOperations for CollatorTestBundle { async fn load_state(&self, block_id: &BlockIdExt) -> Result> { if *block_id != self.index.id { - if let Some(s) = self.states.get(block_id) { - return Ok(s.clone()); - } + return self.load_state_internal(&block_id) } fail!("bundle doesn't contain state for block {}", block_id) } async fn load_block(&self, handle: &BlockHandle) -> Result { if *handle.id() != self.index.id { - if let Some(s) = self.blocks.get(handle.id()) { + if let Some(s) = &self.ethalon_block { return Ok(s.clone()); } } @@ -1272,11 +1383,7 @@ impl EngineOperations for CollatorTestBundle { } async fn load_last_applied_mc_state(&self) -> Result> { - if let Some(s) = self.states.get(&self.index.last_mc_state) { - Ok(s.clone()) - } else { - fail!("bundle doesn't contain state for block {}", &self.index.last_mc_state) - } + self.load_state_internal(&self.index.last_mc_state) } async fn wait_state( @@ -1289,7 +1396,7 @@ impl EngineOperations for CollatorTestBundle { } async fn find_mc_block_by_seq_no(&self, seq_no: u32) -> Result> { - for (id, _block) in self.blocks.iter() { + for (id, _block) in self.states.iter() { if (id.seq_no() != seq_no) || !id.shard().is_masterchain() { continue } @@ -1312,21 +1419,24 @@ impl EngineOperations for CollatorTestBundle { _: &Arc, _: Option<&mut u32>, ) -> Result>> { - if self.top_shard_blocks.len() > 0 { + if !self.top_shard_blocks.is_empty() { return Ok(self.top_shard_blocks.clone()); } else if let Some(candidate) = self.candidate() { - let collated_roots = read_boc(&candidate.collated_data)?.roots; - for i in 0..collated_roots.len() { - let croot = collated_roots[i].clone(); - if croot.cell_type() == CellType::Ordinary { - let mut res = vec!(); - let top_shard_descr_dict = TopBlockDescrSet::construct_from_cell(croot)?; - top_shard_descr_dict.collection().iterate(|tbd| { - let id = tbd.0.proof_for().clone(); - res.push(Arc::new(TopBlockDescrStuff::new(tbd.0, &id, true, false)?)); - Ok(true) - })?; - return Ok(res); + log::info!("candidate.collated_data.len(): {}", candidate.collated_data.len()); + if !candidate.collated_data.is_empty() { + let collated_roots = read_boc(&candidate.collated_data)?.roots; + for i in 0..collated_roots.len() { + let croot = collated_roots[i].clone(); + if croot.cell_type() == CellType::Ordinary { + let mut res = vec!(); + let top_shard_descr_dict = TopBlockDescrSet::construct_from_cell(croot)?; + top_shard_descr_dict.collection().iterate(|tbd| { + let id = tbd.0.proof_for().clone(); + res.push(Arc::new(TopBlockDescrStuff::new(tbd.0, &id, true, false)?)); + Ok(true) + })?; + return Ok(res); + } } } } @@ -1397,3 +1507,82 @@ impl EngineOperations for CollatorTestBundle { None } } + +pub async fn try_collate( + engine: &Arc, + shard: ShardIdent, + prev_blocks_ids: Vec, + created_by_opt: Option, + rand_seed_opt: Option, + skip_state_update: bool, + check_validation: bool, +) -> Result { + let mc_state = engine.load_last_applied_mc_state().await?; + let mc_state_extra = mc_state.shard_state_extra()?; + let mut cc_seqno_with_delta = 0; + let cc_seqno_from_state = if shard.is_masterchain() { + mc_state_extra.validator_info.catchain_seqno + } else { + mc_state_extra.shards.calc_shard_cc_seqno(&shard)? + }; + let nodes = compute_validator_set_cc( + &mc_state, + &shard, + prev_blocks_ids.iter().map(|id| id.seq_no()).max().unwrap() + 1, + cc_seqno_from_state, + &mut cc_seqno_with_delta + )?; + let validator_set = ValidatorSet::with_cc_seqno(0, 0, 0, cc_seqno_with_delta, nodes)?; + + // log::debug!("{}", block_stuff.id()); + + log::info!("TRY COLLATE block {}", shard); + + let min_mc_seq_no = if prev_blocks_ids[0].seq_no() == 0 { + 0 + } else { + let state = engine.load_state(&prev_blocks_ids[0]).await?; + state.state()?.min_ref_mc_seqno() + }; + + let collator_settings = CollatorSettings { + skip_state_update, + ..Default::default() + }; + let collator = Collator::new( + shard.clone(), + min_mc_seq_no, + prev_blocks_ids.clone(), + validator_set.clone(), + created_by_opt.unwrap_or_default(), + engine.clone(), + rand_seed_opt, + collator_settings, + )?; + let collate_result = collator.collate().await?; + + if let Some(candidate) = &collate_result.candidate { + if check_validation { + // let new_block = Block::construct_from_bytes(&candidate.data).unwrap(); + + // std::fs::write(&format!("{}/state_candidate.json", RES_PATH), ever_block_json::debug_state(new_state.clone())?)?; + // std::fs::write(&format!("{}/block_candidate.json", RES_PATH), ever_block_json::debug_block_full(new_block)?)?; + + let validator_query = ValidateQuery::new( + shard.clone(), + min_mc_seq_no, + prev_blocks_ids.clone(), + candidate.clone(), + validator_set.clone(), + engine.clone(), + true, + true, + None, + ); + validator_query.try_validate().await?; + } + Ok(collate_result) + } else { + Err(collate_result.error.unwrap()) + } +} diff --git a/src/internal_db/mod.rs b/src/internal_db/mod.rs index 5d53faa1..77b41765 100644 --- a/src/internal_db/mod.rs +++ b/src/internal_db/mod.rs @@ -15,14 +15,14 @@ use crate::{ block::BlockStuff, block_proof::BlockProofStuff, engine_traits::EngineAlloc, error::NodeError, shard_state::ShardStateStuff, types::top_block_descr::{TopBlockDescrId, TopBlockDescrStuff}, internal_db::restore::check_db, - }; #[cfg(feature = "telemetry")] use crate::engine_traits::EngineTelemetry; use std::{ - cmp::min, collections::{HashMap, HashSet}, io::Cursor, mem::size_of, path::{Path, PathBuf}, - sync::{Arc, atomic::{AtomicBool, AtomicU32, Ordering}}, time::{UNIX_EPOCH, Duration}, ops::Deref + collections::{HashMap, HashSet}, io::Cursor, mem::size_of, ops::Deref, + path::{Path, PathBuf}, sync::{atomic::{AtomicBool, AtomicU32, Ordering}, Arc}, + time::{Duration, UNIX_EPOCH} }; use storage::{ StorageAlloc, TimeChecker, @@ -35,9 +35,9 @@ use storage::{ use storage::shardstate_db_async::{self, AllowStateGcResolver, ShardStateDb}; #[cfg(feature = "telemetry")] use storage::StorageTelemetry; -use ever_block::{Block, BlockIdExt, INVALID_WORKCHAIN_ID, CellsFactory}; use ever_block::{ - error, fail, Result, UInt256, Cell, BocWriterStack, MAX_SAFE_DEPTH, DoneCellsStorage, + error, fail, Block, BlockIdExt, BocWriterStack, Cell, CellsFactory, DoneCellsStorage, + Result, ShardIdent, UInt256, INVALID_WORKCHAIN_ID, MAX_SAFE_DEPTH }; /// Full node state keys @@ -199,6 +199,27 @@ pub struct InternalDb { #[allow(dead_code)] impl InternalDb { + #[cfg(test)] + pub async fn only_cells_db( + config: InternalDbConfig, + #[cfg(feature = "telemetry")] + telemetry: Arc, + allocated: Arc, + ) -> Result { + Self::construct( + config, + false, + false, + false, + true, + &|| Ok(()), + None, + #[cfg(feature = "telemetry")] + telemetry, + allocated + ).await + } + pub async fn with_update( config: InternalDbConfig, restore_db_enabled: bool, @@ -210,43 +231,35 @@ impl InternalDb { telemetry: Arc, allocated: Arc, ) -> Result { - let mut db = Self::construct( + Self::construct( config, + restore_db_enabled, + force_check_db, allow_update, + false, + check_stop, + is_broken, #[cfg(feature = "telemetry")] telemetry, allocated, - ).await?; - let version = db.resolve_db_version()?; - if version != CURRENT_DB_VERSION { - if allow_update { - db = update::update(db, version, check_stop, is_broken, force_check_db, - restore_db_enabled).await? - } else { - fail!( - "DB version {} does not correspond to current supported one {}.", - version, - CURRENT_DB_VERSION - ) - } - } else { - log::info!("DB VERSION {}", version); - // TODO correct workchain id needed here, but it will be known later - db = check_db(db, 0, restore_db_enabled, force_check_db, check_stop, is_broken).await?; - } - Ok(db) + ).await } async fn construct( config: InternalDbConfig, + restore_db_enabled: bool, + force_check_db: bool, allow_update: bool, + read_only: bool, + check_stop: &(dyn Fn() -> Result<()> + Sync), + is_broken: Option<&AtomicBool>, #[cfg(feature = "telemetry")] telemetry: Arc, allocated: Arc, ) -> Result { let mut hi_perf_cfs = HashSet::new(); hi_perf_cfs.insert(CELLS_CF_NAME.to_string()); - let db = RocksDb::with_options(config.db_directory.as_str(), "db", hi_perf_cfs, false)?; + let db = RocksDb::with_options(config.db_directory.as_str(), "db", hi_perf_cfs, read_only)?; let db_catchain = RocksDb::with_path(config.db_directory.as_str(), "catchains")?; let block_handle_db = Arc::new( BlockHandleDb::with_db(db.clone(), "block_handle_db", true)? @@ -334,7 +347,21 @@ impl InternalDb { telemetry, allocated }; - + let version = db.resolve_db_version()?; + let db = if version == CURRENT_DB_VERSION { + log::info!("DB VERSION {}", version); + // TODO correct workchain id needed here, but it will be known later + check_db(db, 0, restore_db_enabled, force_check_db, check_stop, is_broken).await? + } else if allow_update { + update::update(db, version, check_stop, is_broken, force_check_db, + restore_db_enabled).await? + } else { + fail!( + "DB version {} does not correspond to current supported one {}.", + version, + CURRENT_DB_VERSION + ) + }; Ok(db) } @@ -610,6 +637,34 @@ impl InternalDb { } } + #[cfg(test)] + /// seacrhes for a block in the previous blocks db + /// be careful it can be slow + pub fn find_block_by_seq_no(&self, shard: &ShardIdent, seqno: u32) -> Result> { + let _tc = TimeChecker::new(format!("find_block_by_seq_no {}", seqno), 300); + let mut found = None; + self.prev1_block_db.for_each(&mut |_key, val| { + let id = BlockIdExt::deserialize(&mut Cursor::new(&val))?; + if id.shard() == shard && id.seq_no() == seqno { + found = Some(id); + Ok(false) + } else { + Ok(true) + } + })?; + if let Some(id) = found { + self.load_block_handle(&id)?.ok_or_else( + || error!("Cannot load handle for master block {}", id) + ) + } else { + fail!("Can't find block with seqno {} in shard {}", seqno, shard) + } + } + + pub fn enum_shardstate_db(&self) -> Result<()> { + self.shard_state_dynamic_db.enum_shardstate_db() + } + pub async fn store_block_proof( &self, id: &BlockIdExt, @@ -871,7 +926,7 @@ impl InternalDb { if offset == full_lenth { Ok(vec![]) } else { - let length = min(length, full_lenth - offset); + let length = length.min(full_lenth - offset); let data = self.shard_state_persistent_db.read_file_part(id, offset, length).await?; Ok(data) } diff --git a/src/network/control.rs b/src/network/control.rs index 23bdf761..33b4672f 100644 --- a/src/network/control.rs +++ b/src/network/control.rs @@ -541,7 +541,9 @@ impl ControlQuerySubscriber { async fn prepare_bundle(&self, block_id: BlockIdExt) -> Result { if let DataSource::Engine(ref engine) = self.data_source { - let bundle = CollatorTestBundle::build_with_ethalon(&block_id, engine).await?; + let handle = engine.load_block_handle(&block_id)?.ok_or_else(|| error!("Block handle for {} not found", block_id))?; + let block = engine.load_block(&handle).await?; + let bundle = CollatorTestBundle::build_with_ethalon(engine, block).await?; tokio::task::spawn_blocking(move || { bundle.save("target/bundles").ok(); }); @@ -549,10 +551,10 @@ impl ControlQuerySubscriber { Ok(Success::Engine_Validator_Success) } - async fn prepare_future_bundle(&self, prev_block_ids: Vec) -> Result { + async fn prepare_future_bundle(&self, prev_blocks_ids: Vec) -> Result { if let DataSource::Engine(ref engine) = self.data_source { let bundle = CollatorTestBundle::build_for_collating_block( - prev_block_ids, engine + engine, prev_blocks_ids, None ).await?; tokio::task::spawn_blocking(move || { bundle.save("target/bundles").ok(); @@ -703,6 +705,7 @@ impl ControlQuerySubscriber { self.add_validator_bls_key( query.permanent_key_hash.as_slice(), query.key_hash.as_slice(), query.ttl ).await?, + #[cfg(feature = "telemetry")] None ), Err(query) => query diff --git a/src/network/neighbours.rs b/src/network/neighbours.rs index 9f638a68..3d8f350d 100644 --- a/src/network/neighbours.rs +++ b/src/network/neighbours.rs @@ -471,6 +471,7 @@ impl Neighbours { }); } + #[cfg(feature = "telemetry")] pub fn log_neighbors_stat(&self) { log::debug!( target: "telemetry", diff --git a/src/network/node_network.rs b/src/network/node_network.rs index 8355ad36..c3f9e513 100644 --- a/src/network/node_network.rs +++ b/src/network/node_network.rs @@ -271,6 +271,7 @@ impl NodeNetwork { } } + #[cfg(feature = "telemetry")] pub fn log_neighbors_stat(&self) { for guard in self.overlays.iter() { guard.val().peers().log_neighbors_stat(); diff --git a/src/network/remp.rs b/src/network/remp.rs index e2c6fa94..7393a1a6 100644 --- a/src/network/remp.rs +++ b/src/network/remp.rs @@ -182,6 +182,11 @@ impl RempNode { self.telemetry.set(telemetry).map_err(|_| error!("Can't set telemetry"))?; Ok(()) } + + #[cfg(all(test,feature = "telemetry"))] + pub fn telemetry(&self) -> Arc { + self.telemetry.get().unwrap().clone() + } } #[async_trait::async_trait] diff --git a/src/network/tests/test_remp.rs b/src/network/tests/test_remp.rs index 175b0c45..0d1d20b3 100644 --- a/src/network/tests/test_remp.rs +++ b/src/network/tests/test_remp.rs @@ -1,7 +1,9 @@ use crate::{ network::remp::{RempNode, RempMessagesSubscriber, RempReceiptsSubscriber, ReceiptStuff}, - test_helper::{get_adnl_config, init_test_log}, validator::telemetry::RempCoreTelemetry + test_helper::{get_adnl_config, init_test_log}, }; +#[cfg(feature = "telemetry")] +use crate::validator::telemetry::RempCoreTelemetry; use adnl::node::AdnlNode; use std::{ @@ -16,7 +18,7 @@ use ever_block::{fail, KeyId, Result, UInt256}; const KEY_TAG: usize = 0; -async fn init_remp_node(ip: &str) -> Result<(Arc, Arc, Arc)> { +async fn init_remp_node(ip: &str) -> Result<(Arc, Arc)> { let config = get_adnl_config("target/remp", ip, vec![KEY_TAG], true).await.unwrap(); let node = AdnlNode::with_config(config).await.unwrap(); let remp = Arc::new(RempNode::new(node.clone(), KEY_TAG)?); @@ -26,9 +28,10 @@ async fn init_remp_node(ip: &str) -> Result<(Arc, Arc, Arc Result<()> { init_test_log(); - let (node1, remp1, telemetry1) = init_remp_node("127.0.0.1:4191").await?; - let (node2, remp2, telemetry2) = init_remp_node("127.0.0.1:4192").await?; + let (node1, remp1) = init_remp_node("127.0.0.1:4191").await?; + let (node2, remp2) = init_remp_node("127.0.0.1:4192").await?; let peer1 = node2.add_peer( node2.key_by_tag(KEY_TAG).unwrap().id(), node1.ip_address(), @@ -114,8 +117,10 @@ async fn test_remp_client_compact_protocol() -> Result<()> { assert!(s1.got_receipts.load(Ordering::Relaxed) > 0); assert!(s2.got_receipts.load(Ordering::Relaxed) > 0); - log::info!("\n1{}", telemetry1.report()); - log::info!("\n\n2{}", telemetry2.report()); + #[cfg(feature = "telemetry")] + log::info!("\n1{}", remp1.telemetry().report()); + #[cfg(feature = "telemetry")] + log::info!("\n\n2{}", remp2.telemetry().report()); node1.stop().await; node2.stop().await; @@ -162,6 +167,7 @@ async fn test_remp_receipts_send_worker() -> Result<()> { sender.clone(), receiver, receipts_in_channel.clone(), + #[cfg(feature = "telemetry")] telemetry.clone() ); @@ -203,6 +209,7 @@ async fn test_remp_receipts_send_worker() -> Result<()> { log::info!("sent packages {}", sender.sent_receipts.lock().unwrap().len()); log::info!("sent receipts {}", sender.sent_receipts.lock().unwrap().iter().map(|p| p.receipts().len()).sum::()); + #[cfg(feature = "telemetry")] log::info!("{}", telemetry.report()); Ok(()) diff --git a/src/tests/test_control.rs b/src/tests/test_control.rs index b4692acf..a6b7d25c 100644 --- a/src/tests/test_control.rs +++ b/src/tests/test_control.rs @@ -36,7 +36,7 @@ use std::{ }; use storage::block_handle_db::BlockHandle; use ton_api::{ - serialize_boxed, tag_from_boxed_type, AnyBoxedSerialize, + serialize_boxed, AnyBoxedSerialize, ton::{ self, TLObject, accountaddress::AccountAddress, engine::validator::{ControlQueryError, KeyHash, Stats}, @@ -50,6 +50,8 @@ use ton_api::{ }; use ton_api::ton::raw::ShardAccountMeta; use ton_api::ton::rpc::raw::{GetAccountMetaByBlock, GetShardAccountMeta}; +#[cfg(feature = "telemetry")] +use ton_api::tag_from_boxed_type; use ever_block::{ Account, BlockIdExt, ConfigParamEnum, ConfigParams, Deserializable, generate_test_account_by_init_code_hash, Message, Serializable, ShardIdent diff --git a/src/tests/test_helper.rs b/src/tests/test_helper.rs index 5f02e442..3ba7936e 100644 --- a/src/tests/test_helper.rs +++ b/src/tests/test_helper.rs @@ -15,7 +15,7 @@ use crate::{ block::BlockStuff, block_proof::BlockProofStuff, collator_test_bundle::create_engine_allocated, - config::{CollatorConfig, TonNodeConfig}, + config::{CollatorConfig, TonNodeConfig}, full_node::apply_block::apply_block, internal_db::{ LAST_APPLIED_MC_BLOCK, SHARD_CLIENT_MC_BLOCK, @@ -535,6 +535,7 @@ impl TestEngine { } pub async fn change_mc_state(&self, mc_state_id: &BlockIdExt) -> Result<()> { + log::debug!("Changing last masterchain state to {}", mc_state_id); let mc_state = self.db.load_shard_state_dynamic(&mc_state_id)?; self.save_last_applied_mc_block_id(&mc_state_id)?; self.shard_blocks.update_shard_blocks(&mc_state).await?; @@ -545,11 +546,19 @@ impl TestEngine { let mc_state_id = self.db.load_full_node_state(LAST_APPLIED_MC_BLOCK)?.unwrap(); let mc_state = self.load_state(&mc_state_id).await?; let (_, mc_state_id, _) = mc_state.shard_state_extra()?.prev_blocks.get(&mc_seq_no)?.unwrap().master_block_id(); - log::debug!("Changing last masterchain state to {}", mc_state_id); self.change_mc_state(&mc_state_id).await?; Ok(mc_state_id) } + pub async fn change_mc_state_by_prev_blocks_ids(&self, prev_blocks_ids: &[BlockIdExt]) -> Result { + let mut mc_seq_no = u32::MAX; + for block_id in prev_blocks_ids { + let handle = self.load_block_handle(block_id)?.unwrap(); + mc_seq_no = mc_seq_no.min(handle.masterchain_ref_seq_no()); + } + self.change_mc_state_by_seqno(mc_seq_no).await + } + pub async fn load_block_by_id(&self, id: &BlockIdExt) -> Result { let handle = self.load_block_handle(id)?.ok_or_else( || error!("Cannot load handle for block {}", id) @@ -700,8 +709,9 @@ impl TestEngine { CollatorSettings::default(), )?; - let (block_candidate, new_state) = collator.collate().await?; - + let collate_result = collator.collate().await?; + let block_candidate = collate_result.candidate.unwrap(); + if let Some(res_path) = &self.res_path { let new_block = Block::construct_from_bytes(&block_candidate.data)?; @@ -751,7 +761,7 @@ impl TestEngine { )?; std::fs::write( &format!("{}/state_candidate.txt", res_path), - debug_state(new_state.clone())? + debug_state(collate_result.new_state.clone().unwrap())? )?; // std::fs::write( @@ -1088,6 +1098,18 @@ impl EngineOperations for TestEngine { ) -> Result<()> { self.ext_messages.complete_messages(to_delay, to_delete, self.now()) } + fn get_remp_messages(&self, _shard: &ShardIdent) -> Result, UInt256)>> { + Ok(Vec::new()) + } + fn finalize_remp_messages( + &self, + _block: BlockIdExt, + _accepted: Vec, + _rejected: Vec<(UInt256, String)>, + _ignored: Vec, + ) -> Result<()> { + Ok(()) + } async fn get_shard_blocks( &self, last_mc_state: &Arc, diff --git a/src/tests/test_remp_client.rs b/src/tests/test_remp_client.rs index ca60d6ea..f281f436 100644 --- a/src/tests/test_remp_client.rs +++ b/src/tests/test_remp_client.rs @@ -1,6 +1,6 @@ use crate::{ block::BlockStuff, engine_traits::EngineOperations, - full_node::remp_client::{RempClient}, shard_state::ShardStateStuff, + full_node::remp_client::RempClient, shard_state::ShardStateStuff, validator::validator_utils::get_adnl_id, }; #[cfg(feature = "telemetry")] @@ -776,6 +776,7 @@ async fn test_remp_client() -> Result<()> { block_handle_storage: crate::collator_test_bundle::create_block_handle_storage(), sent_remp_messages: AtomicU32::new(0), signed_remp_messages: AtomicU32::new(0), + #[cfg(feature = "telemetry")] telemetry: RempClientTelemetry::default(), }); diff --git a/src/types/accounts.rs b/src/types/accounts.rs index 96ab50fe..e0ac47df 100644 --- a/src/types/accounts.rs +++ b/src/types/accounts.rs @@ -60,6 +60,7 @@ impl ShardAccountStuff { } else { let shard_acc = ShardAccount::with_account_root(self.account_root(), self.last_trans_hash.clone(), self.last_trans_lt); let value = shard_acc.write_to_new_cell()?; + log::trace!("Updating account {:x} in shard state", self.account_addr()); new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?; } AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update) @@ -111,7 +112,7 @@ impl ShardAccountStuff { let account = self.read_account()?; let new_libs = account.libraries(); if new_libs.root() != self.orig_libs.root() { - new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| { + self.orig_libs.scan_diff(&new_libs, |key: UInt256, old, new| { let old = old.unwrap_or_default(); let new = new.unwrap_or_default(); if old.is_public_library() && !new.is_public_library() { diff --git a/src/validator/accept_block.rs b/src/validator/accept_block.rs index ba5f7d97..649745d1 100644 --- a/src/validator/accept_block.rs +++ b/src/validator/accept_block.rs @@ -584,24 +584,23 @@ pub async fn create_top_shard_block_description( fn find_known_ancestors( block: &BlockStuff, - mc_state: &ShardStateStuff) - -> Result)>> { + mc_state: &ShardStateStuff +) -> Result)>> { let block_descr = fmt_block_id_short(block.id()); let master_ref = block.block()?.read_info()?.read_master_ref()? .ok_or_else(|| error!("Block {} doesn't have `master_ref`", block.id()))?.master; let shard = block.id().shard(); - let mc_state_extra = mc_state.state()?.read_custom()? - .ok_or_else(|| error!("State for {} doesn't have McStateExtra", mc_state.block_id()))?; + let shards = mc_state.shards()?; let mut ancestors = vec!(); let oldest_ancestor_seqno; - match mc_state_extra.shards().find_shard(shard) { + match shards.find_shard(shard) { Ok(None) => { let (a1, a2) = shard.split()?; - let ancestor1 = mc_state_extra.shards().get_shard(&a1)?; - let ancestor2 = mc_state_extra.shards().find_shard(&a2)?; + let ancestor1 = shards.get_shard(&a1)?; + let ancestor2 = shards.find_shard(&a2)?; if let (Some(ancestor1), Some(ancestor2)) = (ancestor1, ancestor2) { log::trace!(target: "validator", "({}): found two ancestors: {} and {}", block_descr, ancestor1.shard(), ancestor2.shard()); diff --git a/src/validator/collator.rs b/src/validator/collator.rs index f484b182..400090f6 100644 --- a/src/validator/collator.rs +++ b/src/validator/collator.rs @@ -31,7 +31,7 @@ use crate::{ }, validator::{ BlockCandidate, CollatorSettings, McData, - out_msg_queue::{MsgQueueManager, OutMsgQueueInfoStuff}, + out_msg_queue::{MsgQueueManager, OutMsgQueueInfoStuff}, validator_utils::calc_subset_for_masterchain }, CHECK, @@ -40,7 +40,6 @@ use adnl::common::Wait; use futures::try_join; use rand::Rng; use std::{ - cmp::{max, min}, collections::{BinaryHeap, HashMap, HashSet}, ops::Deref, sync::{ @@ -50,21 +49,22 @@ use std::{ time::{Duration, Instant}, }; use ever_block::{ - AddSub, BlkPrevInfo, Block, BlockCreateStats, BlockExtra, BlockIdExt, BlockInfo, CommonMsgInfo, - ConfigParams, CopyleftRewards, CreatorStats, CurrencyCollection, Deserializable, ExtBlkRef, - FutureSplitMerge, GlobalCapabilities, GlobalVersion, Grams, HashmapAugType, InMsg, InMsgDescr, - InternalMessageHeader, KeyExtBlkRef, KeyMaxLt, Libraries, McBlockExtra, McShardRecord, - McStateExtra, MerkleUpdate, Message, MsgAddressInt, OutMsg, OutMsgDescr, OutMsgQueueKey, - ParamLimitIndex, Serializable, ShardAccount, ShardAccountBlocks, ShardAccounts, ShardDescr, - ShardFees, ShardHashes, ShardIdent, ShardStateSplit, ShardStateUnsplit, TopBlockDescrSet, - Transaction, TransactionTickTock, UnixTime32, ValidatorSet, ValueFlow, WorkchainDescr, - Workchains, Account, AccountIdPrefixFull, OutQueueUpdates, OutMsgQueueInfo, MASTERCHAIN_ID + error, fail, write_boc, Account, AccountId, AccountIdPrefixFull, AddSub, BlkPrevInfo, Block, + BlockCreateStats, BlockError, BlockExtra, BlockIdExt, BlockInfo, Cell, CommonMsgInfo, + ConfigParams, CopyleftRewards, CreatorStats, CurrencyCollection, Deserializable, ExceptionCode, + ExtBlkRef, Failure, FutureSplitMerge, GlobalCapabilities, GlobalVersion, Grams, HashmapAugType, + HashmapType, InMsg, InMsgDescr, InternalMessageHeader, KeyExtBlkRef, KeyMaxLt, Libraries, + McBlockExtra, McShardRecord, McStateExtra, MerkleUpdate, Message, MsgAddressInt, OutMsg, + OutMsgDescr, OutMsgQueueInfo, OutMsgQueueKey, OutQueueUpdates, ParamLimitIndex, ProcessedInfoKey, + ProcessedUpto, Result, Serializable, ShardAccount, ShardAccountBlocks, ShardAccounts, ShardDescr, + ShardFees, ShardHashes, ShardIdent, ShardStateSplit, ShardStateUnsplit, SliceData, TopBlockDescrSet, + Transaction, TransactionTickTock, UInt256, UnixTime32, UsageTree, ValidatorSet, ValueFlow, + WorkchainDescr, Workchains, MASTERCHAIN_ID }; use ever_executor::{ BlockchainConfig, ExecuteParams, OrdinaryTransactionExecutor, TickTockTransactionExecutor, TransactionExecutor, }; -use ever_block::{error, fail, AccountId, Cell, HashmapType, Result, UInt256, UsageTree, SliceData}; use crate::validator::validator_utils::is_remp_enabled; @@ -117,8 +117,8 @@ impl PrevData { let mut overload_history = 0; let mut underload_history = 0; if let Some(state) = states.get(1) { - gen_utime = std::cmp::max(gen_utime, state.state()?.gen_time()); - gen_lt = std::cmp::max(gen_lt, state.state()?.gen_lt()); + gen_utime = gen_utime.max(state.state()?.gen_time()); + gen_lt = gen_lt.max(state.state()?.gen_lt()); let key = state.shard().merge()?.shard_key(false); accounts.merge(&state.state()?.read_accounts()?, &key)?; total_validator_fees.add(state.state()?.total_validator_fees())?; @@ -204,6 +204,8 @@ impl PartialOrd for NewMessage { } struct CollatorData { + collated_block_descr: Arc, + // lists, empty by default in_msgs: InMsgDescr, out_msgs: OutMsgDescr, @@ -276,10 +278,12 @@ impl CollatorData { usage_tree: UsageTree, prev_data: &PrevData, is_masterchain: bool, + collated_block_descr: Arc, ) -> Result { let limits = Arc::new(config.raw_config().block_limits(is_masterchain)?); let split_queues = !config.has_capability(GlobalCapabilities::CapNoSplitOutQueue); let ret = Self { + collated_block_descr, in_msgs: InMsgDescr::default(), out_msgs: OutMsgDescr::default(), accounts: ShardAccountBlocks::default(), @@ -369,7 +373,7 @@ impl CollatorData { /// add in and out messages from to block, and to new message queue fn new_transaction(&mut self, transaction: &Transaction, tr_cell: Cell, in_msg_opt: Option<&InMsg>) -> Result<()> { // log::trace!( - // "new transaction, message {:x}\n{}", + // "{} new transaction, message {:x}\n{}", self.collated_block_descr, // in_msg_opt.map(|m| m.message_cell().unwrap().repr_hash()).unwrap_or_default(), // ever_block_json::debug_transaction(transaction.clone()).unwrap_or_default(), // ); @@ -431,9 +435,9 @@ impl CollatorData { // let mut data = self.out_msg_queue_info.del_message(key)?; // let created_lt = u64::construct_from(&mut data)?; // let enq = MsgEnqueueStuff::construct_from(&mut data, created_lt)?; - // let data = ever_block::write_boc(&enq.message_cell())?; - // log::debug!("del_out_msg_from_state {:x} size {}", key, data.len()); - log::debug!("del_out_msg_from_state {:x}", key); + // let data = write_boc(&enq.message_cell())?; + // log::debug!("{} del_out_msg_from_state {:x} size {}", self.collated_block_descr, key, data.len()); + log::debug!("{} del_out_msg_from_state {:x}", self.collated_block_descr, key); self.dequeue_count += 1; self.out_msg_queue_info.del_message(key)?; self.block_limit_status.register_out_msg_queue_op( @@ -574,7 +578,7 @@ impl CollatorData { } fn update_min_mc_seqno(&mut self, mc_seqno: u32) -> u32 { - let min_ref_mc_seqno = min(self.min_ref_mc_seqno.unwrap_or(std::u32::MAX), mc_seqno); + let min_ref_mc_seqno = self.min_ref_mc_seqno.unwrap_or(std::u32::MAX).min(mc_seqno); self.min_ref_mc_seqno = Some(min_ref_mc_seqno); min_ref_mc_seqno } @@ -752,25 +756,38 @@ impl ExecutionManager { Ok(()) } + /// starts a new transaction execution task + /// if prined access error occurs, returns false pub async fn execute( &mut self, account_id: AccountId, msg: AsyncMessage, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result<()> { + ) -> Result { log::trace!("{}: execute (adding into queue): {:x}", self.collated_block_descr, account_id); if let Some((sender, _handle)) = self.changed_accounts.get(&account_id) { self.wait_tr.request(); sender.send(Arc::new(msg))?; } else { - let shard_acc = if let Some(shard_acc) = prev_data.accounts().account(&account_id)? { - shard_acc - } else if let AsyncMessage::Ext(_, msg_id) = msg { - collator_data.rejected_ext_messages.push((msg_id, format!("account {:x} not found", account_id))); - return Ok(()); // skip external messages for unexisting accounts - } else { - ShardAccount::default() + let shard_acc = match prev_data.accounts().account(&account_id) { + Ok(Some(shard_acc)) => shard_acc, + Ok(None) => { + if let AsyncMessage::Ext(_, msg_id) = msg { + collator_data.rejected_ext_messages.push((msg_id, format!("account {:x} not found", account_id))); + return Ok(true); // skip external messages for unexisting accounts + } else { + ShardAccount::default() + } + } + Err(err) => { + // this code is for collator bundles not to produce error accessing pruned messages + if err.downcast_ref() == Some(&ExceptionCode::PrunedCellAccess) { + return Ok(false); + } else { + return Err(err); + } + } }; let (sender, handle) = self.start_account_job( account_id.clone(), @@ -783,7 +800,7 @@ impl ExecutionManager { self.check_parallel_transactions(collator_data).await?; - Ok(()) + Ok(true) } fn start_account_job( @@ -986,6 +1003,13 @@ impl ExecutionManager { } } +pub struct CollateResult { + pub candidate: Option, + pub new_state: Option, + pub usage_tree: UsageTree, + pub error: Failure, +} + pub struct Collator { engine: Arc, shard: ShardIdent, @@ -1028,7 +1052,7 @@ impl Collator { let new_block_seqno = match prev_blocks_ids.len() { 1 => prev_blocks_ids[0].seq_no() + 1, - 2 => max(prev_blocks_ids[0].seq_no(), prev_blocks_ids[1].seq_no()) + 1, + 2 => prev_blocks_ids[0].seq_no().max(prev_blocks_ids[1].seq_no()) + 1, _ => fail!("`prev_blocks_ids` has invlid length"), }; @@ -1105,7 +1129,7 @@ impl Collator { }) } - pub async fn collate(mut self) -> Result<(BlockCandidate, ShardStateUnsplit)> { + pub async fn collate(mut self) -> Result { log::info!( "{}: COLLATE min_mc_seqno = {}, prev_blocks_ids: {} {}", self.collated_block_descr, @@ -1119,7 +1143,7 @@ impl Collator { let mut attempt = 0; let mut duration; // inside the loop try to collate new block - let (candidate, state, exec_manager) = loop { + let (collate_result, exec_manager) = loop { let attempt_started = Instant::now(); @@ -1142,15 +1166,22 @@ impl Collator { })?; // load messages and process them to produce block candidate - let result = self.do_collate(&mc_data, &prev_data, &mut collator_data).await - .map_err(|e| { + let result = self.do_collate(&mc_data, &prev_data, &mut collator_data).await; + duration = attempt_started.elapsed().as_millis() as u32; + match result { + Err(e) => { log::warn!("{}: COLLATION FAILED: TIME: {}ms do_collate: {:?}", self.collated_block_descr, self.started.elapsed().as_millis(), e); - e - }); - duration = attempt_started.elapsed().as_millis() as u32; - if let Some(result) = result? { - break result; + let collate_result = CollateResult { + candidate: None, + new_state: None, + usage_tree: collator_data.usage_tree, + error: Some(e), + }; + return Ok(collate_result); + } + Ok(Some(result)) => break result, + Ok(None) => () } // sleep after empty collation to respect the collation time iterval @@ -1172,20 +1203,21 @@ impl Collator { let pruned_count = collator_data.estimate_pruned_count(); let estimate_size = collator_data.block_limit_status.estimate_block_size(None, pruned_count) as usize; - log::info!( - "{}: ASYNC COLLATED SIZE: {} ESTIMATEED SIZE: {} GAS: {} TIME: {}ms GAS_RATE: {} TRANS: {}ms ID: {}", - self.collated_block_descr, - candidate.data.len(), - estimate_size, - collator_data.block_limit_status.gas_used(), - duration, - ratio, - exec_manager.total_trans_duration.load(Ordering::Relaxed) / 1000, - candidate.block_id, - ); - - if estimate_size > 400_000 && 100 * estimate_size.abs_diff(candidate.data.len()) / estimate_size > 5 { - log::warn!("{}: diff is too much", self.collated_block_descr) + if let Some(candidate) = &collate_result.candidate { + log::info!( + "{}: ASYNC COLLATED SIZE: {} ESTIMATEED SIZE: {} GAS: {} TIME: {}ms GAS_RATE: {} TRANS: {}ms ID: {}", + self.collated_block_descr, + candidate.data.len(), + estimate_size, + collator_data.block_limit_status.gas_used(), + duration, + ratio, + exec_manager.total_trans_duration.load(Ordering::Relaxed) / 1000, + candidate.block_id, + ); + if estimate_size > 400_000 && 100 * estimate_size.abs_diff(candidate.data.len()) / estimate_size > 5 { + log::warn!("{}: diff is too much", self.collated_block_descr) + } } #[cfg(feature = "log_metrics")] @@ -1199,7 +1231,7 @@ impl Collator { collator_data.execute_count, collator_data.block_limit_status.gas_used(), ratio, - candidate.data.len(), + collate_result.candidate.data.len(), duration, ); @@ -1212,7 +1244,7 @@ impl Collator { collator_data.block_limit_status.gas_used() ); - Ok((candidate, state)) + Ok(collate_result) } async fn import_data(&self) -> Result { @@ -1238,7 +1270,7 @@ impl Collator { let top_shard_blocks_descr = Vec::new(); - break Ok(ImportedData { + return Ok(ImportedData { mc_state, prev_states, prev_ext_blocks_refs, @@ -1281,6 +1313,7 @@ impl Collator { usage_tree, &prev_data, is_masterchain, + self.collated_block_descr.clone(), )?; if !self.shard.is_masterchain() { let (now_upper_limit, before_split, _accept_msgs) = check_this_shard_mc_info( @@ -1335,7 +1368,7 @@ impl Collator { mc_data: &McData, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result> { + ) -> Result> { log::debug!("{}: do_collate", self.collated_block_descr); let remp_messages = if is_remp_enabled(self.engine.clone(), mc_data.config()) { @@ -1412,8 +1445,9 @@ impl Collator { if !self.after_split || !collator_data.split_queues { // import inbound internal messages, process or transit let now = std::time::Instant::now(); - self.process_inbound_internal_messages(prev_data, collator_data, &output_queue_manager, - &mut exec_manager).await?; + let result = self.process_inbound_internal_messages( + prev_data, collator_data, &output_queue_manager, &mut exec_manager + ).await; log::debug!("{}: TIME: process_inbound_internal_messages {}ms;", self.collated_block_descr, now.elapsed().as_millis()); @@ -1527,10 +1561,11 @@ impl Collator { //collator_data.block_limit_status.dump_block_size(); // serialize everything - let result = self.finalize_block( - mc_data, prev_data, collator_data, exec_manager, new_state_copyleft_rewards).await?; + let (collate_result, exec_manager) = self.finalize_block( + mc_data, prev_data, collator_data, exec_manager, new_state_copyleft_rewards + ).await?; - Ok(Some(result)) + Ok(Some((collate_result, exec_manager))) } async fn clean_out_msg_queue( @@ -1607,8 +1642,8 @@ impl Collator { ); } prev_state.proc_info()?.iterate_slices_with_keys(|ref mut key, ref mut value| { - let key = ever_block::ProcessedInfoKey::construct_from(key)?; - let value = ever_block::ProcessedUpto::construct_from(value)?; + let key = ProcessedInfoKey::construct_from(key)?; + let value = ProcessedUpto::construct_from(value)?; log::trace!( "{}: prev processed upto {} {:x} - {} {:x}", self.collated_block_descr, @@ -1663,7 +1698,7 @@ impl Collator { Ok(mc_data) } - fn unpack_last_state(&self, mc_data: &McData, prev_states: &Vec>) -> Result { + fn unpack_last_state(&self, mc_data: &McData, prev_states: &[Arc]) -> Result { log::trace!("{}: unpack_last_state", self.collated_block_descr); for state in prev_states.iter() { self.check_one_state(mc_data, state)?; @@ -1735,9 +1770,9 @@ impl Collator { // consider unixtime and lt from previous block(s) of the same shardchain let prev_now = prev_data.prev_state_utime(); - let prev = max(mc_data.state().state()?.gen_time(), prev_now); + let prev = mc_data.state().state()?.gen_time().max(prev_now); log::trace!("{}: init_utime prev_time: {}", self.collated_block_descr, prev); - let time = max(prev + 1, self.engine.now()); + let time = self.engine.now().max(prev + 1); Ok(time) } @@ -1808,9 +1843,9 @@ impl Collator { log::trace!("{}: init_lt", self.collated_block_descr); let mut start_lt = if !self.shard.is_masterchain() { - max(mc_data.state().state()?.gen_lt(), prev_data.prev_state_lt()) + mc_data.state().state()?.gen_lt().max(prev_data.prev_state_lt()) } else { - max(mc_data.state().state()?.gen_lt(), collator_data.shards_max_end_lt()) + mc_data.state().state()?.gen_lt().max(collator_data.shards_max_end_lt()) }; let align = mc_data.get_lt_align(); @@ -1859,8 +1894,8 @@ impl Collator { let mut shards = mc_data.state().shards()?.clone(); let wc_set = mc_data.config().workchains()?; wc_set.iterate_with_keys(|wc_id: i32, wc_info| { - log::trace!(" - {}: adjust_shard_config workchain {wc_id}, active {}, enabled_since {} (now {})", + log::trace!( + "{}: adjust_shard_config workchain {wc_id}, active {}, enabled_since {} (now {})", self.collated_block_descr, wc_info.active(), wc_info.enabled_since, @@ -1987,7 +2022,7 @@ impl Collator { prev_descr.descr.reg_mc_seqno = self.new_block_id_part.seq_no; descr.descr.reg_mc_seqno = self.new_block_id_part.seq_no; - let end_lt = max(prev_descr.descr.end_lt, descr.descr.end_lt); + let end_lt = prev_descr.descr.end_lt.max(descr.descr.end_lt); if let Err(e) = self.update_shard_block_info2( collator_data.shards_mut()?, prev_descr.clone(), descr.clone(), @@ -2320,8 +2355,18 @@ impl Collator { log::debug!("{}: process_inbound_internal_messages", self.collated_block_descr); let mut iter = output_queue_manager.merge_out_queue_iter(&self.shard)?; while let Some(k_v) = iter.next() { - let (key, enq, created_lt, block_id) = k_v?; - if !collator_data.split_queues && !block_id.shard().contains_full_prefix(&enq.cur_prefix()) { + let (key, enq, created_lt, nb_shard) = match k_v { + Ok(k_v) => k_v, + Err(err) => { + // this code is for collator bundles not to produce error accessing pruned messages + if err.downcast_ref() == Some(&ExceptionCode::PrunedCellAccess) { + break + } else { + return Err(err) + } + } + }; + if !collator_data.split_queues && !nb_shard.contains_full_prefix(&enq.cur_prefix()) { // this message was left from split result continue; } @@ -2336,7 +2381,7 @@ impl Collator { self.collated_block_descr, key.hash ); } else { - self.check_inbound_internal_message(&key, &enq, created_lt, block_id.shard()) + self.check_inbound_internal_message(&key, &enq, created_lt, &nb_shard) .map_err(|err| error!("problem processing internal inbound message \ with hash {:x} : {}", key.hash, err))?; let our = self.shard.contains_full_prefix(&enq.cur_prefix()); @@ -2345,7 +2390,9 @@ impl Collator { let account_id = enq.dst_account_id()?; log::debug!("{}: message {:x} sent to execution to account {:x}", self.collated_block_descr, key.hash, account_id); let msg = AsyncMessage::Int(enq, our); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + break; + } } else { // println!("{:x} {:#}", key, enq); // println!("cur: {}, dst: {}", enq.cur_prefix(), enq.dst_prefix()); @@ -2442,7 +2489,9 @@ impl Collator { log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, msg_id); let (_, account_id) = header.dst.extract_std_address(true)?; let msg = AsyncMessage::Ext(msg.deref().clone(), msg_id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + break; + } } else { // usually node collates more than one shard, the message can belong another one, // so we can't postpone it @@ -2495,7 +2544,9 @@ impl Collator { let (_, account_id) = header.dst.extract_std_address(true)?; log::trace!("{}: remp message {:x} sent to execution", self.collated_block_descr, id); let msg = AsyncMessage::Ext(msg.deref().clone(), id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + break; + } } } else { log::warn!( @@ -2547,7 +2598,9 @@ impl Collator { collator_data.update_last_proc_int_msg((created_lt, hash))?; let msg = AsyncMessage::New(env, tr_cell); log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, key.hash); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + enqueue_only = true; + } }; self.check_stop_flag()?; } @@ -2665,7 +2718,9 @@ impl Collator { hdr.created_lt = collator_data.start_lt()?; hdr.created_at = UnixTime32::new(collator_data.gen_utime); let msg = Message::with_int_header(hdr); - exec_manager.execute(account_id, AsyncMessage::Copyleft(msg), prev_data, collator_data).await?; + if !exec_manager.execute(account_id, AsyncMessage::Copyleft(msg), prev_data, collator_data).await? { + break; + } self.check_stop_flag()?; } @@ -2687,7 +2742,7 @@ impl Collator { collator_data: &mut CollatorData, mut exec_manager: ExecutionManager, new_state_copyleft_rewards: CopyleftRewards, - ) -> Result<(BlockCandidate, ShardStateUnsplit, ExecutionManager)> { + ) -> Result<(CollateResult, ExecutionManager)> { log::trace!("{}: finalize_block", self.collated_block_descr); let (want_split, overload_history) = collator_data.want_split(); @@ -2798,7 +2853,8 @@ impl Collator { log::trace!("{}: finalize_block: calc new state", self.collated_block_descr); // Calc new state, then state update - log::trace!("copyleft rewards count from workchains: {}", collator_data.get_workchains_copyleft_rewards().len()?); + log::trace!("{}: copyleft rewards count from workchains: {}", + self.collated_block_descr, collator_data.get_workchains_copyleft_rewards().len()?); if self.shard.is_masterchain() && !value_flow.copyleft_rewards.is_empty() { log::warn!("copyleft rewards in masterchain must be empty") } @@ -2842,8 +2898,8 @@ impl Collator { .read_out_msg_queue_info()? .proc_info() .iterate_slices_with_keys(|ref mut key, ref mut value| { - let key = ever_block::ProcessedInfoKey::construct_from(key)?; - let value = ever_block::ProcessedUpto::construct_from(value)?; + let key = ProcessedInfoKey::construct_from(key)?; + let value = ProcessedUpto::construct_from(value)?; log::trace!( "{}: new processed upto {} {:x} - {} {:x}", self.collated_block_descr, @@ -2913,7 +2969,7 @@ impl Collator { log::trace!("{}: finalize_block: fill block candidate", self.collated_block_descr); let cell = new_block.serialize()?; block_id.root_hash = cell.repr_hash(); - let data = ever_block::write_boc(&cell)?; + let data = write_boc(&cell)?; block_id.file_hash = UInt256::calc_file_hash(&data); // !!!! DEBUG !!!! @@ -2969,7 +3025,13 @@ impl Collator { collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, collator_data.transit_count, collator_data.remove_count, candidate.data.len() ); - Ok((candidate, new_state, exec_manager)) + let collate_result = CollateResult { + candidate: Some(candidate), + new_state: Some(new_state), + usage_tree: std::mem::take(&mut collator_data.usage_tree), + error: None, + }; + Ok((collate_result, exec_manager)) } fn _check_visited_integrity(cell: &Cell, visited: &HashSet, visited_from_root: &mut HashSet) { @@ -3016,13 +3078,18 @@ impl Collator { // Self::_check_visited_integrity(&prev_data.state_root, &visited, &mut visited_from_root); // assert_eq!(visited.len(), visited_from_root.len()); - let now = std::time::Instant::now(); - let state_update = MerkleUpdate::create_fast( - &prev_data.state_root, - new_ss_root, - |h| collator_data.usage_tree.contains(h) || collator_data.imported_visited.contains(h) - )?; - log::trace!("{}: TIME: merkle update creating {}ms;", self.collated_block_descr, now.elapsed().as_millis()); + let state_update; + if !self.collator_settings.skip_state_update { + let now = std::time::Instant::now(); + state_update = MerkleUpdate::create_fast( + &prev_data.state_root, + new_ss_root, + |h| collator_data.usage_tree.contains(h) || collator_data.imported_visited.contains(h) + )?; + log::trace!("{}: TIME: merkle update creating {}ms;", self.collated_block_descr, now.elapsed().as_millis()); + } else { + state_update = Default::default(); + } // let new_root2 = state_update.apply_for(&prev_data.state_root)?; // assert_eq!(new_root2.repr_hash(), new_ss_root.repr_hash()); @@ -3239,12 +3306,12 @@ impl Collator { // temp code, delete after iterate_shards_with_siblings_mut let mut changed_shards = HashMap::new(); collator_data.shards()?.iterate_shards_with_siblings(|shard, mut descr, mut sibling| { - min_ref_mc_seqno = min(min_ref_mc_seqno, descr.min_ref_mc_seqno); + min_ref_mc_seqno = min_ref_mc_seqno.min(descr.min_ref_mc_seqno); let unchanged_sibling = sibling.clone(); let updated_sibling = if let Some(sibling) = sibling.as_mut() { - min_ref_mc_seqno = min(min_ref_mc_seqno, sibling.min_ref_mc_seqno); + min_ref_mc_seqno = min_ref_mc_seqno.min(sibling.min_ref_mc_seqno); self.update_one_shard( &shard.sibling(), sibling, diff --git a/src/validator/fabric.rs b/src/validator/fabric.rs index f79b805f..75f731a7 100644 --- a/src/validator/fabric.rs +++ b/src/validator/fabric.rs @@ -38,7 +38,7 @@ pub async fn run_validate_query_any_candidate( let info = real_block.read_info()?; let prev = info.read_prev_ids()?; let mc_state = engine.load_last_applied_mc_state().await?; - let min_masterchain_block_id = mc_state.find_block_id(info.min_ref_mc_seqno())?; + let min_mc_seq_no = info.min_ref_mc_seqno(); let mut cc_seqno_with_delta = 0; if let Some(mc_state_extra) = mc_state.state()?.read_custom()? { let cc_seqno_from_state = if shard.is_masterchain() { @@ -62,7 +62,7 @@ pub async fn run_validate_query_any_candidate( run_validate_query( shard, SystemTime::now(), - min_masterchain_block_id, + min_mc_seq_no, prev, block, validator_set, @@ -78,7 +78,7 @@ pub async fn run_validate_query_any_candidate( pub async fn run_validate_query( shard: ShardIdent, _min_ts: SystemTime, - min_masterchain_block_id: BlockIdExt, + min_mc_seq_no: u32, prev: Vec, block: super::BlockCandidate, set: ValidatorSet, @@ -95,7 +95,7 @@ pub async fn run_validate_query( "({}): before validator query shard: {}, min: {}, seqno: {}", next_block_descr, shard, - min_masterchain_block_id, + min_mc_seq_no, seqno + 1 ); @@ -107,7 +107,7 @@ pub async fn run_validate_query( let validator_result = if !test_bundles_config.is_enable() { ValidateQuery::new( shard.clone(), - min_masterchain_block_id.seq_no(), + min_mc_seq_no, prev, block, set, @@ -119,7 +119,7 @@ pub async fn run_validate_query( } else { let query = ValidateQuery::new( shard.clone(), - min_masterchain_block_id.seq_no(), + min_mc_seq_no, prev.clone(), block.clone(), set, @@ -136,11 +136,10 @@ pub async fn run_validate_query( if !CollatorTestBundle::exists(test_bundles_config.path(), &id) { let path = test_bundles_config.path().to_string(); let engine = engine.clone(); - let shard = shard.clone(); tokio::spawn( async move { match CollatorTestBundle::build_for_validating_block( - shard, min_masterchain_block_id, prev, block, &engine + &engine, prev, block ).await { Err(e) => log::error!( "({}): Error while test bundle for {} building: {}", next_block_descr, id, e @@ -233,61 +232,66 @@ pub async fn run_collate_query ( None, CollatorSettings::default() )?; - let collator_result = collator.collate().await; + let collate_result = collator.collate().await; let labels = [("shard", shard.to_string())]; #[cfg(not(feature = "statsd"))] metrics::decrement_gauge!("run_collators", 1.0, &labels); + let mut usage_tree_opt = None; - match collator_result { - Ok((candidate, _)) => { - metrics::increment_counter!("successful_collations", &labels); - - return Ok(validator_query_candidate_to_validator_block_candidate(collator_id, candidate)) + let err = match collate_result { + Ok(collate_result) => { + if let Some(candidate) = collate_result.candidate { + metrics::increment_counter!("successful_collations", &labels); + + return Ok(validator_query_candidate_to_validator_block_candidate(collator_id, candidate)) + } else { + usage_tree_opt = Some(collate_result.usage_tree); + collate_result.error.unwrap() + } } - Err(err) => { - let labels = [("shard", shard.to_string())]; - metrics::increment_counter!("failed_collations", &labels); - let test_bundles_config = &engine.test_bundles_config().collator; + Err(err) => err + }; + let labels = [("shard", shard.to_string())]; + metrics::increment_counter!("failed_collations", &labels); + let test_bundles_config = &engine.test_bundles_config().collator; - let err_str = if test_bundles_config.is_enable() { - err.to_string() - } else { - String::default() - }; + let err_str = if test_bundles_config.is_enable() { + err.to_string() + } else { + String::default() + }; - #[cfg(feature = "telemetry")] - engine.collator_telemetry().failed_attempt(&shard, &err_str); + #[cfg(feature = "telemetry")] + engine.collator_telemetry().failed_attempt(&shard, &err_str); - if test_bundles_config.is_enable() { - if test_bundles_config.need_to_build_for(&err_str) { - let id = BlockIdExt { - shard_id: shard, - seq_no: prev.iter().max_by_key(|id| id.seq_no()).unwrap().seq_no() + 1, - root_hash: UInt256::default(), - file_hash: UInt256::default(), - }; - if !CollatorTestBundle::exists(test_bundles_config.path(), &id) { - let path = test_bundles_config.path().to_string(); - let engine = engine.clone(); - tokio::spawn(async move { - match CollatorTestBundle::build_for_collating_block(prev, &engine).await { - Err(e) => log::error!("({}): Error while test bundle for {} building: {}", next_block_descr, id, e), - Ok(mut b) => { - b.set_notes(err_str.to_string()); - if let Err(e) = b.save(&path) { - log::error!("({}): Error while test bundle for {} saving: {}", next_block_descr, id, e); - } else { - log::info!("({}): Built test bundle for {}", next_block_descr, id); - } - } + if test_bundles_config.is_enable() { + if test_bundles_config.need_to_build_for(&err_str) { + let id = BlockIdExt { + shard_id: shard, + seq_no: prev.iter().max_by_key(|id| id.seq_no()).unwrap().seq_no() + 1, + root_hash: UInt256::default(), + file_hash: UInt256::default(), + }; + if !CollatorTestBundle::exists(test_bundles_config.path(), &id) { + let path = test_bundles_config.path().to_string(); + let engine = engine.clone(); + tokio::spawn(async move { + match CollatorTestBundle::build_for_collating_block(&engine, prev, usage_tree_opt).await { + Err(e) => log::error!("({}): Error while test bundle for {} building: {}", next_block_descr, id, e), + Ok(mut b) => { + b.set_notes(err_str.to_string()); + if let Err(e) = b.save(&path) { + log::error!("({}): Error while test bundle for {} saving: {}", next_block_descr, id, e); + } else { + log::info!("({}): Built test bundle for {}", next_block_descr, id); } - }); + } } - } + }); } - return Err(err); } } + Err(err) } diff --git a/src/validator/mod.rs b/src/validator/mod.rs index 1b4ed909..624ae290 100644 --- a/src/validator/mod.rs +++ b/src/validator/mod.rs @@ -39,8 +39,8 @@ pub mod slashing; mod verification; use std::sync::Arc; -use ever_block::{Result, UInt256, error}; use ever_block::{ + error, Result, UInt256, BlkMasterInfo, BlockIdExt, ConfigParams, CurrencyCollection, ExtBlkRef, McStateExtra, Libraries, }; @@ -60,6 +60,9 @@ pub struct CollatorSettings { pub want_split: Option, pub want_merge: Option, pub is_fake: bool, + // for collator test bundles we don't need to calculate state update + // because of state is merkle proofed + pub skip_state_update: bool, } impl CollatorSettings { diff --git a/src/validator/out_msg_queue.rs b/src/validator/out_msg_queue.rs index 5992902a..64ca8898 100644 --- a/src/validator/out_msg_queue.rs +++ b/src/validator/out_msg_queue.rs @@ -257,8 +257,7 @@ impl OutMsgQueueInfoStuff { 0x5777784F96FB1CFFu64, "05aa297e3a2e003e1449e1297742d64f188985dc029c620edc84264f9786c0c3".parse().unwrap() ); - let key = SliceData::load_bitstring(key.write_to_new_cell()?)?; - out_queue.remove(key)?; + out_queue.remove(key.write_to_bitstring()?)?; } let ihr_pending = out_queue_info.ihr_pending().clone(); @@ -1403,14 +1402,14 @@ impl CachedStates { impl MsgQueueManager { /// create iterator for merging all output messages from all neighbors to our shard - pub fn merge_out_queue_iter(&self, shard: &ShardIdent) -> Result> { + pub fn merge_out_queue_iter(&self, shard: &ShardIdent) -> Result> { MsgQueueMergerIterator::from_manager(self, shard) } /// find enquque message and return it with neighbor id - pub fn find_message(&self, key: &OutMsgQueueKey, prefix: &AccountIdPrefixFull) -> Result<(Option, Option)> { + pub fn find_message(&self, key: &OutMsgQueueKey, prefix: &AccountIdPrefixFull) -> Result<(Option, Option)> { for nb in &self.neighbors { if !nb.is_disabled() && nb.shard().contains_full_prefix(prefix) { - return Ok((Some(nb.block_id().clone()), nb.message(key)?)) + return Ok((Some(nb.shard().clone()), nb.message(key)?)) } } Ok((None, None)) @@ -1497,23 +1496,15 @@ pub struct MsgQueueMergerIterator { roots: Vec>, } -impl MsgQueueMergerIterator { +impl MsgQueueMergerIterator { pub fn from_manager(manager: &MsgQueueManager, shard: &ShardIdent) -> Result { let shard_prefix = shard.shard_key(true); let mut roots = vec![]; for nb in manager.neighbors.iter().filter(|nb| !nb.is_disabled()) { - let out_queue_short = if let Ok(full_queue) = nb.out_queue() { - let mut q = full_queue.clone(); - q.into_subtree_with_prefix(&shard_prefix, &mut 0)?; - q - } else { - let mut q = nb.out_queue_part()?.clone(); - q.into_subtree_with_prefix(&shard_prefix, &mut 0)?; - q - }; + let out_queue_short = nb.out_queue().or_else(|_| nb.out_queue_part())? + .subtree_with_prefix(&shard_prefix, &mut 0)?; if let Some(cell) = out_queue_short.data() { - roots.push(RootRecord::from_cell(cell, out_queue_short.bit_len(), nb.block_id().clone())?); - // roots.push(RootRecord::new(lt, cursor, bit_len, key, nb.block_id().clone())); + roots.push(RootRecord::from_cell(cell, out_queue_short.bit_len(), nb.shard().clone())?); } } if !roots.is_empty() { diff --git a/src/validator/tests/test_collator.rs b/src/validator/tests/test_collator.rs index 7560ce27..ef0c455c 100644 --- a/src/validator/tests/test_collator.rs +++ b/src/validator/tests/test_collator.rs @@ -13,13 +13,11 @@ use super::*; use crate::{ - collator_test_bundle::CollatorTestBundle, engine_traits::EngineOperations, + block::BlockStuff, + collator_test_bundle::{try_collate, CollatorTestBundle}, + engine_traits::EngineOperations, + test_helper::{compare_blocks, init_test_log, TestEngine}, types::messages::{count_matching_bits, MsgEnvelopeStuff}, - validator::{ - CollatorSettings, collator, - validate_query::ValidateQuery, - validator_utils::compute_validator_set_cc, - }, }; use ever_block::Result; use pretty_assertions::assert_eq; @@ -27,8 +25,8 @@ use std::sync::Arc; const RES_PATH: &'static str = "target/cmp"; -async fn try_collate_by_bundle(bundle: Arc) -> Result<(Block, ShardStateUnsplit)> { - try_collate_by_engine( +async fn try_collate_by_bundle(bundle: Arc) -> Result { + let collate_result = try_collate_by_engine( bundle.clone(), bundle.block_id().shard().clone(), bundle.prev_blocks_ids().clone(), @@ -36,8 +34,21 @@ async fn try_collate_by_bundle(bundle: Arc) -> Result<(Block match bundle.ethalon_block()? { Some(block) => Some(block.block()?.read_extra().unwrap().rand_seed().clone()), None => bundle.rand_seed().cloned() + }, + true, + ).await?; + if let Some(ethalon_block) = bundle.ethalon_block()? { + let mut block = match &collate_result.candidate { + Some(candidate) => { + Block::construct_from_bytes(&candidate.data)? + } + None => return Err(collate_result.error.unwrap()) + }; + if let Err(result) = compare_blocks(ethalon_block.block()?, &mut block) { + panic!("Blocks are not equal: {}", result); } - ).await + } + Ok(collate_result) } async fn try_collate_by_engine( @@ -46,71 +57,14 @@ async fn try_collate_by_engine( prev_blocks_ids: Vec, created_by_opt: Option, rand_seed_opt: Option, -) -> Result<(Block, ShardStateUnsplit)> { + skip_state_update: bool, +) -> Result { std::fs::create_dir_all(RES_PATH).ok(); - let mc_state = engine.load_last_applied_mc_state().await?; - let mc_state_extra = mc_state.shard_state_extra()?; - let mut cc_seqno_with_delta = 0; - let cc_seqno_from_state = if shard.is_masterchain() { - mc_state_extra.validator_info.catchain_seqno - } else { - mc_state_extra.shards.calc_shard_cc_seqno(&shard)? - }; - let nodes = compute_validator_set_cc( - &mc_state, - &shard, - prev_blocks_ids.iter().max_by_key(|id1| id1.seq_no).unwrap().seq_no() + 1, - cc_seqno_from_state, - &mut cc_seqno_with_delta - )?; - let validator_set = ValidatorSet::with_cc_seqno(0, 0, 0, cc_seqno_with_delta, nodes)?; - - // log::debug!("{}", block_stuff.id()); - - log::info!("TRY COLLATE block {}", shard); - - let min_mc_seq_no = if prev_blocks_ids[0].seq_no() == 0 { - 0 - } else { - let state = engine.load_state(&prev_blocks_ids[0]).await?; - state.state()?.min_ref_mc_seqno() - }; - - let collator = collator::Collator::new( - shard.clone(), - min_mc_seq_no, - prev_blocks_ids.clone(), - validator_set.clone(), - created_by_opt.unwrap_or_default(), - engine.clone(), - rand_seed_opt, - CollatorSettings::default(), - )?; - let (block_candidate, new_state) = collator.collate().await?; - - let new_block = Block::construct_from_bytes(&block_candidate.data)?; - - // std::fs::write(&format!("{}/state_candidate.json", RES_PATH), ever_block_json::debug_state(new_state.clone())?)?; - // std::fs::write(&format!("{}/block_candidate.json", RES_PATH), ever_block_json::debug_block_full(&new_block)?)?; - - let validator_query = ValidateQuery::new( - shard.clone(), - min_mc_seq_no, - prev_blocks_ids.clone(), - block_candidate.clone(), - validator_set.clone(), - engine.clone(), - true, - true, - None, - ); - validator_query.try_validate().await?; - Ok((new_block, new_state)) + try_collate(&engine, shard, prev_blocks_ids, created_by_opt, rand_seed_opt, skip_state_update, false).await } #[tokio::test(flavor = "multi_thread")] async fn test_collate_first_block() { - std::fs::create_dir_all(RES_PATH).ok(); // init_test_log(); let bundle = Arc::new(CollatorTestBundle::build_with_zero_state( "src/tests/static/zerostate.boc", diff --git a/src/validator/tests/test_message_cache.rs b/src/validator/tests/test_message_cache.rs index fec7d46d..6d7b057c 100644 --- a/src/validator/tests/test_message_cache.rs +++ b/src/validator/tests/test_message_cache.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use openssl::rand::rand_bytes; use rand::{Rng, thread_rng}; +#[cfg(feature = "telemetry")] use adnl::telemetry::Metric; use ton_api::ton::ton_node::{RempMessageLevel, RempMessageStatus, rempmessagestatus::RempAccepted}; use ever_block::{BlockIdExt, ShardIdent}; diff --git a/src/validator/tests/test_out_msg_queue.rs b/src/validator/tests/test_out_msg_queue.rs index 823a9270..af7a9ea1 100644 --- a/src/validator/tests/test_out_msg_queue.rs +++ b/src/validator/tests/test_out_msg_queue.rs @@ -410,7 +410,7 @@ fn test_clean_queue() { continue; } - queue.remove(SliceData::load_builder(key.write_to_new_cell()?)?)?; + queue.remove(key.write_to_bitstring()?)?; } queue.after_remove()?; diff --git a/src/validator/validate_query.rs b/src/validator/validate_query.rs index 144d79b4..a48a3c8d 100644 --- a/src/validator/validate_query.rs +++ b/src/validator/validate_query.rs @@ -2750,12 +2750,12 @@ impl ValidateQuery { enq: MsgEnqueueStuff, created_lt: u64, key: &OutMsgQueueKey, - nb_block_id: &BlockIdExt, + nb_shard: &ShardIdent, ) -> Result { if created_lt != enq.created_lt() { reject_query!("EnqueuedMsg with key {:x} in outbound queue of our neighbor {} \ pretends to have been created at lt {} but its actual creation lt is {}", - key, nb_block_id, created_lt, enq.created_lt()) + key, nb_shard, created_lt, enq.created_lt()) } CHECK!(base.shard().contains_full_prefix(&enq.next_prefix())); @@ -2771,7 +2771,7 @@ impl ValidateQuery { // just check that we have not imported it once again if in_msg.is_some() { reject_query!("have an InMsg entry for processing again already processed EnqueuedMsg with key {:x} \ - of neighbor {}", key, nb_block_id) + of neighbor {}", key, nb_shard) } if base.shard().contains_full_prefix(&enq.cur_prefix()) { // if this message comes from our own outbound queue, we must have dequeued it @@ -2799,7 +2799,7 @@ impl ValidateQuery { // log::error!(target: "validate_query", "internal inconsistency: new ProcessedInfo claims \ // to have processed all messages up to ({},{}) but we had somehow already processed a message ({},{}) \ // from OutMsgQueue of neighbor {} key {}", self.claimed_proc_lt, self.claimed_proc_hash.to_hex_string(), - // created_lt, key.hash.to_hex_string(), nb_block_id, key.to_hex_string()); + // created_lt, key.hash.to_hex_string(), nb_shard, key.to_hex_string()); // return Ok(false) // } // Ok(true) @@ -2814,7 +2814,7 @@ impl ValidateQuery { to have processed all messages up to ({},{:x}), but we had somehow processed in this block \ a message ({},{:x}) from OutMsgQueue of neighbor {} key {:x}", claimed_proc_lt, claimed_proc_hash, - created_lt, key, nb_block_id, key) + created_lt, key, nb_shard, key) } } // must have a msg_import_fin or msg_import_tr InMsg record @@ -2823,15 +2823,15 @@ impl ValidateQuery { Some(InMsg::Transit(info)) => info.in_envelope_message_hash(), None => reject_query!("there is no InMsg entry for processing EnqueuedMsg with key {:x} \ of neighbor {} which is claimed to be processed by new ProcessedInfo of this block", - key, nb_block_id), + key, nb_shard), _ => reject_query!("expected either a msg_import_fin or a msg_import_tr InMsg record \ for processing EnqueuedMsg with key {:x} of neighbor {} which is claimed to be processed \ - by new ProcessedInfo of this block", key, nb_block_id) + by new ProcessedInfo of this block", key, nb_shard) }; if hash != enq.envelope_hash() { reject_query!("InMsg record for processing EnqueuedMsg with key {:x} of neighbor {} \ which is claimed to be processed by new ProcessedInfo of this block contains a reference \ - to a different MsgEnvelope", key, nb_block_id); + to a different MsgEnvelope", key, nb_shard); } // all other checks have been done while checking InMsgDescr Ok(true) @@ -2848,7 +2848,7 @@ impl ValidateQuery { base.next_block_descr, claimed_proc_lt, claimed_proc_hash, created_lt, key.hash, - nb_block_id, key); + nb_shard, key); } } Ok(false) @@ -2860,18 +2860,18 @@ impl ValidateQuery { log::debug!(target: "validate_query", "({}): check_in_queue len: {}", base.next_block_descr, manager.neighbors().len()); let mut iter = manager.merge_out_queue_iter(base.shard())?; while let Some(k_v) = iter.next() { - let (msg_key, enq, lt, nb_block_id) = k_v?; - if !base.split_queues && !nb_block_id.shard().contains_full_prefix(enq.cur_prefix()) { + let (msg_key, enq, lt, nb_shard) = k_v?; + if !base.split_queues && !nb_shard.contains_full_prefix(enq.cur_prefix()) { // this case from shard split result without splitting queues continue; } log::debug!(target: "validate_query", "({}): processing inbound message with \ - (lt,hash)=({},{:x}) from neighbor - {}", base.next_block_descr, lt, msg_key.hash, nb_block_id); + (lt,hash)=({},{:x}) from neighbor - {}", base.next_block_descr, lt, msg_key.hash, nb_shard); // if (verbosity > 3) { // std::cerr << "inbound message: lt=" << kv->lt from=" << kv->source key=" << kv->key.to_hex_string() msg="; // block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg)); // } - match Self::check_neighbor_outbound_message_processed(base, manager, enq, lt, &msg_key, &nb_block_id) { + match Self::check_neighbor_outbound_message_processed(base, manager, enq, lt, &msg_key, &nb_shard) { Err(err) => { // if (verbosity > 1) { // std::cerr << "invalid neighbor outbound message: lt=" << kv->lt from=" << kv->source @@ -2879,7 +2879,7 @@ impl ValidateQuery { // block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg)); // } reject_query!("error processing outbound internal message {:x} of neighbor {} : {}", - msg_key.hash, nb_block_id, err) + msg_key.hash, nb_shard, err) } Ok(false) => return Ok(false), _ => () diff --git a/src/validator/validator_group.rs b/src/validator/validator_group.rs index 94ead864..fc5ba876 100644 --- a/src/validator/validator_group.rs +++ b/src/validator/validator_group.rs @@ -854,7 +854,7 @@ impl ValidatorGroup { run_validate_query( self.shard().clone(), min_ts, - mc.clone(), + mc.seq_no(), prev_block_ids, candidate.clone(), self.validator_set.clone(), diff --git a/storage/src/archives/package.rs b/storage/src/archives/package.rs index 59327f8b..72e765e5 100644 --- a/storage/src/archives/package.rs +++ b/storage/src/archives/package.rs @@ -153,7 +153,7 @@ impl Package { .read(true) .write(!read_only || create) .create(create) - .open(&path).await?) + .open(path).await?) } pub async fn open_file(&self) -> Result { diff --git a/storage/src/shardstate_db_async.rs b/storage/src/shardstate_db_async.rs index db8867b4..505bac88 100644 --- a/storage/src/shardstate_db_async.rs +++ b/storage/src/shardstate_db_async.rs @@ -408,7 +408,6 @@ impl ShardStateDb { Arc::clone(&self.shardstate_db) } - #[cfg(test)] pub fn enum_shardstate_db(&self) -> Result<()> { self.shardstate_db.for_each(&mut |_key, val| { let db_entry = DbEntry::from_slice(val)?; @@ -511,8 +510,7 @@ impl ShardStateDb { )?) } - pub fn create_hashed_cell_storage( - &self,) -> Result { + pub fn create_hashed_cell_storage(&self) -> Result { Ok(CellByHashStorageAdapter::new( self.dynamic_boc_db.clone(), false