From fd8dd015d28f0bcd5954bb0eddb5332b58312cc7 Mon Sep 17 00:00:00 2001 From: Andrii Savytskyi Date: Mon, 2 Sep 2024 13:59:39 +0300 Subject: [PATCH 1/3] feat: new 'VecPool' --- fastrace/benches/object_pool.rs | 9 +- fastrace/src/collector/global_collector.rs | 21 +- fastrace/src/util/mod.rs | 42 ++- fastrace/src/util/object_pool.rs | 287 +++++++++++++-------- 4 files changed, 216 insertions(+), 143 deletions(-) diff --git a/fastrace/benches/object_pool.rs b/fastrace/benches/object_pool.rs index 9705fda..8093fef 100644 --- a/fastrace/benches/object_pool.rs +++ b/fastrace/benches/object_pool.rs @@ -4,20 +4,19 @@ use criterion::criterion_group; use criterion::criterion_main; use criterion::BatchSize; use criterion::Criterion; -use fastrace::util::object_pool::Pool; +use fastrace::util::object_pool::GlobalVecPool; fn bench_alloc_vec(c: &mut Criterion) { let mut bgroup = c.benchmark_group("Vec::with_capacity"); for cap in &[1, 10, 100, 1000, 10000, 100000] { - let vec_pool: Pool> = Pool::new(Vec::new, Vec::clear); - let mut puller = vec_pool.puller(512); - fastrace::util::object_pool::enable_reuse_in_current_thread(); + static VEC_POOL: GlobalVecPool = GlobalVecPool::new(); + let mut puller = VEC_POOL.new_local(512); bgroup.bench_function(format!("object-pool/{}", cap), |b| { b.iter_batched( || (), |_| { - let mut vec = puller.pull(); + let mut vec = puller.take(); if vec.capacity() < *cap { vec.reserve(*cap); } diff --git a/fastrace/src/collector/global_collector.rs b/fastrace/src/collector/global_collector.rs index 18862e8..c00cb28 100644 --- a/fastrace/src/collector/global_collector.rs +++ b/fastrace/src/collector/global_collector.rs @@ -27,7 +27,6 @@ use crate::collector::TraceId; use crate::local::local_collector::LocalSpansInner; use crate::local::raw_span::RawKind; use crate::local::raw_span::RawSpan; -use crate::util::object_pool; use crate::util::spsc::Receiver; use crate::util::spsc::Sender; use crate::util::spsc::{self}; @@ -230,24 +229,20 @@ impl GlobalCollector { { std::thread::Builder::new() .name("fastrace-global-collector".to_string()) - .spawn(move || { - loop { - let begin_instant = Instant::now(); - GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands(); - std::thread::sleep( - config - .report_interval - .saturating_sub(begin_instant.elapsed()), - ); - } + .spawn(move || loop { + let begin_instant = Instant::now(); + GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands(); + std::thread::sleep( + config + .report_interval + .saturating_sub(begin_instant.elapsed()), + ); }) .unwrap(); } } fn handle_commands(&mut self) { - object_pool::enable_reuse_in_current_thread(); - debug_assert!(self.start_collects.is_empty()); debug_assert!(self.drop_collects.is_empty()); debug_assert!(self.commit_collects.is_empty()); diff --git a/fastrace/src/util/mod.rs b/fastrace/src/util/mod.rs index 4ae9fa3..d9af7af 100644 --- a/fastrace/src/util/mod.rs +++ b/fastrace/src/util/mod.rs @@ -9,52 +9,48 @@ pub mod tree; use std::borrow::Cow; use std::cell::RefCell; -use once_cell::sync::Lazy; - use crate::collector::CollectTokenItem; use crate::local::raw_span::RawSpan; -use crate::util::object_pool::Pool; -use crate::util::object_pool::Puller; -use crate::util::object_pool::Reusable; +use crate::util::object_pool::GlobalVecPool; +use crate::util::object_pool::LocalVecPool; +use crate::util::object_pool::ReusableVec; -static RAW_SPANS_POOL: Lazy>> = Lazy::new(|| Pool::new(Vec::new, Vec::clear)); -static COLLECT_TOKEN_ITEMS_POOL: Lazy>> = - Lazy::new(|| Pool::new(Vec::new, Vec::clear)); -#[allow(clippy::type_complexity)] -static PROPERTIES_POOL: Lazy, Cow<'static, str>)>>> = - Lazy::new(|| Pool::new(Vec::new, Vec::clear)); +static RAW_SPANS_POOL: GlobalVecPool = GlobalVecPool::new(); +static COLLECT_TOKEN_ITEMS_POOL: GlobalVecPool = GlobalVecPool::new(); +static PROPERTIES_POOL: GlobalVecPool<(Cow<'static, str>, Cow<'static, str>)> = + GlobalVecPool::new(); thread_local! { - static RAW_SPANS_PULLER: RefCell>> = RefCell::new(RAW_SPANS_POOL.puller(512)); - static COLLECT_TOKEN_ITEMS_PULLER: RefCell>> = RefCell::new(COLLECT_TOKEN_ITEMS_POOL.puller(512)); + static RAW_SPANS_PULLER: RefCell> = RefCell::new(RAW_SPANS_POOL.new_local(512)); + static COLLECT_TOKEN_ITEMS_PULLER: RefCell> = RefCell::new(COLLECT_TOKEN_ITEMS_POOL.new_local(512)); #[allow(clippy::type_complexity)] - static PROPERTIES_PULLER: RefCell, Cow<'static, str>)>>> = RefCell::new(PROPERTIES_POOL.puller(512)); + static PROPERTIES_PULLER: RefCell, Cow<'static, str>)>> = RefCell::new(PROPERTIES_POOL.new_local(512)); } -pub type RawSpans = Reusable<'static, Vec>; -pub type CollectToken = Reusable<'static, Vec>; -pub type Properties = Reusable<'static, Vec<(Cow<'static, str>, Cow<'static, str>)>>; +pub type RawSpans = ReusableVec; +pub type CollectToken = ReusableVec; +pub type Properties = ReusableVec<(Cow<'static, str>, Cow<'static, str>)>; impl Default for RawSpans { fn default() -> Self { RAW_SPANS_PULLER - .try_with(|puller| puller.borrow_mut().pull()) - .unwrap_or_else(|_| Reusable::new(&*RAW_SPANS_POOL, vec![])) + .try_with(|puller| puller.borrow_mut().take()) + .unwrap_or_else(|_| Self::new(&RAW_SPANS_POOL, Vec::new())) } } impl Default for Properties { fn default() -> Self { PROPERTIES_PULLER - .try_with(|puller| puller.borrow_mut().pull()) - .unwrap_or_else(|_| Reusable::new(&*PROPERTIES_POOL, vec![])) + .try_with(|puller| puller.borrow_mut().take()) + .unwrap_or_else(|_| Self::new(&PROPERTIES_POOL, Vec::new())) } } fn new_collect_token(items: impl IntoIterator) -> CollectToken { let mut token = COLLECT_TOKEN_ITEMS_PULLER - .try_with(|puller| puller.borrow_mut().pull()) - .unwrap_or_else(|_| Reusable::new(&*COLLECT_TOKEN_ITEMS_POOL, vec![])); + .try_with(|puller| puller.borrow_mut().take()) + .unwrap_or_else(|_| CollectToken::new(&COLLECT_TOKEN_ITEMS_POOL, Vec::new())); token.extend(items); token } diff --git a/fastrace/src/util/object_pool.rs b/fastrace/src/util/object_pool.rs index 7ac67dd..ee2405a 100644 --- a/fastrace/src/util/object_pool.rs +++ b/fastrace/src/util/object_pool.rs @@ -1,153 +1,236 @@ -// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + fmt, + mem::{self, ManuallyDrop}, + ops, slice, + sync::Mutex, +}; + +#[must_use] +pub struct GlobalVecPool +where + T: 'static, +{ + storage: Mutex>>, +} -use std::cell::Cell; -use std::mem::ManuallyDrop; -use std::ops::Deref; -use std::ops::DerefMut; +impl GlobalVecPool { + pub const fn new() -> Self { + Self { + storage: Mutex::new(Vec::new()), + } + } -use parking_lot::Mutex; + pub const fn new_local(&'static self, capacity: usize) -> LocalVecPool { + debug_assert!(capacity > 0, "storage capacity cannot be zero"); -thread_local! { - static REUSABLE: Cell = const { Cell::new(false) }; -} + LocalVecPool { + global_pool: self, + storage: Vec::new(), + // lazy allocation - only if thread uses pool + capacity, + } + } -pub fn enable_reuse_in_current_thread() { - REUSABLE.with(|r| r.set(true)); -} + fn fill_empty_local(&'static self, local_storage: &mut Vec>) { + debug_assert!(local_storage.is_empty(), "local storage must be empty"); + debug_assert!( + local_storage.capacity() != 0, + "local storage must have capacity" + ); -fn is_reusable() -> bool { - REUSABLE.with(|r| r.get()) -} + let needs = local_storage.capacity(); -pub struct Pool { - // The objects in the pool ready to be reused. - // The mutex should only be visited in the global collector, which is guaranteed by - // `is_reusable`, so it should not have synchronization overhead. - objects: Mutex>, - init: fn() -> T, - reset: fn(&mut T), -} + let mut storage = self.storage.lock().expect("not poisoned"); -impl Pool { - #[inline] - pub fn new(init: fn() -> T, reset: fn(&mut T)) -> Pool { - Pool { - objects: Mutex::new(Vec::new()), - init, - reset, + let available = storage.len(); + + // try to take `1..needs` from storage. All with non-zero capacity. + if available != 0 { + let range = available.saturating_sub(needs)..; + + let vecs = storage.drain(range); + + local_storage.extend(vecs); + + return; + } + + drop(storage); + + // init with zero-capacity vectors + local_storage.resize_with(needs, || Vec::new()); + } + + fn consume_local(&'static self, local_storage: &mut Vec>) { + let Some(first) = local_storage.first() else { + return; + }; + + // local storage contains vectors from global pool with non-zero + // capacity or zero-capacity vectors (see fill_empty_local). + + if first.capacity() == 0 { + // all elements with zero capacity + return; } + + self.storage + .lock() + .expect("not poisoned") + .extend(local_storage.drain(..)); } - #[inline] - fn batch_pull<'a>(&'a self, n: usize, buffer: &mut Vec>) { - let mut objects = self.objects.lock(); - let len = objects.len(); - buffer.extend( - objects - .drain(len.saturating_sub(n)..) - .map(|obj| Reusable::new(self, obj)), - ); - drop(objects); - buffer.resize_with(n, || Reusable::new(self, (self.init)())); + fn recycle(&'static self, mut data: Vec) { + debug_assert!(data.capacity() != 0, "vec must have capacity"); + + data.clear(); + + self.storage.lock().expect("not poisoned").push(data); } - pub fn puller(&self, buffer_size: usize) -> Puller { - assert!(buffer_size > 0); - Puller { - pool: self, - buffer: Vec::with_capacity(buffer_size), - buffer_size, + pub fn take(&'static self) -> ReusableVec { + let mut storage = self.storage.lock().expect("not poisoned"); + + if let Some(data) = storage.pop() { + return ReusableVec::new(self, data); } + + drop(storage); + + ReusableVec::new(self, Vec::new()) } - #[inline] - pub fn recycle(&self, mut obj: T) { - if is_reusable() { - (self.reset)(&mut obj); - self.objects.lock().push(obj) + /// Create a new `ReusableVec` as a stub. + /// + /// Capacity must be 0 when object is ready to be dropped, otherwise it + /// will be recycled what may cause memory leaks. + pub const fn stub(&'static self) -> ReusableVec { + ReusableVec::stub(self) + } +} + +#[must_use] +pub struct LocalVecPool { + global_pool: &'static GlobalVecPool, + storage: Vec>, + capacity: usize, +} + +impl LocalVecPool { + pub fn take(&mut self) -> ReusableVec { + if self.storage.is_empty() { + if self.storage.capacity() == 0 { + self.storage.reserve_exact(self.capacity); + } + + self.global_pool.fill_empty_local(&mut self.storage); } + + ReusableVec::new(self.global_pool, self.storage.pop().expect("not empty")) } } -pub struct Puller<'a, T> { - pool: &'a Pool, - buffer: Vec>, - buffer_size: usize, +impl Drop for LocalVecPool { + fn drop(&mut self) { + self.global_pool.consume_local(&mut self.storage); + } } -impl<'a, T> Puller<'a, T> { - #[inline] - pub fn pull(&mut self) -> Reusable<'a, T> { - self.buffer.pop().unwrap_or_else(|| { - self.pool.batch_pull(self.buffer_size, &mut self.buffer); - self.buffer.pop().unwrap() - }) +#[must_use] +pub struct ReusableVec { + global_pool: &'static GlobalVecPool, + data: ManuallyDrop>, + #[cfg(debug_assertions)] + is_stub: bool, +} + +impl PartialEq for ReusableVec +where + T: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + self.data.eq(&other.data) } } -pub struct Reusable<'a, T> { - pool: &'a Pool, - obj: ManuallyDrop, +impl fmt::Debug for ReusableVec +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.data.fmt(f) + } } -impl<'a, T> Reusable<'a, T> { - #[inline] - pub fn new(pool: &'a Pool, obj: T) -> Self { +impl ReusableVec { + pub fn new(global_pool: &'static GlobalVecPool, data: Vec) -> Self { + debug_assert!(data.is_empty(), "vec must be empty"); + Self { - pool, - obj: ManuallyDrop::new(obj), + global_pool, + data: ManuallyDrop::new(data), + #[cfg(debug_assertions)] + is_stub: false, + } + } + + const fn stub(global_pool: &'static GlobalVecPool) -> Self { + Self { + global_pool, + data: ManuallyDrop::new(Vec::new()), + #[cfg(debug_assertions)] + is_stub: true, } } #[inline] - pub fn into_inner(mut self) -> T { + pub fn into_inner(mut self) -> Vec { unsafe { - let obj = ManuallyDrop::take(&mut self.obj); - std::mem::forget(self); + let obj = ManuallyDrop::take(&mut self.data); + + mem::forget(self); + obj } } } -impl<'a, T> std::fmt::Debug for Reusable<'a, T> -where T: std::fmt::Debug -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.obj.fmt(f) - } -} +impl Drop for ReusableVec { + fn drop(&mut self) { + // SAFETY: first call + let data = unsafe { ManuallyDrop::take(&mut self.data) }; -impl<'a, T> std::cmp::PartialEq for Reusable<'a, T> -where T: std::cmp::PartialEq -{ - fn eq(&self, other: &Self) -> bool { - T::eq(self, other) + if data.capacity() != 0 { + #[cfg(debug_assertions)] + debug_assert!(!self.is_stub, "stubs cannot recycles"); + + self.global_pool.recycle(data); + } } } -impl<'a, T> std::cmp::Eq for Reusable<'a, T> where T: std::cmp::Eq {} - -impl<'a, T> Deref for Reusable<'a, T> { - type Target = T; +impl<'a, T> IntoIterator for &'a ReusableVec { + type IntoIter = slice::Iter<'a, T>; + type Item = &'a T; #[inline] - fn deref(&self) -> &Self::Target { - &self.obj + fn into_iter(self) -> Self::IntoIter { + self.iter() } } -impl<'a, T> DerefMut for Reusable<'a, T> { - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.obj +impl ops::Deref for ReusableVec { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.data } } -impl<'a, T> Drop for Reusable<'a, T> { - #[inline] - fn drop(&mut self) { - unsafe { - self.pool.recycle(ManuallyDrop::take(&mut self.obj)); - } +impl ops::DerefMut for ReusableVec { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data } } From ccc82484a6ba4d3fd5bff6e9ba91c3a73b66e604 Mon Sep 17 00:00:00 2001 From: Andrii Savytskyi Date: Mon, 2 Sep 2024 14:16:40 +0300 Subject: [PATCH 2/3] chore: fmt --- fastrace/src/collector/global_collector.rs | 18 ++++++++++-------- fastrace/src/util/object_pool.rs | 21 +++++++++------------ 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/fastrace/src/collector/global_collector.rs b/fastrace/src/collector/global_collector.rs index c00cb28..34ed8da 100644 --- a/fastrace/src/collector/global_collector.rs +++ b/fastrace/src/collector/global_collector.rs @@ -229,14 +229,16 @@ impl GlobalCollector { { std::thread::Builder::new() .name("fastrace-global-collector".to_string()) - .spawn(move || loop { - let begin_instant = Instant::now(); - GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands(); - std::thread::sleep( - config - .report_interval - .saturating_sub(begin_instant.elapsed()), - ); + .spawn(move || { + loop { + let begin_instant = Instant::now(); + GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands(); + std::thread::sleep( + config + .report_interval + .saturating_sub(begin_instant.elapsed()), + ); + } }) .unwrap(); } diff --git a/fastrace/src/util/object_pool.rs b/fastrace/src/util/object_pool.rs index ee2405a..5c17baf 100644 --- a/fastrace/src/util/object_pool.rs +++ b/fastrace/src/util/object_pool.rs @@ -1,16 +1,15 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{ - fmt, - mem::{self, ManuallyDrop}, - ops, slice, - sync::Mutex, -}; +use std::fmt; +use std::mem::ManuallyDrop; +use std::mem::{self}; +use std::ops; +use std::slice; +use std::sync::Mutex; #[must_use] pub struct GlobalVecPool -where - T: 'static, +where T: 'static { storage: Mutex>>, } @@ -147,8 +146,7 @@ pub struct ReusableVec { } impl PartialEq for ReusableVec -where - T: PartialEq, +where T: PartialEq { fn eq(&self, other: &Self) -> bool { self.data.eq(&other.data) @@ -156,8 +154,7 @@ where } impl fmt::Debug for ReusableVec -where - T: fmt::Debug, +where T: fmt::Debug { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.data.fmt(f) From 7314adb83dd82b84bed1d6407f3652ea91d879e3 Mon Sep 17 00:00:00 2001 From: Andrii Savytskyi Date: Mon, 2 Sep 2024 14:21:49 +0300 Subject: [PATCH 3/3] feat: impl Default trait for GlobalVecPool --- fastrace/src/util/object_pool.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fastrace/src/util/object_pool.rs b/fastrace/src/util/object_pool.rs index 5c17baf..185bf65 100644 --- a/fastrace/src/util/object_pool.rs +++ b/fastrace/src/util/object_pool.rs @@ -14,6 +14,12 @@ where T: 'static storage: Mutex>>, } +impl Default for GlobalVecPool { + fn default() -> Self { + Self::new() + } +} + impl GlobalVecPool { pub const fn new() -> Self { Self {