Skip to content

Commit

Permalink
Merge branch 'master' into v0.5-alpha
Browse files Browse the repository at this point in the history
  • Loading branch information
Peilun Li committed Jun 8, 2020
2 parents 6dce082 + 5abd4a9 commit 14d8245
Show file tree
Hide file tree
Showing 21 changed files with 271 additions and 120 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Here are some guidelines before you start:

* [Conflux Website](https://www.conflux-chain.org/)
* [Conflux Paper](https://arxiv.org/abs/1805.03870)
* [Medium](https://medium.com/@Confluxchain)
* [Medium](https://medium.com/@ConfluxNetwork)

## License

Expand Down
2 changes: 1 addition & 1 deletion client/src/rpc/impls/cfx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ impl Cfx for CfxHandler {
to self.common {
fn best_block_hash(&self) -> JsonRpcResult<RpcH256>;
fn block_by_epoch_number(
&self, epoch_num: EpochNumber, include_txs: bool) -> JsonRpcResult<RpcBlock>;
&self, epoch_num: EpochNumber, include_txs: bool) -> JsonRpcResult<Option<RpcBlock>>;
fn block_by_hash_with_pivot_assumption(
&self, block_hash: RpcH256, pivot_hash: RpcH256, epoch_number: RpcU64)
-> JsonRpcResult<RpcBlock>;
Expand Down
12 changes: 5 additions & 7 deletions client/src/rpc/impls/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl RpcImpl {

pub fn block_by_epoch_number(
&self, epoch_num: EpochNumber, include_txs: bool,
) -> RpcResult<RpcBlock> {
) -> RpcResult<Option<RpcBlock>> {
let consensus_graph = self
.consensus
.as_any()
Expand All @@ -149,14 +149,12 @@ impl RpcImpl {
.get_pivot_hash_from_epoch_number(epoch_height)
.map_err(RpcError::invalid_params)?;

if let Some(block) = self
let maybe_block = self
.data_man
.block_by_hash(&pivot_hash, false /* update_cache */)
{
Ok(RpcBlock::new(&*block, inner, &self.data_man, include_txs))
} else {
Err(RpcError::internal_error())
}
.map(|b| RpcBlock::new(&*b, inner, &self.data_man, include_txs));

Ok(maybe_block)
}

pub fn confirmation_risk_by_hash(
Expand Down
2 changes: 1 addition & 1 deletion client/src/rpc/impls/light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl Cfx for CfxHandler {
delegate! {
to self.common {
fn best_block_hash(&self) -> RpcResult<RpcH256>;
fn block_by_epoch_number(&self, epoch_num: EpochNumber, include_txs: bool) -> RpcResult<RpcBlock>;
fn block_by_epoch_number(&self, epoch_num: EpochNumber, include_txs: bool) -> RpcResult<Option<RpcBlock>>;
fn block_by_hash_with_pivot_assumption(&self, block_hash: RpcH256, pivot_hash: RpcH256, epoch_number: RpcU64) -> RpcResult<RpcBlock>;
fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> RpcResult<Option<RpcBlock>>;
fn blocks_by_epoch(&self, num: EpochNumber) -> RpcResult<Vec<RpcH256>>;
Expand Down
2 changes: 1 addition & 1 deletion client/src/rpc/traits/cfx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait Cfx {
#[rpc(name = "cfx_getBlockByEpochNumber")]
fn block_by_epoch_number(
&self, epoch_number: EpochNumber, include_txs: bool,
) -> JsonRpcResult<Block>;
) -> JsonRpcResult<Option<Block>>;

/// Returns best block hash.
#[rpc(name = "cfx_getBestBlockHash")]
Expand Down
2 changes: 2 additions & 0 deletions core/benchmark/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ serde_json = "1.0"
[patch.crates-io]
parity-snappy = { path = "parity-snappy/rust-snappy" }
bzip2-sys = { git = "https://github.com/alexcrichton/bzip2-rs.git", commit = "a8ee5cb4" }
sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" }


[patch.'https://github.com/paritytech/rust-secp256k1']
# There was a package name change, bit it's not possible to redirect eth-secp256k1 to "parity-secp256k1"
Expand Down
67 changes: 38 additions & 29 deletions core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,41 +754,50 @@ impl ConsensusGraph {
}

let blooms = filter.bloom_possibilities();
let mut blocks = vec![];
for epoch_number in from_epoch..(to_epoch + 1) {
if epoch_number <= inner.get_cur_era_genesis_height() {
// Blocks before (including) `cur_era_genesis` does not has
// epoch set in memory, so we should get
// the epoch set from db
let epoch_set = self
.data_man
.executed_epoch_set_hashes_from_db(epoch_number)
.expect("epoch set past checkpoint should exist");
let epoch_hash = epoch_set.last().expect("Not empty");
for hash in &epoch_set {
if self.block_matches_bloom(hash, epoch_hash, &blooms) {
blocks.push(*hash);
(from_epoch..(to_epoch + 1))
.into_par_iter()
.map(|epoch_number| {
let mut blocks = Vec::new();
if epoch_number <= inner.get_cur_era_genesis_height() {
// Blocks before (including) `cur_era_genesis` do not
// have epoch set in memory, so
// we should get the epoch set from db
let epoch_set = self
.data_man
.executed_epoch_set_hashes_from_db(epoch_number)
.expect("epoch set from past era should exist");
let epoch_hash = epoch_set.last().expect("Not empty");
for hash in &epoch_set {
if self
.block_matches_bloom(hash, epoch_hash, &blooms)
{
blocks.push(*hash)
}
}
}
} else {
// Use the epoch set maintained in memory
let epoch_hash = &inner.arena
[inner.get_pivot_block_arena_index(epoch_number)]
.hash;
for index in inner.get_ordered_executable_epoch_blocks(
inner.get_pivot_block_arena_index(epoch_number),
) {
let hash = &inner.arena[*index].hash;
if self.block_matches_bloom(hash, epoch_hash, &blooms) {
blocks.push(*hash);
} else {
// Use the epoch set maintained in memory
let epoch_hash = &inner.arena
[inner.get_pivot_block_arena_index(epoch_number)]
.hash;
for index in inner.get_ordered_executable_epoch_blocks(
inner.get_pivot_block_arena_index(epoch_number),
) {
let hash = &inner.arena[*index].hash;
if self
.block_matches_bloom(hash, epoch_hash, &blooms)
{
blocks.push(*hash);
}
}
}
}
}
blocks
blocks
})
.flatten()
.collect()
} else {
filter.block_hashes.as_ref().unwrap().clone()
};
debug!("get_logs: {} blocks after filter", block_hashes.len());

Ok(self.logs_from_blocks(
block_hashes,
Expand Down
1 change: 1 addition & 0 deletions core/src/storage/impls/storage_db/kvdb_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl KeyValueDbTrait for KvdbRocksdb {
random_crash_if_enabled("rocksdb delete");
let mut transaction = self.kvdb.transaction();
transaction.delete(self.col, key);
self.kvdb.write(transaction)?;
Ok(None)
}

Expand Down
16 changes: 14 additions & 2 deletions core/src/sync/message/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
};
use network::{node_table::NodeId, NetworkContext};
use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
use rlp_derive::{RlpDecodableWrapper, RlpEncodableWrapper};

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum DynamicCapability {
Expand Down Expand Up @@ -90,11 +89,24 @@ impl DynamicCapabilitySet {
}
}

#[derive(Debug, RlpDecodableWrapper, RlpEncodableWrapper)]
#[derive(Debug)]
pub struct DynamicCapabilityChange {
pub changed: DynamicCapability,
}

impl Encodable for DynamicCapabilityChange {
fn rlp_append(&self, s: &mut RlpStream) {
s.append_internal(&self.changed);
}
}

impl Decodable for DynamicCapabilityChange {
fn decode(d: &Rlp) -> Result<Self, DecoderError> {
let changed = d.as_val()?;
Ok(DynamicCapabilityChange { changed })
}
}

impl Handleable for DynamicCapabilityChange {
fn handle(self, ctx: &Context) -> Result<(), Error> {
debug!(
Expand Down
15 changes: 13 additions & 2 deletions core/src/sync/message/new_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@ use crate::sync::{
};
use cfx_types::H256;
use primitives::Block;
use rlp_derive::{RlpDecodableWrapper, RlpEncodableWrapper};
use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};

#[derive(Debug, PartialEq, RlpDecodableWrapper, RlpEncodableWrapper)]
#[derive(Debug, PartialEq)]
pub struct NewBlock {
pub block: Block,
}

impl Encodable for NewBlock {
fn rlp_append(&self, s: &mut RlpStream) { s.append_internal(&self.block); }
}

impl Decodable for NewBlock {
fn decode(d: &Rlp) -> Result<Self, DecoderError> {
let block = d.as_val()?;
Ok(NewBlock { block })
}
}

impl Handleable for NewBlock {
// TODO This is only used in tests now. Maybe we can add a rpc to send full
// block and remove NEW_BLOCK from p2p
Expand Down
17 changes: 15 additions & 2 deletions core/src/sync/message/new_block_hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,26 @@ use crate::sync::{
Error,
};
use cfx_types::H256;
use rlp_derive::{RlpDecodableWrapper, RlpEncodableWrapper};
use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};

#[derive(Debug, PartialEq, RlpDecodableWrapper, RlpEncodableWrapper)]
#[derive(Debug, PartialEq)]
pub struct NewBlockHashes {
pub block_hashes: Vec<H256>,
}

impl Encodable for NewBlockHashes {
fn rlp_append(&self, s: &mut RlpStream) {
s.append_list(&self.block_hashes);
}
}

impl Decodable for NewBlockHashes {
fn decode(d: &Rlp) -> Result<Self, DecoderError> {
let block_hashes = d.as_list()?;
Ok(NewBlockHashes { block_hashes })
}
}

impl Handleable for NewBlockHashes {
fn handle(self, ctx: &Context) -> Result<(), Error> {
debug!("on_new_block_hashes, msg={:?}", self);
Expand Down
19 changes: 15 additions & 4 deletions core/src/sync/message/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,28 @@ use network::service::ProtocolVersion;
use primitives::{transaction::TxPropagateId, TransactionWithSignature};
use priority_send_queue::SendQueuePriority;
use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
use rlp_derive::{
RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use rlp_derive::{RlpDecodable, RlpEncodable};
use siphasher::sip::SipHasher24;
use std::{any::Any, collections::HashSet, hash::Hasher, time::Duration};

#[derive(Debug, PartialEq, RlpDecodableWrapper, RlpEncodableWrapper)]
#[derive(Debug, PartialEq)]
pub struct Transactions {
pub transactions: Vec<TransactionWithSignature>,
}

impl Encodable for Transactions {
fn rlp_append(&self, s: &mut RlpStream) {
s.append_list(&self.transactions);
}
}

impl Decodable for Transactions {
fn decode(d: &Rlp) -> Result<Self, DecoderError> {
let transactions = d.as_list()?;
Ok(Transactions { transactions })
}
}

impl Handleable for Transactions {
fn handle(self, ctx: &Context) -> Result<(), Error> {
let transactions = self.transactions;
Expand Down
63 changes: 34 additions & 29 deletions core/src/sync/synchronization_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1329,46 +1329,50 @@ impl SynchronizationGraph {
}
}

if let Some(block_header_arc) =
self.data_man.block_header_by_hash(&hash)
{
let mut block_header = block_header_arc.as_ref().clone();
// Only construct synchronization graph if is not header_only.
// Construct both sync and consensus graph if is header_only.
let (insert_result, _) = self.insert_block_header(
&mut block_header,
true, /* need_to_verify */
false, /* bench_mode */
header_only, /* insert_to_consensus */
false, /* persistent */
);
assert!(!insert_result.is_invalid());

let parent = block_header.parent_hash().clone();
let referees = block_header.referee_hashes().clone();

// Construct consensus graph if is not header_only.
if !header_only {
if let Some(block) =
self.data_man.block_by_hash(&hash, false)
{
let result = self.insert_block(
// Insert headers or full blocks depending on our phase.
// Note that if we have headers in db for recover block phase, we
// will not insert the headers, so the blocks can be
// retrieved later.
let get_and_insert = |hash| {
if header_only {
self.data_man.block_header_by_hash(hash).map(|header| {
self.insert_block_header(
&mut header.as_ref().clone(),
true, /* need_to_verify */
false, /* bench_mode */
true, /* insert_to_consensus */
false, /* persistent */
);
header
})
} else {
self.data_man.block_by_hash(hash, false).map(|block| {
let mut header = block.block_header.clone();
self.insert_block_header(
&mut header,
true, /* need_to_verify */
false, /* bench_mode */
false, /* insert_to_consensus */
false, /* persistent */
);
self.insert_block(
block.as_ref().clone(),
true, /* need_to_verify */
false, /* persistent */
true, /* recover_from_db */
);
assert!(result.is_valid());
} else {
missed_hashes.insert(hash);
}
Arc::new(header)
})
}
};

if let Some(block_header) = get_and_insert(&hash) {
let parent = block_header.parent_hash().clone();
let referees = block_header.referee_hashes().clone();
if !visited_blocks.contains(&parent) {
queue.push_back(parent);
visited_blocks.insert(parent);
}

for referee in referees {
if !visited_blocks.contains(&referee) {
queue.push_back(referee);
Expand Down Expand Up @@ -1580,6 +1584,7 @@ impl SynchronizationGraph {
) -> (BlockHeaderInsertionResult, Vec<H256>)
{
let _timer = MeterTimer::time_func(SYNC_INSERT_HEADER.as_ref());
self.statistics.inc_sync_graph_inserted_header_count();
let inner = &mut *self.inner.write();
let hash = header.hash();

Expand Down
2 changes: 1 addition & 1 deletion core/src/sync/synchronization_phases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl SynchronizationPhaseTrait for CatchUpCheckpointPhase {
if let Some(commitment) = sync_handler
.graph
.data_man
.get_epoch_execution_commitment_with_db(&epoch_to_sync)
.load_epoch_execution_commitment_from_db(&epoch_to_sync)
{
info!("CatchUpCheckpointPhase: commitment for epoch {:?} exists, skip state sync. \
commitment={:?}", epoch_to_sync, commitment);
Expand Down
Loading

0 comments on commit 14d8245

Please sign in to comment.