Skip to content

Commit

Permalink
feat: impl raft log store (part 5) (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored Apr 11, 2022
1 parent d434dff commit 4053d8e
Show file tree
Hide file tree
Showing 10 changed files with 755 additions and 116 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 79 additions & 22 deletions storage/src/raft_log_store/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ use runkv_common::coding::CompressionAlgorithm;
use super::DEFAULT_LOG_BATCH_SIZE;
use crate::error::Result;
use crate::raft_log_store::error::RaftLogStoreError;
use crate::utils::{crc32check, crc32sum, get_length_prefixed_slice, put_length_prefixed_slice};
use crate::utils::{
crc32check, crc32sum, get_length_prefixed_slice, put_length_prefixed_slice, var_u32_len,
BufExt, BufMutExt,
};

#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Entry {
RaftLogBatch(RaftLogBatch),
Truncate(Truncate),
Compact(Compact),
Kv(Kv),
}
Expand Down Expand Up @@ -41,12 +45,16 @@ impl Entry {
buf.put_u8(0);
batch.encode(buf);
}
Self::Compact(compact) => {
Self::Truncate(truncate) => {
buf.put_u8(1);
truncate.encode(buf);
}
Self::Compact(compact) => {
buf.put_u8(2);
compact.encode(buf);
}
Self::Kv(kv) => {
buf.put_u8(2);
buf.put_u8(3);
kv.encode(buf);
}
}
Expand All @@ -55,8 +63,9 @@ impl Entry {
pub fn decode(buf: &mut &[u8]) -> Self {
match buf.get_u8() {
0 => Self::RaftLogBatch(RaftLogBatch::decode(buf)),
1 => Self::Compact(Compact::decode(buf)),
2 => Self::Kv(Kv::decode(buf)),
1 => Self::Truncate(Truncate::decode(buf)),
2 => Self::Compact(Compact::decode(buf)),
3 => Self::Kv(Kv::decode(buf)),
_ => unreachable!(),
}
}
Expand All @@ -68,6 +77,7 @@ pub struct RaftLogBatch {
term: u64,
first_index: u64,
offsets: Vec<usize>,
ctxs: Vec<Vec<u8>>,
data_len: usize,
/// Note: Only used for encoding.
data: Vec<u8>,
Expand All @@ -91,6 +101,7 @@ impl Default for RaftLogBatch {
term: 0,
first_index: 0,
offsets: vec![],
ctxs: vec![],
data_len: 0,
data: Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE),
}
Expand All @@ -116,7 +127,17 @@ impl RaftLogBatch {
}

pub fn data_segment_location(&self) -> (usize, usize) {
let offset = 8 + 8 + 8 + 8 + self.offsets.len() * 4 + 8;
let offset = 8 // group
+ 8 // term
+ 8 // first index
+ 8 // N+1
+ self
.offsets
.iter()
.map(|offset| var_u32_len(*offset as u32))
.sum::<usize>()
+ self.ctxs.iter().map(|ctx| var_u32_len(ctx.len() as u32) + ctx.len()).sum::<usize>()
+ 8; // data segment len
let len = self.data_len;
(offset, len)
}
Expand All @@ -128,6 +149,11 @@ impl RaftLogBatch {
(offset, len)
}

pub fn ctx(&self, index: usize) -> &[u8] {
debug_assert!(index < self.len());
&self.ctxs[index]
}

/// Convert raw data to encoded data.
///
/// Format:
Expand Down Expand Up @@ -161,10 +187,11 @@ impl RaftLogBatch {
///
/// ```plain
/// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 (4B) | ... | offset (N-1) | offset N (phantom) |
/// | ctx 0 | .. | ctx (N-1) | # ctx is a length prefixed buffer
/// | data segment len (8B) | data block (compressed) | compression algorithm (1B) | crc32sum (4B) |
/// | <---------- data segment ------------------------------------------->|
/// ```
fn encode(&self, buf: &mut Vec<u8>) {
fn encode(&self, mut buf: &mut Vec<u8>) {
debug_assert!(!self.offsets.is_empty());

// Encode meta.
Expand All @@ -173,30 +200,39 @@ impl RaftLogBatch {
buf.put_u64_le(self.first_index);
buf.put_u64_le(self.offsets.len() as u64);
for offset in self.offsets.iter() {
buf.put_u32_le(*offset as u32);
buf.put_var_u32(*offset as u32);
}
for ctx in self.ctxs.iter() {
buf.put_length_prefixed_slice(ctx);
}
buf.put_u64_le(self.data_len as u64);
buf.put_slice(&self.data)
}

/// Decode meta only. [`RaftLogBatch.data`] will be left empty.
pub fn decode(buf: &mut &[u8]) -> Self {
pub fn decode(mut buf: &mut &[u8]) -> Self {
let group = buf.get_u64_le();
let term = buf.get_u64_le();
let first_index = buf.get_u64_le();
let offsets_len = buf.get_u64_le() as usize;
let mut offsets = Vec::with_capacity(offsets_len);
for _ in 0..offsets_len {
let offset = buf.get_u32_le() as usize;
let offset = buf.get_var_u32() as usize;
offsets.push(offset);
}
let mut ctxs = Vec::with_capacity(offsets_len - 1);
for _ in 0..offsets_len - 1 {
let ctx = buf.get_length_prefixed_slice();
ctxs.push(ctx);
}
let data_segment_len = buf.get_u64_le() as usize;
buf.consume(data_segment_len);
Self {
group,
term,
first_index,
offsets,
ctxs,
data_len: data_segment_len,
data: vec![],
}
Expand Down Expand Up @@ -240,7 +276,7 @@ pub struct RaftLogBatchBuilder {
}

impl RaftLogBatchBuilder {
pub fn add(&mut self, group: u64, term: u64, index: u64, data: &[u8]) {
pub fn add(&mut self, group: u64, term: u64, index: u64, ctx: &[u8], data: &[u8]) {
debug_assert_ne!(group, 0);
debug_assert_ne!(term, 0);
debug_assert_ne!(index, 0);
Expand All @@ -253,6 +289,7 @@ impl RaftLogBatchBuilder {
self.current.first_index = index;
}
self.current.offsets.push(self.current.data.len());
self.current.ctxs.push(ctx.to_vec());
self.current.data.put_slice(data);
}

Expand Down Expand Up @@ -281,6 +318,25 @@ impl RaftLogBatchBuilder {
}
}

#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Truncate {
pub group: u64,
pub index: u64,
}

impl Truncate {
pub fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u64_le(self.group);
buf.put_u64_le(self.index);
}

pub fn decode(buf: &mut &[u8]) -> Self {
let group = buf.get_u64_le();
let index = buf.get_u64_le();
Self { group, index }
}
}

#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Compact {
pub group: u64,
Expand Down Expand Up @@ -352,21 +408,22 @@ mod tests {
use super::*;

#[test]
#[allow(clippy::type_complexity)]
fn test_log_enc_dec() {
let dataset = vec![
(1, 1, 1, b"data-1-1-1"),
(1, 1, 2, b"data-1-1-2"),
(1, 2, 1, b"data-1-2-1"),
(1, 2, 2, b"data-1-2-2"),
(2, 1, 1, b"data-2-1-1"),
(2, 1, 2, b"data-2-1-2"),
(2, 2, 1, b"data-2-2-1"),
(2, 2, 2, b"data-2-2-2"),
let dataset: Vec<(u64, u64, u64, &'static [u8], &'static [u8])> = vec![
(1, 1, 1, b"aaa", b"data-1-1-1"),
(1, 1, 2, b"", b"data-1-1-2"),
(1, 2, 1, b"ccc", b"data-1-2-1"),
(1, 2, 2, b"ddd", b"data-1-2-2"),
(2, 1, 1, b"", b"data-2-1-1"),
(2, 1, 2, b"fff", b"data-2-1-2"),
(2, 2, 1, b"", b"data-2-2-1"),
(2, 2, 2, b"", b"data-2-2-2"),
];

let mut builder = RaftLogBatchBuilder::default();
for (group, term, index, data) in dataset {
builder.add(group, term, index, data);
for (group, term, index, ctx, data) in dataset {
builder.add(group, term, index, ctx, data);
}
let batches = builder.build();
assert_eq!(batches.len(), 4);
Expand Down
2 changes: 1 addition & 1 deletion storage/src/raft_log_store/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod tests {
for group in 1..=groups as u64 {
let term = 1;
for index in 1..=group_size as u64 {
builder.add(group, term, index, &data);
builder.add(group, term, index, b"some-ctx", &data);
}
}
let batches = builder.build();
Expand Down
Loading

0 comments on commit 4053d8e

Please sign in to comment.