From 8d2f0bd677bfaa3e39367065864f319e115d230d Mon Sep 17 00:00:00 2001 From: Andrew Morris Date: Thu, 26 Oct 2023 13:00:40 +1100 Subject: [PATCH] Sled and transactions --- Cargo.lock | 101 +++++++++++++++++++++++++++--- storage/Cargo.toml | 1 + storage/src/lib.rs | 18 ++---- storage/src/memory_backend.rs | 49 +++++++++++++++ storage/src/memory_storage.rs | 28 --------- storage/src/sled_backend.rs | 69 ++++++++++++++++++++ storage/src/storage.rs | 114 +++++++++++++++++++++++++++------- storage/src/tests.rs | 26 ++++++++ 8 files changed, 337 insertions(+), 69 deletions(-) create mode 100644 storage/src/memory_backend.rs delete mode 100644 storage/src/memory_storage.rs create mode 100644 storage/src/sled_backend.rs create mode 100644 storage/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 6cff429..8f16baa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,12 @@ version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cc" version = "1.0.73" @@ -260,6 +266,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "crossbeam-channel" version = "0.5.4" @@ -374,7 +389,7 @@ checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" dependencies = [ "cfg-if 1.0.0", "num_cpus", - "parking_lot", + "parking_lot 0.12.0", ] [[package]] @@ -442,6 +457,25 @@ dependencies = [ "syn", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.5" @@ -527,6 +561,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "is-macro" version = "0.2.0" @@ -865,6 +908,17 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e72e30578e0d0993c8ae20823dd9cff2bc5517d2f586a8aef462a581e8a03eb" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.0" @@ -872,7 +926,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.2", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", ] [[package]] @@ -1283,6 +1351,22 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", +] + [[package]] name = "smallvec" version = "1.8.0" @@ -1347,6 +1431,7 @@ dependencies = [ "num-bigint", "rand", "serde", + "sled", ] [[package]] @@ -1357,7 +1442,7 @@ checksum = "213494b7a2b503146286049378ce02b482200519accc31872ee8be91fa820a08" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "phf_shared", "precomputed-hash", "serde", @@ -1456,7 +1541,7 @@ dependencies = [ "json_comments", "lru", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "pathdiff", "regex", "serde", @@ -1528,7 +1613,7 @@ dependencies = [ "from_variant", "num-bigint", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "rustc-hash", "serde", "siphasher", @@ -1622,7 +1707,7 @@ dependencies = [ "ahash", "auto_impl", "dashmap 5.2.0", - "parking_lot", + "parking_lot 0.12.0", "rayon", "regex", "serde", @@ -1645,7 +1730,7 @@ dependencies = [ "lru", "normpath", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "path-clean", "pathdiff", "serde", @@ -1665,7 +1750,7 @@ dependencies = [ "arrayvec", "indexmap", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "rayon", "regex", "retain_mut", diff --git a/storage/Cargo.toml b/storage/Cargo.toml index b79a8f4..87afe27 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -10,3 +10,4 @@ serde = { version = "1", features = ["derive"] } bincode = "1" num-bigint = "0.4.3" rand = "0.8.5" +sled = "0.34.7" diff --git a/storage/src/lib.rs b/storage/src/lib.rs index bf92e79..f86e611 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,15 +1,9 @@ -mod memory_storage; +mod memory_backend; mod storage; -#[cfg(test)] -mod tests { - use crate::{memory_storage::MemoryStorage, storage::Storage}; +mod sled_backend; +mod tests; - #[test] - fn number() { - let mut storage = Storage::new(MemoryStorage::new()); - - let key = storage.write_number(123.456); - assert_eq!(storage.read_number(key), Some(123.456)); - } -} +pub use self::storage::{Storage, StorageBackend, StorageKey}; +pub use memory_backend::MemoryBackend; +pub use sled_backend::SledBackend; diff --git a/storage/src/memory_backend.rs b/storage/src/memory_backend.rs new file mode 100644 index 0000000..3fcd297 --- /dev/null +++ b/storage/src/memory_backend.rs @@ -0,0 +1,49 @@ +use std::collections::HashMap; +use std::fmt::Debug as DebugTrait; + +use crate::storage::{StorageBackend, StorageBackendHandle, StorageKey}; + +pub struct MemoryBackend { + data: HashMap>, +} + +impl MemoryBackend { + pub fn new() -> Self { + Self { + data: HashMap::new(), + } + } +} + +impl StorageBackend for MemoryBackend { + type Error = E; + type InTransactionError = E; + type Handle<'a, E> = MemoryStorageHandle<'a>; + + fn transaction(&mut self, f: F) -> Result> + where + F: Fn(&mut Self::Handle<'_, E>) -> Result>, + { + let mut handle = MemoryStorageHandle { storage: self }; + f(&mut handle) + } +} + +pub struct MemoryStorageHandle<'a> { + storage: &'a mut MemoryBackend, +} + +impl<'a, E> StorageBackendHandle<'a, E> for MemoryStorageHandle<'a> { + fn read(&self, key: StorageKey) -> Result>, E> { + Ok(self.storage.data.get(&key).cloned()) + } + + fn write(&mut self, key: StorageKey, data: Option>) -> Result<(), E> { + match data { + Some(data) => self.storage.data.insert(key, data), + None => self.storage.data.remove(&key), + }; + + Ok(()) + } +} diff --git a/storage/src/memory_storage.rs b/storage/src/memory_storage.rs deleted file mode 100644 index 8ece8b6..0000000 --- a/storage/src/memory_storage.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::collections::HashMap; - -use crate::storage::{RawStorage, StorageKey}; - -pub struct MemoryStorage { - data: HashMap>, -} - -impl MemoryStorage { - pub fn new() -> Self { - Self { - data: HashMap::new(), - } - } -} - -impl RawStorage for MemoryStorage { - fn read(&self, key: StorageKey) -> Option> { - self.data.get(&key).cloned() - } - - fn write(&mut self, key: StorageKey, data: Option>) { - match data { - Some(data) => self.data.insert(key, data), - None => self.data.remove(&key), - }; - } -} diff --git a/storage/src/sled_backend.rs b/storage/src/sled_backend.rs new file mode 100644 index 0000000..243838c --- /dev/null +++ b/storage/src/sled_backend.rs @@ -0,0 +1,69 @@ +use std::fmt::Debug as DebugTrait; + +use crate::storage::{StorageBackend, StorageBackendHandle, StorageKey}; + +pub struct SledBackend { + db: sled::Db, +} + +impl SledBackend { + pub fn open

(path: P) -> Result + where + P: AsRef, + { + Ok(Self { + db: sled::open(path)?, + }) + } + + pub fn open_in_memory() -> Result { + Ok(Self { + db: sled::Config::new().temporary(true).open()?, + }) + } +} + +impl StorageBackend for SledBackend { + type Error = sled::transaction::TransactionError; + type InTransactionError = sled::transaction::ConflictableTransactionError; + type Handle<'a, E> = SledBackendHandle<'a>; + + fn transaction(&mut self, f: F) -> Result> + where + F: Fn(&mut Self::Handle<'_, E>) -> Result>, + { + self.db.transaction(|tx| { + let mut handle = SledBackendHandle { tx }; + f(&mut handle) + }) + } +} + +pub struct SledBackendHandle<'a> { + tx: &'a sled::transaction::TransactionalTree, +} + +impl<'a, E> StorageBackendHandle<'a, sled::transaction::ConflictableTransactionError> + for SledBackendHandle<'a> +{ + fn read( + &self, + key: StorageKey, + ) -> Result>, sled::transaction::ConflictableTransactionError> { + let value = self.tx.get(key.to_bytes())?.map(|value| value.to_vec()); + Ok(value) + } + + fn write( + &mut self, + key: StorageKey, + data: Option>, + ) -> Result<(), sled::transaction::ConflictableTransactionError> { + match data { + Some(data) => self.tx.insert(key.to_bytes(), data)?, + None => self.tx.remove(key.to_bytes())?, + }; + + Ok(()) + } +} diff --git a/storage/src/storage.rs b/storage/src/storage.rs index e4f27fc..74b6d78 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -1,27 +1,51 @@ -use std::rc::Rc; +use std::{fmt::Debug as DebugTrait, rc::Rc}; use rand::{rngs::ThreadRng, thread_rng, Rng}; -use serde::{ser::SerializeSeq, Serialize, Serializer}; - #[derive(serde::Serialize, serde::Deserialize, Hash, PartialEq, Eq, Clone, Copy, Debug)] pub struct StorageKey(u64, u64, u64); -pub struct Storage { +impl StorageKey { + pub fn from_bytes(bytes: &[u8]) -> Self { + if bytes.len() > 24 { + panic!("Too many bytes"); + } + + let mut key = [0u8; 24]; + key[..bytes.len()].copy_from_slice(bytes); + bincode::deserialize(&key).unwrap() + } + + pub fn to_bytes(&self) -> Vec { + bincode::serialize(self).unwrap() + } +} + +pub struct Storage { rng: ThreadRng, - rs: RS, + sb: SB, +} + +pub trait StorageBackendHandle<'a, E> { + fn read(&self, key: StorageKey) -> Result>, E>; + fn write(&mut self, key: StorageKey, data: Option>) -> Result<(), E>; } -pub trait RawStorage { - fn read(&self, key: StorageKey) -> Option>; - fn write(&mut self, key: StorageKey, data: Option>); +pub trait StorageBackend { + type Error: DebugTrait; + type InTransactionError; + type Handle<'a, E>: StorageBackendHandle<'a, Self::InTransactionError>; + + fn transaction(&mut self, f: F) -> Result> + where + F: Fn(&mut Self::Handle<'_, E>) -> Result>; } -impl Storage { - pub fn new(rs: RS) -> Self { +impl Storage { + pub fn new(sb: SB) -> Self { Self { rng: thread_rng(), - rs, + sb, } } @@ -29,25 +53,73 @@ impl Storage { StorageKey(self.rng.gen(), self.rng.gen(), self.rng.gen()) } - pub fn write_number(&mut self, number: f64) -> StorageKey { + pub fn write_number(&mut self, number: f64) -> Result> { let mut data = Vec::::new(); data.extend_from_slice(&number.to_le_bytes()); let key = self.random_key(); - self.rs.write(key, Some(data)); - key + + self + .sb + .transaction(|sb| sb.write(key, Some(data.clone())))?; + + Ok(key) } - pub fn read_number(&self, key: StorageKey) -> Option { - let data = self.rs.read(key)?; - let mut bytes = [0u8; 8]; - bytes.copy_from_slice(&data); - Some(f64::from_le_bytes(bytes)) + pub fn read_number(&mut self, key: StorageKey) -> Result, SB::Error<()>> { + self.sb.transaction(|sb| { + let data = match sb.read(key)? { + Some(data) => data, + None => return Ok(None), + }; + + let mut bytes = [0u8; 8]; + bytes.copy_from_slice(&data); + Ok(Some(f64::from_le_bytes(bytes))) + }) + } + + pub fn read(&mut self, key: StorageKey) -> Result, SB::Error<()>> { + self.sb.transaction(|sb| { + let data = match sb.read(key)? { + Some(data) => data, + None => return Ok(None), + }; + + Ok(Some(bincode::deserialize(&data).unwrap())) + }) + } + + pub fn write(&mut self, key: StorageKey, data: &StoredRc) -> Result<(), SB::Error<()>> { + self.sb.transaction(|sb| { + let data = bincode::serialize(&data).unwrap(); + sb.write(key, Some(data)) + }) + } + + pub fn head(&mut self) -> Result, SB::Error<()>> { + self.sb.transaction(|sb| { + let data = match sb.read(StorageKey(0, 0, 0))? { + Some(data) => data, + None => return Ok(None), + }; + + Ok(Some(bincode::deserialize(&data).unwrap())) + }) + } + + pub fn set_head(&mut self, key: StorageKey) -> Result<(), SB::Error<()>> { + self.sb.transaction(|sb| { + // TODO: read old head, deal with ref counts + + let data = bincode::serialize(&key).unwrap(); + sb.write(StorageKey(0, 0, 0), Some(data)) + }) } } #[derive(serde::Serialize, serde::Deserialize)] -struct StoredRc { - ref_count: u64, +pub struct StoredRc { + count: u64, refs: Vec, data: Vec, } diff --git a/storage/src/tests.rs b/storage/src/tests.rs new file mode 100644 index 0000000..9351d4c --- /dev/null +++ b/storage/src/tests.rs @@ -0,0 +1,26 @@ +#[cfg(test)] +mod tests_ { + use crate::{ + memory_backend::MemoryBackend, + sled_backend::SledBackend, + storage::{Storage, StorageBackend}, + }; + + fn run(impl_memory: fn(&mut Storage), impl_sled: fn(&mut Storage)) { + let mut storage = Storage::new(MemoryBackend::new()); + impl_memory(&mut storage); + + let mut storage = Storage::new(SledBackend::open_in_memory().unwrap()); + impl_sled(&mut storage); + } + + #[test] + fn number() { + fn impl_(storage: &mut Storage) { + let key = storage.write_number(123.456).unwrap(); + assert_eq!(storage.read_number(key).unwrap(), Some(123.456)); + } + + run(impl_, impl_); + } +}