From bfeacfe7149e02dbbc262a7acf9eb260caaebb54 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 11 Apr 2022 10:50:30 +0800 Subject: [PATCH 1/2] feat: impl raft log store (part 5) --- Cargo.lock | 143 ++++++++- storage/src/raft_log_store/entry.rs | 101 ++++-- storage/src/raft_log_store/log.rs | 2 +- storage/src/raft_log_store/mem.rs | 86 ++++- storage/src/raft_log_store/store.rs | 173 ++++++++--- storage/src/utils/coding.rs | 15 + wheel/Cargo.toml | 4 +- wheel/src/error.rs | 12 +- wheel/src/storage/mod.rs | 2 - wheel/src/storage/raft_log_store.rs | 465 +++++++++++++++++++++++++--- 10 files changed, 887 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37b8915..4877dd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,6 +426,15 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -458,6 +467,9 @@ name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +dependencies = [ + "serde", +] [[package]] name = "bytes-utils" @@ -670,6 +682,72 @@ dependencies = [ "sct", ] +[[package]] +name = "darling" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f2c43f534ea4b0b049015d00269734195e6d3f0f6635cb692251aca6f9f8b3c" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e91455b86830a1c21799d94524df0845183fa55bafd9aa137b01c7d1065fa36" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29b5acf0dea37a7f66f7b25d2c5e93fd46f8f6968b1a5d7a3e02e97768afc95a" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "derive_builder" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13202debe11181040ae9063d739fa32cfcaaebe2275fe387703460ae2365b30" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66e616858f6187ed828df7c64a6d71720d83767a7f19740b2d1b6fe6327b36e5" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58a94ace95092c5acb1e97a7e846b310cfbd499652f72297da7493f618a98d73" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -974,6 +1052,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-serde" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d98b3d9662de70952b14c4840ee0f37e23973542a363e2275f4b9d024ff6cca" +dependencies = [ + "http", + "serde", +] + [[package]] name = "httparse" version = "1.6.0" @@ -1055,6 +1143,12 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indexmap" version = "1.8.0" @@ -1128,6 +1222,35 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "lol-core" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5598e6305939e7f5a0bda6c19f0113f9d58c591dad99ef8e9c82493954a5021" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "bincode", + "bytes", + "derive_builder", + "derive_more", + "futures", + "http-serde", + "log", + "phi-detector", + "prost", + "prost-build", + "rand", + "serde", + "serde_bytes", + "tokio", + "tokio-stream", + "tokio-util 0.6.9", + "tonic", + "tonic-build", +] + [[package]] name = "lz4" version = "1.23.3" @@ -1324,7 +1447,7 @@ checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "openraft" version = "0.6.4" -source = "git+https://github.com/datafuselabs/openraft?rev=61c0f4a0fe10ab4e16c8530a52e8bddcd9bb87b1#61c0f4a0fe10ab4e16c8530a52e8bddcd9bb87b1" +source = "git+https://github.com/datafuselabs/openraft?rev=8d3d1285aa93ed00722da4a898adf390ea6f1e82#8d3d1285aa93ed00722da4a898adf390ea6f1e82" dependencies = [ "anyerror", "async-trait", @@ -1427,6 +1550,12 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phi-detector" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bfc027b03922a25a425d0c10acb06f8091b563f9d62420e6ecad87ef8ccdf72" + [[package]] name = "pin-project" version = "1.0.10" @@ -1835,6 +1964,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "bincode", "bytes", "bytesize", "clap", @@ -1842,6 +1972,7 @@ dependencies = [ "humantime", "humantime-serde", "itertools", + "lol-core", "openraft", "parking_lot 0.12.0", "prost", @@ -1984,6 +2115,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.136" @@ -2260,6 +2400,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", + "slab", "tokio", ] diff --git a/storage/src/raft_log_store/entry.rs b/storage/src/raft_log_store/entry.rs index 21a429a..37c8cd9 100644 --- a/storage/src/raft_log_store/entry.rs +++ b/storage/src/raft_log_store/entry.rs @@ -7,11 +7,15 @@ use runkv_common::coding::CompressionAlgorithm; use super::DEFAULT_LOG_BATCH_SIZE; use crate::error::Result; use crate::raft_log_store::error::RaftLogStoreError; -use crate::utils::{crc32check, crc32sum, get_length_prefixed_slice, put_length_prefixed_slice}; +use crate::utils::{ + crc32check, crc32sum, get_length_prefixed_slice, put_length_prefixed_slice, var_u32_len, + BufExt, BufMutExt, +}; #[derive(PartialEq, Eq, Clone, Debug)] pub enum Entry { RaftLogBatch(RaftLogBatch), + Truncate(Truncate), Compact(Compact), Kv(Kv), } @@ -41,12 +45,16 @@ impl Entry { buf.put_u8(0); batch.encode(buf); } - Self::Compact(compact) => { + Self::Truncate(truncate) => { buf.put_u8(1); + truncate.encode(buf); + } + Self::Compact(compact) => { + buf.put_u8(2); compact.encode(buf); } Self::Kv(kv) => { - buf.put_u8(2); + buf.put_u8(3); kv.encode(buf); } } @@ -55,8 +63,9 @@ impl Entry { pub fn decode(buf: &mut &[u8]) -> Self { match buf.get_u8() { 0 => Self::RaftLogBatch(RaftLogBatch::decode(buf)), - 1 => Self::Compact(Compact::decode(buf)), - 2 => Self::Kv(Kv::decode(buf)), + 1 => Self::Truncate(Truncate::decode(buf)), + 2 => Self::Compact(Compact::decode(buf)), + 3 => Self::Kv(Kv::decode(buf)), _ => unreachable!(), } } @@ -68,6 +77,7 @@ pub struct RaftLogBatch { term: u64, first_index: u64, offsets: Vec, + ctxs: Vec>, data_len: usize, /// Note: Only used for encoding. data: Vec, @@ -91,6 +101,7 @@ impl Default for RaftLogBatch { term: 0, first_index: 0, offsets: vec![], + ctxs: vec![], data_len: 0, data: Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE), } @@ -116,7 +127,17 @@ impl RaftLogBatch { } pub fn data_segment_location(&self) -> (usize, usize) { - let offset = 8 + 8 + 8 + 8 + self.offsets.len() * 4 + 8; + let offset = 8 // group + + 8 // term + + 8 // first index + + 8 // N+1 + + self + .offsets + .iter() + .map(|offset| var_u32_len(*offset as u32)) + .sum::() + + self.ctxs.iter().map(|ctx| var_u32_len(ctx.len() as u32) + ctx.len()).sum::() + + 8; // data segment len let len = self.data_len; (offset, len) } @@ -128,6 +149,11 @@ impl RaftLogBatch { (offset, len) } + pub fn ctx(&self, index: usize) -> &[u8] { + debug_assert!(index < self.len()); + &self.ctxs[index] + } + /// Convert raw data to encoded data. /// /// Format: @@ -161,10 +187,11 @@ impl RaftLogBatch { /// /// ```plain /// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 (4B) | ... | offset (N-1) | offset N (phantom) | + /// | ctx 0 | .. | ctx (N-1) | # ctx is a length prefixed buffer /// | data segment len (8B) | data block (compressed) | compression algorithm (1B) | crc32sum (4B) | /// | <---------- data segment ------------------------------------------->| /// ``` - fn encode(&self, buf: &mut Vec) { + fn encode(&self, mut buf: &mut Vec) { debug_assert!(!self.offsets.is_empty()); // Encode meta. @@ -173,23 +200,31 @@ impl RaftLogBatch { buf.put_u64_le(self.first_index); buf.put_u64_le(self.offsets.len() as u64); for offset in self.offsets.iter() { - buf.put_u32_le(*offset as u32); + buf.put_var_u32(*offset as u32); + } + for ctx in self.ctxs.iter() { + buf.put_length_prefixed_slice(ctx); } buf.put_u64_le(self.data_len as u64); buf.put_slice(&self.data) } /// Decode meta only. [`RaftLogBatch.data`] will be left empty. - pub fn decode(buf: &mut &[u8]) -> Self { + pub fn decode(mut buf: &mut &[u8]) -> Self { let group = buf.get_u64_le(); let term = buf.get_u64_le(); let first_index = buf.get_u64_le(); let offsets_len = buf.get_u64_le() as usize; let mut offsets = Vec::with_capacity(offsets_len); for _ in 0..offsets_len { - let offset = buf.get_u32_le() as usize; + let offset = buf.get_var_u32() as usize; offsets.push(offset); } + let mut ctxs = Vec::with_capacity(offsets_len - 1); + for _ in 0..offsets_len - 1 { + let ctx = buf.get_length_prefixed_slice(); + ctxs.push(ctx); + } let data_segment_len = buf.get_u64_le() as usize; buf.consume(data_segment_len); Self { @@ -197,6 +232,7 @@ impl RaftLogBatch { term, first_index, offsets, + ctxs, data_len: data_segment_len, data: vec![], } @@ -240,7 +276,7 @@ pub struct RaftLogBatchBuilder { } impl RaftLogBatchBuilder { - pub fn add(&mut self, group: u64, term: u64, index: u64, data: &[u8]) { + pub fn add(&mut self, group: u64, term: u64, index: u64, ctx: &[u8], data: &[u8]) { debug_assert_ne!(group, 0); debug_assert_ne!(term, 0); debug_assert_ne!(index, 0); @@ -253,6 +289,7 @@ impl RaftLogBatchBuilder { self.current.first_index = index; } self.current.offsets.push(self.current.data.len()); + self.current.ctxs.push(ctx.to_vec()); self.current.data.put_slice(data); } @@ -281,6 +318,25 @@ impl RaftLogBatchBuilder { } } +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct Truncate { + pub group: u64, + pub index: u64, +} + +impl Truncate { + pub fn encode(&self, buf: &mut Vec) { + buf.put_u64_le(self.group); + buf.put_u64_le(self.index); + } + + pub fn decode(buf: &mut &[u8]) -> Self { + let group = buf.get_u64_le(); + let index = buf.get_u64_le(); + Self { group, index } + } +} + #[derive(PartialEq, Eq, Clone, Debug)] pub struct Compact { pub group: u64, @@ -352,21 +408,22 @@ mod tests { use super::*; #[test] + #[allow(clippy::type_complexity)] fn test_log_enc_dec() { - let dataset = vec![ - (1, 1, 1, b"data-1-1-1"), - (1, 1, 2, b"data-1-1-2"), - (1, 2, 1, b"data-1-2-1"), - (1, 2, 2, b"data-1-2-2"), - (2, 1, 1, b"data-2-1-1"), - (2, 1, 2, b"data-2-1-2"), - (2, 2, 1, b"data-2-2-1"), - (2, 2, 2, b"data-2-2-2"), + let dataset: Vec<(u64, u64, u64, &'static [u8], &'static [u8])> = vec![ + (1, 1, 1, b"aaa", b"data-1-1-1"), + (1, 1, 2, b"", b"data-1-1-2"), + (1, 2, 1, b"ccc", b"data-1-2-1"), + (1, 2, 2, b"ddd", b"data-1-2-2"), + (2, 1, 1, b"", b"data-2-1-1"), + (2, 1, 2, b"fff", b"data-2-1-2"), + (2, 2, 1, b"", b"data-2-2-1"), + (2, 2, 2, b"", b"data-2-2-2"), ]; let mut builder = RaftLogBatchBuilder::default(); - for (group, term, index, data) in dataset { - builder.add(group, term, index, data); + for (group, term, index, ctx, data) in dataset { + builder.add(group, term, index, ctx, data); } let batches = builder.build(); assert_eq!(batches.len(), 4); diff --git a/storage/src/raft_log_store/log.rs b/storage/src/raft_log_store/log.rs index bb88ee7..119e66e 100644 --- a/storage/src/raft_log_store/log.rs +++ b/storage/src/raft_log_store/log.rs @@ -269,7 +269,7 @@ mod tests { for group in 1..=groups as u64 { let term = 1; for index in 1..=group_size as u64 { - builder.add(group, term, index, &data); + builder.add(group, term, index, b"some-ctx", &data); } } let batches = builder.build(); diff --git a/storage/src/raft_log_store/mem.rs b/storage/src/raft_log_store/mem.rs index f45db47..560ed28 100644 --- a/storage/src/raft_log_store/mem.rs +++ b/storage/src/raft_log_store/mem.rs @@ -13,6 +13,7 @@ pub struct EntryIndex { /// Prevent log entries with lower terms added from GC shadowing those with the same indices /// but with higher terms. pub term: u64, + pub ctx: Vec, pub file_id: u64, pub block_offset: usize, pub block_len: usize, @@ -87,24 +88,64 @@ impl MemStates { Ok(()) } - pub async fn first_index(&self, group: u64) -> Result { + pub async fn term(&self, group: u64, index: u64) -> Result> { let guard = self.states.read().await; let state = guard .get(&group) .ok_or(RaftLogStoreError::GroupNotExists(group))? .read() .await; - Ok(state.first_index) + if index < state.first_index || index >= state.first_index + state.indices.len() as u64 { + Ok(None) + } else { + let i = (index - state.first_index) as usize; + let term = state.indices[i].term; + Ok(Some(term)) + } } - pub async fn last_index(&self, group: u64) -> Result { + pub async fn ctx(&self, group: u64, index: u64) -> Result>> { let guard = self.states.read().await; let state = guard .get(&group) .ok_or(RaftLogStoreError::GroupNotExists(group))? .read() .await; - Ok(state.first_index + state.indices.len() as u64 - 1) + if index < state.first_index || index >= state.first_index + state.indices.len() as u64 { + Ok(None) + } else { + let i = (index - state.first_index) as usize; + let ctx = state.indices[i].ctx.clone(); + Ok(Some(ctx)) + } + } + + pub async fn first_index(&self, group: u64) -> Result> { + let guard = self.states.read().await; + let state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .read() + .await; + if state.indices.is_empty() { + Ok(Err(state.first_index)) + } else { + Ok(Ok(state.first_index)) + } + } + + pub async fn next_index(&self, group: u64) -> Result> { + let guard = self.states.read().await; + let state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .read() + .await; + if state.indices.is_empty() { + Ok(Err(state.first_index + state.indices.len() as u64)) + } else { + Ok(Ok(state.first_index + state.indices.len() as u64)) + } } /// Append raft log indices. @@ -171,6 +212,37 @@ impl MemStates { Ok(()) } + /// Truncate raft log of given `group` since given `index`. + pub async fn truncate(&self, group: u64, index: u64) -> Result<()> { + let guard = self.states.read().await; + let mut state = guard + .get(&group) + .ok_or(RaftLogStoreError::GroupNotExists(group))? + .write() + .await; + + if index < state.first_index { + return Err(RaftLogStoreError::RaftLogGap { + start: index, + end: state.first_index, + } + .into()); + } + + if index >= state.first_index + state.indices.len() as u64 { + return Err(RaftLogStoreError::RaftLogGap { + start: state.first_index + state.indices.len() as u64, + end: index, + } + .into()); + } + + let len = (index - state.first_index) as usize; + state.indices.truncate(len); + + Ok(()) + } + /// Compact any indices before the given index. pub async fn compact(&self, group: u64, index: u64) -> Result<()> { let guard = self.states.read().await; @@ -317,6 +389,11 @@ mod tests { assert!(states.entries(1, 250, usize::MAX).await.is_err()); assert!(states.entries(1, 401, usize::MAX).await.is_err()); + assert!(states.truncate(1, 250).await.is_err()); + assert!(states.truncate(1, 401).await.is_err()); + states.truncate(1, 301).await.unwrap(); + assert_range(&states, 1, 251..301).await; + states.remove_group(1).await.unwrap(); } @@ -355,6 +432,7 @@ mod tests { vec![ EntryIndex { term, + ctx: vec![], file_id: 1, block_offset: 0, block_len: 0, diff --git a/storage/src/raft_log_store/store.rs b/storage/src/raft_log_store/store.rs index 5c8c086..b75c4a0 100644 --- a/storage/src/raft_log_store/store.rs +++ b/storage/src/raft_log_store/store.rs @@ -1,15 +1,23 @@ use std::sync::Arc; use futures_async_stream::for_await; -use itertools::Itertools; use tracing::trace; use super::block_cache::BlockCache; -use super::entry::{Compact, Entry, Kv, RaftLogBatch}; +use super::entry::{Compact, Entry as LogEntry, Kv, RaftLogBatch, Truncate}; use super::log::{Log, LogOptions, LogRef}; use super::mem::{EntryIndex, MemStates}; use crate::error::Result; +#[derive(Clone, Debug)] +pub struct Entry { + pub group: u64, + pub term: u64, + pub index: u64, + pub ctx: Vec, + pub data: Vec, +} + #[derive(Clone, Debug)] pub struct RaftLogStoreOptions { pub log_dir_path: String, @@ -50,38 +58,43 @@ impl RaftLogStore { for item in log.replay() { let (file_id, write_offset, entry) = item?; match entry { - Entry::RaftLogBatch(batch) => { + LogEntry::RaftLogBatch(batch) => { let (data_segment_offset, data_segment_len) = batch.data_segment_location(); let group = batch.group(); let term = batch.term(); let first_index = batch.first_index(); - let locations = (0..batch.len()) - .into_iter() - .map(|i| batch.location(i)) - .collect_vec(); - let indices = locations - .into_iter() - .map(|(offset, len)| EntryIndex { + let block_offset = write_offset + data_segment_offset + 1; + let block_len = data_segment_len; + let mut indices = Vec::with_capacity(batch.len()); + for i in 0..batch.len() { + let (offset, len) = batch.location(i); + let index = EntryIndex { term, + ctx: batch.ctx(i).to_vec(), file_id, - block_offset: write_offset + data_segment_offset + 1, // `1` for entry type - block_len: data_segment_len, + block_offset, + block_len, offset, len, - }) - .collect_vec(); + }; + indices.push(index); + } states.may_add_group(group).await; states.append(group, first_index, indices).await?; } - Entry::Compact(Compact { group, index }) => { + LogEntry::Truncate(Truncate { group, index }) => { + states.may_add_group(group).await; + states.truncate(group, index).await?; + } + LogEntry::Compact(Compact { group, index }) => { states.may_add_group(group).await; states.compact(group, index).await?; } - Entry::Kv(Kv::Put { group, key, value }) => { + LogEntry::Kv(Kv::Put { group, key, value }) => { states.may_add_group(group).await; states.put(group, key, value).await?; } - Entry::Kv(Kv::Delete { group, key }) => { + LogEntry::Kv(Kv::Delete { group, key }) => { states.may_add_group(group).await; states.delete(group, key).await?; } @@ -117,57 +130,97 @@ impl RaftLogStore { let group = batch.group(); let term = batch.term(); let first_index = batch.first_index(); - let locations = (0..batch.len()) - .into_iter() - .map(|i| batch.location(i)) - .collect_vec(); - - let entry = Entry::RaftLogBatch(batch); - let (file_id, write_offset, _write_len) = self.core.log.push(entry).await?; - let indices = locations - .into_iter() - .map(|(offset, len)| EntryIndex { + let mut indices = Vec::with_capacity(batch.len()); + for i in 0..batch.len() { + let (offset, len) = batch.location(i); + let index = EntryIndex { term, - file_id, - block_offset: write_offset + data_segment_offset + 1, // `1` for entry type - block_len: data_segment_len, + ctx: batch.ctx(i).to_vec(), + file_id: 0, + block_offset: 0, + block_len: 0, offset, len, - }) - .collect_vec(); + }; + indices.push(index); + } + + let entry = LogEntry::RaftLogBatch(batch); + let (file_id, write_offset, _write_len) = self.core.log.push(entry).await?; + + let block_offset = write_offset + data_segment_offset + 1; + let block_len = data_segment_len; + for index in indices.iter_mut() { + index.file_id = file_id; + index.block_offset = block_offset; + index.block_len = block_len; + } self.core.states.append(group, first_index, indices).await?; Ok(()) } + /// Truncate raft log of given `group` since given `index`. + pub async fn truncate(&self, group: u64, index: u64) -> Result<()> { + self.core + .log + .push(LogEntry::Truncate(Truncate { group, index })) + .await?; + self.core.states.truncate(group, index).await?; + Ok(()) + } + /// Mark all raft log entries before given `index` of the given `group` can be safely deleted. pub async fn compact(&self, group: u64, index: u64) -> Result<()> { self.core .log - .push(Entry::Compact(Compact { group, index })) + .push(LogEntry::Compact(Compact { group, index })) .await?; self.core.states.compact(group, index).await?; Ok(()) } /// Get raft log entries from [`RaftLogStore`]. - pub async fn entries(&self, group: u64, index: u64, max_len: usize) -> Result>> { + pub async fn entries(&self, group: u64, index: u64, max_len: usize) -> Result> { let indices = self.core.states.entries(group, index, max_len).await?; // TODO: Use concurrent operation? let mut entries = Vec::with_capacity(indices.len()); - for index in indices { - let entry = self.entry(index).await?; + for (i, ei) in indices.into_iter().enumerate() { + let data = self.entry_data(&ei).await?; + let entry = Entry { + group, + term: ei.term, + index: index + i as u64, + ctx: ei.ctx, + data, + }; entries.push(entry); } Ok(entries) } + pub async fn term(&self, group: u64, index: u64) -> Result> { + self.core.states.term(group, index).await + } + + pub async fn ctx(&self, group: u64, index: u64) -> Result>> { + self.core.states.ctx(group, index).await + } + + pub async fn first_index(&self, group: u64) -> Result> { + self.core.states.first_index(group).await + } + + pub async fn next_index(&self, group: u64) -> Result> { + self.core.states.next_index(group).await + } + pub async fn put(&self, group: u64, key: Vec, value: Vec) -> Result<()> { self.core .log - .push(Entry::Kv(Kv::Put { + .push(LogEntry::Kv(Kv::Put { group, key: key.clone(), value: value.clone(), @@ -180,7 +233,7 @@ impl RaftLogStore { pub async fn delete(&self, group: u64, key: Vec) -> Result<()> { self.core .log - .push(Entry::Kv(Kv::Delete { + .push(LogEntry::Kv(Kv::Delete { group, key: key.clone(), })) @@ -195,7 +248,7 @@ impl RaftLogStore { } impl RaftLogStore { - async fn entry(&self, index: EntryIndex) -> Result> { + async fn entry_data(&self, index: &EntryIndex) -> Result> { trace!("read entry: {:?}", index); let log = self.core.log.clone(); let index_clone = index.clone(); @@ -224,6 +277,7 @@ impl RaftLogStore { #[cfg(test)] mod tests { + use itertools::Itertools; use test_log::test; use super::*; @@ -242,7 +296,7 @@ mod tests { let mut builder = RaftLogBatchBuilder::default(); for group in 1..=4 { for index in 1..=16 { - builder.add(group, 1, index, &data(group, 1, index)); + builder.add(group, 1, index, b"some-ctx", &data(group, 1, index)); } } let batches = builder.build(); @@ -268,7 +322,7 @@ mod tests { for group in 1..=4 { let entries = store.entries(group, 1, usize::MAX).await.unwrap(); assert_eq!( - entries, + entries.into_iter().map(|entry| entry.data).collect_vec(), (1..=16) .into_iter() .map(|index| data(group, 1, index)) @@ -282,7 +336,7 @@ mod tests { for group in 1..=4 { let entries = store.entries(group, 1, usize::MAX).await.unwrap(); assert_eq!( - entries, + entries.into_iter().map(|entry| entry.data).collect_vec(), (1..=16) .into_iter() .map(|index| data(group, 1, index)) @@ -297,7 +351,7 @@ mod tests { assert!(store.entries(group, 8, usize::MAX).await.is_err()); let entries = store.entries(group, 9, usize::MAX).await.unwrap(); assert_eq!( - entries, + entries.into_iter().map(|entry| entry.data).collect_vec(), (9..=16) .into_iter() .map(|index| data(group, 1, index)) @@ -312,13 +366,44 @@ mod tests { assert!(store.entries(group, 8, usize::MAX).await.is_err()); let entries = store.entries(group, 9, usize::MAX).await.unwrap(); assert_eq!( - entries, + entries.into_iter().map(|entry| entry.data).collect_vec(), (9..=16) .into_iter() .map(|index| data(group, 1, index)) .collect_vec() ); } + + for group in 1..=4 { + store.truncate(group, 11).await.unwrap(); + } + + for group in 1..=4 { + assert!(store.entries(group, 8, usize::MAX).await.is_err()); + let entries = store.entries(group, 9, usize::MAX).await.unwrap(); + assert_eq!( + entries.into_iter().map(|entry| entry.data).collect_vec(), + (9..=10) + .into_iter() + .map(|index| data(group, 1, index)) + .collect_vec() + ); + } + + drop(store); + let store = RaftLogStore::open(options.clone()).await.unwrap(); + assert_eq!(store.core.log.frozen_file_count().await, 7); + for group in 1..=4 { + assert!(store.entries(group, 8, usize::MAX).await.is_err()); + let entries = store.entries(group, 9, usize::MAX).await.unwrap(); + assert_eq!( + entries.into_iter().map(|entry| entry.data).collect_vec(), + (9..=10) + .into_iter() + .map(|index| data(group, 1, index)) + .collect_vec() + ); + } } #[test(tokio::test)] diff --git a/storage/src/utils/coding.rs b/storage/src/utils/coding.rs index d61737c..317bcbb 100644 --- a/storage/src/utils/coding.rs +++ b/storage/src/utils/coding.rs @@ -1,3 +1,4 @@ +use std::io::Read; use std::{cmp, ptr}; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -44,6 +45,12 @@ pub trait BufMutExt: BufMut { self.put_u8((n >> 28) as u8); } } + + fn put_length_prefixed_slice(&mut self, slice: &[u8]) { + let len = slice.len() as u32; + self.put_var_u32(len); + self.put_slice(slice); + } } pub trait BufExt: Buf { @@ -62,6 +69,14 @@ pub trait BufExt: Buf { } n } + + fn get_length_prefixed_slice(&mut self) -> Vec { + let len = self.get_var_u32() as usize; + let mut reader = self.reader(); + let mut slice = vec![0; len]; + reader.read_exact(&mut slice).unwrap(); + slice + } } impl BufMutExt for &mut T {} diff --git a/wheel/Cargo.toml b/wheel/Cargo.toml index 3120f25..9057946 100644 --- a/wheel/Cargo.toml +++ b/wheel/Cargo.toml @@ -7,13 +7,15 @@ edition = "2021" [dependencies] anyhow = "1.0" async-trait = "0.1" +bincode = "1.3.3" bytes = "1" bytesize = { version = "1.1.0", features = ["serde"] } clap = { version = "3.1.6", features = ["derive"] } humantime = "2.1.0" humantime-serde = "1.1.1" itertools = "0.10.3" -openraft = { git = "https://github.com/datafuselabs/openraft", rev = "61c0f4a0fe10ab4e16c8530a52e8bddcd9bb87b1" } +lol-core = "0.9" +openraft = { git = "https://github.com/datafuselabs/openraft", rev = "8d3d1285aa93ed00722da4a898adf390ea6f1e82" } parking_lot = "0.12" prost = "0.9" runkv-common = { path = "../common" } diff --git a/wheel/src/error.rs b/wheel/src/error.rs index da275df..7012d8c 100644 --- a/wheel/src/error.rs +++ b/wheel/src/error.rs @@ -10,21 +10,27 @@ pub enum Error { TransportError(#[from] tonic::transport::Error), #[error("rpc status error: {0}")] RpcStatus(#[from] Status), + #[error("serde error: {0}")] + SerdeError(String), #[error("other: {0}")] Other(String), } impl Error { pub fn err(e: impl Into>) -> Error { - Error::Other(e.into().to_string()) + Self::Other(e.into().to_string()) } pub fn config_err(e: impl Into>) -> Error { - Error::ConfigError(e.into().to_string()) + Self::ConfigError(e.into().to_string()) } pub fn storage_err(e: runkv_storage::Error) -> Error { - Error::StorageError(e) + Self::StorageError(e) + } + + pub fn serde_err(e: impl Into>) -> Error { + Self::SerdeError(e.into().to_string()) } } diff --git a/wheel/src/storage/mod.rs b/wheel/src/storage/mod.rs index 475b4c9..ac22c4e 100644 --- a/wheel/src/storage/mod.rs +++ b/wheel/src/storage/mod.rs @@ -1,4 +1,2 @@ pub mod lsm_tree; - -#[allow(dead_code)] pub mod raft_log_store; diff --git a/wheel/src/storage/raft_log_store.rs b/wheel/src/storage/raft_log_store.rs index 8d20d40..a892fb8 100644 --- a/wheel/src/storage/raft_log_store.rs +++ b/wheel/src/storage/raft_log_store.rs @@ -1,8 +1,22 @@ use std::io::Cursor; use async_trait::async_trait; +use bytes::{Buf, BufMut}; +use runkv_storage::raft_log_store::entry::RaftLogBatchBuilder; use runkv_storage::raft_log_store::RaftLogStore; use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; + +use crate::error::{Error, Result}; + +const VOTE_KEY: &[u8] = b"vote"; +const MEMBERSHIP_KEY: &[u8] = b"membership"; +const APPLIED_STATE_KEY: &[u8] = b"applied_state"; +const SNAPSHOT_META: &[u8] = b"snapshot_meta"; +const SNAPSHOT_DATA: &[u8] = b"snapshot_data"; +const PRUGE_LOG_ID_KEY: &[u8] = b"purge_log_id"; + +const DEFAULT_SNAPSHOT_BUFFER_CAPACITY: usize = 64 << 10; pub type RaftNodeId = u64; @@ -16,6 +30,7 @@ openraft::declare_raft_types!( pub RaftTypeConfig: D = RaftRequest, R = RaftResponse, NodeId = RaftNodeId ); +#[derive(Clone)] pub struct RaftGroupLogStore { group: u64, core: RaftLogStore, @@ -27,6 +42,39 @@ impl RaftGroupLogStore { } } +pub fn storage_io_error( + e: Error, + subject: openraft::ErrorSubject<::NodeId>, + verb: openraft::ErrorVerb, +) -> openraft::StorageError<::NodeId> { + openraft::StorageIOError::new(subject, verb, openraft::AnyError::new(&e)).into() +} + +impl RaftGroupLogStore { + async fn apply(&self, entry: &openraft::raft::Entry) -> Result { + let resp = match entry.payload { + openraft::raft::EntryPayload::Blank => RaftResponse {}, + openraft::raft::EntryPayload::Normal(ref _request) => { + // TODO: impl me!! + // TODO: impl me!! + // TODO: impl me!! + RaftResponse {} + } + openraft::raft::EntryPayload::Membership(ref membership) => { + let effective_membership = + openraft::EffectiveMembership::new(Some(entry.log_id), membership.to_owned()); + let value = bincode::serialize(&effective_membership).map_err(Error::serde_err)?; + self.core + .put(self.group, MEMBERSHIP_KEY.to_vec(), value) + .await + .map_err(Error::storage_err)?; + RaftResponse {} + } + }; + Ok(resp) + } +} + #[async_trait] impl openraft::RaftStorage for RaftGroupLogStore { type SnapshotData = Cursor>; @@ -37,122 +85,455 @@ impl openraft::RaftStorage for RaftGroupLogStore { async fn save_vote( &mut self, - _vote: &openraft::Vote<::NodeId>, - ) -> Result<(), openraft::StorageError<::NodeId>> - { - todo!() + vote: &openraft::Vote<::NodeId>, + ) -> core::result::Result< + (), + openraft::StorageError<::NodeId>, + > { + let err = |e| storage_io_error(e, openraft::ErrorSubject::Vote, openraft::ErrorVerb::Write); + let value = bincode::serialize(&vote) + .map_err(Error::serde_err) + .map_err(err)?; + self.core + .put(self.group, VOTE_KEY.to_vec(), value) + .await + .map_err(Error::storage_err) + .map_err(err)?; + Ok(()) } async fn read_vote( &mut self, - ) -> Result< + ) -> core::result::Result< Option::NodeId>>, openraft::StorageError<::NodeId>, > { - todo!() + let err = |e| storage_io_error(e, openraft::ErrorSubject::Vote, openraft::ErrorVerb::Read); + let buf = self + .core + .get(self.group, VOTE_KEY.to_vec()) + .await + .map_err(Error::storage_err) + .map_err(err)?; + match buf { + Some(buf) => bincode::deserialize(&buf) + .map_err(Error::serde_err) + .map_err(err), + None => Ok(None), + } } + /// Get the log reader. + /// + /// The method is intentionally async to give the implementation a chance to use asynchronous + /// sync primitives to serialize access to the common internal object, if needed. async fn get_log_reader(&mut self) -> Self::LogReader { - todo!() + self.clone() } + /// Append a payload of entries to the log. + /// + /// Though the entries will always be presented in order, each entry's index should be used to + /// determine its location to be written in the log. async fn append_to_log( &mut self, - _entries: &[&openraft::raft::Entry], - ) -> Result<(), openraft::StorageError<::NodeId>> - { - todo!() + entries: &[&openraft::raft::Entry], + ) -> core::result::Result< + (), + openraft::StorageError<::NodeId>, + > { + let err = |e| storage_io_error(e, openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write); + + let mut builder = RaftLogBatchBuilder::default(); + for entry in entries.iter() { + let data = bincode::serialize(&entry.payload) + .map_err(Error::serde_err) + .map_err(err)?; + let mut ctx = Vec::with_capacity(8); + ctx.put_u64_le(entry.log_id.leader_id.node_id); + builder.add( + self.group, + entry.log_id.leader_id.term, + entry.log_id.index, + &ctx, + &data, + ); + } + Ok(()) } + /// Delete conflict log entries since `log_id`, inclusive. async fn delete_conflict_logs_since( &mut self, - _log_id: openraft::LogId<::NodeId>, - ) -> Result<(), openraft::StorageError<::NodeId>> - { - todo!() + log_id: openraft::LogId<::NodeId>, + ) -> core::result::Result< + (), + openraft::StorageError<::NodeId>, + > { + let err = |e, id| { + storage_io_error( + e, + openraft::ErrorSubject::Log(id), + openraft::ErrorVerb::Write, + ) + }; + + self.core + .truncate(self.group, log_id.index) + .await + .map_err(Error::storage_err) + .map_err(|e| err(e, log_id))?; + Ok(()) } + /// Delete applied log entries upto `log_id`, inclusive. async fn purge_logs_upto( &mut self, - _log_id: openraft::LogId<::NodeId>, - ) -> Result<(), openraft::StorageError<::NodeId>> - { - todo!() + log_id: openraft::LogId<::NodeId>, + ) -> core::result::Result< + (), + openraft::StorageError<::NodeId>, + > { + let err = |e, id| { + storage_io_error( + e, + openraft::ErrorSubject::Log(id), + openraft::ErrorVerb::Write, + ) + }; + + // Record purge log id. + let value = bincode::serialize(&log_id) + .map_err(Error::serde_err) + .map_err(|e| err(e, log_id))?; + self.core + .put(self.group, PRUGE_LOG_ID_KEY.to_vec(), value) + .await + .map_err(Error::storage_err) + .map_err(|e| err(e, log_id))?; + + // Perform log compact. + self.core + .compact(self.group, log_id.index + 1) + .await + .map_err(Error::storage_err) + .map_err(|e| err(e, log_id))?; + + Ok(()) } + /// Returns the last applied log id which is recorded in state machine, and the last applied + /// membership log id and membership config. + // NOTE: This can be made into sync, provided all state machines will use atomic read or the + // like. async fn last_applied_state( &mut self, - ) -> Result< + ) -> core::result::Result< ( Option::NodeId>>, - openraft::EffectiveMembership, + openraft::EffectiveMembership<::NodeId>, ), openraft::StorageError<::NodeId>, > { - todo!() + let err = + |e| storage_io_error(e, openraft::ErrorSubject::Store, openraft::ErrorVerb::Write); + + let applied_state = self + .core + .get(self.group, APPLIED_STATE_KEY.to_vec()) + .await + .map_err(Error::storage_err) + .map_err(err)?; + let applied_state = match applied_state { + None => None, + Some(raw) => bincode::deserialize(&raw) + .map_err(Error::serde_err) + .map_err(err)?, + }; + + let membership = self + .core + .get(self.group, MEMBERSHIP_KEY.to_vec()) + .await + .map_err(Error::storage_err) + .map_err(err)?; + let membership = membership.ok_or_else(|| { + err(Error::storage_err(runkv_storage::Error::Other(format!( + "membership not found in raft log store, group: {}", + self.group + )))) + })?; + let membership = bincode::deserialize(&membership) + .map_err(Error::serde_err) + .map_err(err)?; + Ok((applied_state, membership)) } + /// Apply the given payload of entries to the state machine. + /// + /// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which + /// have been replicated to a quorum of the cluster, will be applied to the state machine. + /// + /// This is where the business logic of interacting with your application's state machine + /// should live. This is 100% application specific. Perhaps this is where an application + /// specific transaction is being started, or perhaps committed. This may be where a key/value + /// is being stored. + /// + /// An impl should do: + /// - Store the last applied log id. + /// - Deal with the EntryPayload::Normal() log, which is business logic log. + /// - Deal with EntryPayload::Membership, store the membership config. + // TODO The reply should happen asynchronously, somehow. Make this method synchronous and + // instead of using the result, pass a channel where to post the completion. The Raft core can + // then collect completions on this channel and update the client with the result once all + // the preceding operations have been applied to the state machine. This way we'll reach + // operation pipelining w/o the need to wait for the completion of each operation inline. async fn apply_to_state_machine( &mut self, - _entries: &[&openraft::raft::Entry], - ) -> Result< + entries: &[&openraft::raft::Entry], + ) -> core::result::Result< Vec<::R>, openraft::StorageError<::NodeId>, > { - todo!() + let err = |e| storage_io_error(e, openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write); + + let mut resps = Vec::with_capacity(entries.len()); + for entry in entries.iter() { + let resp = self.apply(entry).await.map_err(err)?; + resps.push(resp); + } + if let Some(&entry) = entries.last() { + let value = bincode::serialize(entry) + .map_err(Error::serde_err) + .map_err(err)?; + self.core + .put(self.group, APPLIED_STATE_KEY.to_vec(), value) + .await + .map_err(Error::storage_err) + .map_err(err)?; + } + Ok(resps) } + /// Get the snapshot builder for the state machine. + /// + /// The method is intentionally async to give the implementation a chance to use asynchronous + /// sync primitives to serialize access to the common internal object, if needed. async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { - todo!() + self.clone() } + /// Create a new blank snapshot, returning a writable handle to the snapshot object. + /// + /// Raft will use this handle to receive snapshot data. + /// + /// ### implementation guide + /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) + /// for details on log compaction / snapshotting. async fn begin_receiving_snapshot( &mut self, - ) -> Result< + ) -> core::result::Result< Box, openraft::StorageError<::NodeId>, > { - todo!() + Ok(Box::new(Cursor::new(Vec::new()))) } + /// Install a snapshot which has finished streaming from the cluster leader. + /// + /// All other snapshots should be deleted at this point. + /// + /// ### snapshot + /// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the + /// snapshot. async fn install_snapshot( &mut self, - _meta: &openraft::SnapshotMeta<::NodeId>, - _snapshot: Box, - ) -> Result< + meta: &openraft::SnapshotMeta<::NodeId>, + mut snapshot: Box, + ) -> core::result::Result< openraft::StateMachineChanges, openraft::StorageError<::NodeId>, > { - todo!() + let err = |e| { + storage_io_error( + e, + openraft::ErrorSubject::Snapshot(meta.to_owned()), + openraft::ErrorVerb::Write, + ) + }; + + // TODO: impl me!! + // TODO: impl me!! + // TODO: impl me!! + + let buf_meta = bincode::serialize(meta) + .map_err(Error::serde_err) + .map_err(err)?; + self.core + .put(self.group, SNAPSHOT_META.to_vec(), buf_meta) + .await + .map_err(Error::storage_err) + .map_err(err)?; + + let mut buf_data = Vec::with_capacity(DEFAULT_SNAPSHOT_BUFFER_CAPACITY); + snapshot + .seek(std::io::SeekFrom::Start(0)) + .await + .map_err(Error::err) + .map_err(err)?; + snapshot + .read_to_end(&mut buf_data) + .await + .map_err(Error::err) + .map_err(err)?; + self.core + .put(self.group, SNAPSHOT_DATA.to_vec(), buf_data) + .await + .map_err(Error::storage_err) + .map_err(err)?; + + Ok(openraft::StateMachineChanges { + last_applied: meta.last_log_id, + is_snapshot: true, + }) } + /// Get a readable handle to the current snapshot, along with its metadata. + /// + /// ### implementation algorithm + /// Implementing this method should be straightforward. Check the configured snapshot + /// directory for any snapshot files. A proper implementation will only ever have one + /// active snapshot, though another may exist while it is being created. As such, it is + /// recommended to use a file naming pattern which will allow for easily distinguishing between + /// the current live snapshot, and any new snapshot which is being created. + /// + /// A proper snapshot implementation will store the term, index and membership config as part + /// of the snapshot, which should be decoded for creating this method's response data. async fn get_current_snapshot( &mut self, - ) -> Result< + ) -> core::result::Result< Option>, openraft::StorageError<::NodeId>, > { - todo!() + let err = + |e| storage_io_error(e, openraft::ErrorSubject::Store, openraft::ErrorVerb::Write); + + let meta = self + .core + .get(self.group, SNAPSHOT_META.to_vec()) + .await + .map_err(Error::storage_err) + .map_err(err)?; + + let meta = match meta { + None => return Ok(None), + Some(meta) => bincode::deserialize(&meta) + .map_err(Error::serde_err) + .map_err(err)?, + }; + + let data = self + .core + .get(self.group, SNAPSHOT_DATA.to_vec()) + .await + .map_err(Error::storage_err) + .map_err(err)?; + let data = data.ok_or_else(|| { + err(Error::storage_err(runkv_storage::Error::Other(format!( + "snapshot data not found in raft log store, group: {}", + self.group + )))) + })?; + + Ok(Some(openraft::storage::Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + })) } } #[async_trait] impl openraft::RaftLogReader for RaftGroupLogStore { + /// Returns the last deleted log id and the last log id. + /// + /// The impl should not consider the applied log id in state machine. + /// The returned `last_log_id` could be the log id of the last present log entry, or the + /// `last_purged_log_id` if there is no entry at all. + // NOTE: This can be made into sync, provided all state machines will use atomic read or the + // like. async fn get_log_state( &mut self, - ) -> Result< + ) -> core::result::Result< openraft::storage::LogState, openraft::StorageError<::NodeId>, > { - todo!() + let err = |e| storage_io_error(e, openraft::ErrorSubject::Store, openraft::ErrorVerb::Read); + + // Get purge log id. + let purge_log_id = self + .core + .get(self.group, PRUGE_LOG_ID_KEY.to_vec()) + .await + .map_err(Error::storage_err) + .map_err(err)?; + let purge_log_id = match purge_log_id { + None => None, + Some(purge_log_id) => bincode::deserialize(&purge_log_id) + .map_err(Error::serde_err) + .map_err(err)?, + }; + + // Get last log id. + let next_index = self + .core + .next_index(self.group) + .await + .map_err(Error::storage_err) + .map_err(err)?; + let last_log_id = match next_index { + Ok(next_index) => { + let last_index = next_index - 1; + let term = self + .core + .term(self.group, last_index) + .await + .map_err(Error::storage_err) + .map_err(err)? + .unwrap(); + let ctx = self + .core + .ctx(self.group, last_index) + .await + .map_err(Error::storage_err) + .map_err(err)? + .unwrap(); + let node_id = (&ctx[..]).get_u64_le(); + Some(openraft::LogId { + leader_id: openraft::LeaderId:: { term, node_id }, + index: last_index, + }) + } + Err(_) => purge_log_id, + }; + + Ok(openraft::storage::LogState { + last_purged_log_id: purge_log_id, + last_log_id, + }) } + /// Get a series of log entries from storage. + /// + /// The start value is inclusive in the search and the stop value is non-inclusive: `[start, + /// stop)`. + /// + /// Entry that is not found is allowed. async fn try_get_log_entries< RB: std::ops::RangeBounds + Clone + std::fmt::Debug + Send + Sync, >( &mut self, _range: RB, - ) -> Result< + ) -> core::result::Result< Vec>, openraft::StorageError<::NodeId>, > { @@ -161,9 +542,17 @@ impl openraft::RaftLogReader for RaftGroupLogStore { } #[async_trait] impl openraft::RaftSnapshotBuilder>> for RaftGroupLogStore { + /// Build snapshot + /// + /// A snapshot has to contain information about exactly all logs up to the last applied. + /// + /// Building snapshot can be done by: + /// - Performing log compaction, e.g. merge log entries that operates on the same key, like a + /// LSM-tree does, + /// - or by fetching a snapshot from the state machine. async fn build_snapshot( &mut self, - ) -> Result< + ) -> core::result::Result< openraft::storage::Snapshot>>, openraft::StorageError<::NodeId>, > { From 1eb6119b3465b78ab382b3ba028ca69577785bb0 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 11 Apr 2022 10:56:31 +0800 Subject: [PATCH 2/2] remove lol-core deps --- Cargo.lock | 131 ----------------------------------------------- wheel/Cargo.toml | 1 - 2 files changed, 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4877dd1..b50ce52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,9 +467,6 @@ name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" -dependencies = [ - "serde", -] [[package]] name = "bytes-utils" @@ -682,72 +679,6 @@ dependencies = [ "sct", ] -[[package]] -name = "darling" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f2c43f534ea4b0b049015d00269734195e6d3f0f6635cb692251aca6f9f8b3c" -dependencies = [ - "darling_core", - "darling_macro", -] - -[[package]] -name = "darling_core" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e91455b86830a1c21799d94524df0845183fa55bafd9aa137b01c7d1065fa36" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim", - "syn", -] - -[[package]] -name = "darling_macro" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29b5acf0dea37a7f66f7b25d2c5e93fd46f8f6968b1a5d7a3e02e97768afc95a" -dependencies = [ - "darling_core", - "quote", - "syn", -] - -[[package]] -name = "derive_builder" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d13202debe11181040ae9063d739fa32cfcaaebe2275fe387703460ae2365b30" -dependencies = [ - "derive_builder_macro", -] - -[[package]] -name = "derive_builder_core" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66e616858f6187ed828df7c64a6d71720d83767a7f19740b2d1b6fe6327b36e5" -dependencies = [ - "darling", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "derive_builder_macro" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58a94ace95092c5acb1e97a7e846b310cfbd499652f72297da7493f618a98d73" -dependencies = [ - "derive_builder_core", - "syn", -] - [[package]] name = "derive_more" version = "0.99.17" @@ -1052,16 +983,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-serde" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d98b3d9662de70952b14c4840ee0f37e23973542a363e2275f4b9d024ff6cca" -dependencies = [ - "http", - "serde", -] - [[package]] name = "httparse" version = "1.6.0" @@ -1143,12 +1064,6 @@ dependencies = [ "tokio-io-timeout", ] -[[package]] -name = "ident_case" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" - [[package]] name = "indexmap" version = "1.8.0" @@ -1222,35 +1137,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "lol-core" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5598e6305939e7f5a0bda6c19f0113f9d58c591dad99ef8e9c82493954a5021" -dependencies = [ - "anyhow", - "async-stream", - "async-trait", - "bincode", - "bytes", - "derive_builder", - "derive_more", - "futures", - "http-serde", - "log", - "phi-detector", - "prost", - "prost-build", - "rand", - "serde", - "serde_bytes", - "tokio", - "tokio-stream", - "tokio-util 0.6.9", - "tonic", - "tonic-build", -] - [[package]] name = "lz4" version = "1.23.3" @@ -1550,12 +1436,6 @@ dependencies = [ "indexmap", ] -[[package]] -name = "phi-detector" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bfc027b03922a25a425d0c10acb06f8091b563f9d62420e6ecad87ef8ccdf72" - [[package]] name = "pin-project" version = "1.0.10" @@ -1972,7 +1852,6 @@ dependencies = [ "humantime", "humantime-serde", "itertools", - "lol-core", "openraft", "parking_lot 0.12.0", "prost", @@ -2115,15 +1994,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_bytes" -version = "0.11.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" -dependencies = [ - "serde", -] - [[package]] name = "serde_derive" version = "1.0.136" @@ -2400,7 +2270,6 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "slab", "tokio", ] diff --git a/wheel/Cargo.toml b/wheel/Cargo.toml index 9057946..29666d2 100644 --- a/wheel/Cargo.toml +++ b/wheel/Cargo.toml @@ -14,7 +14,6 @@ clap = { version = "3.1.6", features = ["derive"] } humantime = "2.1.0" humantime-serde = "1.1.1" itertools = "0.10.3" -lol-core = "0.9" openraft = { git = "https://github.com/datafuselabs/openraft", rev = "8d3d1285aa93ed00722da4a898adf390ea6f1e82" } parking_lot = "0.12" prost = "0.9"