Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: raft log store (part 2) #90

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions storage/src/raft_log_store/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use runkv_common::coding::CompressionAlgorithm;

use super::DEFAULT_LOG_BATCH_SIZE;
use crate::raft_log_store::error::RaftLogStoreError;
use crate::utils::{
crc32sum, get_length_prefixed_slice, put_length_prefixed_slice, BufExt, BufMutExt,
};
use crate::utils::{crc32sum, get_length_prefixed_slice, put_length_prefixed_slice};

#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Entry {
Expand Down Expand Up @@ -70,6 +68,7 @@ pub struct RaftLogBatch {
term: u64,
first_index: u64,
offsets: Vec<usize>,
/// Note: Only used for encoding.
data: Vec<u8>,
}

Expand Down Expand Up @@ -101,6 +100,10 @@ impl RaftLogBatch {
self.group
}

pub fn first_index(&self) -> u64 {
self.first_index
}

pub fn term(&self) -> u64 {
self.term
}
Expand All @@ -110,6 +113,12 @@ impl RaftLogBatch {
self.offsets.len() - 1
}

pub fn data_segment_location(&self) -> (usize, usize) {
let offset = 8 + 8 + 8 + 8 + self.offsets.len() * 4 + 8;
let len = self.offsets[self.offsets.len() - 1];
(offset, len)
}

pub fn location(&self, index: usize) -> (usize, usize) {
debug_assert!(index < self.len() - 1);
let offset = self.offsets[index];
Expand All @@ -120,11 +129,11 @@ impl RaftLogBatch {
/// Format:
///
/// ```plain
/// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 | ... | offset (N-1) | offset N (phantom) |
/// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 (4B) | ... | offset (N-1) | offset N (phantom) |
/// | data segment len (8B) | data block (compressed) | compression algorithm (1B) | crc32sum (4B) |
/// | <---------- data segment ------------------------------------------->|
/// ```
pub fn encode(&self, mut buf_meta: &mut Vec<u8>, buf_data: &mut Vec<u8>) {
pub fn encode(&self, buf_meta: &mut Vec<u8>, buf_data: &mut Vec<u8>) {
debug_assert!(!self.offsets.is_empty());

// Encode meta.
Expand All @@ -133,7 +142,7 @@ impl RaftLogBatch {
buf_meta.put_u64_le(self.first_index);
buf_meta.put_u64_le(self.offsets.len() as u64);
for offset in self.offsets.iter() {
buf_meta.put_var_u32(*offset as u32);
buf_meta.put_u32_le(*offset as u32);
}

// Encode data.
Expand All @@ -160,14 +169,14 @@ impl RaftLogBatch {
}

/// Decode meta only. [`RaftLogBatch.data`] will be left empty.
pub fn decode(mut buf: &mut &[u8]) -> Self {
pub fn decode(buf: &mut &[u8]) -> Self {
let group = buf.get_u64_le();
let term = buf.get_u64_le();
let first_index = buf.get_u64_le();
let offsets_len = buf.get_u64_le() as usize;
let mut offsets = Vec::with_capacity(offsets_len);
for _ in 0..offsets_len {
let offset = buf.get_var_u32() as usize;
let offset = buf.get_u32_le() as usize;
offsets.push(offset);
}
let data_segment_len = buf.get_u64_le() as usize;
Expand Down
4 changes: 4 additions & 0 deletions storage/src/raft_log_store/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
pub enum RaftLogStoreError {
#[error("group {0} not exists")]
GroupNotExists(u64),
#[error("group {0} already exists")]
GroupAlreadyExists(u64),
#[error("encode error: {0}")]
EncodeError(String),
#[error("raft log gap exists: [{start}, {end})")]
RaftLogGap { start: u64, end: u64 },
#[error("other: {0}")]
Other(String),
}
Expand Down
36 changes: 18 additions & 18 deletions storage/src/raft_log_store/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ use crate::entry::Entry;
use crate::error::{Error, Result};

#[derive(Clone, Debug)]
pub struct PipeLogOptions {
pub struct LogOptions {
path: String,
log_file_capacity: usize,
}

struct PipeLogCore {
struct LogCore {
active_file: File,
frozen_files: Vec<File>,
first_log_file_id: u64,
}

pub struct PipeLog {
pub struct Log {
path: String,
log_file_capacity: usize,
core: Mutex<PipeLogCore>,
core: Mutex<LogCore>,
}

impl PipeLog {
pub async fn open(options: PipeLogOptions) -> Result<Self> {
impl Log {
pub async fn open(options: LogOptions) -> Result<Self> {
create_dir_all(&options.path).await?;
let (frozen_files, first_log_file_id) = {
let mut frozen_files = vec![];
Expand Down Expand Up @@ -68,7 +68,7 @@ impl PipeLog {
let active_file_id = first_log_file_id + frozen_files.len() as u64 + 1;
let active_file = Self::new_active_file(&options.path, active_file_id).await?;

let core = PipeLogCore {
let core = LogCore {
active_file,
frozen_files,
first_log_file_id,
Expand All @@ -89,19 +89,19 @@ impl PipeLog {
Ok(())
}

pub async fn append(&self, entries: Vec<Entry>) -> Result<()> {
pub async fn push(&self, entry: Entry) -> Result<(usize, usize)> {
let mut guard = self.core.lock().await;
let start = guard.active_file.metadata().await?.len() as usize;
let mut buf = Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE);
for entry in entries {
entry.encode(&mut buf);
}
entry.encode(&mut buf);
guard.active_file.write_all(&buf).await?;
guard.active_file.sync_data().await?;
if guard.active_file.metadata().await?.len() as usize >= self.log_file_capacity {
let end = guard.active_file.metadata().await?.len() as usize;
if end >= self.log_file_capacity {
drop(guard);
self.rotate().await?;
}
Ok(())
Ok((start, end - start))
}

pub async fn read(&self, log_file_id: u64, offset: u64, len: usize) -> Result<Vec<u8>> {
Expand All @@ -119,7 +119,7 @@ impl PipeLog {
}
}

impl PipeLog {
impl Log {
async fn rotate(&self) -> Result<()> {
let mut guard = self.core.lock().await;
// Sync old active file.
Expand Down Expand Up @@ -172,17 +172,17 @@ mod tests {
#[test(tokio::test)]
async fn test_pipe_log_recovery() {
let tempdir = tempfile::tempdir().unwrap();
let options = PipeLogOptions {
let options = LogOptions {
path: tempdir.path().to_str().unwrap().to_string(),
// Estimated size of each compressed entry is 111.
log_file_capacity: 100,
};
let log = PipeLog::open(options.clone()).await.unwrap();
let log = Log::open(options.clone()).await.unwrap();
let entries = generate_entries(4, 16, vec![b'x'; 64]);
assert_eq!(entries.len(), 4);

for entry in entries.iter().cloned() {
log.append(vec![entry]).await.unwrap();
log.push(entry).await.unwrap();
}
assert_eq!(log.core.lock().await.frozen_files.len(), 4);
let mut buf = vec![];
Expand All @@ -202,7 +202,7 @@ mod tests {

// Recover pipe log.
drop(log);
let log = PipeLog::open(options).await.unwrap();
let log = Log::open(options).await.unwrap();
assert_eq!(log.core.lock().await.frozen_files.len(), 5);
let mut buf = vec![];
for i in 0..4 {
Expand Down
Loading