From d06ce0bf40af53f722da33141f3e473bf67d5db3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 24 Sep 2020 16:00:41 +0800 Subject: [PATCH 01/17] refactor evict to avoid read disk from pipelog Signed-off-by: Little-Wallace --- src/cache_evict.rs | 99 +++++++++++----------------------------------- src/engine.rs | 21 ++++------ src/pipe_log.rs | 19 +++++---- 3 files changed, 42 insertions(+), 97 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index e5c3afa8..d17f59f4 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -8,12 +8,9 @@ use crossbeam::channel::{bounded, Sender}; use protobuf::Message; use crate::engine::{MemTableAccessor, SharedCacheStats}; -use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; -use crate::pipe_log::{GenericPipeLog, LogQueue}; +use crate::log_batch::EntryExt; use crate::util::{HandyRwLock, Runnable, Scheduler}; -pub const DEFAULT_CACHE_CHUNK_SIZE: usize = 4 * 1024 * 1024; - const HIGH_WATER_RATIO: f64 = 0.9; const LOW_WATER_RATIO: f64 = 0.8; const CHUNKS_SHRINK_TO: usize = 1024; @@ -21,15 +18,11 @@ const CHUNKS_SHRINK_TO: usize = 1024; /// Used in `PipLog` to emit `CacheTask::NewChunk` tasks. pub struct CacheSubmitor { file_num: u64, - offset: u64, - // `chunk_size` is different from `size_tracker`. For a given chunk, - // the former is monotomically increasing, but the latter can decrease. - chunk_size: usize, size_tracker: Arc, + group_infos: Vec<(u64, u64)>, scheduler: Scheduler, cache_limit: usize, - chunk_limit: usize, cache_stats: Arc, block_on_full: bool, } @@ -37,18 +30,15 @@ pub struct CacheSubmitor { impl CacheSubmitor { pub fn new( cache_limit: usize, - chunk_limit: usize, scheduler: Scheduler, cache_stats: Arc, ) -> Self { CacheSubmitor { file_num: 0, - offset: 0, - chunk_size: 0, + group_infos: vec![], size_tracker: Arc::new(AtomicUsize::new(0)), scheduler, cache_limit, - chunk_limit, cache_stats, block_on_full: false, } @@ -62,39 +52,24 @@ impl CacheSubmitor { self.block_on_full = false; } - pub fn get_cache_tracker( - &mut self, - file_num: u64, - offset: u64, - size: usize, - ) -> Option> { + pub fn get_cache_tracker(&mut self, file_num: u64, size: usize) -> Option> { if self.cache_limit == 0 { return None; } - - if self.file_num == 0 { + if self.file_num != file_num { self.file_num = file_num; - self.offset = offset; - } - - if self.chunk_size >= self.chunk_limit || self.file_num < file_num { // If all entries are released from cache, the chunk can be ignored. if self.size_tracker.load(Ordering::Relaxed) > 0 { - let mut task = CacheChunk { + let group_infos = std::mem::replace(&mut self.group_infos, vec![]); + let task = CacheChunk { file_num: self.file_num, - base_offset: self.offset, - end_offset: offset, size_tracker: self.size_tracker.clone(), + group_infos, }; - if file_num != self.file_num { - // Log file is switched, use `u64::MAX` as the end. - task.end_offset = u64::MAX; - } let _ = self.scheduler.schedule(CacheTask::NewChunk(task)); } - self.reset(file_num, offset); + self.reset(file_num); } - if self.block_on_full { let cache_size = self.cache_stats.cache_size(); if cache_size > self.cache_limit { @@ -104,55 +79,50 @@ impl CacheSubmitor { } } } - - self.chunk_size += size; self.size_tracker.fetch_add(size, Ordering::Release); self.cache_stats.add_mem_change(size); Some(self.size_tracker.clone()) } - fn reset(&mut self, file_num: u64, offset: u64) { + pub fn fill_cache(&mut self, group_id: u64, index: u64) { + self.group_infos.push((group_id, index)); + } + + fn reset(&mut self, file_num: u64) { self.file_num = file_num; - self.offset = offset; - self.chunk_size = 0; self.size_tracker = Arc::new(AtomicUsize::new(0)); } } -pub struct Runner +pub struct Runner where E: Message + Clone, W: EntryExt + 'static, - P: GenericPipeLog, { cache_limit: usize, cache_stats: Arc, chunk_limit: usize, valid_cache_chunks: VecDeque, memtables: MemTableAccessor, - pipe_log: P, } -impl Runner +impl Runner where E: Message + Clone, W: EntryExt + 'static, - P: GenericPipeLog, { pub fn new( cache_limit: usize, cache_stats: Arc, chunk_limit: usize, memtables: MemTableAccessor, - pipe_log: P, - ) -> Runner { + ) -> Runner { Runner { cache_limit, cache_stats, chunk_limit, valid_cache_chunks: Default::default(), memtables, - pipe_log, } } @@ -186,32 +156,9 @@ where _ => return false, }; - let file_num = chunk.file_num; - let read_len = if chunk.end_offset == u64::MAX { - self.pipe_log.file_len(LogQueue::Append, file_num) - chunk.base_offset - } else { - chunk.end_offset - chunk.base_offset - }; - let chunk_content = self - .pipe_log - .fread(LogQueue::Append, file_num, chunk.base_offset, read_len) - .unwrap(); - - let mut reader: &[u8] = chunk_content.as_ref(); - let mut offset = chunk.base_offset; - while let Some(b) = LogBatch::::from_bytes(&mut reader, file_num, offset).unwrap() - { - offset += read_len - reader.len() as u64; - for item in b.items { - if let LogItemContent::Entries(entries) = item.content { - let gc_cache_to = match entries.entries.last() { - Some(entry) => W::index(entry) + 1, - None => continue, - }; - if let Some(memtable) = self.memtables.get(item.raft_group_id) { - memtable.wl().compact_cache_to(gc_cache_to); - } - } + for (group_id, index) in chunk.group_infos { + if let Some(memtable) = self.memtables.get(group_id) { + memtable.wl().compact_cache_to(index); } } } @@ -219,11 +166,10 @@ where } } -impl Runnable for Runner +impl Runnable for Runner where E: Message + Clone, W: EntryExt + 'static, - P: GenericPipeLog, { fn run(&mut self, task: CacheTask) -> bool { match task { @@ -253,9 +199,8 @@ pub enum CacheTask { #[derive(Clone)] pub struct CacheChunk { file_num: u64, - base_offset: u64, - end_offset: u64, size_tracker: Arc, + group_infos: Vec<(u64, u64)>, } #[derive(Clone)] diff --git a/src/engine.rs b/src/engine.rs index a9451a14..3c9724cf 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -7,9 +7,7 @@ use std::{fmt, u64}; use log::{info, warn}; use protobuf::Message; -use crate::cache_evict::{ - CacheSubmitor, CacheTask, Runner as CacheEvictRunner, DEFAULT_CACHE_CHUNK_SIZE, -}; +use crate::cache_evict::{CacheSubmitor, CacheTask, Runner as CacheEvictRunner}; use crate::config::{Config, RecoveryMode}; use crate::log_batch::{ self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN, @@ -173,11 +171,10 @@ where } } - if let Some(tracker) = pipe_log.cache_submitor().get_cache_tracker( - file_num, - offset, - encoded_size, - ) { + if let Some(tracker) = pipe_log + .cache_submitor() + .get_cache_tracker(file_num, encoded_size) + { for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { entries.attach_cache_tracker(tracker.clone()); @@ -291,7 +288,7 @@ where E: Message + Clone, W: EntryExt + 'static, { - fn new_impl(cfg: Config, chunk_limit: usize) -> Result> { + fn new_impl(cfg: Config) -> Result> { let cache_limit = cfg.cache_limit.0 as usize; let cache_stats = Arc::new(SharedCacheStats::default()); @@ -301,7 +298,6 @@ where &cfg, CacheSubmitor::new( cache_limit, - chunk_limit, cache_evict_worker.scheduler(), cache_stats.clone(), ), @@ -319,9 +315,8 @@ where let cache_evict_runner = CacheEvictRunner::new( cache_limit, cache_stats.clone(), - chunk_limit, + cfg.target_file_size.0 as usize, memtables.clone(), - pipe_log.clone(), ); cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); @@ -351,7 +346,7 @@ where } pub fn new(cfg: Config) -> Engine { - Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap() + Self::new_impl(cfg).unwrap() } } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index dc0cbad9..df626864 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -182,7 +182,9 @@ impl LogManager { if self.active_file_num > 0 { self.truncate_active_log(None)?; } - + if let Some(last_file) = self.all_files.back() { + fsync(last_file.0).map_err(|e| parse_nix_error(e, "fsync"))?; + } self.active_file_num += 1; let mut path = PathBuf::from(&self.dir); @@ -449,18 +451,21 @@ impl GenericPipeLog for PipeLog { // TODO: `pwrite` is performed in the mutex. Is it possible for concurrence? let mut cache_submitor = self.cache_submitor.lock().unwrap(); let (cur_file_num, offset, fd) = self.append(LogQueue::Append, &content, &mut sync)?; - let tracker = cache_submitor.get_cache_tracker(cur_file_num, offset, entries_size); - drop(cache_submitor); - if sync { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; - } - + let tracker = cache_submitor.get_cache_tracker(cur_file_num, entries_size); for item in &batch.items { if let LogItemContent::Entries(ref entries) = item.content { entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); + entries.entries.last().map(|e| { + cache_submitor.fill_cache(item.raft_group_id, W::index(e)); + }); } } + drop(cache_submitor); + if sync { + fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + } + *file_num = cur_file_num; return Ok(content.len()); } From eb405162af35c18a4e49f88b66e8f462690e4200 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 24 Sep 2020 16:06:01 +0800 Subject: [PATCH 02/17] small fix Signed-off-by: Little-Wallace --- src/cache_evict.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index d17f59f4..669e2137 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -57,7 +57,6 @@ impl CacheSubmitor { return None; } if self.file_num != file_num { - self.file_num = file_num; // If all entries are released from cache, the chunk can be ignored. if self.size_tracker.load(Ordering::Relaxed) > 0 { let group_infos = std::mem::replace(&mut self.group_infos, vec![]); @@ -85,7 +84,9 @@ impl CacheSubmitor { } pub fn fill_cache(&mut self, group_id: u64, index: u64) { - self.group_infos.push((group_id, index)); + if self.cache_limit != 0 { + self.group_infos.push((group_id, index)); + } } fn reset(&mut self, file_num: u64) { From cb7075818780a131bb1c4b708e96add48f508427 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 24 Sep 2020 19:30:11 +0800 Subject: [PATCH 03/17] add async interface Signed-off-by: Little-Wallace --- Cargo.toml | 1 + src/cache_evict.rs | 20 ++---- src/engine.rs | 157 ++++++++++++++++++++++++++++++--------------- src/errors.rs | 13 ++++ src/lib.rs | 1 + src/pipe_log.rs | 89 ++++++++++++------------- src/wal.rs | 109 +++++++++++++++++++++++++++++++ 7 files changed, 277 insertions(+), 113 deletions(-) create mode 100644 src/wal.rs diff --git a/Cargo.toml b/Cargo.toml index fd374273..5dbc7cc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ nix = "0.18.0" crossbeam = "0.7" prometheus = { version = "0.10", default-features = false, features = ["nightly"] } thiserror = "1.0" +futures = { version = "0.3", features = ["thread-pool", "compat"] } [dev-dependencies] raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 669e2137..6c87de00 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -52,7 +52,7 @@ impl CacheSubmitor { self.block_on_full = false; } - pub fn get_cache_tracker(&mut self, file_num: u64, size: usize) -> Option> { + pub fn get_cache_tracker(&mut self, file_num: u64) -> Option> { if self.cache_limit == 0 { return None; } @@ -69,23 +69,15 @@ impl CacheSubmitor { } self.reset(file_num); } - if self.block_on_full { - let cache_size = self.cache_stats.cache_size(); - if cache_size > self.cache_limit { - let (tx, rx) = bounded(1); - if self.scheduler.schedule(CacheTask::EvictOldest(tx)).is_ok() { - let _ = rx.recv(); - } - } - } - self.size_tracker.fetch_add(size, Ordering::Release); - self.cache_stats.add_mem_change(size); + Some(self.size_tracker.clone()) } - pub fn fill_cache(&mut self, group_id: u64, index: u64) { + pub fn fill_cache(&mut self, size: usize, group_infos: &mut Vec<(u64, u64)>) { if self.cache_limit != 0 { - self.group_infos.push((group_id, index)); + self.size_tracker.fetch_add(size, Ordering::Release); + self.cache_stats.add_mem_change(size); + self.group_infos.append(group_infos); } } diff --git a/src/engine.rs b/src/engine.rs index 3c9724cf..5b5e6359 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,23 +1,27 @@ use std::io::BufRead; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; +use std::sync::mpsc::{channel, Sender, SendError}; use std::time::{Duration, Instant}; use std::{fmt, u64}; +use std::thread::{Builder as ThreadBuilder, JoinHandle}; + +use futures::channel as future_channel; +use futures::executor::block_on; use log::{info, warn}; use protobuf::Message; use crate::cache_evict::{CacheSubmitor, CacheTask, Runner as CacheEvictRunner}; use crate::config::{Config, RecoveryMode}; -use crate::log_batch::{ - self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN, - HEADER_LEN, -}; +use crate::log_batch::{self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN, HEADER_LEN, LogItem}; use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; use crate::purge::PurgeManager; use crate::util::{HandyRwLock, HashMap, Worker}; use crate::{codec, CacheStats, Result}; +use crate::wal::{WriteTask, LogMsg, WalRunner}; +use crate::errors::Error; const SLOTS_COUNT: usize = 128; @@ -118,6 +122,7 @@ where pipe_log: P, cache_stats: Arc, purge_manager: PurgeManager, + wal_sender: Sender, workers: Arc>, } @@ -133,6 +138,7 @@ where queue: LogQueue, pipe_log: &P, memtables: &MemTableAccessor, + cache_submitor: &mut CacheSubmitor, recovery_mode: RecoveryMode, ) -> Result<()> { // Get first file number and last file number. @@ -171,9 +177,8 @@ where } } - if let Some(tracker) = pipe_log - .cache_submitor() - .get_cache_tracker(file_num, encoded_size) + if let Some(tracker) = cache_submitor + .get_cache_tracker(file_num) { for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { @@ -218,14 +223,37 @@ where // 2. Users can call `append` on one raft group concurrently. // Maybe we can improve the implement of "inactive log rewrite" and // forbid concurrent `append` to remove locks here. - fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { - let queue = LogQueue::Append; - let mut file_num = 0; - let bytes = self.pipe_log.write(log_batch, sync, &mut file_num)?; - if file_num > 0 { - apply_to_memtable(&self.memtables, log_batch, queue, file_num); + async fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { + let mut entries_size = 0; + let mut group_infos = vec![]; + for item in log_batch.items.iter() { + if let LogItemContent::Entries(ref entries) = item.content { + group_infos.push((item.raft_group_id, W::index(entries.entries.last().unwrap()))); + } + } + if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { + let (sender, r) = future_channel::oneshot::channel(); + let bytes = content.len(); + let task = WriteTask { + content, + sync, + entries_size, + group_infos, + sender, + }; + self.wal_sender.send(LogMsg::Write(task)).map_err(|_| Error::Stop)?; + let (file_num, offset, tracker) = r.await?; + if file_num > 0 { + for item in log_batch.items.drain(..) { + if let LogItemContent::Entries(ref entries) = item.content { + entries.update_position(LogQueue::Append, file_num, offset, &tracker); + } + apply_item_to_memtable(&self.memtables, item, LogQueue::Append, file_num); + } + } + return Ok(bytes); } - Ok(bytes) + Ok(0) } } @@ -281,6 +309,7 @@ impl SharedCacheStats { struct Workers { cache_evict: Worker, + wal: JoinHandle<()>, } impl Engine @@ -296,14 +325,8 @@ where let pipe_log = PipeLog::open( &cfg, - CacheSubmitor::new( - cache_limit, - cache_evict_worker.scheduler(), - cache_stats.clone(), - ), ) .expect("Open raft log"); - pipe_log.cache_submitor().block_on_full(); let memtables = { let stats = cache_stats.clone(); @@ -321,17 +344,31 @@ where cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); let recovery_mode = cfg.recovery_mode; + let mut cache_submitor = CacheSubmitor::new( + cache_limit, + cache_evict_worker.scheduler(), + cache_stats.clone(), + ); + cache_submitor.block_on_full(); Engine::recover( LogQueue::Rewrite, &pipe_log, &memtables, + &mut cache_submitor, RecoveryMode::TolerateCorruptedTailRecords, )?; - Engine::recover(LogQueue::Append, &pipe_log, &memtables, recovery_mode)?; - pipe_log.cache_submitor().nonblock_on_full(); + Engine::recover(LogQueue::Append, &pipe_log, &memtables, &mut cache_submitor, recovery_mode)?; + cache_submitor.nonblock_on_full(); let cfg = Arc::new(cfg); let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); + let (wal_sender, wal_receiver) = channel(); + + let mut wal_runner = WalRunner::new(cache_submitor, pipe_log.clone(), wal_receiver); + let th = ThreadBuilder::new() + .name("wal".to_string()) + .spawn(move || wal_runner.run().unwrap_or_else(|e| warn!("run error because: {}", e))) + .unwrap(); Ok(Engine { cfg, @@ -339,8 +376,10 @@ where pipe_log, cache_stats, purge_manager, + wal_sender, workers: Arc::new(RwLock::new(Workers { cache_evict: cache_evict_worker, + wal: th, })), }) } @@ -475,7 +514,11 @@ where /// Write the content of LogBatch into the engine and return written bytes. /// If set sync true, the data will be persisted on disk by `fsync`. pub fn write(&self, log_batch: &mut LogBatch, sync: bool) -> Result { - self.write_impl(log_batch, sync) + block_on(self.write_impl(log_batch, sync)) + } + + pub async fn async_write(&self, log_batch: &mut LogBatch, sync: bool) -> Result { + self.write_impl(log_batch, sync).await } /// Flush stats about EntryCache. @@ -557,6 +600,45 @@ where Ok(e) } +fn apply_item_to_memtable( + memtables: &MemTableAccessor, + item: LogItem, + queue: LogQueue, + file_num: u64, +) where + E: Message + Clone, + W: EntryExt, +{ + let memtable = memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries = entries_to_add.entries; + let entries_index = entries_to_add.entries_index.into_inner(); + if queue == LogQueue::Rewrite { + memtable.wl().append_rewrite(entries, entries_index); + } else { + memtable.wl().append(entries, entries_index); + } + } + LogItemContent::Command(Command::Clean) => { + memtables.remove(item.raft_group_id); + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.wl().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let (key, value) = (kv.key, kv.value.unwrap()); + match queue { + LogQueue::Append => memtable.wl().put(key, value, file_num), + LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), + } + } + OpType::Del => memtable.wl().delete(kv.key.as_slice()), + }, + } +} + fn apply_to_memtable( memtables: &MemTableAccessor, log_batch: &mut LogBatch, @@ -567,34 +649,7 @@ fn apply_to_memtable( W: EntryExt, { for item in log_batch.items.drain(..) { - let memtable = memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index.into_inner(); - if queue == LogQueue::Rewrite { - memtable.wl().append_rewrite(entries, entries_index); - } else { - memtable.wl().append(entries, entries_index); - } - } - LogItemContent::Command(Command::Clean) => { - memtables.remove(item.raft_group_id); - } - LogItemContent::Command(Command::Compact { index }) => { - memtable.wl().compact_to(index); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => { - let (key, value) = (kv.key, kv.value.unwrap()); - match queue { - LogQueue::Append => memtable.wl().put(key, value, file_num), - LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), - } - } - OpType::Del => memtable.wl().delete(kv.key.as_slice()), - }, - } + apply_item_to_memtable(memtables, item, queue, file_num); } } diff --git a/src/errors.rs b/src/errors.rs index 8313149f..87ff1762 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,5 +1,6 @@ use std::error; use std::io::Error as IoError; +use futures::channel::oneshot::Canceled; use thiserror::Error; @@ -27,6 +28,18 @@ pub enum Error { StorageUnavailable, #[error("The entry acquired has been compacted")] StorageCompacted, + #[error("write wal failed")] + Wal, + #[error("wal-thread has stopped")] + Stop, + #[error("wal-channel has full")] + Full, +} + +impl From for Error { + fn from(c: Canceled) -> Error { + Error::Wal + } } pub type Result = ::std::result::Result; diff --git a/src/lib.rs b/src/lib.rs index 443abdf4..18266a6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ mod memtable; mod pipe_log; mod purge; mod util; +mod wal; use crate::pipe_log::PipeLog; diff --git a/src/pipe_log.rs b/src/pipe_log.rs index df626864..06ed14a2 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -43,13 +43,18 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Read some bytes from the given position. fn fread(&self, queue: LogQueue, file_num: u64, offset: u64, len: u64) -> Result>; + /// Check whether the size of current write log has reached the rotate limit. + fn switch_log_file( + &self, + queue: LogQueue, + ) -> Result<(u64, Arc)>; + /// Append some bytes to the given queue. fn append( &self, queue: LogQueue, content: &[u8], - sync: &mut bool, - ) -> Result<(u64, u64, Arc)>; + ) -> Result; /// Close the pipe log. fn close(&self) -> Result<()>; @@ -95,8 +100,6 @@ pub trait GenericPipeLog: Sized + Clone + Send { fn latest_file_before(&self, queue: LogQueue, size: usize) -> u64; fn file_len(&self, queue: LogQueue, file_num: u64) -> u64; - - fn cache_submitor(&self) -> MutexGuard; } pub struct LogFd(RawFd); @@ -105,6 +108,9 @@ impl LogFd { fn close(&self) -> Result<()> { close(self.0).map_err(|e| parse_nix_error(e, "close")) } + pub fn sync(&self) -> Result<()> { + fsync(self.0).map_err(|e| parse_nix_error(e, "fsync")) + } } impl Drop for LogFd { @@ -118,7 +124,6 @@ impl Drop for LogFd { struct LogManager { dir: String, rotate_size: usize, - bytes_per_sync: usize, name_suffix: &'static str, pub first_file_num: u64, @@ -168,7 +173,7 @@ impl LogManager { let mut path = PathBuf::from(&self.dir); path.push(logs.last().unwrap()); let fd = Arc::new(LogFd(open_active_file(&path)?)); - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + fd.sync()?; self.active_log_size = file_len(fd.0)?; self.active_log_capacity = self.active_log_size; @@ -183,7 +188,7 @@ impl LogManager { self.truncate_active_log(None)?; } if let Some(last_file) = self.all_files.back() { - fsync(last_file.0).map_err(|e| parse_nix_error(e, "fsync"))?; + last_file.sync()?; } self.active_file_num += 1; @@ -215,7 +220,7 @@ impl LogManager { // After truncate to 0, write header is necessary. offset = write_file_header(active_fd.0)?; } - fsync(active_fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + active_fd.sync()?; self.active_log_size = offset; self.active_log_capacity = offset; self.last_sync_size = self.active_log_size; @@ -243,16 +248,8 @@ impl LogManager { Ok(end_offset) } - fn reach_sync_limit(&self) -> bool { - self.active_log_size - self.last_sync_size >= self.bytes_per_sync - } - - fn on_append(&mut self, content_len: usize, sync: &mut bool) -> Result<(u64, u64, Arc)> { - if self.active_log_size >= self.rotate_size { - self.new_log_file()?; - } + fn on_append(&mut self, content_len: usize) -> Result<(u64, Arc)> { - let active_file_num = self.active_file_num; let active_log_size = self.active_log_size; let fd = self.get_active_fd().unwrap(); @@ -273,11 +270,7 @@ impl LogManager { self.active_log_capacity += alloc_size; } - if *sync || self.reach_sync_limit() { - self.last_sync_size = self.active_log_size; - *sync = true; - } - Ok((active_file_num, active_log_size as u64, fd)) + Ok((active_log_size as u64, fd)) } } @@ -295,11 +288,10 @@ pub struct PipeLog { appender: Arc>, rewriter: Arc>, - cache_submitor: Arc>, } impl PipeLog { - fn new(cfg: &Config, cache_submitor: CacheSubmitor) -> PipeLog { + fn new(cfg: &Config) -> PipeLog { let appender = Arc::new(RwLock::new(LogManager::new(&cfg, LOG_SUFFIX))); let rewriter = Arc::new(RwLock::new(LogManager::new(&cfg, REWRITE_SUFFIX))); PipeLog { @@ -308,11 +300,10 @@ impl PipeLog { bytes_per_sync: cfg.bytes_per_sync.0 as usize, appender, rewriter, - cache_submitor: Arc::new(Mutex::new(cache_submitor)), } } - pub fn open(cfg: &Config, cache_submitor: CacheSubmitor) -> Result { + pub fn open(cfg: &Config) -> Result { let path = Path::new(&cfg.dir); if !path.exists() { info!("Create raft log directory: {}", &cfg.dir); @@ -322,7 +313,7 @@ impl PipeLog { return Err(box_err!("Not directory: {}", &cfg.dir)); } - let pipe_log = PipeLog::new(cfg, cache_submitor); + let pipe_log = PipeLog::new(cfg); let (mut min_file_num, mut max_file_num): (u64, u64) = (u64::MAX, 0); let (mut min_rewrite_num, mut max_rewrite_num): (u64, u64) = (u64::MAX, 0); @@ -422,19 +413,30 @@ impl GenericPipeLog for PipeLog { pread_exact(fd.0, offset, len as usize) } + fn switch_log_file( + &self, + queue: LogQueue, + ) -> Result<(u64, Arc)> { + let mut writer = self.mut_queue(queue); + if writer.active_log_size >= self.rotate_size { + writer.new_log_file()?; + } + let fd = writer.get_active_fd().unwrap(); + + Ok((writer.active_file_num, fd)) + } + fn append( &self, queue: LogQueue, content: &[u8], - sync: &mut bool, - ) -> Result<(u64, u64, Arc)> { - let (file_num, offset, fd) = self.mut_queue(queue).on_append(content.len(), sync)?; + ) -> Result { + let (offset, fd) = self.mut_queue(queue).on_append(content.len())?; pwrite_exact(fd.0, offset, content)?; - Ok((file_num, offset, fd)) + Ok(offset) } fn close(&self) -> Result<()> { - let _write_lock = self.cache_submitor.lock().unwrap(); self.appender.wl().truncate_active_log(None)?; self.rewriter.wl().truncate_active_log(None)?; Ok(()) @@ -449,23 +451,17 @@ impl GenericPipeLog for PipeLog { let mut entries_size = 0; if let Some(content) = batch.encode_to_bytes(&mut entries_size) { // TODO: `pwrite` is performed in the mutex. Is it possible for concurrence? - let mut cache_submitor = self.cache_submitor.lock().unwrap(); - let (cur_file_num, offset, fd) = self.append(LogQueue::Append, &content, &mut sync)?; - let tracker = cache_submitor.get_cache_tracker(cur_file_num, entries_size); + let (cur_file_num, fd) = self.switch_log_file(LogQueue::Append)?; + let tracker = None; + let offset = self.append(LogQueue::Append, &content)?; for item in &batch.items { if let LogItemContent::Entries(ref entries) = item.content { entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); - entries.entries.last().map(|e| { - cache_submitor.fill_cache(item.raft_group_id, W::index(e)); - }); } } - - drop(cache_submitor); if sync { fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; } - *file_num = cur_file_num; return Ok(content.len()); } @@ -480,9 +476,10 @@ impl GenericPipeLog for PipeLog { ) -> Result { let mut encoded_size = 0; if let Some(content) = batch.encode_to_bytes(&mut encoded_size) { - let (cur_file_num, offset, fd) = self.append(LogQueue::Rewrite, &content, &mut sync)?; + let (cur_file_num, fd) = self.switch_log_file(LogQueue::Rewrite)?; + let offset = self.append(LogQueue::Rewrite, &content)?; if sync { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + fd.sync()?; } for item in &batch.items { if let LogItemContent::Entries(ref entries) = item.content { @@ -501,7 +498,7 @@ impl GenericPipeLog for PipeLog { fn sync(&self, queue: LogQueue) -> Result<()> { if let Some(fd) = self.get_queue(queue).get_active_fd() { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + fd.sync()?; } Ok(()) } @@ -566,10 +563,6 @@ impl GenericPipeLog for PipeLog { let fd = self.get_queue(queue).get_fd(file_num).unwrap(); file_len(fd.0).unwrap() as u64 } - - fn cache_submitor(&self) -> MutexGuard { - self.cache_submitor.lock().unwrap() - } } fn generate_file_name(file_num: u64, suffix: &'static str) -> String { diff --git a/src/wal.rs b/src/wal.rs new file mode 100644 index 00000000..6a4367e6 --- /dev/null +++ b/src/wal.rs @@ -0,0 +1,109 @@ +use std::sync::{Arc, RwLock}; +use std::sync::atomic::{Ordering, AtomicUsize}; +use std::sync::mpsc::{Receiver, RecvError, TryRecvError}; +use futures::channel::oneshot::Sender; + +use crate::cache_evict::CacheSubmitor; +use crate::log_batch::{LogBatch, EntryExt, LogItem, LogItemContent}; +use crate::engine::MemTableAccessor; +use crate::pipe_log::{LogQueue, GenericPipeLog}; +use crate::errors::Result; +use protobuf::Message; +use log::{info, warn}; + +pub struct WriteTask { + pub content: Vec, + pub group_infos: Vec<(u64, u64)>, + pub entries_size: usize, + pub sync: bool, + pub sender: Sender<(u64, u64, Option>)>, +} + +pub enum LogMsg { + Write(WriteTask), + Stop +} + +pub struct WalRunner + where + E: Message + Clone, + W: EntryExt, + P: GenericPipeLog, +{ + cache_submitter: CacheSubmitor, + pipe_log: P, + receiver: Receiver, +} + +impl WalRunner + where + E: Message + Clone, + W: EntryExt, + P: GenericPipeLog, +{ + pub fn new( + cache_submitter: CacheSubmitor, + pipe_log: P, + receiver: Receiver, + ) -> WalRunner { + WalRunner { + pipe_log, + cache_submitter, + receiver, + } + } +} + + +impl WalRunner + where + E: Message + Clone, + W: EntryExt, + P: GenericPipeLog, +{ + pub fn run(&mut self) -> Result<()>{ + let mut write_ret = vec![]; + let mut sync = false; + let mut entries_size = 0; + let mut run = false; + while run { + let mut task = match self.receiver.recv().unwrap() { + LogMsg::Write(task) => task, + LogMsg::Stop => return Ok(()), + }; + sync = task.sync; + let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap() + let offset = self.pipe_log.append(LogQueue::Append, &task.content).unwrap(); + write_ret.push((offset, task.sender)); + entries_size = task.entries_size; + let tracker= self.cache_submitter.get_cache_tracker(file_num); + self.cache_submitter.fill_cache(task.entries_size, &mut task.group_infos); + while let Ok(msg) = self.receiver.try_recv() { + let mut task = match msg { + LogMsg::Write(task) => task, + LogMsg::Stop => { + run = false; + break; + }, + }; + if task.sync { + sync = true; + } + self.cache_submitter.fill_cache(task.entries_size, &mut task.group_infos); + let offset = self.pipe_log.append(LogQueue::Append, &task.content).unwrap(); + write_ret.push((offset, task.sender)); + } + if sync { + if let Err(e) = fd.sync() { + warn!("write wal failed because of: {} ", e); + write_ret.clear(); + } + sync = false; + } + for (offset, sender) in write_ret.drain(..) { + let _ = sender.send((file_num, offset, tracker.clone())); + } + } + Ok(()) + } +} From 500dfd8e7eece3283d2007cdae2fa9fbe6f8440d Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 24 Sep 2020 20:02:46 +0800 Subject: [PATCH 04/17] fix type Signed-off-by: Little-Wallace --- src/cache_evict.rs | 26 ++++----------- src/engine.rs | 62 +++++++++++++++++++++++------------ src/errors.rs | 4 +-- src/pipe_log.rs | 33 ++++--------------- src/wal.rs | 80 ++++++++++++++++++---------------------------- 5 files changed, 88 insertions(+), 117 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 6c87de00..9bc834da 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -4,7 +4,6 @@ use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use crossbeam::channel::{bounded, Sender}; use protobuf::Message; use crate::engine::{MemTableAccessor, SharedCacheStats}; @@ -21,7 +20,7 @@ pub struct CacheSubmitor { size_tracker: Arc, group_infos: Vec<(u64, u64)>, - scheduler: Scheduler, + scheduler: Scheduler, cache_limit: usize, cache_stats: Arc, block_on_full: bool, @@ -30,7 +29,7 @@ pub struct CacheSubmitor { impl CacheSubmitor { pub fn new( cache_limit: usize, - scheduler: Scheduler, + scheduler: Scheduler, cache_stats: Arc, ) -> Self { CacheSubmitor { @@ -65,7 +64,7 @@ impl CacheSubmitor { size_tracker: self.size_tracker.clone(), group_infos, }; - let _ = self.scheduler.schedule(CacheTask::NewChunk(task)); + let _ = self.scheduler.schedule(task); } self.reset(file_num); } @@ -159,20 +158,13 @@ where } } -impl Runnable for Runner +impl Runnable for Runner where E: Message + Clone, W: EntryExt + 'static, { - fn run(&mut self, task: CacheTask) -> bool { - match task { - CacheTask::NewChunk(chunk) => self.valid_cache_chunks.push_back(chunk), - CacheTask::EvictOldest(tx) => { - assert!(self.evict_oldest_cache()); - let _ = tx.send(()); - return false; - } - } + fn run(&mut self, chunk: CacheChunk) -> bool { + self.valid_cache_chunks.push_back(chunk); true } fn on_tick(&mut self) { @@ -183,12 +175,6 @@ where } } -#[derive(Clone)] -pub enum CacheTask { - NewChunk(CacheChunk), - EvictOldest(Sender<()>), -} - #[derive(Clone)] pub struct CacheChunk { file_num: u64, diff --git a/src/engine.rs b/src/engine.rs index 5b5e6359..b50f5491 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,10 +1,10 @@ use std::io::BufRead; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, RwLock}; -use std::sync::mpsc::{channel, Sender, SendError}; +use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; use std::{fmt, u64}; -use std::thread::{Builder as ThreadBuilder, JoinHandle}; use futures::channel as future_channel; use futures::executor::block_on; @@ -12,16 +12,19 @@ use futures::executor::block_on; use log::{info, warn}; use protobuf::Message; -use crate::cache_evict::{CacheSubmitor, CacheTask, Runner as CacheEvictRunner}; +use crate::cache_evict::{CacheChunk, CacheSubmitor, Runner as CacheEvictRunner}; use crate::config::{Config, RecoveryMode}; -use crate::log_batch::{self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN, HEADER_LEN, LogItem}; +use crate::errors::Error; +use crate::log_batch::{ + self, Command, CompressionType, EntryExt, LogBatch, LogItem, LogItemContent, OpType, + CHECKSUM_LEN, HEADER_LEN, +}; use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; use crate::purge::PurgeManager; use crate::util::{HandyRwLock, HashMap, Worker}; +use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, Result}; -use crate::wal::{WriteTask, LogMsg, WalRunner}; -use crate::errors::Error; const SLOTS_COUNT: usize = 128; @@ -177,14 +180,17 @@ where } } - if let Some(tracker) = cache_submitor - .get_cache_tracker(file_num) - { + if let Some(tracker) = cache_submitor.get_cache_tracker(file_num) { + let mut group_infos = vec![]; for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { entries.attach_cache_tracker(tracker.clone()); + entries.entries.last().map(|e| { + group_infos.push((item.raft_group_id, W::index(e))) + }); } } + cache_submitor.fill_cache(encoded_size, &mut group_infos); } apply_to_memtable(memtables, &mut log_batch, queue, file_num); offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; @@ -228,7 +234,10 @@ where let mut group_infos = vec![]; for item in log_batch.items.iter() { if let LogItemContent::Entries(ref entries) = item.content { - group_infos.push((item.raft_group_id, W::index(entries.entries.last().unwrap()))); + group_infos.push(( + item.raft_group_id, + W::index(entries.entries.last().unwrap()), + )); } } if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { @@ -241,7 +250,9 @@ where group_infos, sender, }; - self.wal_sender.send(LogMsg::Write(task)).map_err(|_| Error::Stop)?; + self.wal_sender + .send(LogMsg::Write(task)) + .map_err(|_| Error::Stop)?; let (file_num, offset, tracker) = r.await?; if file_num > 0 { for item in log_batch.items.drain(..) { @@ -308,8 +319,8 @@ impl SharedCacheStats { } struct Workers { - cache_evict: Worker, - wal: JoinHandle<()>, + cache_evict: Worker, + wal: Option>, } impl Engine @@ -323,10 +334,7 @@ where let mut cache_evict_worker = Worker::new("cache_evict".to_owned(), None); - let pipe_log = PipeLog::open( - &cfg, - ) - .expect("Open raft log"); + let pipe_log = PipeLog::open(&cfg).expect("Open raft log"); let memtables = { let stats = cache_stats.clone(); @@ -357,7 +365,13 @@ where &mut cache_submitor, RecoveryMode::TolerateCorruptedTailRecords, )?; - Engine::recover(LogQueue::Append, &pipe_log, &memtables, &mut cache_submitor, recovery_mode)?; + Engine::recover( + LogQueue::Append, + &pipe_log, + &memtables, + &mut cache_submitor, + recovery_mode, + )?; cache_submitor.nonblock_on_full(); let cfg = Arc::new(cfg); @@ -367,7 +381,11 @@ where let mut wal_runner = WalRunner::new(cache_submitor, pipe_log.clone(), wal_receiver); let th = ThreadBuilder::new() .name("wal".to_string()) - .spawn(move || wal_runner.run().unwrap_or_else(|e| warn!("run error because: {}", e))) + .spawn(move || { + wal_runner + .run() + .unwrap_or_else(|e| warn!("run error because: {}", e)) + }) .unwrap(); Ok(Engine { @@ -379,7 +397,7 @@ where wal_sender, workers: Arc::new(RwLock::new(Workers { cache_evict: cache_evict_worker, - wal: th, + wal: Some(th), })), }) } @@ -533,7 +551,11 @@ where /// Stop background thread which will keep trying evict caching. pub fn stop(&self) { let mut workers = self.workers.wl(); + self.wal_sender.send(LogMsg::Stop).unwrap(); workers.cache_evict.stop(); + if let Some(wal) = workers.wal.take() { + wal.join().unwrap(); + } } } diff --git a/src/errors.rs b/src/errors.rs index 87ff1762..61d0420e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ +use futures::channel::oneshot::Canceled; use std::error; use std::io::Error as IoError; -use futures::channel::oneshot::Canceled; use thiserror::Error; @@ -37,7 +37,7 @@ pub enum Error { } impl From for Error { - fn from(c: Canceled) -> Error { + fn from(_: Canceled) -> Error { Error::Wal } } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 06ed14a2..524a4d6d 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -3,7 +3,7 @@ use std::fs::{self, File}; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read}; use std::os::unix::io::RawFd; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{cmp, u64}; use log::{info, warn}; @@ -15,7 +15,6 @@ use nix::unistd::{close, fsync, ftruncate, lseek, Whence}; use nix::NixPath; use protobuf::Message; -use crate::cache_evict::CacheSubmitor; use crate::config::Config; use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; use crate::util::HandyRwLock; @@ -44,17 +43,10 @@ pub trait GenericPipeLog: Sized + Clone + Send { fn fread(&self, queue: LogQueue, file_num: u64, offset: u64, len: u64) -> Result>; /// Check whether the size of current write log has reached the rotate limit. - fn switch_log_file( - &self, - queue: LogQueue, - ) -> Result<(u64, Arc)>; + fn switch_log_file(&self, queue: LogQueue) -> Result<(u64, Arc)>; /// Append some bytes to the given queue. - fn append( - &self, - queue: LogQueue, - content: &[u8], - ) -> Result; + fn append(&self, queue: LogQueue, content: &[u8]) -> Result; /// Close the pipe log. fn close(&self) -> Result<()>; @@ -123,7 +115,6 @@ impl Drop for LogFd { struct LogManager { dir: String, - rotate_size: usize, name_suffix: &'static str, pub first_file_num: u64, @@ -140,8 +131,6 @@ impl LogManager { fn new(cfg: &Config, name_suffix: &'static str) -> Self { Self { dir: cfg.dir.clone(), - rotate_size: cfg.target_file_size.0 as usize, - bytes_per_sync: cfg.bytes_per_sync.0 as usize, name_suffix, first_file_num: INIT_FILE_NUM, @@ -249,7 +238,6 @@ impl LogManager { } fn on_append(&mut self, content_len: usize) -> Result<(u64, Arc)> { - let active_log_size = self.active_log_size; let fd = self.get_active_fd().unwrap(); @@ -413,10 +401,7 @@ impl GenericPipeLog for PipeLog { pread_exact(fd.0, offset, len as usize) } - fn switch_log_file( - &self, - queue: LogQueue, - ) -> Result<(u64, Arc)> { + fn switch_log_file(&self, queue: LogQueue) -> Result<(u64, Arc)> { let mut writer = self.mut_queue(queue); if writer.active_log_size >= self.rotate_size { writer.new_log_file()?; @@ -426,11 +411,7 @@ impl GenericPipeLog for PipeLog { Ok((writer.active_file_num, fd)) } - fn append( - &self, - queue: LogQueue, - content: &[u8], - ) -> Result { + fn append(&self, queue: LogQueue, content: &[u8]) -> Result { let (offset, fd) = self.mut_queue(queue).on_append(content.len())?; pwrite_exact(fd.0, offset, content)?; Ok(offset) @@ -445,7 +426,7 @@ impl GenericPipeLog for PipeLog { fn write>( &self, batch: &LogBatch, - mut sync: bool, + sync: bool, file_num: &mut u64, ) -> Result { let mut entries_size = 0; @@ -471,7 +452,7 @@ impl GenericPipeLog for PipeLog { fn rewrite>( &self, batch: &LogBatch, - mut sync: bool, + sync: bool, file_num: &mut u64, ) -> Result { let mut encoded_size = 0; diff --git a/src/wal.rs b/src/wal.rs index 6a4367e6..1894ad40 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,15 +1,12 @@ -use std::sync::{Arc, RwLock}; -use std::sync::atomic::{Ordering, AtomicUsize}; -use std::sync::mpsc::{Receiver, RecvError, TryRecvError}; use futures::channel::oneshot::Sender; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::Receiver; +use std::sync::Arc; use crate::cache_evict::CacheSubmitor; -use crate::log_batch::{LogBatch, EntryExt, LogItem, LogItemContent}; -use crate::engine::MemTableAccessor; -use crate::pipe_log::{LogQueue, GenericPipeLog}; use crate::errors::Result; -use protobuf::Message; -use log::{info, warn}; +use crate::pipe_log::{GenericPipeLog, LogQueue}; +use log::warn; pub struct WriteTask { pub content: Vec, @@ -21,31 +18,21 @@ pub struct WriteTask { pub enum LogMsg { Write(WriteTask), - Stop + Stop, } -pub struct WalRunner - where - E: Message + Clone, - W: EntryExt, - P: GenericPipeLog, -{ +pub struct WalRunner { cache_submitter: CacheSubmitor, pipe_log: P, receiver: Receiver, } -impl WalRunner - where - E: Message + Clone, - W: EntryExt, - P: GenericPipeLog, -{ +impl WalRunner

{ pub fn new( cache_submitter: CacheSubmitor, pipe_log: P, receiver: Receiver, - ) -> WalRunner { + ) -> WalRunner

{ WalRunner { pipe_log, cache_submitter, @@ -54,43 +41,39 @@ impl WalRunner } } - -impl WalRunner - where - E: Message + Clone, - W: EntryExt, - P: GenericPipeLog, +impl

WalRunner

+where + P: GenericPipeLog, { - pub fn run(&mut self) -> Result<()>{ + pub fn run(&mut self) -> Result<()> { let mut write_ret = vec![]; - let mut sync = false; - let mut entries_size = 0; - let mut run = false; - while run { - let mut task = match self.receiver.recv().unwrap() { - LogMsg::Write(task) => task, - LogMsg::Stop => return Ok(()), - }; - sync = task.sync; - let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap() - let offset = self.pipe_log.append(LogQueue::Append, &task.content).unwrap(); + while let Ok(LogMsg::Write(mut task)) = self.receiver.recv() { + let mut sync = task.sync; + let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap(); + let offset = self + .pipe_log + .append(LogQueue::Append, &task.content) + .unwrap(); write_ret.push((offset, task.sender)); - entries_size = task.entries_size; - let tracker= self.cache_submitter.get_cache_tracker(file_num); - self.cache_submitter.fill_cache(task.entries_size, &mut task.group_infos); + let tracker = self.cache_submitter.get_cache_tracker(file_num); + self.cache_submitter + .fill_cache(task.entries_size, &mut task.group_infos); while let Ok(msg) = self.receiver.try_recv() { let mut task = match msg { LogMsg::Write(task) => task, LogMsg::Stop => { - run = false; - break; - }, + return Ok(()); + } }; if task.sync { sync = true; } - self.cache_submitter.fill_cache(task.entries_size, &mut task.group_infos); - let offset = self.pipe_log.append(LogQueue::Append, &task.content).unwrap(); + self.cache_submitter + .fill_cache(task.entries_size, &mut task.group_infos); + let offset = self + .pipe_log + .append(LogQueue::Append, &task.content) + .unwrap(); write_ret.push((offset, task.sender)); } if sync { @@ -98,7 +81,6 @@ impl WalRunner warn!("write wal failed because of: {} ", e); write_ret.clear(); } - sync = false; } for (offset, sender) in write_ret.drain(..) { let _ = sender.send((file_num, offset, tracker.clone())); From 7123b53c3f8e70732fe11e7eb1dfee762df62ecc Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 25 Sep 2020 16:49:07 +0800 Subject: [PATCH 05/17] revert evict cache change Signed-off-by: Little-Wallace --- src/cache_evict.rs | 132 +++++++++++++++++++++++++++++++++++---------- src/engine.rs | 31 ++++------- src/pipe_log.rs | 4 +- src/wal.rs | 13 ++--- 4 files changed, 120 insertions(+), 60 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 9bc834da..7a5f9d4b 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -4,12 +4,16 @@ use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use crossbeam::channel::{bounded, Sender}; use protobuf::Message; use crate::engine::{MemTableAccessor, SharedCacheStats}; -use crate::log_batch::EntryExt; +use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; +use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::{HandyRwLock, Runnable, Scheduler}; +pub const DEFAULT_CACHE_CHUNK_SIZE: usize = 4 * 1024 * 1024; + const HIGH_WATER_RATIO: f64 = 0.9; const LOW_WATER_RATIO: f64 = 0.8; const CHUNKS_SHRINK_TO: usize = 1024; @@ -17,11 +21,15 @@ const CHUNKS_SHRINK_TO: usize = 1024; /// Used in `PipLog` to emit `CacheTask::NewChunk` tasks. pub struct CacheSubmitor { file_num: u64, + offset: u64, + // `chunk_size` is different from `size_tracker`. For a given chunk, + // the former is monotomically increasing, but the latter can decrease. + chunk_size: usize, size_tracker: Arc, - group_infos: Vec<(u64, u64)>, - scheduler: Scheduler, + scheduler: Scheduler, cache_limit: usize, + chunk_limit: usize, cache_stats: Arc, block_on_full: bool, } @@ -29,15 +37,18 @@ pub struct CacheSubmitor { impl CacheSubmitor { pub fn new( cache_limit: usize, - scheduler: Scheduler, + chunk_limit: usize, + scheduler: Scheduler, cache_stats: Arc, ) -> Self { CacheSubmitor { file_num: 0, - group_infos: vec![], + offset: 0, + chunk_size: 0, size_tracker: Arc::new(AtomicUsize::new(0)), scheduler, cache_limit, + chunk_limit, cache_stats, block_on_full: false, } @@ -51,70 +62,95 @@ impl CacheSubmitor { self.block_on_full = false; } - pub fn get_cache_tracker(&mut self, file_num: u64) -> Option> { + pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option> { if self.cache_limit == 0 { return None; } - if self.file_num != file_num { + + if self.file_num == 0 { + self.file_num = file_num; + self.offset = offset; + } + + if self.chunk_size >= self.chunk_limit || self.file_num < file_num { // If all entries are released from cache, the chunk can be ignored. if self.size_tracker.load(Ordering::Relaxed) > 0 { - let group_infos = std::mem::replace(&mut self.group_infos, vec![]); - let task = CacheChunk { + let mut task = CacheChunk { file_num: self.file_num, + base_offset: self.offset, + end_offset: offset, size_tracker: self.size_tracker.clone(), - group_infos, }; - let _ = self.scheduler.schedule(task); + if file_num != self.file_num { + // Log file is switched, use `u64::MAX` as the end. + task.end_offset = u64::MAX; + } + let _ = self.scheduler.schedule(CacheTask::NewChunk(task)); + } + self.reset(file_num, offset); + } + + if self.block_on_full { + let cache_size = self.cache_stats.cache_size(); + if cache_size > self.cache_limit { + let (tx, rx) = bounded(1); + if self.scheduler.schedule(CacheTask::EvictOldest(tx)).is_ok() { + let _ = rx.recv(); + } } - self.reset(file_num); } Some(self.size_tracker.clone()) } - pub fn fill_cache(&mut self, size: usize, group_infos: &mut Vec<(u64, u64)>) { - if self.cache_limit != 0 { - self.size_tracker.fetch_add(size, Ordering::Release); - self.cache_stats.add_mem_change(size); - self.group_infos.append(group_infos); - } + pub fn fill_chunk(&mut self, size: usize) { + self.chunk_size += size; + self.size_tracker.fetch_add(size, Ordering::Release); + self.cache_stats.add_mem_change(size); } - fn reset(&mut self, file_num: u64) { + fn reset(&mut self, file_num: u64, offset: u64) { self.file_num = file_num; + self.offset = offset; + self.chunk_size = 0; self.size_tracker = Arc::new(AtomicUsize::new(0)); } } -pub struct Runner +pub struct Runner where E: Message + Clone, W: EntryExt + 'static, + P: GenericPipeLog, { cache_limit: usize, cache_stats: Arc, chunk_limit: usize, valid_cache_chunks: VecDeque, memtables: MemTableAccessor, + pipe_log: P, } -impl Runner +impl Runner where E: Message + Clone, W: EntryExt + 'static, + P: GenericPipeLog, { pub fn new( cache_limit: usize, cache_stats: Arc, chunk_limit: usize, memtables: MemTableAccessor, - ) -> Runner { + pipe_log: P, + ) -> Runner { Runner { cache_limit, cache_stats, chunk_limit, valid_cache_chunks: Default::default(), memtables, + pipe_log, } } @@ -148,9 +184,32 @@ where _ => return false, }; - for (group_id, index) in chunk.group_infos { - if let Some(memtable) = self.memtables.get(group_id) { - memtable.wl().compact_cache_to(index); + let file_num = chunk.file_num; + let read_len = if chunk.end_offset == u64::MAX { + self.pipe_log.file_len(LogQueue::Append, file_num) - chunk.base_offset + } else { + chunk.end_offset - chunk.base_offset + }; + let chunk_content = self + .pipe_log + .fread(LogQueue::Append, file_num, chunk.base_offset, read_len) + .unwrap(); + + let mut reader: &[u8] = chunk_content.as_ref(); + let mut offset = chunk.base_offset; + while let Some(b) = LogBatch::::from_bytes(&mut reader, file_num, offset).unwrap() + { + offset += read_len - reader.len() as u64; + for item in b.items { + if let LogItemContent::Entries(entries) = item.content { + let gc_cache_to = match entries.entries.last() { + Some(entry) => W::index(entry) + 1, + None => continue, + }; + if let Some(memtable) = self.memtables.get(item.raft_group_id) { + memtable.wl().compact_cache_to(gc_cache_to); + } + } } } } @@ -158,13 +217,21 @@ where } } -impl Runnable for Runner +impl Runnable for Runner where E: Message + Clone, W: EntryExt + 'static, + P: GenericPipeLog, { - fn run(&mut self, chunk: CacheChunk) -> bool { - self.valid_cache_chunks.push_back(chunk); + fn run(&mut self, task: CacheTask) -> bool { + match task { + CacheTask::NewChunk(chunk) => self.valid_cache_chunks.push_back(chunk), + CacheTask::EvictOldest(tx) => { + assert!(self.evict_oldest_cache()); + let _ = tx.send(()); + return false; + } + } true } fn on_tick(&mut self) { @@ -175,11 +242,18 @@ where } } +#[derive(Clone)] +pub enum CacheTask { + NewChunk(CacheChunk), + EvictOldest(Sender<()>), +} + #[derive(Clone)] pub struct CacheChunk { file_num: u64, + base_offset: u64, + end_offset: u64, size_tracker: Arc, - group_infos: Vec<(u64, u64)>, } #[derive(Clone)] diff --git a/src/engine.rs b/src/engine.rs index b50f5491..41ef19e0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -12,7 +12,9 @@ use futures::executor::block_on; use log::{info, warn}; use protobuf::Message; -use crate::cache_evict::{CacheChunk, CacheSubmitor, Runner as CacheEvictRunner}; +use crate::cache_evict::{ + CacheSubmitor, CacheTask, Runner as CacheEvictRunner, DEFAULT_CACHE_CHUNK_SIZE, +}; use crate::config::{Config, RecoveryMode}; use crate::errors::Error; use crate::log_batch::{ @@ -180,17 +182,12 @@ where } } - if let Some(tracker) = cache_submitor.get_cache_tracker(file_num) { - let mut group_infos = vec![]; + if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { entries.attach_cache_tracker(tracker.clone()); - entries.entries.last().map(|e| { - group_infos.push((item.raft_group_id, W::index(e))) - }); } } - cache_submitor.fill_cache(encoded_size, &mut group_infos); } apply_to_memtable(memtables, &mut log_batch, queue, file_num); offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; @@ -231,15 +228,6 @@ where // forbid concurrent `append` to remove locks here. async fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { let mut entries_size = 0; - let mut group_infos = vec![]; - for item in log_batch.items.iter() { - if let LogItemContent::Entries(ref entries) = item.content { - group_infos.push(( - item.raft_group_id, - W::index(entries.entries.last().unwrap()), - )); - } - } if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { let (sender, r) = future_channel::oneshot::channel(); let bytes = content.len(); @@ -247,7 +235,6 @@ where content, sync, entries_size, - group_infos, sender, }; self.wal_sender @@ -319,7 +306,7 @@ impl SharedCacheStats { } struct Workers { - cache_evict: Worker, + cache_evict: Worker, wal: Option>, } @@ -328,7 +315,7 @@ where E: Message + Clone, W: EntryExt + 'static, { - fn new_impl(cfg: Config) -> Result> { + fn new_impl(cfg: Config, chunk_limit: usize) -> Result> { let cache_limit = cfg.cache_limit.0 as usize; let cache_stats = Arc::new(SharedCacheStats::default()); @@ -346,14 +333,16 @@ where let cache_evict_runner = CacheEvictRunner::new( cache_limit, cache_stats.clone(), - cfg.target_file_size.0 as usize, + chunk_limit, memtables.clone(), + pipe_log.clone(), ); cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); let recovery_mode = cfg.recovery_mode; let mut cache_submitor = CacheSubmitor::new( cache_limit, + chunk_limit, cache_evict_worker.scheduler(), cache_stats.clone(), ); @@ -403,7 +392,7 @@ where } pub fn new(cfg: Config) -> Engine { - Self::new_impl(cfg).unwrap() + Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap() } } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 524a4d6d..1a05210a 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -663,8 +663,8 @@ mod tests { let mut worker = Worker::new("test".to_owned(), None); let stats = Arc::new(SharedCacheStats::default()); - let submitor = CacheSubmitor::new(usize::MAX, 4096, worker.scheduler(), stats); - let log = PipeLog::open(&cfg, submitor).unwrap(); + let submitor = CacheSubmitor::new(4096, worker.scheduler(), stats); + let log = PipeLog::open(&cfg).unwrap(); (log, worker.take_receiver()) } diff --git a/src/wal.rs b/src/wal.rs index 1894ad40..a4cceb7e 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -10,7 +10,6 @@ use log::warn; pub struct WriteTask { pub content: Vec, - pub group_infos: Vec<(u64, u64)>, pub entries_size: usize, pub sync: bool, pub sender: Sender<(u64, u64, Option>)>, @@ -47,7 +46,7 @@ where { pub fn run(&mut self) -> Result<()> { let mut write_ret = vec![]; - while let Ok(LogMsg::Write(mut task)) = self.receiver.recv() { + while let Ok(LogMsg::Write(task)) = self.receiver.recv() { let mut sync = task.sync; let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap(); let offset = self @@ -55,11 +54,10 @@ where .append(LogQueue::Append, &task.content) .unwrap(); write_ret.push((offset, task.sender)); - let tracker = self.cache_submitter.get_cache_tracker(file_num); - self.cache_submitter - .fill_cache(task.entries_size, &mut task.group_infos); + let tracker = self.cache_submitter.get_cache_tracker(file_num, offset); + self.cache_submitter.fill_chunk(task.entries_size); while let Ok(msg) = self.receiver.try_recv() { - let mut task = match msg { + let task = match msg { LogMsg::Write(task) => task, LogMsg::Stop => { return Ok(()); @@ -68,8 +66,7 @@ where if task.sync { sync = true; } - self.cache_submitter - .fill_cache(task.entries_size, &mut task.group_infos); + self.cache_submitter.fill_chunk(task.entries_size); let offset = self .pipe_log .append(LogQueue::Append, &task.content) From 729df17e26f6ab52579609dea51e807645ac933b Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 25 Sep 2020 18:39:23 +0800 Subject: [PATCH 06/17] fix test Signed-off-by: Little-Wallace --- src/engine.rs | 1 + src/pipe_log.rs | 17 +++++++++-------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 41ef19e0..bfac2d7e 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -189,6 +189,7 @@ where } } } + cache_submitor.fill_chunk(encoded_size); apply_to_memtable(memtables, &mut log_batch, queue, file_num); offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 1a05210a..349e52c5 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -647,8 +647,7 @@ mod tests { use tempfile::Builder; use super::*; - use crate::cache_evict::{CacheSubmitor, CacheTask}; - use crate::engine::SharedCacheStats; + use crate::cache_evict::CacheTask; use crate::util::{ReadableSize, Worker}; fn new_test_pipe_log( @@ -662,8 +661,6 @@ mod tests { cfg.target_file_size = ReadableSize(rotate_size as u64); let mut worker = Worker::new("test".to_owned(), None); - let stats = Arc::new(SharedCacheStats::default()); - let submitor = CacheSubmitor::new(4096, worker.scheduler(), stats); let log = PipeLog::open(&cfg).unwrap(); (log, worker.take_receiver()) } @@ -697,12 +694,14 @@ mod tests { // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; - let (file_num, offset, _) = pipe_log.append(queue, &content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &content).unwrap(); assert_eq!(file_num, 1); assert_eq!(offset, header_size); assert_eq!(pipe_log.active_file_num(queue), 1); - let (file_num, offset, _) = pipe_log.append(queue, &content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &content).unwrap(); assert_eq!(file_num, 2); assert_eq!(offset, header_size); assert_eq!(pipe_log.active_file_num(queue), 2); @@ -716,11 +715,13 @@ mod tests { // append position let s_content = b"short content".to_vec(); - let (file_num, offset, _) = pipe_log.append(queue, &s_content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &s_content).unwrap(); assert_eq!(file_num, 3); assert_eq!(offset, header_size); - let (file_num, offset, _) = pipe_log.append(queue, &s_content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &s_content).unwrap(); assert_eq!(file_num, 3); assert_eq!(offset, header_size + s_content.len() as u64); From cc2887579e811359cfd646dd0a77a252807d3291 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 25 Sep 2020 19:28:10 +0800 Subject: [PATCH 07/17] refactor some method Signed-off-by: Little-Wallace --- src/engine.rs | 287 +++++++++++++++++++++++--------------------------- 1 file changed, 131 insertions(+), 156 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index a85e7eab..3f44e5d8 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -137,84 +137,35 @@ where W: EntryExt + 'static, P: GenericPipeLog, { - // Recover from disk. - fn recover( - queue: LogQueue, - pipe_log: &P, - memtables: &MemTableAccessor, - cache_submitor: &mut CacheSubmitor, - recovery_mode: RecoveryMode, - ) -> Result<()> { - // Get first file number and last file number. - let first_file_num = pipe_log.first_file_num(queue); - let active_file_num = pipe_log.active_file_num(queue); - - // Iterate and recover from files one by one. - let start = Instant::now(); - for file_num in first_file_num..=active_file_num { - // Read a file. - let content = pipe_log.read_whole_file(queue, file_num)?; - - // Verify file header. - let mut buf = content.as_slice(); - if !buf.starts_with(FILE_MAGIC_HEADER) { - if file_num != active_file_num { - warn!("Raft log header is corrupted at {:?}.{}", queue, file_num); - return Err(box_err!("Raft log file header is corrupted")); + fn apply_to_memtable(&self, item: LogItem, queue: LogQueue, file_num: u64) { + let memtable = self.memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries = entries_to_add.entries; + let entries_index = entries_to_add.entries_index.into_inner(); + if queue == LogQueue::Rewrite { + memtable.wl().append_rewrite(entries, entries_index); } else { - pipe_log.truncate_active_log(queue, Some(0)).unwrap(); - break; + memtable.wl().append(entries, entries_index); } } - - // Iterate all LogBatch in one file. - let start_ptr = buf.as_ptr(); - buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len()); - let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64; - loop { - match LogBatch::from_bytes(&mut buf, file_num, offset) { - Ok(Some(mut log_batch)) => { - let mut encoded_size = 0; - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - encoded_size += entries.encoded_size.get(); - } - } - - if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - entries.attach_cache_tracker(tracker.clone()); - } - } - } - cache_submitor.fill_chunk(encoded_size); - apply_to_memtable(memtables, &mut log_batch, queue, file_num); - offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; - } - Ok(None) => { - info!("Recovered raft log {:?}.{}.", queue, file_num); - break; - } - Err(e) => { - warn!( - "Raft log content is corrupted at {:?}.{}:{}, error: {}", - queue, file_num, offset, e - ); - // There may be a pre-allocated space at the tail of the active log. - if file_num == active_file_num - && recovery_mode == RecoveryMode::TolerateCorruptedTailRecords - { - pipe_log.truncate_active_log(queue, Some(offset as usize))?; - break; - } - return Err(box_err!("Raft log content is corrupted")); + LogItemContent::Command(Command::Clean) => { + self.memtables.remove(item.raft_group_id); + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.wl().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let (key, value) = (kv.key, kv.value.unwrap()); + match queue { + LogQueue::Append => memtable.wl().put(key, value, file_num), + LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), } } - } + OpType::Del => memtable.wl().delete(kv.key.as_slice()), + }, } - info!("Recover raft log takes {:?}", start.elapsed()); - Ok(()) } // Write a batch needs 3 steps: @@ -246,7 +197,7 @@ where if let LogItemContent::Entries(ref entries) = item.content { entries.update_position(LogQueue::Append, file_num, offset, &tracker); } - apply_item_to_memtable(&self.memtables, item, LogQueue::Append, file_num); + self.apply_to_memtable(item, LogQueue::Append, file_num); } } return Ok(bytes); @@ -276,6 +227,10 @@ where E: Message + Clone, W: EntryExt + 'static, { + pub fn new(cfg: Config) -> Engine { + Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap() + } + fn new_impl(cfg: Config, chunk_limit: usize) -> Result> { let cache_limit = cfg.cache_limit.0 as usize; let global_stats = Arc::new(GlobalStats::default()); @@ -300,35 +255,42 @@ where ); cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); - let recovery_mode = cfg.recovery_mode; let mut cache_submitor = CacheSubmitor::new( cache_limit, chunk_limit, cache_evict_worker.scheduler(), global_stats.clone(), ); + + let cfg = Arc::new(cfg); + let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); + let (wal_sender, wal_receiver) = channel(); + let engine = Engine { + cfg, + memtables, + pipe_log, + global_stats, + purge_manager, + wal_sender, + workers: Arc::new(RwLock::new(Workers { + cache_evict: cache_evict_worker, + wal: None, + })), + }; cache_submitor.block_on_full(); - Engine::recover( - LogQueue::Rewrite, - &pipe_log, - &memtables, + engine.recover( &mut cache_submitor, + LogQueue::Rewrite, RecoveryMode::TolerateCorruptedTailRecords, )?; - Engine::recover( - LogQueue::Append, - &pipe_log, - &memtables, + engine.recover( &mut cache_submitor, - recovery_mode, + LogQueue::Append, + engine.cfg.recovery_mode, )?; cache_submitor.nonblock_on_full(); - let cfg = Arc::new(cfg); - let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); - let (wal_sender, wal_receiver) = channel(); - - let mut wal_runner = WalRunner::new(cache_submitor, pipe_log.clone(), wal_receiver); + let mut wal_runner = WalRunner::new(cache_submitor, engine.pipe_log.clone(), wal_receiver); let th = ThreadBuilder::new() .name("wal".to_string()) .spawn(move || { @@ -337,23 +299,89 @@ where .unwrap_or_else(|e| warn!("run error because: {}", e)) }) .unwrap(); - - Ok(Engine { - cfg, - memtables, - pipe_log, - global_stats, - purge_manager, - wal_sender, - workers: Arc::new(RwLock::new(Workers { - cache_evict: cache_evict_worker, - wal: Some(th), - })), - }) + engine.workers.wl().wal = Some(th); + Ok(engine) } + // Recover from disk. + fn recover( + &self, + cache_submitor: &mut CacheSubmitor, + queue: LogQueue, + recovery_mode: RecoveryMode, + ) -> Result<()> { + // Get first file number and last file number. + let first_file_num = self.pipe_log.first_file_num(queue); + let active_file_num = self.pipe_log.active_file_num(queue); - pub fn new(cfg: Config) -> Engine { - Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap() + // Iterate and recover from files one by one. + let start = Instant::now(); + for file_num in first_file_num..=active_file_num { + // Read a file. + let content = self.pipe_log.read_whole_file(queue, file_num)?; + + // Verify file header. + let mut buf = content.as_slice(); + if !buf.starts_with(FILE_MAGIC_HEADER) { + if file_num != active_file_num { + warn!("Raft log header is corrupted at {:?}.{}", queue, file_num); + return Err(box_err!("Raft log file header is corrupted")); + } else { + self.pipe_log.truncate_active_log(queue, Some(0)).unwrap(); + break; + } + } + + // Iterate all LogBatch in one file. + let start_ptr = buf.as_ptr(); + buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len()); + let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64; + loop { + match LogBatch::::from_bytes(&mut buf, file_num, offset) { + Ok(Some(mut log_batch)) => { + let mut encoded_size = 0; + for item in &log_batch.items { + if let LogItemContent::Entries(ref entries) = item.content { + encoded_size += entries.encoded_size.get(); + } + } + + if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { + for item in &log_batch.items { + if let LogItemContent::Entries(ref entries) = item.content { + entries.attach_cache_tracker(tracker.clone()); + } + } + } + cache_submitor.fill_chunk(encoded_size); + for item in log_batch.items.drain(..) { + self.apply_to_memtable(item, queue, file_num); + } + offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; + } + Ok(None) => { + info!("Recovered raft log {:?}.{}.", queue, file_num); + break; + } + Err(e) => { + warn!( + "Raft log content is corrupted at {:?}.{}:{}, error: {}", + queue, file_num, offset, e + ); + // There may be a pre-allocated space at the tail of the active log. + if file_num == active_file_num + && recovery_mode == RecoveryMode::TolerateCorruptedTailRecords + { + self.pipe_log + .truncate_active_log(queue, Some(offset as usize))?; + break; + } + return Err(box_err!("Raft log content is corrupted")); + } + } + } + } + info!("Recover raft log takes {:?}", start.elapsed()); + Ok(()) } } @@ -568,59 +596,6 @@ where Ok(e) } -fn apply_item_to_memtable( - memtables: &MemTableAccessor, - item: LogItem, - queue: LogQueue, - file_num: u64, -) where - E: Message + Clone, - W: EntryExt, -{ - let memtable = memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index.into_inner(); - if queue == LogQueue::Rewrite { - memtable.wl().append_rewrite(entries, entries_index); - } else { - memtable.wl().append(entries, entries_index); - } - } - LogItemContent::Command(Command::Clean) => { - memtables.remove(item.raft_group_id); - } - LogItemContent::Command(Command::Compact { index }) => { - memtable.wl().compact_to(index); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => { - let (key, value) = (kv.key, kv.value.unwrap()); - match queue { - LogQueue::Append => memtable.wl().put(key, value, file_num), - LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), - } - } - OpType::Del => memtable.wl().delete(kv.key.as_slice()), - }, - } -} - -fn apply_to_memtable( - memtables: &MemTableAccessor, - log_batch: &mut LogBatch, - queue: LogQueue, - file_num: u64, -) where - E: Message + Clone, - W: EntryExt, -{ - for item in log_batch.items.drain(..) { - apply_item_to_memtable(memtables, item, queue, file_num); - } -} - #[cfg(test)] mod tests { use super::*; From 6eb627ed6c78904c813776ce3061ff5e23bcebe2 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 25 Sep 2020 20:01:51 +0800 Subject: [PATCH 08/17] remove note Signed-off-by: Little-Wallace --- src/engine.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 3f44e5d8..36a58cb1 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -168,15 +168,6 @@ where } } - // Write a batch needs 3 steps: - // 1. find all involved raft groups and then lock their memtables; - // 2. append the log batch to pipe log; - // 3. update all involved memtables. - // The lock logic is a little complex. However it's necessary because - // 1. "Inactive log rewrite" needs to keep logs on pipe log order; - // 2. Users can call `append` on one raft group concurrently. - // Maybe we can improve the implement of "inactive log rewrite" and - // forbid concurrent `append` to remove locks here. async fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { let mut entries_size = 0; if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { From 2017cdb46c519224899a56e3545bfeb10ef030f7 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 27 Sep 2020 12:53:13 +0800 Subject: [PATCH 09/17] fix mutable entries index Signed-off-by: Little-Wallace --- src/engine.rs | 12 ++++----- src/log_batch.rs | 64 +++++++++++++++++++++++------------------------- src/pipe_log.rs | 8 +++--- src/purge.rs | 5 ++-- 4 files changed, 43 insertions(+), 46 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 36a58cb1..e1cea450 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -142,7 +142,7 @@ where match item.content { LogItemContent::Entries(entries_to_add) => { let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index.into_inner(); + let entries_index = entries_to_add.entries_index; if queue == LogQueue::Rewrite { memtable.wl().append_rewrite(entries, entries_index); } else { @@ -184,8 +184,8 @@ where .map_err(|_| Error::Stop)?; let (file_num, offset, tracker) = r.await?; if file_num > 0 { - for item in log_batch.items.drain(..) { - if let LogItemContent::Entries(ref entries) = item.content { + for mut item in log_batch.items.drain(..) { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Append, file_num, offset, &tracker); } self.apply_to_memtable(item, LogQueue::Append, file_num); @@ -332,13 +332,13 @@ where let mut encoded_size = 0; for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { - encoded_size += entries.encoded_size.get(); + encoded_size += entries.encoded_size; } } if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.attach_cache_tracker(tracker.clone()); } } diff --git a/src/log_batch.rs b/src/log_batch.rs index 2204564d..7509e8ce 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -1,5 +1,4 @@ use std::borrow::{Borrow, Cow}; -use std::cell::{Cell, RefCell}; use std::io::BufRead; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; @@ -131,9 +130,9 @@ where { pub entries: Vec, // EntryIndex may be update after write to file. - pub entries_index: RefCell>, + pub entries_index: Vec, - pub encoded_size: Cell, + pub encoded_size: usize, } impl PartialEq for Entries { @@ -148,14 +147,14 @@ impl Entries { let (encoded_size, entries_index) = match entries_index { Some(index) => ( index.iter().fold(0, |acc, x| acc + x.len as usize), - RefCell::new(index), + index, ), - None => (0, RefCell::new(vec![EntryIndex::default(); len])), + None => (0, vec![EntryIndex::default(); len]), }; Entries { entries, entries_index, - encoded_size: Cell::new(encoded_size), + encoded_size, } } @@ -190,7 +189,7 @@ impl Entries { Ok(Entries::new(entries, Some(entries_index))) } - pub fn encode_to>(&self, vec: &mut Vec) -> Result<()> { + pub fn encode_to>(&mut self, vec: &mut Vec) -> Result<()> { if self.entries.is_empty() { return Ok(()); } @@ -204,13 +203,12 @@ impl Entries { vec.encode_var_u64(content.len() as u64)?; // file_num = 0 means entry index is not initialized. - let mut entries_index = self.entries_index.borrow_mut(); - if entries_index[i].file_num == 0 { - entries_index[i].index = W::index(&e); + if self.entries_index[i].file_num == 0 { + self.entries_index[i].index = W::index(&e); // This offset doesn't count the header. - entries_index[i].offset = vec.len() as u64; - entries_index[i].len = content.len() as u64; - self.encoded_size.update(|x| x + content.len()); + self.entries_index[i].offset = vec.len() as u64; + self.entries_index[i].len = content.len() as u64; + self.encoded_size += content.len(); } vec.extend_from_slice(&content); @@ -219,13 +217,13 @@ impl Entries { } pub fn update_position( - &self, + &mut self, queue: LogQueue, file_num: u64, base: u64, chunk_size: &Option>, ) { - for idx in self.entries_index.borrow_mut().iter_mut() { + for idx in self.entries_index.iter_mut() { debug_assert!(idx.file_num == 0 && idx.base_offset == 0); idx.queue = queue; idx.file_num = file_num; @@ -239,8 +237,8 @@ impl Entries { } } - pub fn attach_cache_tracker(&self, chunk_size: Arc) { - for idx in self.entries_index.borrow_mut().iter_mut() { + pub fn attach_cache_tracker(&mut self, chunk_size: Arc) { + for idx in self.entries_index.iter_mut() { idx.cache_tracker = Some(CacheTracker { chunk_size: chunk_size.clone(), sub_on_drop: idx.len as usize, @@ -248,8 +246,8 @@ impl Entries { } } - fn update_compression_type(&self, compression_type: CompressionType, batch_len: u64) { - for idx in self.entries_index.borrow_mut().iter_mut() { + fn update_compression_type(&mut self, compression_type: CompressionType, batch_len: u64) { + for idx in self.entries_index.iter_mut() { idx.compression_type = compression_type; idx.batch_len = batch_len; } @@ -403,19 +401,19 @@ impl LogItem { } } - pub fn encode_to>(&self, vec: &mut Vec) -> Result<()> { + pub fn encode_to>(&mut self, vec: &mut Vec) -> Result<()> { // layout = { 8 byte id | 1 byte type | item layout } vec.encode_var_u64(self.raft_group_id)?; - match self.content { - LogItemContent::Entries(ref entries) => { + match &mut self.content { + LogItemContent::Entries(entries) => { vec.push(TYPE_ENTRIES); entries.encode_to::(vec)?; } - LogItemContent::Command(ref command) => { + LogItemContent::Command(command) => { vec.push(TYPE_COMMAND); command.encode_to(vec); } - LogItemContent::Kv(ref kv) => { + LogItemContent::Kv(kv) => { vec.push(TYPE_KV); kv.encode_to(vec)?; } @@ -573,8 +571,8 @@ where assert!(reader.is_empty()); buf.consume(batch_len); - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_compression_type(batch_type, batch_len as u64); } } @@ -583,7 +581,7 @@ where } // TODO: avoid to write a large batch into one compressed chunk. - pub fn encode_to_bytes(&self, encoded_size: &mut usize) -> Option> { + pub fn encode_to_bytes(&mut self, encoded_size: &mut usize) -> Option> { if self.items.is_empty() { return None; } @@ -592,7 +590,7 @@ where let mut vec = Vec::with_capacity(4096); vec.encode_u64(0).unwrap(); vec.encode_var_u64(self.items.len() as u64).unwrap(); - for item in &self.items { + for item in self.items.iter_mut() { item.encode_to::(&mut vec).unwrap(); } @@ -611,9 +609,9 @@ where vec.as_mut_slice().write_u64::(header).unwrap(); let batch_len = (vec.len() - 8) as u64; - for item in &self.items { - if let LogItemContent::Entries(ref entries) = item.content { - *encoded_size += entries.encoded_size.get(); + for item in self.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { + *encoded_size += entries.encoded_size; entries.update_compression_type(compression_type, batch_len as u64); } } @@ -652,11 +650,11 @@ mod tests { fn test_entries_enc_dec() { let pb_entries = vec![Entry::new(); 10]; let file_num = 1; - let entries = Entries::new(pb_entries, None); + let mut entries = Entries::new(pb_entries, None); let mut encoded = vec![]; entries.encode_to::(&mut encoded).unwrap(); - for idx in entries.entries_index.borrow_mut().iter_mut() { + for idx in entries.entries_index.iter_mut() { idx.file_num = file_num; } let mut s = encoded.as_slice(); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 1a80ca0f..e4b498d3 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -54,7 +54,7 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Rewrite a batch into the rewrite queue. fn rewrite>( &self, - batch: &LogBatch, + batch: &mut LogBatch, sync: bool, file_num: &mut u64, ) -> Result; @@ -417,7 +417,7 @@ impl GenericPipeLog for PipeLog { fn rewrite>( &self, - batch: &LogBatch, + batch: &mut LogBatch, sync: bool, file_num: &mut u64, ) -> Result { @@ -428,8 +428,8 @@ impl GenericPipeLog for PipeLog { if sync { fd.sync()?; } - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None); } } diff --git a/src/purge.rs b/src/purge.rs index d6723c1f..1ff95b19 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -142,7 +142,7 @@ where fn rewrite_impl(&self, log_batch: &mut LogBatch, latest_rewrite: u64) -> Result<()> { let mut file_num = 0; - self.pipe_log.rewrite(&log_batch, true, &mut file_num)?; + self.pipe_log.rewrite(log_batch, true, &mut file_num)?; if file_num > 0 { rewrite_to_memtable(&self.memtables, log_batch, file_num, latest_rewrite); } @@ -163,8 +163,7 @@ fn rewrite_to_memtable( let memtable = memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - let entries_index = entries_to_add.entries_index.into_inner(); - memtable.wl().rewrite(entries_index, latest_rewrite); + memtable.wl().rewrite(entries_to_add.entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num), From 563912125c7f6da75e7d1e1a5f5f6091958f0bd3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 27 Sep 2020 14:08:23 +0800 Subject: [PATCH 10/17] fix future Signed-off-by: Little-Wallace --- src/engine.rs | 117 ++++++++++++++++++++++++++++------------------- src/log_batch.rs | 5 +- src/purge.rs | 4 +- 3 files changed, 75 insertions(+), 51 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index e1cea450..ca22807f 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -26,6 +26,7 @@ use crate::purge::PurgeManager; use crate::util::{HandyRwLock, HashMap, Worker}; use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; +use futures::future::{err, ok, BoxFuture}; const SLOTS_COUNT: usize = 128; @@ -131,44 +132,62 @@ where workers: Arc>, } +fn apply_item( + memtables: &MemTableAccessor, + item: LogItem, + queue: LogQueue, + file_num: u64, +) where + E: Message + Clone, + W: EntryExt, +{ + let memtable = memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries = entries_to_add.entries; + let entries_index = entries_to_add.entries_index; + if queue == LogQueue::Rewrite { + memtable.wl().append_rewrite(entries, entries_index); + } else { + memtable.wl().append(entries, entries_index); + } + } + LogItemContent::Command(Command::Clean) => { + memtables.remove(item.raft_group_id); + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.wl().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let (key, value) = (kv.key, kv.value.unwrap()); + match queue { + LogQueue::Append => memtable.wl().put(key, value, file_num), + LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), + } + } + OpType::Del => memtable.wl().delete(kv.key.as_slice()), + }, + } +} + impl Engine where E: Message + Clone, W: EntryExt + 'static, P: GenericPipeLog, { - fn apply_to_memtable(&self, item: LogItem, queue: LogQueue, file_num: u64) { - let memtable = self.memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index; - if queue == LogQueue::Rewrite { - memtable.wl().append_rewrite(entries, entries_index); - } else { - memtable.wl().append(entries, entries_index); - } - } - LogItemContent::Command(Command::Clean) => { - self.memtables.remove(item.raft_group_id); - } - LogItemContent::Command(Command::Compact { index }) => { - memtable.wl().compact_to(index); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => { - let (key, value) = (kv.key, kv.value.unwrap()); - match queue { - LogQueue::Append => memtable.wl().put(key, value, file_num), - LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), - } - } - OpType::Del => memtable.wl().delete(kv.key.as_slice()), - }, + fn apply_to_memtable(&self, log_batch: &mut LogBatch, queue: LogQueue, file_num: u64) { + for item in log_batch.items.drain(..) { + apply_item(&self.memtables, item, queue, file_num); } } - async fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { + fn write_impl( + &self, + log_batch: &mut LogBatch, + sync: bool, + ) -> BoxFuture<'static, Result> { let mut entries_size = 0; if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { let (sender, r) = future_channel::oneshot::channel(); @@ -179,21 +198,25 @@ where entries_size, sender, }; - self.wal_sender - .send(LogMsg::Write(task)) - .map_err(|_| Error::Stop)?; - let (file_num, offset, tracker) = r.await?; - if file_num > 0 { - for mut item in log_batch.items.drain(..) { - if let LogItemContent::Entries(entries) = &mut item.content { - entries.update_position(LogQueue::Append, file_num, offset, &tracker); + if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) { + return Box::pin(err(Error::Stop)); + } + let memtables = self.memtables.clone(); + let items = std::mem::replace(&mut log_batch.items, vec![]); + return Box::pin(async move { + let (file_num, offset, tracker) = r.await?; + if file_num > 0 { + for mut item in items { + if let LogItemContent::Entries(entries) = &mut item.content { + entries.update_position(LogQueue::Append, file_num, offset, &tracker); + } + apply_item(&memtables, item, LogQueue::Append, file_num); } - self.apply_to_memtable(item, LogQueue::Append, file_num); } - } - return Ok(bytes); + return Ok(bytes); + }); } - Ok(0) + return Box::pin(ok(0)); } } @@ -344,9 +367,7 @@ where } } cache_submitor.fill_chunk(encoded_size); - for item in log_batch.items.drain(..) { - self.apply_to_memtable(item, queue, file_num); - } + self.apply_to_memtable(&mut log_batch, queue, file_num); offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; } Ok(None) => { @@ -504,8 +525,12 @@ where block_on(self.write_impl(log_batch, sync)) } - pub async fn async_write(&self, log_batch: &mut LogBatch, sync: bool) -> Result { - self.write_impl(log_batch, sync).await + pub fn async_write( + &self, + log_batch: &mut LogBatch, + sync: bool, + ) -> BoxFuture<'static, Result> { + self.write_impl(log_batch, sync) } /// Flush stats about EntryCache. diff --git a/src/log_batch.rs b/src/log_batch.rs index 7509e8ce..9fc2eae3 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -145,10 +145,7 @@ impl Entries { pub fn new(entries: Vec, entries_index: Option>) -> Entries { let len = entries.len(); let (encoded_size, entries_index) = match entries_index { - Some(index) => ( - index.iter().fold(0, |acc, x| acc + x.len as usize), - index, - ), + Some(index) => (index.iter().fold(0, |acc, x| acc + x.len as usize), index), None => (0, vec![EntryIndex::default(); len]), }; Entries { diff --git a/src/purge.rs b/src/purge.rs index 1ff95b19..ec3a145b 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -163,7 +163,9 @@ fn rewrite_to_memtable( let memtable = memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - memtable.wl().rewrite(entries_to_add.entries_index, latest_rewrite); + memtable + .wl() + .rewrite(entries_to_add.entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num), From 38d77cf3d7071ea60c306c131aebd13a3d841042 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 27 Sep 2020 14:22:56 +0800 Subject: [PATCH 11/17] fix mutable Signed-off-by: Little-Wallace --- src/engine.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index ca22807f..6469cc6d 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -527,10 +527,10 @@ where pub fn async_write( &self, - log_batch: &mut LogBatch, + mut log_batch: LogBatch, sync: bool, ) -> BoxFuture<'static, Result> { - self.write_impl(log_batch, sync) + self.write_impl(&mut log_batch, sync) } /// Flush stats about EntryCache. From 276a241e194f52179995fd64220846528d647a89 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 14 Oct 2020 17:09:06 +0800 Subject: [PATCH 12/17] fix rewrite bug Signed-off-by: Little-Wallace --- src/engine.rs | 27 +++++++++++++++++------- src/log_batch.rs | 13 +++--------- src/memtable.rs | 2 ++ src/pipe_log.rs | 18 ++++++++-------- src/purge.rs | 53 +++++++++++++++++++++++++++++------------------- 5 files changed, 66 insertions(+), 47 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 36414dbf..3ed673d9 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -22,7 +22,7 @@ use crate::log_batch::{ }; use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; -use crate::purge::PurgeManager; +use crate::purge::{PurgeManager, RemovedMemtables}; use crate::util::{HandyRwLock, HashMap, Worker}; use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; @@ -136,6 +136,7 @@ where fn apply_item( memtables: &MemTableAccessor, + removed_memtables: &RemovedMemtables, item: LogItem, queue: LogQueue, file_num: u64, @@ -155,9 +156,8 @@ fn apply_item( } } LogItemContent::Command(Command::Clean) => { - if self.memtables.remove(item.raft_group_id).is_some() { - self.purge_manager - .remove_memtable(file_num, item.raft_group_id); + if memtables.remove(item.raft_group_id).is_some() { + removed_memtables.remove_memtable(file_num, item.raft_group_id); } } LogItemContent::Command(Command::Compact { index }) => { @@ -183,8 +183,9 @@ where P: GenericPipeLog, { fn apply_to_memtable(&self, log_batch: &mut LogBatch, queue: LogQueue, file_num: u64) { + let removed = self.purge_manager.get_removed_memtables(); for item in log_batch.items.drain(..) { - apply_item(&self.memtables, item, queue, file_num); + apply_item(&self.memtables, &removed, item, queue, file_num); } } @@ -208,6 +209,7 @@ where } let memtables = self.memtables.clone(); let items = std::mem::replace(&mut log_batch.items, vec![]); + let removed_memtables = self.purge_manager.get_removed_memtables(); return Box::pin(async move { let (file_num, offset, tracker) = r.await?; if file_num > 0 { @@ -215,7 +217,13 @@ where if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Append, file_num, offset, &tracker); } - apply_item(&memtables, item, LogQueue::Append, file_num); + apply_item( + &memtables, + &removed_memtables, + item, + LogQueue::Append, + file_num, + ); } } return Ok(bytes); @@ -282,7 +290,12 @@ where ); let cfg = Arc::new(cfg); - let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); + let purge_manager = PurgeManager::new( + cfg.clone(), + memtables.clone(), + pipe_log.clone(), + global_stats.clone(), + ); let (wal_sender, wal_receiver) = channel(); let engine = Engine { cfg, diff --git a/src/log_batch.rs b/src/log_batch.rs index 9fc2eae3..da5a9cf7 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -591,12 +591,7 @@ where item.encode_to::(&mut vec).unwrap(); } - let compression_type = if vec.len() > COMPRESSION_SIZE { - vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4); - CompressionType::Lz4 - } else { - CompressionType::None - }; + let compression_type = CompressionType::None; let checksum = crc32(&vec[8..]); vec.encode_u32_le(checksum).unwrap(); @@ -697,7 +692,7 @@ mod tests { ), ]; - for item in items { + for mut item in items { let mut encoded = vec![]; item.encode_to::(&mut encoded).unwrap(); let mut s = encoded.as_slice(); @@ -728,9 +723,7 @@ mod tests { assert_eq!(batch, decoded_batch); match &decoded_batch.items[0].content { - LogItemContent::Entries(entries) => { - assert_eq!(entries.encoded_size.get(), encoded_size) - } + LogItemContent::Entries(entries) => assert_eq!(entries.encoded_size, encoded_size), _ => unreachable!(), } } diff --git a/src/memtable.rs b/src/memtable.rs index d4f10f16..2b6ac55b 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -303,6 +303,8 @@ impl> MemTable { self.entries_index[i + distance].base_offset = ei.base_offset; self.entries_index[i + distance].compression_type = ei.compression_type; self.entries_index[i + distance].batch_len = ei.batch_len; + self.entries_index[i + distance].offset = ei.offset; + self.entries_index[i + distance].len = ei.len; } compacted_rewrite_operations } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 4a2a8e03..3a5809b0 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -760,7 +760,7 @@ mod tests { fn write_to_log( log: &mut PipeLog, submitor: &mut CacheSubmitor, - batch: &LogBatch, + batch: &mut LogBatch, file_num: &mut u64, ) { let mut entries_size = 0; @@ -770,8 +770,8 @@ mod tests { let offset = log.append(LogQueue::Append, &content).unwrap(); let tracker = submitor.get_cache_tracker(cur_file_num, offset); submitor.fill_chunk(entries_size); - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in &mut batch.items { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); } } @@ -809,9 +809,9 @@ mod tests { // After 4 batches are written into pipe log, no `CacheTask::NewChunk` // task should be triggered. However the last batch will trigger it. for i in 0..5 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); log_batches.push(log_batch); let x = receiver.recv_timeout(Duration::from_millis(100)); if i < 4 { @@ -824,9 +824,9 @@ mod tests { // Write more 2 batches into pipe log. A `CacheTask::NewChunk` will be // emit on the second batch because log file is switched. for i in 5..7 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); log_batches.push(log_batch); let x = receiver.recv_timeout(Duration::from_millis(100)); if i < 6 { @@ -840,9 +840,9 @@ mod tests { // `CacheTracker`s accociated in `EntryIndex`s are droped. drop(log_batches); for _ in 7..20 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); drop(log_batch); assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err()); } diff --git a/src/purge.rs b/src/purge.rs index 18798ee6..c6b56fcb 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -12,6 +12,16 @@ use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::HandyRwLock; use crate::{GlobalStats, Result}; +#[derive(Clone, Default)] +pub struct RemovedMemtables(Arc, u64)>>>); + +impl RemovedMemtables { + pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { + let mut tables = self.0.lock().unwrap(); + tables.push((Reverse(file_num), raft_group_id)); + } +} + // If a region has some very old raft logs less than this threshold, // rewrite them to clean stale log files ASAP. const REWRITE_ENTRY_COUNT_THRESHOLD: usize = 32; @@ -31,8 +41,7 @@ where global_stats: Arc, // Vector of (file_num, raft_group_id). - #[allow(clippy::type_complexity)] - removed_memtables: Arc, u64)>>>, + removed_memtables: RemovedMemtables, // Only one thread can run `purge_expired_files` at a time. purge_mutex: Arc>, @@ -115,9 +124,8 @@ where } } - pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { - let mut tables = self.removed_memtables.lock().unwrap(); - tables.push((Reverse(file_num), raft_group_id)); + pub fn get_removed_memtables(&self) -> RemovedMemtables { + self.removed_memtables.clone() } // Returns (`latest_needs_rewrite`, `latest_needs_force_compact`). @@ -159,13 +167,16 @@ where assert!(latest_compact <= latest_rewrite); let mut log_batch = LogBatch::::new(); - while let Some(item) = self.removed_memtables.lock().unwrap().pop() { - let (file_num, raft_id) = ((item.0).0, item.1); - if file_num > latest_rewrite { - self.removed_memtables.lock().unwrap().push(item); - break; + { + let mut guard = self.removed_memtables.0.lock().unwrap(); + while let Some(item) = guard.pop() { + let (file_num, raft_id) = ((item.0).0, item.1); + if file_num > latest_rewrite { + guard.push(item); + break; + } + log_batch.clean_region(raft_id); } - log_batch.clean_region(raft_id); } let memtables = self.memtables.collect(|t| { @@ -216,7 +227,6 @@ where Ok(()) } - fn rewrite_to_memtable( &self, log_batch: &mut LogBatch, @@ -227,7 +237,7 @@ where let memtable = self.memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - let entries_index = entries_to_add.entries_index.into_inner(); + let entries_index = entries_to_add.entries_index; memtable.wl().rewrite(entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { @@ -294,16 +304,17 @@ mod tests { cfg.dir = dir.path().to_str().unwrap().to_owned(); let engine = RaftLogEngine::new(cfg.clone()); - engine.purge_manager().remove_memtable(3, 10); - engine.purge_manager().remove_memtable(3, 9); - engine.purge_manager().remove_memtable(3, 11); - engine.purge_manager().remove_memtable(2, 9); - engine.purge_manager().remove_memtable(4, 4); - engine.purge_manager().remove_memtable(4, 3); + let tables = engine.purge_manager().get_removed_memtables(); + tables.remove_memtable(3, 10); + tables.remove_memtable(3, 9); + tables.remove_memtable(3, 11); + tables.remove_memtable(2, 9); + tables.remove_memtable(4, 4); + tables.remove_memtable(4, 3); - let mut tables = engine.purge_manager().removed_memtables.lock().unwrap(); + let mut guard = tables.0.lock().unwrap(); for (file_num, raft_id) in vec![(2, 9), (3, 11), (3, 10), (3, 9), (4, 4), (4, 3)] { - let item = tables.pop().unwrap(); + let item = guard.pop().unwrap(); assert_eq!((item.0).0, file_num); assert_eq!(item.1, raft_id); } From 23dd4c7f21d22e72dc9024d335a393ed15cfde50 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 16 Oct 2020 16:58:26 +0800 Subject: [PATCH 13/17] fix cache evictor bug Signed-off-by: Little-Wallace --- src/cache_evict.rs | 39 +++++++++++++++++++++++++++------------ src/engine.rs | 22 ++++++++++++++++------ src/memtable.rs | 4 ++++ src/pipe_log.rs | 9 +++++---- src/wal.rs | 45 ++++++++++++++++++++++++++++++--------------- 5 files changed, 82 insertions(+), 37 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 18d122a0..300127d9 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -5,13 +5,14 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use crossbeam::channel::{bounded, Sender}; +use log::info; use protobuf::Message; use crate::engine::MemTableAccessor; use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::{HandyRwLock, Runnable, Scheduler}; -use crate::GlobalStats; +use crate::{GlobalStats, Result}; pub const DEFAULT_CACHE_CHUNK_SIZE: usize = 4 * 1024 * 1024; @@ -185,19 +186,16 @@ where _ => return false, }; - let file_num = chunk.file_num; - let read_len = if chunk.end_offset == u64::MAX { - self.pipe_log.file_len(LogQueue::Append, file_num) - chunk.base_offset - } else { - chunk.end_offset - chunk.base_offset + let (read_len, chunk_content) = match self.read_chunk(&chunk) { + Ok((len, content)) => (len, content), + Err(e) => { + info!("Evictor read chunk {:?} fail: {}", chunk, e); + continue; + } }; - let chunk_content = self - .pipe_log - .fread(LogQueue::Append, file_num, chunk.base_offset, read_len) - .unwrap(); let mut reader: &[u8] = chunk_content.as_ref(); - let mut offset = chunk.base_offset; + let (file_num, mut offset) = (chunk.file_num, chunk.base_offset); while let Some(b) = LogBatch::::from_bytes(&mut reader, file_num, offset).unwrap() { offset += read_len - reader.len() as u64; @@ -216,6 +214,23 @@ where } true } + + fn read_chunk(&self, chunk: &CacheChunk) -> Result<(u64, Vec)> { + let read_len = if chunk.end_offset == u64::MAX { + let file_len = self.pipe_log.file_len(LogQueue::Append, chunk.file_num)?; + file_len - chunk.base_offset + } else { + chunk.end_offset - chunk.base_offset + }; + + let content = self.pipe_log.fread( + LogQueue::Append, + chunk.file_num, + chunk.base_offset, + read_len, + )?; + Ok((read_len, content)) + } } impl Runnable for Runner @@ -249,7 +264,7 @@ pub enum CacheTask { EvictOldest(Sender<()>), } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct CacheChunk { file_num: u64, base_offset: u64, diff --git a/src/engine.rs b/src/engine.rs index 3ed673d9..1b87bb91 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -378,10 +378,14 @@ where } } - if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { - for item in log_batch.items.iter_mut() { - if let LogItemContent::Entries(entries) = &mut item.content { - entries.attach_cache_tracker(tracker.clone()); + if queue == LogQueue::Append { + if let Some(tracker) = + cache_submitor.get_cache_tracker(file_num, offset) + { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { + entries.attach_cache_tracker(tracker.clone()); + } } } } @@ -873,7 +877,10 @@ mod tests { assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1); let active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite); - let active_len = engine.pipe_log.file_len(LogQueue::Rewrite, active_num); + let active_len = engine + .pipe_log + .file_len(LogQueue::Rewrite, active_num) + .unwrap(); assert!(active_num > 1 || active_len > 59); // The rewrite queue isn't empty. // All entries should be available. @@ -908,7 +915,10 @@ mod tests { assert!(engine.purge_expired_files().unwrap().is_empty()); let new_active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite); - let new_active_len = engine.pipe_log.file_len(LogQueue::Rewrite, active_num); + let new_active_len = engine + .pipe_log + .file_len(LogQueue::Rewrite, active_num) + .unwrap(); assert!( new_active_num > active_num || (new_active_num == active_num && new_active_len > active_len) diff --git a/src/memtable.rs b/src/memtable.rs index 2b6ac55b..d24bb40a 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -232,6 +232,10 @@ impl> MemTable { ei.queue = LogQueue::Rewrite; } self.append(entries, entries_index); + if let Some(index) = self.entries_index.back().map(|ei| ei.index) { + // Things in rewrite queue won't appear in cache. + self.compact_cache_to(index); + } let new_rewrite_count = self.entries_index.len(); self.adjust_rewrite_count(new_rewrite_count); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 3a5809b0..4304e590 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -85,7 +85,7 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Return the last file number before `total - size`. `0` means no such files. fn latest_file_before(&self, queue: LogQueue, size: usize) -> u64; - fn file_len(&self, queue: LogQueue, file_num: u64) -> u64; + fn file_len(&self, queue: LogQueue, file_num: u64) -> Result; } pub struct LogFd(RawFd); @@ -512,9 +512,10 @@ impl GenericPipeLog for PipeLog { file_num } - fn file_len(&self, queue: LogQueue, file_num: u64) -> u64 { - let fd = self.get_queue(queue).get_fd(file_num).unwrap(); - file_len(fd.0).unwrap() as u64 + fn file_len(&self, queue: LogQueue, file_num: u64) -> Result { + self.get_queue(queue) + .get_fd(file_num) + .map(|fd| file_len(fd.0).unwrap() as u64) } } diff --git a/src/wal.rs b/src/wal.rs index a4cceb7e..3a9cfac5 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -46,42 +46,57 @@ where { pub fn run(&mut self) -> Result<()> { let mut write_ret = vec![]; + const MAX_WRITE_BUFFER: usize = 2 * 1024 * 1024; // 2MB + let mut write_buffer = Vec::with_capacity(MAX_WRITE_BUFFER); while let Ok(LogMsg::Write(task)) = self.receiver.recv() { let mut sync = task.sync; + let mut entries_size = task.entries_size; let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap(); - let offset = self - .pipe_log - .append(LogQueue::Append, &task.content) - .unwrap(); - write_ret.push((offset, task.sender)); - let tracker = self.cache_submitter.get_cache_tracker(file_num, offset); - self.cache_submitter.fill_chunk(task.entries_size); + write_ret.push((0, task.sender)); + while let Ok(msg) = self.receiver.try_recv() { + if write_buffer.is_empty() { + write_buffer.extend_from_slice(&task.content); + } let task = match msg { LogMsg::Write(task) => task, LogMsg::Stop => { return Ok(()); } }; - if task.sync { + if task.sync && !sync { sync = true; } - self.cache_submitter.fill_chunk(task.entries_size); - let offset = self - .pipe_log - .append(LogQueue::Append, &task.content) - .unwrap(); - write_ret.push((offset, task.sender)); + entries_size += task.entries_size; + write_ret.push((write_buffer.len() as u64, task.sender)); + write_buffer.extend_from_slice(&task.content); + if write_buffer.len() >= MAX_WRITE_BUFFER { + break; + } } + let base_offset = if write_buffer.is_empty() { + self.pipe_log + .append(LogQueue::Append, &task.content) + .unwrap() + } else { + self.pipe_log + .append(LogQueue::Append, &write_buffer) + .unwrap() + }; if sync { if let Err(e) = fd.sync() { warn!("write wal failed because of: {} ", e); write_ret.clear(); } } + let tracker = self + .cache_submitter + .get_cache_tracker(file_num, base_offset); + self.cache_submitter.fill_chunk(entries_size); for (offset, sender) in write_ret.drain(..) { - let _ = sender.send((file_num, offset, tracker.clone())); + let _ = sender.send((file_num, base_offset + offset, tracker.clone())); } + write_buffer.clear(); } Ok(()) } From 7b422973b7e1ae51f9b1ee3f13a21acfb10421e3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 20 Oct 2020 14:58:45 +0800 Subject: [PATCH 14/17] add metric Signed-off-by: Little-Wallace --- src/engine.rs | 12 ++++++++-- src/lib.rs | 1 + src/util.rs | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/wal.rs | 31 +++++++++++++++++++++++-- 4 files changed, 104 insertions(+), 4 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 1b87bb91..bdaf9844 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -5,8 +5,8 @@ use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; use std::{fmt, u64}; -use futures::channel as future_channel; use futures::executor::block_on; +use futures::{channel as future_channel, TryFutureExt}; use log::{info, warn}; use protobuf::Message; @@ -23,7 +23,7 @@ use crate::log_batch::{ use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; use crate::purge::{PurgeManager, RemovedMemtables}; -use crate::util::{HandyRwLock, HashMap, Worker}; +use crate::util::{HandyRwLock, HashMap, Statistic, Worker}; use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; use futures::future::{err, ok, BoxFuture}; @@ -570,6 +570,14 @@ where wal.join().unwrap(); } } + + pub fn async_get_metric(&self) -> BoxFuture<'static, Result> { + let (sender, r) = future_channel::oneshot::channel(); + if let Err(_) = self.wal_sender.send(LogMsg::Metric(sender)) { + return Box::pin(err(Error::Stop)); + } + return Box::pin(r.map_err(|_| Error::Stop)); + } } pub fn fetch_entries( diff --git a/src/lib.rs b/src/lib.rs index 9da8305a..ba36843a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ use crate::pipe_log::PipeLog; pub use self::config::{Config, RecoveryMode}; pub use self::errors::{Error, Result}; pub use self::log_batch::{EntryExt, LogBatch}; +pub use self::util::Statistic; pub type RaftLogEngine = self::engine::Engine; #[derive(Clone, Copy, Default)] diff --git a/src/util.rs b/src/util.rs index 89ce7a18..6ef60ca1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -318,3 +318,67 @@ impl Drop for Worker { self.stop(); } } + +#[derive(Clone, Debug, Copy, PartialEq, Default)] +pub struct Statistic { + pub wal_cost: f64, + pub sync_cost: f64, + pub write_cost: f64, + pub max_wal_cost: f64, + pub max_sync_cost: f64, + pub max_write_cost: f64, + pub freq: usize, +} + +fn max(left: f64, right: f64) -> f64 { + if left > right { + left + } else { + right + } +} + +impl Statistic { + pub fn add(&mut self, other: &Self) { + self.wal_cost += other.wal_cost; + self.sync_cost += other.sync_cost; + self.write_cost += other.write_cost; + self.freq += other.freq; + self.max_wal_cost = max(self.max_wal_cost, other.max_wal_cost); + self.max_write_cost = max(self.max_write_cost, other.max_write_cost); + self.max_sync_cost = max(self.max_sync_cost, other.max_sync_cost); + } + + pub fn clear(&mut self) { + self.wal_cost = 0.0; + self.sync_cost = 0.0; + self.write_cost = 0.0; + self.max_wal_cost = 0.0; + self.max_sync_cost = 0.0; + self.max_write_cost = 0.0; + self.freq = 0; + } + + #[inline] + pub fn add_wal(&mut self, wal: f64) { + self.wal_cost += wal; + self.max_wal_cost = max(self.max_wal_cost, wal); + } + + #[inline] + pub fn add_write(&mut self, write: f64) { + self.write_cost += write; + self.max_write_cost = max(self.max_write_cost, write); + } + + #[inline] + pub fn add_sync(&mut self, sync: f64) { + self.sync_cost += sync; + self.max_sync_cost = max(self.max_sync_cost, sync); + } + + #[inline] + pub fn add_one(&mut self) { + self.freq += 1; + } +} diff --git a/src/wal.rs b/src/wal.rs index 3a9cfac5..85a85cc0 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -6,7 +6,9 @@ use std::sync::Arc; use crate::cache_evict::CacheSubmitor; use crate::errors::Result; use crate::pipe_log::{GenericPipeLog, LogQueue}; +use crate::util::Statistic; use log::warn; +use std::time::Instant; pub struct WriteTask { pub content: Vec, @@ -17,6 +19,7 @@ pub struct WriteTask { pub enum LogMsg { Write(WriteTask), + Metric(Sender), Stop, } @@ -24,6 +27,7 @@ pub struct WalRunner { cache_submitter: CacheSubmitor, pipe_log: P, receiver: Receiver, + statistic: Statistic, } impl WalRunner

{ @@ -36,6 +40,7 @@ impl WalRunner

{ pipe_log, cache_submitter, receiver, + statistic: Statistic::default(), } } } @@ -46,9 +51,21 @@ where { pub fn run(&mut self) -> Result<()> { let mut write_ret = vec![]; - const MAX_WRITE_BUFFER: usize = 2 * 1024 * 1024; // 2MB + const MAX_WRITE_BUFFER: usize = 1 * 1024 * 1024; // 2MB let mut write_buffer = Vec::with_capacity(MAX_WRITE_BUFFER); - while let Ok(LogMsg::Write(task)) = self.receiver.recv() { + while let Ok(msg) = self.receiver.recv() { + let task = match msg { + LogMsg::Write(task) => task, + LogMsg::Stop => { + return Ok(()); + } + LogMsg::Metric(cb) => { + let _ = cb.send(self.statistic.clone()); + self.statistic.clear(); + continue; + } + }; + let now = Instant::now(); let mut sync = task.sync; let mut entries_size = task.entries_size; let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap(); @@ -63,6 +80,11 @@ where LogMsg::Stop => { return Ok(()); } + LogMsg::Metric(cb) => { + let _ = cb.send(self.statistic.clone()); + self.statistic.clear(); + continue; + } }; if task.sync && !sync { sync = true; @@ -83,6 +105,7 @@ where .append(LogQueue::Append, &write_buffer) .unwrap() }; + let before_sync_cost = now.elapsed().as_secs_f64(); if sync { if let Err(e) = fd.sync() { warn!("write wal failed because of: {} ", e); @@ -93,6 +116,10 @@ where .cache_submitter .get_cache_tracker(file_num, base_offset); self.cache_submitter.fill_chunk(entries_size); + let wal_cost = now.elapsed().as_secs_f64(); + self.statistic.add_wal(wal_cost); + self.statistic.add_sync(wal_cost - before_sync_cost); + self.statistic.add_one(); for (offset, sender) in write_ret.drain(..) { let _ = sender.send((file_num, base_offset + offset, tracker.clone())); } From b2960c67690cd0a2bebddb1cbcc7d9e713907fa9 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 22 Oct 2020 10:32:57 +0800 Subject: [PATCH 15/17] fix duration Signed-off-by: Little-Wallace --- src/engine.rs | 26 +++++++++++++++++++++++--- src/lib.rs | 10 ++++++++++ src/util.rs | 37 +++++++++++++++---------------------- src/wal.rs | 9 +++++---- 4 files changed, 53 insertions(+), 29 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index bdaf9844..ba5e9281 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -5,8 +5,8 @@ use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; use std::{fmt, u64}; +use futures::channel as future_channel; use futures::executor::block_on; -use futures::{channel as future_channel, TryFutureExt}; use log::{info, warn}; use protobuf::Message; @@ -27,6 +27,7 @@ use crate::util::{HandyRwLock, HashMap, Statistic, Worker}; use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; use futures::future::{err, ok, BoxFuture}; +use std::sync::atomic::Ordering; const SLOTS_COUNT: usize = 128; @@ -195,6 +196,7 @@ where sync: bool, ) -> BoxFuture<'static, Result> { let mut entries_size = 0; + let now = Instant::now(); if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { let (sender, r) = future_channel::oneshot::channel(); let bytes = content.len(); @@ -210,6 +212,7 @@ where let memtables = self.memtables.clone(); let items = std::mem::replace(&mut log_batch.items, vec![]); let removed_memtables = self.purge_manager.get_removed_memtables(); + let stats = self.global_stats.clone(); return Box::pin(async move { let (file_num, offset, tracker) = r.await?; if file_num > 0 { @@ -226,7 +229,9 @@ where ); } } - return Ok(bytes); + let t = now.elapsed().as_micros(); + stats.add_write_duration_change(t as usize); + Ok(bytes) }); } return Box::pin(ok(0)); @@ -576,7 +581,22 @@ where if let Err(_) = self.wal_sender.send(LogMsg::Metric(sender)) { return Box::pin(err(Error::Stop)); } - return Box::pin(r.map_err(|_| Error::Stop)); + let write_count = self.global_stats.write_count.load(Ordering::Relaxed); + let write_cost = self.global_stats.write_cost.load(Ordering::Relaxed); + let max_write_cost = self.global_stats.max_write_cost.load(Ordering::Relaxed); + self.global_stats + .write_count + .fetch_sub(write_count, Ordering::Relaxed); + self.global_stats + .write_cost + .fetch_sub(write_cost, Ordering::Relaxed); + return Box::pin(async move { + let mut stats = r.await?; + // transfer micro to sec + stats.avg_write_cost = write_cost / write_count; + stats.max_write_cost = max_write_cost; + Ok(stats) + }); } } diff --git a/src/lib.rs b/src/lib.rs index ba36843a..dae6151f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,9 @@ pub struct GlobalStats { rewrite_operations: AtomicUsize, // How many compacted operations in the rewrite queue. compacted_rewrite_operations: AtomicUsize, + write_count: AtomicUsize, + write_cost: AtomicUsize, + max_write_cost: AtomicUsize, } impl GlobalStats { @@ -67,6 +70,13 @@ impl GlobalStats { pub fn add_cache_miss(&self, count: usize) { self.cache_miss.fetch_add(count, Ordering::Relaxed); } + pub fn add_write_duration_change(&self, dur: usize) { + self.write_count.fetch_add(1, Ordering::Relaxed); + self.write_cost.fetch_add(dur, Ordering::Relaxed); + if dur > self.max_write_cost.load(Ordering::Relaxed) { + self.max_write_cost.store(dur, Ordering::Relaxed); + } + } pub fn cache_hit(&self) -> usize { self.cache_hit.load(Ordering::Relaxed) diff --git a/src/util.rs b/src/util.rs index 6ef60ca1..c859ddb5 100644 --- a/src/util.rs +++ b/src/util.rs @@ -321,16 +321,16 @@ impl Drop for Worker { #[derive(Clone, Debug, Copy, PartialEq, Default)] pub struct Statistic { - pub wal_cost: f64, - pub sync_cost: f64, - pub write_cost: f64, - pub max_wal_cost: f64, - pub max_sync_cost: f64, - pub max_write_cost: f64, + pub wal_cost: usize, + pub sync_cost: usize, + pub avg_write_cost: usize, + pub max_wal_cost: usize, + pub max_sync_cost: usize, + pub max_write_cost: usize, pub freq: usize, } -fn max(left: f64, right: f64) -> f64 { +fn max(left: usize, right: usize) -> usize { if left > right { left } else { @@ -342,7 +342,6 @@ impl Statistic { pub fn add(&mut self, other: &Self) { self.wal_cost += other.wal_cost; self.sync_cost += other.sync_cost; - self.write_cost += other.write_cost; self.freq += other.freq; self.max_wal_cost = max(self.max_wal_cost, other.max_wal_cost); self.max_write_cost = max(self.max_write_cost, other.max_write_cost); @@ -350,29 +349,23 @@ impl Statistic { } pub fn clear(&mut self) { - self.wal_cost = 0.0; - self.sync_cost = 0.0; - self.write_cost = 0.0; - self.max_wal_cost = 0.0; - self.max_sync_cost = 0.0; - self.max_write_cost = 0.0; + self.wal_cost = 0; + self.sync_cost = 0; + self.avg_write_cost = 0; + self.max_wal_cost = 0; + self.max_sync_cost = 0; + self.max_write_cost = 0; self.freq = 0; } #[inline] - pub fn add_wal(&mut self, wal: f64) { + pub fn add_wal(&mut self, wal: usize) { self.wal_cost += wal; self.max_wal_cost = max(self.max_wal_cost, wal); } #[inline] - pub fn add_write(&mut self, write: f64) { - self.write_cost += write; - self.max_write_cost = max(self.max_write_cost, write); - } - - #[inline] - pub fn add_sync(&mut self, sync: f64) { + pub fn add_sync(&mut self, sync: usize) { self.sync_cost += sync; self.max_sync_cost = max(self.max_sync_cost, sync); } diff --git a/src/wal.rs b/src/wal.rs index 85a85cc0..223a321f 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -105,7 +105,7 @@ where .append(LogQueue::Append, &write_buffer) .unwrap() }; - let before_sync_cost = now.elapsed().as_secs_f64(); + let before_sync_cost = now.elapsed().as_micros(); if sync { if let Err(e) = fd.sync() { warn!("write wal failed because of: {} ", e); @@ -116,9 +116,10 @@ where .cache_submitter .get_cache_tracker(file_num, base_offset); self.cache_submitter.fill_chunk(entries_size); - let wal_cost = now.elapsed().as_secs_f64(); - self.statistic.add_wal(wal_cost); - self.statistic.add_sync(wal_cost - before_sync_cost); + let wal_cost = now.elapsed().as_micros(); + self.statistic.add_wal(wal_cost as usize); + self.statistic + .add_sync((wal_cost - before_sync_cost) as usize); self.statistic.add_one(); for (offset, sender) in write_ret.drain(..) { let _ = sender.send((file_num, base_offset + offset, tracker.clone())); From 28803f4be860b5e08d25994c5f27507e19fb2eca Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 22 Oct 2020 12:11:21 +0800 Subject: [PATCH 16/17] fix master conflict Signed-off-by: Little-Wallace --- src/cache_evict.rs | 8 +++++--- src/engine.rs | 9 +++------ src/log_batch.rs | 33 +++++++++++++++++++-------------- src/wal.rs | 6 ++---- 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 125d23bf..22fa56ac 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -64,7 +64,7 @@ impl CacheSubmitor { self.block_on_full = false; } - pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option> { + pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option { if self.cache_limit == 0 { return None; } @@ -104,13 +104,15 @@ impl CacheSubmitor { } } - Some(self.size_tracker.clone()) + Some(CacheTracker::new( + self.global_stats.clone(), + self.size_tracker.clone(), + )) } pub fn fill_chunk(&mut self, size: usize) { self.chunk_size += size; self.size_tracker.fetch_add(size, Ordering::Release); - self.global_stats.add_mem_change(size); } fn reset(&mut self, file_num: u64, offset: u64) { diff --git a/src/engine.rs b/src/engine.rs index b427a474..31b502b5 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -195,15 +195,14 @@ where log_batch: &mut LogBatch, sync: bool, ) -> BoxFuture<'static, Result> { - let mut entries_size = 0; let now = Instant::now(); - if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { + if let Some(content) = log_batch.encode_to_bytes() { let (sender, r) = future_channel::oneshot::channel(); let bytes = content.len(); let task = WriteTask { content, sync, - entries_size, + entries_size: log_batch.entries_size(), sender, }; if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) { @@ -378,14 +377,12 @@ where if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { - let mut encoded_size = 0; for item in log_batch.items.iter_mut() { if let LogItemContent::Entries(entries) = &mut item.content { entries.attach_cache_tracker(tracker.clone()); - encoded_size += entries.encoded_size; } } - cache_submitor.fill_chunk(encoded_size); + cache_submitor.fill_chunk(log_batch.entries_size()); } } self.apply_to_memtable(&mut log_batch, queue, file_num); diff --git a/src/log_batch.rs b/src/log_batch.rs index c085728d..64e045fe 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -139,7 +139,7 @@ impl PartialEq for Entries { impl Entries { pub fn new(entries: Vec, entries_index: Option>) -> Entries { - let entries_index = + let entries_index = entries_index.unwrap_or_else(|| vec![EntryIndex::default(); entries.len()]); Entries { entries, @@ -202,7 +202,7 @@ impl Entries { // This offset doesn't count the header. self.entries_index[i].offset = vec.len() as u64; self.entries_index[i].len = content.len() as u64; - *entries_size += entries_index[i].len as usize; + *entries_size += self.entries_index[i].len as usize; } vec.extend_from_slice(&content); @@ -232,12 +232,12 @@ impl Entries { } } - pub fn attach_cache_tracker(&mut self, chunk_size: Arc) { + pub fn attach_cache_tracker(&mut self, tracker: CacheTracker) { for idx in self.entries_index.iter_mut() { - idx.cache_tracker = Some(CacheTracker { - chunk_size: chunk_size.clone(), - sub_on_drop: idx.len as usize, - }); + let mut tkr = tracker.clone(); + tkr.global_stats.add_mem_change(idx.len as usize); + tkr.sub_on_drop = idx.len as usize; + idx.cache_tracker = Some(tkr); } } @@ -466,7 +466,7 @@ where W: EntryExt, { pub items: Vec>, - entries_size: RefCell, + entries_size: usize, _phantom: PhantomData, } @@ -478,7 +478,7 @@ where fn default() -> Self { Self { items: Vec::with_capacity(16), - entries_size: RefCell::new(0), + entries_size: 0, _phantom: PhantomData, } } @@ -496,7 +496,7 @@ where pub fn with_capacity(cap: usize) -> Self { Self { items: Vec::with_capacity(cap), - entries_size: RefCell::new(0), + entries_size: 0, _phantom: PhantomData, } } @@ -576,7 +576,7 @@ where file_num, base_offset, content_offset, - &mut log_batch.entries_size.borrow_mut(), + &mut log_batch.entries_size, )?; log_batch.items.push(item); items_count -= 1; @@ -603,12 +603,17 @@ where let mut vec = Vec::with_capacity(4096); vec.encode_u64(0).unwrap(); vec.encode_var_u64(self.items.len() as u64).unwrap(); - for item in &self.items { - item.encode_to::(&mut vec, &mut *self.entries_size.borrow_mut()) + for item in self.items.iter_mut() { + item.encode_to::(&mut vec, &mut self.entries_size) .unwrap(); } - let compression_type = CompressionType::None; + let compression_type = if vec.len() > COMPRESSION_SIZE { + vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4); + CompressionType::Lz4 + } else { + CompressionType::None + }; let checksum = crc32(&vec[8..]); vec.encode_u32_le(checksum).unwrap(); diff --git a/src/wal.rs b/src/wal.rs index 223a321f..d56805dc 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,9 +1,7 @@ use futures::channel::oneshot::Sender; -use std::sync::atomic::AtomicUsize; use std::sync::mpsc::Receiver; -use std::sync::Arc; -use crate::cache_evict::CacheSubmitor; +use crate::cache_evict::{CacheSubmitor, CacheTracker}; use crate::errors::Result; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::Statistic; @@ -14,7 +12,7 @@ pub struct WriteTask { pub content: Vec, pub entries_size: usize, pub sync: bool, - pub sender: Sender<(u64, u64, Option>)>, + pub sender: Sender<(u64, u64, Option)>, } pub enum LogMsg { From f21f0a5eb6ea036533b919a37063cf550acfffa5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 22 Oct 2020 22:41:35 +0800 Subject: [PATCH 17/17] add metric Signed-off-by: Little-Wallace --- src/engine.rs | 17 ++++++++++++++--- src/lib.rs | 16 ++++++++++++---- src/log_batch.rs | 2 +- src/util.rs | 31 +++++++++++++------------------ src/wal.rs | 14 +++++++++----- 5 files changed, 49 insertions(+), 31 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 31b502b5..e78dc8ae 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -214,6 +214,8 @@ where let stats = self.global_stats.clone(); return Box::pin(async move { let (file_num, offset, tracker) = r.await?; + // cacluate memtable cost + let t1 = now.elapsed().as_micros(); if file_num > 0 { for mut item in items { if let LogItemContent::Entries(entries) = &mut item.content { @@ -228,8 +230,8 @@ where ); } } - let t = now.elapsed().as_micros(); - stats.add_write_duration_change(t as usize); + let t2 = now.elapsed().as_micros(); + stats.add_write_duration_change((t2 - t1) as usize, t2 as usize); Ok(bytes) }); } @@ -573,18 +575,27 @@ where } let write_count = self.global_stats.write_count.load(Ordering::Relaxed); let write_cost = self.global_stats.write_cost.load(Ordering::Relaxed); + let mem_cost = self.global_stats.mem_cost.load(Ordering::Relaxed); let max_write_cost = self.global_stats.max_write_cost.load(Ordering::Relaxed); + let max_mem_cost = self.global_stats.max_mem_cost.load(Ordering::Relaxed); self.global_stats .write_count .fetch_sub(write_count, Ordering::Relaxed); self.global_stats .write_cost .fetch_sub(write_cost, Ordering::Relaxed); + self.global_stats + .mem_cost + .fetch_sub(mem_cost, Ordering::Relaxed); return Box::pin(async move { let mut stats = r.await?; // transfer micro to sec - stats.avg_write_cost = write_cost / write_count; + if write_count > 0 { + stats.avg_write_cost = write_cost / write_count; + stats.avg_mem_cost = mem_cost / write_count; + } stats.max_write_cost = max_write_cost; + stats.max_mem_cost = max_mem_cost; Ok(stats) }); } diff --git a/src/lib.rs b/src/lib.rs index dae6151f..1720c23d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,8 @@ pub struct GlobalStats { write_count: AtomicUsize, write_cost: AtomicUsize, max_write_cost: AtomicUsize, + mem_cost: AtomicUsize, + max_mem_cost: AtomicUsize, } impl GlobalStats { @@ -70,11 +72,17 @@ impl GlobalStats { pub fn add_cache_miss(&self, count: usize) { self.cache_miss.fetch_add(count, Ordering::Relaxed); } - pub fn add_write_duration_change(&self, dur: usize) { + pub fn add_write_duration_change(&self, memtable_duration: usize, write_duration: usize) { self.write_count.fetch_add(1, Ordering::Relaxed); - self.write_cost.fetch_add(dur, Ordering::Relaxed); - if dur > self.max_write_cost.load(Ordering::Relaxed) { - self.max_write_cost.store(dur, Ordering::Relaxed); + self.write_cost.fetch_add(write_duration, Ordering::Relaxed); + self.mem_cost + .fetch_add(memtable_duration, Ordering::Relaxed); + if write_duration > self.max_write_cost.load(Ordering::Relaxed) { + self.max_write_cost.store(write_duration, Ordering::Relaxed); + } + if memtable_duration > self.max_mem_cost.load(Ordering::Relaxed) { + self.max_write_cost + .store(memtable_duration, Ordering::Relaxed); } } diff --git a/src/log_batch.rs b/src/log_batch.rs index 64e045fe..a16d695a 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -673,7 +673,7 @@ mod tests { entries .encode_to::(&mut encoded, &mut entries_size1) .unwrap(); - for idx in entries.entries_index.borrow_mut().iter_mut() { + for idx in entries.entries_index.iter_mut() { idx.file_num = file_num; } let (mut s, mut entries_size2) = (encoded.as_slice(), 0); diff --git a/src/util.rs b/src/util.rs index b1cdc4be..3c2d921d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -324,12 +324,14 @@ impl Drop for Worker { #[derive(Clone, Debug, Copy, PartialEq, Default)] pub struct Statistic { - pub wal_cost: usize, - pub sync_cost: usize, + pub avg_wal_cost: usize, + pub avg_sync_cost: usize, pub avg_write_cost: usize, + pub avg_mem_cost: usize, pub max_wal_cost: usize, pub max_sync_cost: usize, pub max_write_cost: usize, + pub max_mem_cost: usize, pub freq: usize, } @@ -342,18 +344,9 @@ fn max(left: usize, right: usize) -> usize { } impl Statistic { - pub fn add(&mut self, other: &Self) { - self.wal_cost += other.wal_cost; - self.sync_cost += other.sync_cost; - self.freq += other.freq; - self.max_wal_cost = max(self.max_wal_cost, other.max_wal_cost); - self.max_write_cost = max(self.max_write_cost, other.max_write_cost); - self.max_sync_cost = max(self.max_sync_cost, other.max_sync_cost); - } - pub fn clear(&mut self) { - self.wal_cost = 0; - self.sync_cost = 0; + self.avg_wal_cost = 0; + self.avg_sync_cost = 0; self.avg_write_cost = 0; self.max_wal_cost = 0; self.max_sync_cost = 0; @@ -363,18 +356,20 @@ impl Statistic { #[inline] pub fn add_wal(&mut self, wal: usize) { - self.wal_cost += wal; + self.avg_wal_cost += wal; self.max_wal_cost = max(self.max_wal_cost, wal); } #[inline] pub fn add_sync(&mut self, sync: usize) { - self.sync_cost += sync; + self.avg_sync_cost += sync; self.max_sync_cost = max(self.max_sync_cost, sync); } - #[inline] - pub fn add_one(&mut self) { - self.freq += 1; + pub fn divide(&mut self) { + if self.freq > 0 { + self.avg_wal_cost /= self.freq; + self.avg_sync_cost /= self.freq; + } } } diff --git a/src/wal.rs b/src/wal.rs index d56805dc..eab62cf0 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -58,8 +58,7 @@ where return Ok(()); } LogMsg::Metric(cb) => { - let _ = cb.send(self.statistic.clone()); - self.statistic.clear(); + self.report(cb); continue; } }; @@ -79,8 +78,7 @@ where return Ok(()); } LogMsg::Metric(cb) => { - let _ = cb.send(self.statistic.clone()); - self.statistic.clear(); + self.report(cb); continue; } }; @@ -118,7 +116,7 @@ where self.statistic.add_wal(wal_cost as usize); self.statistic .add_sync((wal_cost - before_sync_cost) as usize); - self.statistic.add_one(); + self.statistic.freq += 1; for (offset, sender) in write_ret.drain(..) { let _ = sender.send((file_num, base_offset + offset, tracker.clone())); } @@ -126,4 +124,10 @@ where } Ok(()) } + + pub fn report(&mut self, cb: Sender) { + self.statistic.divide(); + let _ = cb.send(self.statistic.clone()); + self.statistic.clear(); + } }