Skip to content

Commit

Permalink
feat: configurable last cache eviction (#25520)
Browse files Browse the repository at this point in the history
* refactor: make last cache eviction optional

This changes how the last cache is evicted. It will no longer run eviction
on writes to the cache, instead, there is an optional method to create a
last cache provider that will run eviction in a background task on a specified
interval.

Otherwise, when records are produced from the cache, only those that have
not expired will be produced.

This should reduce locks on the cache and hopefully improve performance.

* feat: configurable last cache eviction interval

* docs: clean up var names, code docs, and comments
  • Loading branch information
hiltontj authored Nov 6, 2024
1 parent da294a2 commit 391b67f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 61 deletions.
19 changes: 16 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ pub struct Config {
action
)]
pub telemetry_endpoint: String,

/// The interval on which to evict expired entries from the Last-N-Value cache, expressed as a
/// human-readable time, e.g., "20s", "1m", "1h".
#[clap(
long = "last-cache-eviction-interval",
env = "INFLUXDB3_LAST_CACHE_EVICTION_INTERVAL",
default_value = "1m",
action
)]
pub last_cache_eviction_interval: humantime::Duration,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -436,15 +446,18 @@ pub async fn command(config: Config) -> Result<()> {
.map_err(Error::InitializePersistedCatalog)?,
);

let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
.map_err(Error::InitializeLastCache)?;
let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&catalog) as _,
config.last_cache_eviction_interval.into(),
)
.map_err(Error::InitializeLastCache)?;
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

let write_buffer_impl = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(last_cache),
last_cache,
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ mod tests {
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ mod tests {
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig {
Expand Down
134 changes: 87 additions & 47 deletions influxdb3_write/src/last_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ pub struct CreateCacheArguments {

impl LastCacheProvider {
/// Initialize a [`LastCacheProvider`] from a [`Catalog`]
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Self, Error> {
let provider = LastCacheProvider {
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Arc<Self>, Error> {
let provider = Arc::new(LastCacheProvider {
catalog: Arc::clone(&catalog),
cache_map: Default::default(),
};
});
for db_schema in catalog.list_db_schema() {
for table_def in db_schema.tables() {
for (cache_name, cache_def) in table_def.last_caches() {
Expand Down Expand Up @@ -169,6 +169,20 @@ impl LastCacheProvider {
}
}
}

Ok(provider)
}

/// Initialize a [`LastCacheProvider`] from a [`Catalog`] and run a background process to
/// evict expired entries from the cache
pub fn new_from_catalog_with_background_eviction(
catalog: Arc<Catalog>,
eviction_interval: Duration,
) -> Result<Arc<Self>, Error> {
let provider = Self::new_from_catalog(catalog)?;

background_eviction_process(Arc::clone(&provider), eviction_interval);

Ok(provider)
}

Expand Down Expand Up @@ -569,6 +583,22 @@ impl LastCacheProvider {
}
}

fn background_eviction_process(
provider: Arc<LastCacheProvider>,
eviction_interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(eviction_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;

provider.evict_expired_cache_entries();
}
})
}

fn last_cache_schema_from_table_def(
table_def: Arc<TableDefinition>,
key_columns: Vec<ColumnId>,
Expand Down Expand Up @@ -1011,7 +1041,9 @@ impl<'a> ExtendedLastCacheState<'a> {
.state
.as_store()
.expect("should only be calling to_record_batch when using a store");
let n = store.len();
// Determine the number of elements that have not expired up front, so that the value used
// is consistent in the chain of methods used to produce record batches below:
let n_non_expired = store.len();
let extended: Option<Vec<ArrayRef>> = if self.key_column_values.is_empty() {
None
} else {
Expand All @@ -1021,28 +1053,28 @@ impl<'a> ExtendedLastCacheState<'a> {
.map(|value| match value {
KeyValue::String(v) => {
let mut builder = StringBuilder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::Int(v) => {
let mut builder = Int64Builder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::UInt(v) => {
let mut builder = UInt64Builder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::Bool(v) => {
let mut builder = BooleanBuilder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
Expand All @@ -1051,7 +1083,7 @@ impl<'a> ExtendedLastCacheState<'a> {
.collect(),
)
};
store.to_record_batch(table_def, schema, extended)
store.to_record_batch(table_def, schema, extended, n_non_expired)
}
}

Expand Down Expand Up @@ -1317,9 +1349,12 @@ impl LastCacheStore {
}
}

/// Get the number of values in the cache.
/// Get the number of values in the cache that have not expired past the TTL.
fn len(&self) -> usize {
self.instants.len()
self.instants
.iter()
.filter(|i| i.elapsed() < self.ttl)
.count()
}

/// Check if the cache is empty
Expand Down Expand Up @@ -1388,11 +1423,16 @@ impl LastCacheStore {
/// produced set of [`RecordBatch`]es. These are for the scenario where key columns are
/// included in the outputted batches, as the [`LastCacheStore`] only holds the field columns
/// for the cache.
///
/// Accepts an `n_non_expired` argument to indicate the number of non-expired elements in the
/// store. This is passed in vs. calling `self.len()`, since that is already invoked in the
/// calling function, and calling it here _could_ produce a different result.
fn to_record_batch(
&self,
table_def: Arc<TableDefinition>,
schema: ArrowSchemaRef,
extended: Option<Vec<ArrayRef>>,
n_non_expired: usize,
) -> Result<RecordBatch, ArrowError> {
let mut arrays = extended.unwrap_or_default();
if self.accept_new_fields {
Expand All @@ -1408,12 +1448,16 @@ impl LastCacheStore {
continue;
}
arrays.push(self.cache.get(&id).map_or_else(
|| new_null_array(field.data_type(), self.len()),
|c| c.data.as_array(),
|| new_null_array(field.data_type(), n_non_expired),
|c| c.data.as_array(n_non_expired),
));
}
} else {
arrays.extend(self.cache.iter().map(|(_, col)| col.data.as_array()));
arrays.extend(
self.cache
.iter()
.map(|(_, col)| col.data.as_array(n_non_expired)),
);
}
RecordBatch::try_new(schema, arrays)
}
Expand All @@ -1423,7 +1467,7 @@ impl LastCacheStore {
/// Returns whether or not the store is empty after expired entries are removed.
fn remove_expired(&mut self) -> bool {
while let Some(instant) = self.instants.back() {
if instant.elapsed() > self.ttl {
if instant.elapsed() >= self.ttl {
self.instants.pop_back();
} else {
break;
Expand Down Expand Up @@ -1594,43 +1638,47 @@ impl CacheColumnData {
}

/// Produce an arrow [`ArrayRef`] from this column for the sake of producing [`RecordBatch`]es
fn as_array(&self) -> ArrayRef {
///
/// Accepts `n_non_expired` to indicate how many of the first elements in the column buffer to
/// take, i.e., those that have not yet expired. That value is determined externally by the
/// [`LastCacheStore`] that tracks TTL.
fn as_array(&self, n_non_expired: usize) -> ArrayRef {
match self {
CacheColumnData::I64(buf) => {
let mut b = Int64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::U64(buf) => {
let mut b = UInt64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::F64(buf) => {
let mut b = Float64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::String(buf) => {
let mut b = StringBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::Bool(buf) => {
let mut b = BooleanBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Expand All @@ -1639,7 +1687,7 @@ impl CacheColumnData {
CacheColumnData::Tag(buf) => {
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
StringDictionaryBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(v),
None => b.append_null(),
});
Expand All @@ -1648,14 +1696,16 @@ impl CacheColumnData {
CacheColumnData::Key(buf) => {
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
StringDictionaryBuilder::new();
buf.iter().for_each(|val| {
buf.iter().take(n_non_expired).for_each(|val| {
b.append_value(val);
});
Arc::new(b.finish())
}
CacheColumnData::Time(buf) => {
let mut b = TimestampNanosecondBuilder::new();
buf.iter().for_each(|val| b.append_value(*val));
buf.iter()
.take(n_non_expired)
.for_each(|val| b.append_value(*val));
Arc::new(b.finish())
}
}
Expand Down Expand Up @@ -1721,7 +1771,7 @@ mod tests {
WriteBufferImpl::new(
persister,
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
time_provider,
crate::test_help::make_exec(),
WalConfig::test_config(),
Expand Down Expand Up @@ -2289,7 +2339,7 @@ mod tests {
Some("cache"),
// use a cache size greater than 1 to ensure the TTL is doing the evicting
Some(10),
Some(Duration::from_millis(50)),
Some(Duration::from_millis(1000)),
Some(vec![
(region_col_id, "region".into()),
(host_col_id, "host".into()),
Expand Down Expand Up @@ -2345,25 +2395,7 @@ mod tests {
);

// wait for the TTL to clear the cache
tokio::time::sleep(Duration::from_millis(100)).await;

// the last cache eviction only happens when writes are flushed out to the buffer. If
// no writes are coming in, the last cache will still have data in it. So, we need to write
// some data to the buffer to trigger the last cache eviction.
wbuf.write_lp(
NamespaceName::new(db_name).unwrap(),
format!(
"\
{tbl_name},region=us,host=b usage=200\n\
"
)
.as_str(),
Time::from_timestamp_nanos(2_000),
false,
Precision::Nanosecond,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;

// Check what is in the last cache:
let batches = wbuf
Expand All @@ -2373,7 +2405,15 @@ mod tests {
.unwrap();

// The cache is completely empty after the TTL evicted data, so it will give back nothing:
assert_batches_sorted_eq!(["++", "++",], &batches);
assert_batches_sorted_eq!(
[
"+--------+------+------+-------+",
"| region | host | time | usage |",
"+--------+------+------+-------+",
"+--------+------+------+-------+",
],
&batches
);

// Ensure that records can be written to the cache again:
wbuf.write_lp(
Expand Down Expand Up @@ -2446,7 +2486,7 @@ mod tests {
tbl_id,
Some("cache"),
None,
Some(Duration::from_millis(50)),
None,
Some(vec![
(component_id_col_id, "component_id".into()),
(active_col_id, "active".into()),
Expand Down
Loading

0 comments on commit 391b67f

Please sign in to comment.