From 2822c69b2e6db4c66dc99d55a1a2f13168bec208 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Thu, 7 Apr 2022 14:10:10 +0800 Subject: [PATCH] feat: raft log store (part 2) --- storage/src/raft_log_store/entry.rs | 25 ++- storage/src/raft_log_store/error.rs | 4 + storage/src/raft_log_store/log.rs | 36 ++-- storage/src/raft_log_store/mem.rs | 250 ++++++++++++++++++++++++++-- storage/src/raft_log_store/mod.rs | 1 - storage/src/raft_log_store/store.rs | 77 ++++++++- 6 files changed, 346 insertions(+), 47 deletions(-) diff --git a/storage/src/raft_log_store/entry.rs b/storage/src/raft_log_store/entry.rs index a077268..3418933 100644 --- a/storage/src/raft_log_store/entry.rs +++ b/storage/src/raft_log_store/entry.rs @@ -5,9 +5,7 @@ use runkv_common::coding::CompressionAlgorithm; use super::DEFAULT_LOG_BATCH_SIZE; use crate::raft_log_store::error::RaftLogStoreError; -use crate::utils::{ - crc32sum, get_length_prefixed_slice, put_length_prefixed_slice, BufExt, BufMutExt, -}; +use crate::utils::{crc32sum, get_length_prefixed_slice, put_length_prefixed_slice}; #[derive(PartialEq, Eq, Clone, Debug)] pub enum Entry { @@ -70,6 +68,7 @@ pub struct RaftLogBatch { term: u64, first_index: u64, offsets: Vec, + /// Note: Only used for encoding. data: Vec, } @@ -101,6 +100,10 @@ impl RaftLogBatch { self.group } + pub fn first_index(&self) -> u64 { + self.first_index + } + pub fn term(&self) -> u64 { self.term } @@ -110,6 +113,12 @@ impl RaftLogBatch { self.offsets.len() - 1 } + pub fn data_segment_location(&self) -> (usize, usize) { + let offset = 8 + 8 + 8 + 8 + self.offsets.len() * 4 + 8; + let len = self.offsets[self.offsets.len() - 1]; + (offset, len) + } + pub fn location(&self, index: usize) -> (usize, usize) { debug_assert!(index < self.len() - 1); let offset = self.offsets[index]; @@ -120,11 +129,11 @@ impl RaftLogBatch { /// Format: /// /// ```plain - /// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 | ... | offset (N-1) | offset N (phantom) | + /// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 (4B) | ... | offset (N-1) | offset N (phantom) | /// | data segment len (8B) | data block (compressed) | compression algorithm (1B) | crc32sum (4B) | /// | <---------- data segment ------------------------------------------->| /// ``` - pub fn encode(&self, mut buf_meta: &mut Vec, buf_data: &mut Vec) { + pub fn encode(&self, buf_meta: &mut Vec, buf_data: &mut Vec) { debug_assert!(!self.offsets.is_empty()); // Encode meta. @@ -133,7 +142,7 @@ impl RaftLogBatch { buf_meta.put_u64_le(self.first_index); buf_meta.put_u64_le(self.offsets.len() as u64); for offset in self.offsets.iter() { - buf_meta.put_var_u32(*offset as u32); + buf_meta.put_u32_le(*offset as u32); } // Encode data. @@ -160,14 +169,14 @@ impl RaftLogBatch { } /// Decode meta only. [`RaftLogBatch.data`] will be left empty. - pub fn decode(mut buf: &mut &[u8]) -> Self { + pub fn decode(buf: &mut &[u8]) -> Self { let group = buf.get_u64_le(); let term = buf.get_u64_le(); let first_index = buf.get_u64_le(); let offsets_len = buf.get_u64_le() as usize; let mut offsets = Vec::with_capacity(offsets_len); for _ in 0..offsets_len { - let offset = buf.get_var_u32() as usize; + let offset = buf.get_u32_le() as usize; offsets.push(offset); } let data_segment_len = buf.get_u64_le() as usize; diff --git a/storage/src/raft_log_store/error.rs b/storage/src/raft_log_store/error.rs index 745057b..e5c804b 100644 --- a/storage/src/raft_log_store/error.rs +++ b/storage/src/raft_log_store/error.rs @@ -2,8 +2,12 @@ pub enum RaftLogStoreError { #[error("group {0} not exists")] GroupNotExists(u64), + #[error("group {0} already exists")] + GroupAlreadyExists(u64), #[error("encode error: {0}")] EncodeError(String), + #[error("raft log gap exists: [{start}, {end})")] + RaftLogGap { start: u64, end: u64 }, #[error("other: {0}")] Other(String), } diff --git a/storage/src/raft_log_store/log.rs b/storage/src/raft_log_store/log.rs index ee0bc3e..df38a3c 100644 --- a/storage/src/raft_log_store/log.rs +++ b/storage/src/raft_log_store/log.rs @@ -11,25 +11,25 @@ use crate::entry::Entry; use crate::error::{Error, Result}; #[derive(Clone, Debug)] -pub struct PipeLogOptions { +pub struct LogOptions { path: String, log_file_capacity: usize, } -struct PipeLogCore { +struct LogCore { active_file: File, frozen_files: Vec, first_log_file_id: u64, } -pub struct PipeLog { +pub struct Log { path: String, log_file_capacity: usize, - core: Mutex, + core: Mutex, } -impl PipeLog { - pub async fn open(options: PipeLogOptions) -> Result { +impl Log { + pub async fn open(options: LogOptions) -> Result { create_dir_all(&options.path).await?; let (frozen_files, first_log_file_id) = { let mut frozen_files = vec![]; @@ -68,7 +68,7 @@ impl PipeLog { let active_file_id = first_log_file_id + frozen_files.len() as u64 + 1; let active_file = Self::new_active_file(&options.path, active_file_id).await?; - let core = PipeLogCore { + let core = LogCore { active_file, frozen_files, first_log_file_id, @@ -89,19 +89,19 @@ impl PipeLog { Ok(()) } - pub async fn append(&self, entries: Vec) -> Result<()> { + pub async fn push(&self, entry: Entry) -> Result<(usize, usize)> { let mut guard = self.core.lock().await; + let start = guard.active_file.metadata().await?.len() as usize; let mut buf = Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE); - for entry in entries { - entry.encode(&mut buf); - } + entry.encode(&mut buf); guard.active_file.write_all(&buf).await?; guard.active_file.sync_data().await?; - if guard.active_file.metadata().await?.len() as usize >= self.log_file_capacity { + let end = guard.active_file.metadata().await?.len() as usize; + if end >= self.log_file_capacity { drop(guard); self.rotate().await?; } - Ok(()) + Ok((start, end - start)) } pub async fn read(&self, log_file_id: u64, offset: u64, len: usize) -> Result> { @@ -119,7 +119,7 @@ impl PipeLog { } } -impl PipeLog { +impl Log { async fn rotate(&self) -> Result<()> { let mut guard = self.core.lock().await; // Sync old active file. @@ -172,17 +172,17 @@ mod tests { #[test(tokio::test)] async fn test_pipe_log_recovery() { let tempdir = tempfile::tempdir().unwrap(); - let options = PipeLogOptions { + let options = LogOptions { path: tempdir.path().to_str().unwrap().to_string(), // Estimated size of each compressed entry is 111. log_file_capacity: 100, }; - let log = PipeLog::open(options.clone()).await.unwrap(); + let log = Log::open(options.clone()).await.unwrap(); let entries = generate_entries(4, 16, vec![b'x'; 64]); assert_eq!(entries.len(), 4); for entry in entries.iter().cloned() { - log.append(vec![entry]).await.unwrap(); + log.push(entry).await.unwrap(); } assert_eq!(log.core.lock().await.frozen_files.len(), 4); let mut buf = vec![]; @@ -202,7 +202,7 @@ mod tests { // Recover pipe log. drop(log); - let log = PipeLog::open(options).await.unwrap(); + let log = Log::open(options).await.unwrap(); assert_eq!(log.core.lock().await.frozen_files.len(), 5); let mut buf = vec![]; for i in 0..4 { diff --git a/storage/src/raft_log_store/mem.rs b/storage/src/raft_log_store/mem.rs index 5ff7874..6c24f40 100644 --- a/storage/src/raft_log_store/mem.rs +++ b/storage/src/raft_log_store/mem.rs @@ -1,38 +1,29 @@ -use std::collections::BTreeMap; +use std::collections::btree_map::{BTreeMap, Entry}; use tokio::sync::RwLock; use super::error::RaftLogStoreError; use crate::error::Result; +const DEFAULT_INDICES_INIT_CAPACITY: usize = 1024; + +#[derive(Clone, Copy, Debug)] pub struct EntryIndex { /// Prevent log entries with lower terms added from GC shadowing those with the same indices /// but with higher terms. pub term: u64, - pub block_offset: u64, + pub block_offset: usize, pub block_len: usize, - pub offset: u64, + pub offset: usize, pub len: usize, } pub struct MemState { - group: u64, first_index: u64, indices: Vec, kvs: BTreeMap, Vec>, } -impl MemState { - pub fn new(group: u64, last_index: u64, init_capacity: usize) -> Self { - Self { - group, - first_index: last_index, - indices: Vec::with_capacity(init_capacity), - kvs: BTreeMap::default(), - } - } -} - pub struct MemStates { /// Mapping [`group`] to [`MemState`]. states: RwLock>>, @@ -47,6 +38,38 @@ impl Default for MemStates { } impl MemStates { + pub async fn add_group(&self, group: u64, first_index: u64) -> Result<()> { + let mut guard = self.states.write().await; + match guard.entry(group) { + Entry::Occupied(_) => return Err(RaftLogStoreError::GroupAlreadyExists(group).into()), + Entry::Vacant(v) => { + v.insert(RwLock::new(MemState { + first_index, + indices: Vec::with_capacity(DEFAULT_INDICES_INIT_CAPACITY), + kvs: BTreeMap::default(), + })); + } + } + Ok(()) + } + + /// # Safety + /// + /// Removed group needs to be guaranteed never be used again. + pub async fn remove_group(&self, group: u64) -> Result<()> { + let mut guard = self.states.write().await; + match guard.entry(group) { + Entry::Occupied(o) => { + let mut state = o.into_mut().write().await; + state.first_index = u64::MAX; + state.indices.clear(); + state.kvs.clear(); + } + Entry::Vacant(_) => return Err(RaftLogStoreError::GroupNotExists(group).into()), + } + Ok(()) + } + pub async fn first_index(&self, group: u64) -> Result { let guard = self.states.read().await; let state = guard @@ -66,4 +89,201 @@ impl MemStates { .await; Ok(state.first_index + state.indices.len() as u64 - 1) } + + /// Append raft log indices. + pub async fn append( + &self, + group: u64, + mut first_index: u64, + mut indices: Vec, + ) -> Result<()> { + debug_assert!(!indices.is_empty()); + let guard = self.states.read().await; + let mut state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .write() + .await; + + let state_next_index = state.first_index + state.indices.len() as u64; + + // Return error if there is gap in raft log. + if first_index > state_next_index { + return Err(RaftLogStoreError::RaftLogGap { + start: state_next_index, + end: first_index, + } + .into()); + } + + // Ignore outdated indices. + if first_index < state.first_index { + indices.drain(..(state.first_index - first_index) as usize); + first_index = state.first_index; + if indices.is_empty() { + return Ok(()); + } + } + + // Directly append new indices. + if (state_next_index as usize - first_index as usize) < indices.len() { + let next_indices = indices.drain((state_next_index - first_index) as usize..); + state.indices.extend(next_indices) + } + + // Update overlapping indices. + let overlap_start = (first_index - state.first_index) as usize; + let overlap_end = overlap_start + indices.len(); + for (indices_i, state_indices_i) in (overlap_start..overlap_end).enumerate() { + let state_index = &mut state.indices[state_indices_i]; + let index = &mut indices[indices_i]; + + // Ignore outdated rewrite indices. + if state_index.term > index.term { + continue; + } + + *state_index = *index; + } + + Ok(()) + } + + /// Compact any indices before the given index. + pub async fn compact(&self, group: u64, index: u64) -> Result<()> { + let guard = self.states.read().await; + let mut state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .write() + .await; + + // Ignore outdated compact command. + if index <= state.first_index { + return Ok(()); + } + + // Return error if there is gap in raft log. + if index > state.first_index + state.indices.len() as u64 { + return Err(RaftLogStoreError::RaftLogGap { + start: state.first_index + state.indices.len() as u64, + end: index, + } + .into()); + } + + // Truncate indices. + let len = (index - state.first_index) as usize; + state.indices.drain(..len); + state.first_index = index; + + Ok(()) + } + + pub async fn put(&self, group: u64, key: Vec, value: Vec) -> Result<()> { + let guard = self.states.read().await; + let mut state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .write() + .await; + state.kvs.insert(key, value); + Ok(()) + } + + pub async fn delete(&self, group: u64, key: Vec) -> Result<()> { + let guard = self.states.read().await; + let mut state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .write() + .await; + state.kvs.remove(&key); + Ok(()) + } + + pub async fn get(&self, group: u64, key: Vec) -> Result>> { + let guard = self.states.read().await; + let state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .read() + .await; + Ok(state.kvs.get(&key).cloned()) + } +} + +#[cfg(test)] +mod tests { + + use std::ops::Range; + + use test_log::test; + + use super::*; + + #[test(tokio::test)] + async fn test_raft_log() { + let states = MemStates::default(); + states.add_group(1, 1).await.unwrap(); + states.append(1, 1, gen_indices(1, 100)).await.unwrap(); + assert_range(&states, 1, 1..101).await; + assert!(states.append(1, 102, gen_indices(1, 100)).await.is_err()); + assert_range(&states, 1, 1..101).await; + states.append(1, 101, gen_indices(1, 100)).await.unwrap(); + assert_range(&states, 1, 1..201).await; + states.compact(1, 101).await.unwrap(); + assert_range(&states, 1, 101..201).await; + states.append(1, 51, gen_indices(1, 200)).await.unwrap(); + assert_range(&states, 1, 101..251).await; + states.compact(1, 251).await.unwrap(); + assert_range(&states, 1, 251..251).await; + assert!(states.compact(1, 252).await.is_err()); + states.compact(1, 101).await.unwrap(); + states.remove_group(1).await.unwrap(); + } + + #[test(tokio::test)] + async fn test_kv() { + let states = MemStates::default(); + states.add_group(1, 1).await.unwrap(); + states.put(1, b"k1".to_vec(), b"v1".to_vec()).await.unwrap(); + assert_eq!( + states.get(1, b"k1".to_vec()).await.unwrap(), + Some(b"v1".to_vec()) + ); + states.put(1, b"k1".to_vec(), b"v2".to_vec()).await.unwrap(); + assert_eq!( + states.get(1, b"k1".to_vec()).await.unwrap(), + Some(b"v2".to_vec()) + ); + states.delete(1, b"k1".to_vec()).await.unwrap(); + assert_eq!(states.get(1, b"k1".to_vec()).await.unwrap(), None); + states.remove_group(1).await.unwrap(); + } + + async fn assert_range(target: &MemStates, group: u64, range: Range) { + let guard = target.states.read().await; + let state = guard.get(&group).unwrap().read().await; + assert_eq!( + ( + state.first_index, + state.first_index + state.indices.len() as u64 + ), + (range.start, range.end) + ); + } + + fn gen_indices(term: u64, len: usize) -> Vec { + vec![ + EntryIndex { + term, + block_offset: 0, + block_len: 0, + offset: 0, + len: 0, + }; + len + ] + } } diff --git a/storage/src/raft_log_store/mod.rs b/storage/src/raft_log_store/mod.rs index c454bd2..2a16416 100644 --- a/storage/src/raft_log_store/mod.rs +++ b/storage/src/raft_log_store/mod.rs @@ -1,7 +1,6 @@ pub mod entry; pub mod error; pub mod log; -#[allow(dead_code)] pub mod mem; #[allow(dead_code)] pub mod store; diff --git a/storage/src/raft_log_store/store.rs b/storage/src/raft_log_store/store.rs index 7d8b634..62661dc 100644 --- a/storage/src/raft_log_store/store.rs +++ b/storage/src/raft_log_store/store.rs @@ -1,10 +1,14 @@ use std::sync::Arc; -use crate::log::PipeLog; -use crate::mem::MemStates; +use itertools::Itertools; + +use crate::entry::{Entry, RaftLogBatch}; +use crate::error::Result; +use crate::log::Log; +use crate::mem::{EntryIndex, MemStates}; struct RaftLogStoreCore { - pipe_log: PipeLog, + log: Log, states: MemStates, } @@ -14,9 +18,72 @@ pub struct RaftLogStore { } impl RaftLogStore { - pub fn new(pipe_log: PipeLog, states: MemStates) -> Self { + pub fn new(pipe_log: Log, states: MemStates) -> Self { Self { - core: Arc::new(RaftLogStoreCore { pipe_log, states }), + core: Arc::new(RaftLogStoreCore { + log: pipe_log, + states, + }), } } + + pub async fn append_log(&self, batch: RaftLogBatch) -> Result<()> { + let (data_segment_offset, data_segment_len) = batch.data_segment_location(); + let _group = batch.group(); + let term = batch.term(); + let _first_index = batch.first_index(); + let locations = (0..batch.len()) + .into_iter() + .map(|i| batch.location(i)) + .collect_vec(); + + let entry = Entry::RaftLogBatch(batch); + let (write_offset, _write_len) = self.core.log.push(entry).await?; + + let _indices = locations + .into_iter() + .map(|(offset, len)| EntryIndex { + term, + block_offset: write_offset + data_segment_offset, + block_len: data_segment_len, + offset, + len, + }) + .collect_vec(); + + // TODO: Append MemState. + + Ok(()) + } + + pub fn get_log(&self) { + todo!() + } + + pub fn put(&self) { + todo!() + } + + pub fn delete(&self) { + todo!() + } + + pub fn get(&self) { + todo!() + } +} + +#[cfg(test)] +mod tests { + + use test_log::test; + + use super::*; + + fn is_send_sync() {} + + #[test] + fn ensure_send_sync() { + is_send_sync::() + } }