Skip to content

Commit

Permalink
feat: fill cache after append raft log (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored Apr 18, 2022
1 parent df8949e commit 63aaa89
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
17 changes: 17 additions & 0 deletions storage/src/raft_log_store/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use futures::Future;
use moka::future::Cache;
use tracing::trace;

use super::error::RaftLogStoreError;
use super::DEFAULT_LOG_BATCH_SIZE;
Expand All @@ -28,10 +29,21 @@ impl BlockCache {
}

pub fn get(&self, file_id: u64, offset: usize) -> Option<Arc<Vec<u8>>> {
trace!(
file_id = file_id,
offset = offset,
"try get from block cache:"
);
self.inner.get(&BlockIndex { file_id, offset })
}

pub async fn insert(&self, file_id: u64, offset: usize, block: Arc<Vec<u8>>) {
trace!(
file_id = file_id,
offset = offset,
len = block.len(),
"insert to block cache:"
);
self.inner
.insert(BlockIndex { file_id, offset }, block)
.await
Expand All @@ -46,6 +58,11 @@ impl BlockCache {
where
F: Future<Output = Result<Arc<Vec<u8>>>>,
{
trace!(
file_id = file_id,
offset = offset,
"get or insert block cache"
);
match self
.inner
.get_or_try_insert_with(BlockIndex { file_id, offset }, f)
Expand Down
25 changes: 18 additions & 7 deletions storage/src/raft_log_store/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct RaftLogBatch {
ctxs: Vec<Vec<u8>>,
data_len: usize,
/// Note: Only used for encoding.
raw: Vec<u8>,
/// Note: Only used for encoding.
data: Vec<u8>,
}

Expand All @@ -109,6 +111,7 @@ impl Default for RaftLogBatch {
offsets: vec![],
ctxs: vec![],
data_len: 0,
raw: Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE),
data: Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE),
}
}
Expand Down Expand Up @@ -160,6 +163,12 @@ impl RaftLogBatch {
&self.ctxs[index]
}

pub fn take_raw(&mut self) -> Vec<u8> {
let mut buf = vec![];
std::mem::swap(&mut self.raw, &mut buf);
buf
}

/// Convert raw data to encoded data.
///
/// Format:
Expand All @@ -171,11 +180,11 @@ impl RaftLogBatch {
let mut buf = {
let mut encoder = lz4::EncoderBuilder::new()
.level(4)
.build(Vec::with_capacity(self.data.len()).writer())
.build(Vec::with_capacity(self.raw.len()).writer())
.map_err(RaftLogStoreError::encode_error)
.unwrap();
encoder
.write(&self.data[..])
.write(&self.raw[..])
.map_err(RaftLogStoreError::encode_error)
.unwrap();
let (writer, result) = encoder.finish();
Expand All @@ -186,7 +195,7 @@ impl RaftLogBatch {
let checksum = crc32sum(&buf);
buf.put_u32_le(checksum);
self.data = buf;
self.data_len = self.data.len()
self.data_len = self.data.len();
}

/// Format:
Expand Down Expand Up @@ -240,6 +249,7 @@ impl RaftLogBatch {
offsets,
ctxs,
data_len: data_segment_len,
raw: vec![],
data: vec![],
}
}
Expand Down Expand Up @@ -296,15 +306,16 @@ impl RaftLogBatchBuilder {
self.current.term = term;
self.current.first_index = index;
}
self.current.offsets.push(self.current.data.len());
self.current.offsets.push(self.current.raw.len());
self.current.ctxs.push(ctx.to_vec());
self.current.data.put_slice(data);
self.current.raw.put_slice(data);
}

/// Build [`RaftLogBatch`]s.
pub fn build(mut self) -> Vec<RaftLogBatch> {
self.may_rotate(0, 0, 0);
for batch in self.batches.iter_mut() {
batch.encode_data()
batch.encode_data();
}
self.batches
}
Expand All @@ -318,7 +329,7 @@ impl RaftLogBatchBuilder {
|| self.current.first_index + self.current.offsets.len() as u64 != index
{
// Phantom offset.
self.current.offsets.push(self.current.data.len());
self.current.offsets.push(self.current.raw.len());
let mut current = RaftLogBatch::default();
std::mem::swap(&mut self.current, &mut current);
self.batches.push(current);
Expand Down
9 changes: 6 additions & 3 deletions storage/src/raft_log_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl RaftLogStore {
}

/// Append raft log batch to [`RaftLogStore`].
pub async fn append(&self, batch: RaftLogBatch) -> Result<()> {
pub async fn append(&self, mut batch: RaftLogBatch) -> Result<()> {
let (data_segment_offset, data_segment_len) = batch.data_segment_location();
let group = batch.group();
let term = batch.term();
Expand All @@ -150,6 +150,7 @@ impl RaftLogStore {
indices.push(index);
}

let raw = batch.take_raw();
let entry = LogEntry::RaftLogBatch(batch);
let (file_id, write_offset, _write_len) = self.core.log.push(entry).await?;

Expand All @@ -161,8 +162,10 @@ impl RaftLogStore {
index.block_len = block_len;
}

// TODO: Fill block cache. Code refactor is needed, at this point, data in log bath has
// already been encoded.
self.core
.block_cache
.insert(file_id, block_offset, Arc::new(raw))
.await;

self.core.states.append(group, first_index, indices).await?;

Expand Down

0 comments on commit 63aaa89

Please sign in to comment.