Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add file backed envelope stacks #4138

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f7652d3
feat(spooler): Add file backed envelope stacks
iambriccardo Oct 14, 2024
2771673
Fix
iambriccardo Oct 14, 2024
c3cc6b0
Fix
iambriccardo Oct 14, 2024
8201530
Fix
iambriccardo Oct 14, 2024
8859b3e
Fix
iambriccardo Oct 14, 2024
121291a
Fix
iambriccardo Oct 15, 2024
c48ddc0
Fix
iambriccardo Oct 15, 2024
421b47b
Merge branch 'master' into riccardo/feat/file-based-stack
iambriccardo Oct 15, 2024
515a925
Fix
iambriccardo Oct 15, 2024
3c1ce77
Fix
iambriccardo Oct 15, 2024
65f9106
Fix
iambriccardo Oct 15, 2024
d04e88f
Fix
iambriccardo Oct 15, 2024
1cb4c3d
Fix
iambriccardo Oct 15, 2024
4b0001e
Fix
iambriccardo Oct 15, 2024
91c9fdf
Fix
iambriccardo Oct 15, 2024
c7e2cdf
Fix
iambriccardo Oct 15, 2024
4699547
Fix
iambriccardo Oct 15, 2024
35fb7e7
Fix
iambriccardo Oct 15, 2024
accca78
Fix
iambriccardo Oct 15, 2024
cfabee3
Fix
iambriccardo Oct 15, 2024
ce9fd0e
first review
jjbayer Oct 15, 2024
acdc6d8
Fix
iambriccardo Oct 15, 2024
a21029a
Merge branch 'riccardo/feat/file-based-stack' of github.com:getsentry…
iambriccardo Oct 15, 2024
15a23ef
Fix
iambriccardo Oct 15, 2024
51156b6
Fix
iambriccardo Oct 15, 2024
ff9b39a
Fix
iambriccardo Oct 15, 2024
1b3bcf1
Fix
iambriccardo Oct 16, 2024
769b26e
Fix
iambriccardo Oct 16, 2024
fb45eee
Fix
iambriccardo Oct 16, 2024
58291b4
Fix
iambriccardo Oct 16, 2024
13d6637
Fix
iambriccardo Oct 16, 2024
1e37866
Fix
iambriccardo Oct 16, 2024
3361eca
Fix
iambriccardo Oct 16, 2024
1305cc1
Merge
iambriccardo Oct 16, 2024
17902ec
Merge
iambriccardo Oct 16, 2024
4e1c601
Fix
iambriccardo Oct 16, 2024
e9fb673
Disable file backed on windows
iambriccardo Oct 16, 2024
8999195
Fix
iambriccardo Oct 18, 2024
d57dcd5
test(buffer): Long-running benchmark (#4149)
jjbayer Oct 18, 2024
c28418e
Fix
iambriccardo Oct 18, 2024
c64a709
Fix
iambriccardo Oct 18, 2024
6c22095
Fix
iambriccardo Oct 18, 2024
e2f6334
Fix
iambriccardo Oct 18, 2024
e41764d
Fix
iambriccardo Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,11 @@ fn spool_max_backpressure_memory_percent() -> f32 {
0.9
}

/// Default for max opened files, 100000.
fn spool_envelopes_max_opened_files() -> usize {
100000
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand Down Expand Up @@ -979,9 +984,15 @@ pub struct EnvelopeSpool {
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
max_backpressure_memory_percent: f32,
/// Maximum number of opened files for the envelope spool.
#[serde(default = "spool_envelopes_max_opened_files")]
max_opened_files: usize,
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
/// The strategy to use for envelope buffering.
#[serde(default)]
buffer_strategy: EnvelopeBufferStrategy,
}

/// Version of the envelope buffering mechanism.
Expand All @@ -1002,6 +1013,19 @@ pub enum EnvelopeSpoolVersion {
V2,
}

/// The strategy to use for envelope buffering.
#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum EnvelopeBufferStrategy {
/// Use an in-memory buffer for envelopes.
#[default]
Memory,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this default, our currently deployed configs will revert to Memory even if they have a path set.

Bonus points if you can make it an adjacently or untagged enum while maintaining backward compatibility:

Memory,
Sqlite(PathBuf),
FileBacked(PathBuf)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a good point. I will try to make it work.

/// Use a SQLite database for envelope buffering.
Sqlite,
/// Use a file-backed system for envelope buffering.
FileBacked,
}

impl Default for EnvelopeSpool {
fn default() -> Self {
Self {
Expand All @@ -1017,7 +1041,9 @@ impl Default for EnvelopeSpool {
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
max_opened_files: spool_envelopes_max_opened_files(),
version: EnvelopeSpoolVersion::default(),
buffer_strategy: EnvelopeBufferStrategy::default(),
}
}
}
Expand Down Expand Up @@ -2243,6 +2269,16 @@ impl Config {
self.values.spool.envelopes.max_backpressure_memory_percent
}

/// Returns the maximum number of opened files for the envelope spool.
pub fn spool_envelopes_max_opened_files(&self) -> usize {
self.values.spool.envelopes.max_opened_files
}

/// Returns the envelope buffer strategy.
pub fn spool_envelope_buffer_strategy(&self) -> &EnvelopeBufferStrategy {
&self.values.spool.envelopes.buffer_strategy
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
165 changes: 100 additions & 65 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::convert::Infallible;
use std::error::Error;
use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
Expand All @@ -10,14 +9,17 @@ use std::time::Duration;

use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use tokio::time::{timeout, Instant};
use relay_config::{Config, EnvelopeBufferStrategy};
use tokio::time::Instant;

use crate::envelope::Envelope;
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::file_backed::FileBackedEnvelopeStackError;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::file_backed::FileBackedEnvelopeStoreError;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::file_backed::FileBackedStackProvider;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::services::buffer::stack_provider::StackCreationType;
Expand All @@ -42,6 +44,8 @@ pub enum PolymorphicEnvelopeBuffer {
InMemory(EnvelopeBuffer<MemoryStackProvider>),
/// An enveloper buffer that uses sqlite envelopes stacks.
Sqlite(EnvelopeBuffer<SqliteStackProvider>),
/// An envelope buffer that uses file-based envelope stacks.
FileBacked(EnvelopeBuffer<FileBackedStackProvider>),
}

impl PolymorphicEnvelopeBuffer {
Expand All @@ -50,6 +54,7 @@ impl PolymorphicEnvelopeBuffer {
match self {
PolymorphicEnvelopeBuffer::InMemory(_) => true,
PolymorphicEnvelopeBuffer::Sqlite(_) => false,
PolymorphicEnvelopeBuffer::FileBacked(_) => false,
}
}

Expand All @@ -59,14 +64,27 @@ impl PolymorphicEnvelopeBuffer {
config: &Config,
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path().is_some() {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(config).await?;
Self::Sqlite(buffer)
} else {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(memory_checker);
Self::InMemory(buffer)
let buffer = match (
config.spool_envelope_buffer_strategy(),
config.spool_envelopes_path(),
) {
(EnvelopeBufferStrategy::Sqlite, Some(_)) => {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(config).await?;
Self::Sqlite(buffer)
}
(EnvelopeBufferStrategy::FileBacked, Some(_)) => {
relay_log::trace!(
"PolymorphicEnvelopeBuffer: initializing file backed envelope buffer"
);
let buffer = EnvelopeBuffer::<FileBackedStackProvider>::new(config).await?;
Self::FileBacked(buffer)
}
_ => {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(memory_checker);
Self::InMemory(buffer)
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
};

Ok(buffer)
Expand All @@ -75,8 +93,9 @@ impl PolymorphicEnvelopeBuffer {
/// Initializes the envelope buffer.
pub async fn initialize(&mut self) {
match self {
PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
Self::InMemory(buffer) => buffer.initialize().await,
Self::Sqlite(buffer) => buffer.initialize().await,
Self::FileBacked(buffer) => buffer.initialize().await,
}
}

Expand All @@ -86,6 +105,7 @@ impl PolymorphicEnvelopeBuffer {
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
Self::FileBacked(buffer) => buffer.push(envelope).await,
}?;
});
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1);
Expand All @@ -98,6 +118,7 @@ impl PolymorphicEnvelopeBuffer {
match self {
Self::Sqlite(buffer) => buffer.peek(),
Self::InMemory(buffer) => buffer.peek(),
Self::FileBacked(buffer) => buffer.peek(),
}
})
}
Expand All @@ -108,6 +129,7 @@ impl PolymorphicEnvelopeBuffer {
match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
Self::FileBacked(buffer) => buffer.pop().await,
}?
});
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1);
Expand All @@ -122,6 +144,7 @@ impl PolymorphicEnvelopeBuffer {
match self {
Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
Self::FileBacked(buffer) => buffer.mark_ready(project, is_ready),
}
}

Expand All @@ -134,41 +157,55 @@ impl PolymorphicEnvelopeBuffer {
match self {
Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
Self::FileBacked(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
}
}

/// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
pub fn has_capacity(&self) -> bool {
/// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s, `false`
/// otherwise.
pub async fn has_capacity(&self) -> bool {
match self {
Self::Sqlite(buffer) => buffer.has_capacity(),
Self::InMemory(buffer) => buffer.has_capacity(),
Self::Sqlite(buffer) => buffer.has_capacity().await,
Self::InMemory(buffer) => buffer.has_capacity().await,
Self::FileBacked(buffer) => buffer.has_capacity().await,
}
}

/// Shuts down the [`PolymorphicEnvelopeBuffer`].
///
/// Returns `true` if the envelope have been written to storage, and it's safe to drop the
/// buffer, `false` otherwise.
pub async fn shutdown(&mut self) -> bool {
// Currently, we want to flush the buffer only for disk, since the in memory implementation
// tries to not do anything and pop as many elements as possible within the shutdown
// timeout.
let Self::Sqlite(buffer) = self else {
relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
return false;
};
buffer.flush().await;

true
match self {
Self::Sqlite(buffer) => {
buffer.flush().await;
true
}
// With in-memory we don't do anything on shutdown and let the system continue until
// the timeout.
Self::InMemory(_) => false,
// With file-backed all the data is on disk by default so we can safely drop the
// buffer.
Self::FileBacked(_) => true,
}
}
}

/// Error that occurs while interacting with the envelope buffer.
#[derive(Debug, thiserror::Error)]
pub enum EnvelopeBufferError {
#[error("sqlite")]
#[error("an error occurred in the sqlite envelope store")]
SqliteStore(#[from] SqliteEnvelopeStoreError),

#[error("sqlite")]
#[error("an error occurred in the sqlite envelope stack")]
SqliteStack(#[from] SqliteEnvelopeStackError),

#[error["an error occurred in the file backed store"]]
FileBackedStore(#[from] FileBackedEnvelopeStoreError),

#[error["an error occurred in the file backed stack"]]
FileBackedStack(#[from] FileBackedEnvelopeStackError),

#[error("failed to push envelope to the buffer")]
PushFailed,
}
Expand Down Expand Up @@ -221,7 +258,6 @@ impl EnvelopeBuffer<MemoryStackProvider> {
}
}

#[allow(dead_code)]
impl EnvelopeBuffer<SqliteStackProvider> {
/// Creates an empty sqlite-based buffer.
pub async fn new(config: &Config) -> Result<Self, EnvelopeBufferError> {
Expand All @@ -235,6 +271,19 @@ impl EnvelopeBuffer<SqliteStackProvider> {
}
}

impl EnvelopeBuffer<FileBackedStackProvider> {
/// Creates an empty file backed buffer.
pub async fn new(config: &Config) -> Result<Self, EnvelopeBufferError> {
Ok(Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: FileBackedStackProvider::new(config).await?,
total_count: Arc::new(AtomicI64::new(0)),
total_count_initialized: false,
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
})
}
}

impl<P: StackProvider> EnvelopeBuffer<P>
where
EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
Expand All @@ -244,9 +293,18 @@ where
pub async fn initialize(&mut self) {
relay_statsd::metric!(timer(RelayTimers::BufferInitialization), {
let initialization_state = self.stack_provider.initialize().await;

// We load the stacks given the supplied project key pairs.
self.load_stacks(initialization_state.project_key_pairs)
.await;
self.load_store_total_count().await;

// We initialize the total count for the store.
self.total_count.store(
initialization_state.store_total_count as i64,
AtomicOrdering::SeqCst,
);
self.total_count_initialized = true;
self.track_total_count();
});
}

Expand Down Expand Up @@ -287,6 +345,7 @@ where
Ok(())
}

/// Peeks at the metadata of the next [`Envelope`] without reading the envelope itself.
pub fn peek(&mut self) -> Option<Peek> {
let (
QueueItem { key, .. },
Expand Down Expand Up @@ -389,8 +448,8 @@ where
}

/// Returns `true` if the underlying storage has the capacity to store more envelopes.
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
pub async fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity().await
}

/// Flushes the envelope buffer.
Expand Down Expand Up @@ -465,33 +524,6 @@ where
}
}

/// Loads the total count from the store if it takes less than a specified duration.
///
/// The total count returned by the store is related to the count of elements that the buffer
/// will process, besides the count of elements that will be added and removed during its
/// lifecycle
async fn load_store_total_count(&mut self) {
let total_count = timeout(Duration::from_secs(1), async {
self.stack_provider.store_total_count().await
})
.await;
match total_count {
Ok(total_count) => {
self.total_count
.store(total_count as i64, AtomicOrdering::SeqCst);
self.total_count_initialized = true;
}
Err(error) => {
self.total_count_initialized = false;
relay_log::error!(
error = &error as &dyn Error,
"failed to load the total envelope count of the store",
);
}
};
self.track_total_count();
}

/// Emits a metric to track the total count of envelopes that are in the envelope buffer.
fn track_total_count(&self) {
let total_count = self.total_count.load(AtomicOrdering::SeqCst) as f64;
Expand Down Expand Up @@ -620,6 +652,7 @@ mod tests {
use relay_sampling::DynamicSamplingContext;
use std::str::FromStr;
use std::sync::Arc;
use tempfile::TempDir;
use uuid::Uuid;

use crate::envelope::{Item, ItemType};
Expand Down Expand Up @@ -924,11 +957,13 @@ mod tests {

#[tokio::test]
async fn test_initialize_buffer() {
let path = std::env::temp_dir()
.join(Uuid::new_v4().to_string())
.into_os_string()
.into_string()
.unwrap();
let temp_dir = TempDir::new().unwrap();
let path = temp_dir
.path()
.join("buffer.db")
.to_str()
.unwrap()
.to_string();
let config = mock_config(&path);
let mut store = SqliteEnvelopeStore::prepare(&config).await.unwrap();
let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(&config)
Expand Down
Loading
Loading