Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl raft log store (part 5) #97

Merged
merged 2 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 79 additions & 22 deletions storage/src/raft_log_store/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ use runkv_common::coding::CompressionAlgorithm;
use super::DEFAULT_LOG_BATCH_SIZE;
use crate::error::Result;
use crate::raft_log_store::error::RaftLogStoreError;
use crate::utils::{crc32check, crc32sum, get_length_prefixed_slice, put_length_prefixed_slice};
use crate::utils::{
crc32check, crc32sum, get_length_prefixed_slice, put_length_prefixed_slice, var_u32_len,
BufExt, BufMutExt,
};

#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Entry {
RaftLogBatch(RaftLogBatch),
Truncate(Truncate),
Compact(Compact),
Kv(Kv),
}
Expand Down Expand Up @@ -41,12 +45,16 @@ impl Entry {
buf.put_u8(0);
batch.encode(buf);
}
Self::Compact(compact) => {
Self::Truncate(truncate) => {
buf.put_u8(1);
truncate.encode(buf);
}
Self::Compact(compact) => {
buf.put_u8(2);
compact.encode(buf);
}
Self::Kv(kv) => {
buf.put_u8(2);
buf.put_u8(3);
kv.encode(buf);
}
}
Expand All @@ -55,8 +63,9 @@ impl Entry {
pub fn decode(buf: &mut &[u8]) -> Self {
match buf.get_u8() {
0 => Self::RaftLogBatch(RaftLogBatch::decode(buf)),
1 => Self::Compact(Compact::decode(buf)),
2 => Self::Kv(Kv::decode(buf)),
1 => Self::Truncate(Truncate::decode(buf)),
2 => Self::Compact(Compact::decode(buf)),
3 => Self::Kv(Kv::decode(buf)),
_ => unreachable!(),
}
}
Expand All @@ -68,6 +77,7 @@ pub struct RaftLogBatch {
term: u64,
first_index: u64,
offsets: Vec<usize>,
ctxs: Vec<Vec<u8>>,
data_len: usize,
/// Note: Only used for encoding.
data: Vec<u8>,
Expand All @@ -91,6 +101,7 @@ impl Default for RaftLogBatch {
term: 0,
first_index: 0,
offsets: vec![],
ctxs: vec![],
data_len: 0,
data: Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE),
}
Expand All @@ -116,7 +127,17 @@ impl RaftLogBatch {
}

pub fn data_segment_location(&self) -> (usize, usize) {
let offset = 8 + 8 + 8 + 8 + self.offsets.len() * 4 + 8;
let offset = 8 // group
+ 8 // term
+ 8 // first index
+ 8 // N+1
+ self
.offsets
.iter()
.map(|offset| var_u32_len(*offset as u32))
.sum::<usize>()
+ self.ctxs.iter().map(|ctx| var_u32_len(ctx.len() as u32) + ctx.len()).sum::<usize>()
+ 8; // data segment len
let len = self.data_len;
(offset, len)
}
Expand All @@ -128,6 +149,11 @@ impl RaftLogBatch {
(offset, len)
}

pub fn ctx(&self, index: usize) -> &[u8] {
debug_assert!(index < self.len());
&self.ctxs[index]
}

/// Convert raw data to encoded data.
///
/// Format:
Expand Down Expand Up @@ -161,10 +187,11 @@ impl RaftLogBatch {
///
/// ```plain
/// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 (4B) | ... | offset (N-1) | offset N (phantom) |
/// | ctx 0 | .. | ctx (N-1) | # ctx is a length prefixed buffer
/// | data segment len (8B) | data block (compressed) | compression algorithm (1B) | crc32sum (4B) |
/// | <---------- data segment ------------------------------------------->|
/// ```
fn encode(&self, buf: &mut Vec<u8>) {
fn encode(&self, mut buf: &mut Vec<u8>) {
debug_assert!(!self.offsets.is_empty());

// Encode meta.
Expand All @@ -173,30 +200,39 @@ impl RaftLogBatch {
buf.put_u64_le(self.first_index);
buf.put_u64_le(self.offsets.len() as u64);
for offset in self.offsets.iter() {
buf.put_u32_le(*offset as u32);
buf.put_var_u32(*offset as u32);
}
for ctx in self.ctxs.iter() {
buf.put_length_prefixed_slice(ctx);
}
buf.put_u64_le(self.data_len as u64);
buf.put_slice(&self.data)
}

/// Decode meta only. [`RaftLogBatch.data`] will be left empty.
pub fn decode(buf: &mut &[u8]) -> Self {
pub fn decode(mut buf: &mut &[u8]) -> Self {
let group = buf.get_u64_le();
let term = buf.get_u64_le();
let first_index = buf.get_u64_le();
let offsets_len = buf.get_u64_le() as usize;
let mut offsets = Vec::with_capacity(offsets_len);
for _ in 0..offsets_len {
let offset = buf.get_u32_le() as usize;
let offset = buf.get_var_u32() as usize;
offsets.push(offset);
}
let mut ctxs = Vec::with_capacity(offsets_len - 1);
for _ in 0..offsets_len - 1 {
let ctx = buf.get_length_prefixed_slice();
ctxs.push(ctx);
}
let data_segment_len = buf.get_u64_le() as usize;
buf.consume(data_segment_len);
Self {
group,
term,
first_index,
offsets,
ctxs,
data_len: data_segment_len,
data: vec![],
}
Expand Down Expand Up @@ -240,7 +276,7 @@ pub struct RaftLogBatchBuilder {
}

impl RaftLogBatchBuilder {
pub fn add(&mut self, group: u64, term: u64, index: u64, data: &[u8]) {
pub fn add(&mut self, group: u64, term: u64, index: u64, ctx: &[u8], data: &[u8]) {
debug_assert_ne!(group, 0);
debug_assert_ne!(term, 0);
debug_assert_ne!(index, 0);
Expand All @@ -253,6 +289,7 @@ impl RaftLogBatchBuilder {
self.current.first_index = index;
}
self.current.offsets.push(self.current.data.len());
self.current.ctxs.push(ctx.to_vec());
self.current.data.put_slice(data);
}

Expand Down Expand Up @@ -281,6 +318,25 @@ impl RaftLogBatchBuilder {
}
}

#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Truncate {
pub group: u64,
pub index: u64,
}

impl Truncate {
pub fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u64_le(self.group);
buf.put_u64_le(self.index);
}

pub fn decode(buf: &mut &[u8]) -> Self {
let group = buf.get_u64_le();
let index = buf.get_u64_le();
Self { group, index }
}
}

#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Compact {
pub group: u64,
Expand Down Expand Up @@ -352,21 +408,22 @@ mod tests {
use super::*;

#[test]
#[allow(clippy::type_complexity)]
fn test_log_enc_dec() {
let dataset = vec![
(1, 1, 1, b"data-1-1-1"),
(1, 1, 2, b"data-1-1-2"),
(1, 2, 1, b"data-1-2-1"),
(1, 2, 2, b"data-1-2-2"),
(2, 1, 1, b"data-2-1-1"),
(2, 1, 2, b"data-2-1-2"),
(2, 2, 1, b"data-2-2-1"),
(2, 2, 2, b"data-2-2-2"),
let dataset: Vec<(u64, u64, u64, &'static [u8], &'static [u8])> = vec![
(1, 1, 1, b"aaa", b"data-1-1-1"),
(1, 1, 2, b"", b"data-1-1-2"),
(1, 2, 1, b"ccc", b"data-1-2-1"),
(1, 2, 2, b"ddd", b"data-1-2-2"),
(2, 1, 1, b"", b"data-2-1-1"),
(2, 1, 2, b"fff", b"data-2-1-2"),
(2, 2, 1, b"", b"data-2-2-1"),
(2, 2, 2, b"", b"data-2-2-2"),
];

let mut builder = RaftLogBatchBuilder::default();
for (group, term, index, data) in dataset {
builder.add(group, term, index, data);
for (group, term, index, ctx, data) in dataset {
builder.add(group, term, index, ctx, data);
}
let batches = builder.build();
assert_eq!(batches.len(), 4);
Expand Down
2 changes: 1 addition & 1 deletion storage/src/raft_log_store/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod tests {
for group in 1..=groups as u64 {
let term = 1;
for index in 1..=group_size as u64 {
builder.add(group, term, index, &data);
builder.add(group, term, index, b"some-ctx", &data);
}
}
let batches = builder.build();
Expand Down
Loading