Skip to content

Commit

Permalink
Allow snapshot sync to be resumed. (#1453)
Browse files Browse the repository at this point in the history
If all peers become inactive (because of timeout or others), we now
allow the node to request candidates from all peers again to update the
`active_peers` set, and resume the unfinished chunks if the snapshot
candidate to sync is unchanged.
  • Loading branch information
peilun-conflux authored and Peilun Li committed May 19, 2020
1 parent b1b3459 commit 6e0b6aa
Showing 1 changed file with 12 additions and 25 deletions.
37 changes: 12 additions & 25 deletions core/src/sync/state/snapshot_chunk_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ impl Inner {
if self.current_sync_candidate.as_ref() == Some(&sync_candidate)
&& self.trusted_blame_block == trusted_blame_block
{
// The new candidate is not changed, so we can resume our previous
// sync status with new `active_peers`.
self.status = Status::DownloadingChunks(Instant::now());
return;
}
info!(
Expand Down Expand Up @@ -267,10 +270,6 @@ impl SnapshotChunkSync {

pub fn status(&self) -> Status { self.inner.read().status }

pub fn trusted_blame_block(&self) -> H256 {
self.inner.read().trusted_blame_block.clone()
}

pub fn handle_snapshot_manifest_response(
&self, ctx: &Context, response: SnapshotManifestResponse,
request: &SnapshotManifestRequest,
Expand Down Expand Up @@ -502,22 +501,13 @@ impl SnapshotChunkSync {
&self, ctx: &Context, chunk_key: ChunkKey, chunk: Chunk,
) -> StorageResult<()> {
let mut inner = self.inner.write();
// status mismatch
let download_start_time = match inner.status {
Status::DownloadingChunks(t) => {
debug!(
"Snapshot chunk received, checkpoint = {:?}, chunk = {:?}",
inner.current_sync_candidate, chunk_key
);
t
}
_ => {
debug!("Snapshot chunk received, but mismatch with current status {:?}", inner.status);
return Ok(());
}
};

// There are two possible reasons:
// If a response is in `downloading_chunks`, we can process
// it regardless of our current status, because we allow chunk requests
// to be resumed if the snapshot to sync is unchanged.
//
// There are two possible reasons that a response is not in
// `downloading_chunks`:
// 1. received a out-of-date snapshot chunk, e.g. new era started.
// 2. Duplicated chunks received, and it has been processed.
if inner.downloading_chunks.remove(&chunk_key).is_none() {
Expand All @@ -538,10 +528,7 @@ impl SnapshotChunkSync {

// begin to restore if all chunks downloaded
if inner.downloading_chunks.is_empty() {
debug!(
"Snapshot chunks are all downloaded in {:?}",
download_start_time.elapsed()
);
debug!("Snapshot chunks are all downloaded",);

let snapshot_info = inner.snapshot_info.clone();
// start to restore and update status
Expand Down Expand Up @@ -917,6 +904,7 @@ impl SnapshotChunkSync {
)
{
let mut inner = self.inner.write();
debug!("sync state status before updating: {:?}", *inner);
self.check_timeout(
&mut *inner,
&Context {
Expand All @@ -929,8 +917,6 @@ impl SnapshotChunkSync {
if inner.sync_candidate_manager.is_inactive() {
warn!("current sync candidate becomes inactive: {:?}", inner);
inner.status = Status::Inactive;
inner.current_sync_candidate = None;
inner.trusted_blame_block = Default::default();
}

// If we moves into the next era, we should force state_sync to change
Expand Down Expand Up @@ -996,6 +982,7 @@ impl SnapshotChunkSync {
}];
inner.start_sync(current_era_genesis, candidates, io, sync_handler)
}
debug!("sync state status after updating: {:?}", *inner);
}

fn check_timeout(&self, inner: &mut Inner, ctx: &Context) {
Expand Down

0 comments on commit 6e0b6aa

Please sign in to comment.