Skip to content

Commit

Permalink
Rework ObjectPart type
Browse files Browse the repository at this point in the history
The `ObjectPart` type represents a slice of the content of an S3 object. It contains the information required to identify the object it belongs to and its offset in it. It also maintains checksums to validate its integrity. The new type is used in the prefetcher and replaces both `ChecksummedBytes` and the original `prefetcher::ObjectPart`.

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed May 2, 2024
1 parent 2c3a3ea commit 7c0b608
Show file tree
Hide file tree
Showing 10 changed files with 538 additions and 488 deletions.
15 changes: 4 additions & 11 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use thiserror::Error;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
pub use crate::object::ChecksummedBytes;

use crate::object::ObjectId;
use crate::object::{ObjectId, ObjectPart};

/// Indexes blocks within a given object.
pub type BlockIndex = u64;
Expand Down Expand Up @@ -48,16 +47,10 @@ pub trait DataCache {
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>>;
) -> DataCacheResult<Option<ObjectPart>>;

/// Put block of data to the cache for the given [ObjectId] and [BlockIndex].
fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
) -> DataCacheResult<()>;
/// Put block of data to the cache.
fn put_block(&self, bytes: ObjectPart, block_idx: BlockIndex) -> DataCacheResult<()>;

/// Returns the block size for the data cache.
fn block_size(&self) -> u64;
Expand Down
152 changes: 61 additions & 91 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ use sha2::{Digest, Sha256};
use thiserror::Error;
use tracing::{trace, warn};

use crate::data_cache::DataCacheError;
use crate::object::{IntegrityError, ObjectId};
use crate::data_cache::{BlockIndex, DataCache, DataCacheError, DataCacheResult};
use crate::object::{ObjectId, ObjectPart, PartValidationError};
use crate::sync::Mutex;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};

/// Disk and file-layout versioning.
const CACHE_VERSION: &str = "V1";

Expand Down Expand Up @@ -79,7 +77,7 @@ struct DiskBlockHeader {
enum DiskBlockCreationError {
/// Data corruption detected when unpacking bytes and checksum
#[error(transparent)]
IntegrityError(#[from] IntegrityError),
IntegrityError(#[from] PartValidationError),
}

/// Error during access to a [DiskBlock]
Expand Down Expand Up @@ -170,15 +168,11 @@ impl DiskBlock {
///
/// This may return an integrity error if the checksummed byte buffer is found to be corrupt.
/// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt.
fn new(
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
) -> Result<Self, DiskBlockCreationError> {
let s3_key = cache_key.key().to_owned();
let etag = cache_key.etag().as_str().to_owned();
let (data, data_checksum) = bytes.into_inner()?;
fn new(block_idx: BlockIndex, bytes: ObjectPart) -> Result<Self, DiskBlockCreationError> {
let s3_key = bytes.object_id().key().to_owned();
let etag = bytes.object_id().etag().clone().into_inner();
let block_offset = bytes.offset();
let (data, data_checksum) = bytes.into_inner_data()?;
let header = DiskBlockHeader::new(block_idx, block_offset, etag, s3_key, data_checksum);

Ok(DiskBlock { data, header })
Expand All @@ -192,11 +186,11 @@ impl DiskBlock {
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> Result<ChecksummedBytes, DiskBlockAccessError> {
) -> Result<ObjectPart, DiskBlockAccessError> {
let data_checksum =
self.header
.validate(cache_key.key(), cache_key.etag().as_str(), block_idx, block_offset)?;
let bytes = ChecksummedBytes::new_from_inner_data(self.data.clone(), data_checksum);
let bytes = ObjectPart::new_from_inner_data(cache_key.clone(), block_offset, self.data.clone(), data_checksum);
Ok(bytes)
}
}
Expand Down Expand Up @@ -228,7 +222,7 @@ impl DiskDataCache {
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>> {
) -> DataCacheResult<Option<ObjectPart>> {
let mut file = match fs::File::open(path.as_ref()) {
Ok(file) => file,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
Expand Down Expand Up @@ -339,13 +333,10 @@ impl DiskDataCache {

/// Hash the cache key using its fields as well as the [CACHE_VERSION].
fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
let s3_key = cache_key.key();
let etag = cache_key.etag();

let mut hasher = Sha256::new();
hasher.update(CACHE_VERSION.as_bytes());
hasher.update(s3_key);
hasher.update(etag.as_str());
hasher.update(cache_key.key());
hasher.update(cache_key.etag().as_str());
hasher.finalize().into()
}

Expand All @@ -355,7 +346,7 @@ impl DataCache for DiskDataCache {
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>> {
) -> DataCacheResult<Option<ObjectPart>> {
if block_offset != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}
Expand Down Expand Up @@ -395,23 +386,17 @@ impl DataCache for DiskDataCache {
}
}

fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
) -> DataCacheResult<()> {
if block_offset != block_idx * self.config.block_size {
fn put_block(&self, bytes: ObjectPart, block_idx: BlockIndex) -> DataCacheResult<()> {
if bytes.offset() != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}

let bytes_len = bytes.len();
let block_key = DiskBlockKey::new(&cache_key, block_idx);
let block_key = DiskBlockKey::new(bytes.object_id(), block_idx);
let path = self.get_path_for_block_key(&block_key);
trace!(?cache_key, ?path, "new block will be created in disk cache");
trace!(cache_key=?bytes.object_id(), ?path, "new block will be created in disk cache");

let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
let block = DiskBlock::new(block_idx, bytes).map_err(|err| match err {
DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
})?;

Expand Down Expand Up @@ -537,8 +522,8 @@ mod tests {
#[test]
fn test_block_format_version_requires_update() {
let cache_key = ObjectId::new("hello-world".to_string(), ETag::for_tests());
let data = ChecksummedBytes::new("Foo".into());
let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
let data = ObjectPart::new(cache_key, 100 * 10, "Foo".into());
let block = DiskBlock::new(100, data).expect("should succeed as data checksum is valid");
let expected_bytes: Vec<u8> = vec![
100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101,
116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128,
Expand Down Expand Up @@ -623,10 +608,6 @@ mod tests {

#[test]
fn test_put_get() {
let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());

let block_size = 8 * 1024 * 1024;
let cache_directory = tempfile::tempdir().unwrap();
let cache = DiskDataCache::new(
Expand All @@ -642,6 +623,10 @@ mod tests {
ETag::for_tests(),
);

let data_1 = ObjectPart::new(cache_key_1.clone(), 0, "Foo".into());
let data_2 = ObjectPart::new(cache_key_2.clone(), 0, "Bar".into());
let data_3 = ObjectPart::new(cache_key_1.clone(), block_size, "Baz".into());

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible");
assert!(
block.is_none(),
Expand All @@ -650,9 +635,7 @@ mod tests {
);

// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.expect("cache should be accessible");
cache.put_block(data_1.clone(), 0).expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.expect("cache should be accessible")
Expand All @@ -663,9 +646,7 @@ mod tests {
);

// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.expect("cache should be accessible");
cache.put_block(data_2.clone(), 0).expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.expect("cache should be accessible")
Expand All @@ -676,9 +657,7 @@ mod tests {
);

// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.expect("cache should be accessible");
cache.put_block(data_3.clone(), 1).expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.expect("cache should be accessible")
Expand All @@ -701,24 +680,24 @@ mod tests {

#[test]
fn test_checksummed_bytes_slice() {
let data = ChecksummedBytes::new("0123456789".into());
let slice = data.slice(1..5);

let cache_directory = tempfile::tempdir().unwrap();
let cache = DiskDataCache::new(
cache_directory.into_path(),
DiskDataCacheConfig {
block_size: 8 * 1024 * 1024,
block_size: 4,
limit: CacheLimit::Unbounded,
},
);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let data = ObjectPart::new(cache_key.clone(), 0, "0123456789".into());

cache
.put_block(cache_key.clone(), 0, 0, slice.clone())
.expect("cache should be accessible");
let slice_offset = 4;
let slice = data.slice(4..8).unwrap();
assert_eq!(slice.offset(), slice_offset);

cache.put_block(slice.clone(), 1).expect("cache should be accessible");
let entry = cache
.get_block(&cache_key, 0, 0)
.get_block(&cache_key, 1, 4)
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -735,50 +714,55 @@ mod tests {
const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;

fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
fn create_random(seed: u64, size: usize) -> Bytes {
let mut rng = ChaCha20Rng::seed_from_u64(seed);
let mut body = vec![0u8; size];
rng.fill(&mut body[..]);

ChecksummedBytes::new(body.into())
body.into()
}

fn is_block_in_cache(
cache: &DiskDataCache,
cache_key: &ObjectId,
block_idx: u64,
expected_bytes: &ChecksummedBytes,
expected_bytes: &ObjectPart,
) -> bool {
if let Some(retrieved) = cache
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64)
.expect("cache should be accessible")
{
assert_eq!(
retrieved.clone().into_bytes().expect("retrieved bytes should be valid"),
expected_bytes
.clone()
.into_bytes()
.expect("original bytes should be valid")
);
assert_eq!(&retrieved, expected_bytes,);
true
} else {
false
}
}

let large_object_key = ObjectId::new("large".into(), ETag::for_tests());
let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
let large_object_blocks: Vec<_> = (0..large_object.len())
.step_by(BLOCK_SIZE)
.map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))))
.map(|offset| {
ObjectPart::new(
large_object_key.clone(),
offset as u64,
large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))),
)
})
.collect();
let large_object_key = ObjectId::new("large".into(), ETag::for_tests());

let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE);
let small_object_blocks: Vec<_> = (0..small_object.len())
.step_by(BLOCK_SIZE)
.map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))))
.map(|offset| {
ObjectPart::new(
small_object_key.clone(),
offset as u64,
small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))),
)
})
.collect();
let small_object_key = ObjectId::new("small".into(), ETag::for_tests());

let cache_directory = tempfile::tempdir().unwrap();
let cache = DiskDataCache::new(
Expand All @@ -791,26 +775,12 @@ mod tests {

// Put all of large_object
for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
cache
.put_block(
large_object_key.clone(),
block_idx as u64,
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.unwrap();
cache.put_block(bytes.clone(), block_idx as u64).unwrap();
}

// Put all of small_object
for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
cache
.put_block(
small_object_key.clone(),
block_idx as u64,
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.unwrap();
cache.put_block(bytes.clone(), block_idx as u64).unwrap();
}

let count_small_object_blocks_in_cache = small_object_blocks
Expand All @@ -837,13 +807,13 @@ mod tests {

#[test]
fn data_block_extract_checks() {
let data_1 = ChecksummedBytes::new("Foo".into());

let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
let cache_key_3 = ObjectId::new("a".into(), ETag::from_str("badetag").unwrap());

let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
let data_1 = ObjectPart::new(cache_key_1.clone(), 0, "Foo".into());

let block = DiskBlock::new(0, data_1.clone()).expect("should have no checksum err");
block
.data(&cache_key_1, 1, 0)
.expect_err("should fail due to incorrect block index");
Expand Down
Loading

0 comments on commit 7c0b608

Please sign in to comment.