Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl sst uploader and fix bugs #56

Merged
merged 1 commit into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ tokio = { version = "1", features = [
"macros",
"time",
] }
tracing = "0.1"
32 changes: 0 additions & 32 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,6 @@ mod error;
mod lsm_tree;
mod object_store;

use async_trait::async_trait;
use bytes::Bytes;
pub use error::*;
pub use lsm_tree::*;
pub use object_store::*;

#[async_trait]
pub trait LsmTree: Send + Sync {
/// Put a new `key` `value` pair into LSM-Tree with given `timestamp`.
///
/// # Safety
///
/// The interface exposes `timestamp` to user for the compatibility with upper system. It's
/// caller's responsibility to ensure that the new timestamp is higher than the old one on the
/// same key. Otherwise there will be consistency problems.
async fn put(&self, key: &Bytes, value: &Bytes, timestamp: u64) -> Result<()>;

/// Delete a the given `key` in LSM-Tree by tombstone with given `timestamp`.
///
/// # Safety
///
/// The interface exposes `timestamp` to user for the compatibility with upper system. It's
/// caller's responsibility to ensure that the new timestamp is higher than the old one on the
/// same key. Otherwise there will be consistency problems.
async fn delete(&self, key: &Bytes, timestamp: u64) -> Result<()>;

/// Get the value of the given `key` in LSM-Tree with given `timestamp`.
///
/// # Safety
///
/// The interface exposes `timestamp` to user for the compatibility with upper system. It's
/// caller's responsibility to ensure that the new timestamp is higher than the old one on the
/// same key. Otherwise there will be consistency problems.
async fn get(&self, key: &Bytes, timestamp: u64) -> Result<Option<Bytes>>;
}
8 changes: 8 additions & 0 deletions storage/src/lsm_tree/components/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ impl Memtable {
pub(in crate::lsm_tree) fn iter(&self) -> IterRef<Skiplist<Comparator>, Comparator> {
self.inner.iter()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

pub fn unwrap(self) -> Skiplist<Comparator> {
self.inner
}
}

#[cfg(test)]
Expand Down
42 changes: 39 additions & 3 deletions storage/src/lsm_tree/components/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::lsm_tree::{
DEFAULT_BLOCK_SIZE, DEFAULT_BLOOM_FALSE_POSITIVE, DEFAULT_ENTRY_SIZE, DEFAULT_RESTART_INTERVAL,
DEFAULT_SSTABLE_META_SIZE, DEFAULT_SSTABLE_SIZE, TEST_DEFAULT_RESTART_INTERVAL,
};
use crate::utils::user_key;
use crate::Result;

/// [`BlockMeta`] contains block metadata, served as a part of [`Sstable`] meta.
Expand Down Expand Up @@ -93,6 +94,10 @@ impl Sstable {
self.meta.is_overlap_with_range(range)
}

pub fn is_overlap_with_user_key_range(&self, user_key_range: RangeInclusive<&[u8]>) -> bool {
self.meta.is_overlap_with_user_key_range(user_key_range)
}

/// Judge whether the given `key` may be in the sstable with bloom filter.
pub fn may_contain_key(&self, key: &[u8]) -> bool {
self.meta.may_contain_key(key)
Expand Down Expand Up @@ -167,8 +172,31 @@ impl SstableMeta {
}

fn is_overlap_with_range(&self, range: RangeInclusive<&Bytes>) -> bool {
self.block_metas.first().as_ref().unwrap().first_key <= range.end()
&& self.block_metas.last().as_ref().unwrap().last_key >= range.start()
// println!("range: {:?}", range);
// println!(
// "first: {:?}",
// self.block_metas.first().as_ref().unwrap().first_key
// );
// println!(
// "last: {:?}",
// self.block_metas.last().as_ref().unwrap().last_key
// );

!(self.block_metas.first().as_ref().unwrap().first_key > range.end()
|| self.block_metas.last().as_ref().unwrap().last_key < range.start())
}

fn is_overlap_with_user_key_range(&self, user_key_range: RangeInclusive<&[u8]>) -> bool {
let first_user_key = user_key(&self.block_metas.first().as_ref().unwrap().first_key);
let last_user_key = user_key(&self.block_metas.last().as_ref().unwrap().last_key);
// println!("range: {:?}", user_key_range);
// println!("first: {:?}", Bytes::copy_from_slice(first_user_key));
// println!("last: {:?}", Bytes::copy_from_slice(last_user_key));
// println!(
// "result: {}",
// !(&first_user_key > user_key_range.end() || &last_user_key < user_key_range.start())
// );
!(&first_user_key > user_key_range.end() || &last_user_key < user_key_range.start())
}

/// Judge whether the given `key` may be in the sstable with bloom filter.
Expand Down Expand Up @@ -244,7 +272,7 @@ impl SstableBuilder {
self.block_builder = Some(BlockBuilder::new(BlockBuilderOptions {
capacity: self.options.capacity,
restart_interval: self.options.restart_interval,
compression_algorithm: self.options.compression_algorithm.clone(),
compression_algorithm: self.options.compression_algorithm,
}));
self.block_metas.push(BlockMeta {
offset: self.buf.len(),
Expand Down Expand Up @@ -320,6 +348,14 @@ impl SstableBuilder {
block_meta.len = self.buf.len() - block_meta.offset;
self.last_full_key.clear();
}

pub fn len(&self) -> usize {
self.user_key_hashes.len()
}

pub fn is_empty(&self) -> bool {
self.user_key_hashes.is_empty()
}
}

#[cfg(test)]
Expand Down
Loading