Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable last cache eviction #25520

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ pub struct Config {
action
)]
pub telemetry_endpoint: String,

/// The interval on which to evict expired entries from the Last-N-Value cache, expressed as a
/// human-readable time, e.g., "20s", "1m", "1h".
#[clap(
long = "last-cache-eviction-interval",
env = "INFLUXDB3_LAST_CACHE_EVICTION_INTERVAL",
default_value = "1m",
action
)]
pub last_cache_eviction_interval: humantime::Duration,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -436,15 +446,18 @@ pub async fn command(config: Config) -> Result<()> {
.map_err(Error::InitializePersistedCatalog)?,
);

let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
.map_err(Error::InitializeLastCache)?;
let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&catalog) as _,
config.last_cache_eviction_interval.into(),
)
.map_err(Error::InitializeLastCache)?;
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

let write_buffer_impl = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(last_cache),
last_cache,
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ mod tests {
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ mod tests {
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig {
Expand Down
134 changes: 87 additions & 47 deletions influxdb3_write/src/last_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ pub struct CreateCacheArguments {

impl LastCacheProvider {
/// Initialize a [`LastCacheProvider`] from a [`Catalog`]
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Self, Error> {
let provider = LastCacheProvider {
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Arc<Self>, Error> {
let provider = Arc::new(LastCacheProvider {
catalog: Arc::clone(&catalog),
cache_map: Default::default(),
};
});
for db_schema in catalog.list_db_schema() {
for table_def in db_schema.tables() {
for (cache_name, cache_def) in table_def.last_caches() {
Expand Down Expand Up @@ -169,6 +169,20 @@ impl LastCacheProvider {
}
}
}

Ok(provider)
}

/// Initialize a [`LastCacheProvider`] from a [`Catalog`] and run a background process to
/// evict expired entries from the cache
pub fn new_from_catalog_with_background_eviction(
catalog: Arc<Catalog>,
eviction_interval: Duration,
) -> Result<Arc<Self>, Error> {
let provider = Self::new_from_catalog(catalog)?;

background_eviction_process(Arc::clone(&provider), eviction_interval);

Ok(provider)
}

Expand Down Expand Up @@ -569,6 +583,22 @@ impl LastCacheProvider {
}
}

fn background_eviction_process(
provider: Arc<LastCacheProvider>,
eviction_interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(eviction_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;

provider.evict_expired_cache_entries();
}
})
}

fn last_cache_schema_from_table_def(
table_def: Arc<TableDefinition>,
key_columns: Vec<ColumnId>,
Expand Down Expand Up @@ -1011,7 +1041,9 @@ impl<'a> ExtendedLastCacheState<'a> {
.state
.as_store()
.expect("should only be calling to_record_batch when using a store");
let n = store.len();
// Determine the number of elements that have not expired up front, so that the value used
// is consistent in the chain of methods used to produce record batches below:
let n_non_expired = store.len();
let extended: Option<Vec<ArrayRef>> = if self.key_column_values.is_empty() {
None
} else {
Expand All @@ -1021,28 +1053,28 @@ impl<'a> ExtendedLastCacheState<'a> {
.map(|value| match value {
KeyValue::String(v) => {
let mut builder = StringBuilder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::Int(v) => {
let mut builder = Int64Builder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::UInt(v) => {
let mut builder = UInt64Builder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
}
KeyValue::Bool(v) => {
let mut builder = BooleanBuilder::new();
for _ in 0..n {
for _ in 0..n_non_expired {
builder.append_value(*v);
}
Arc::new(builder.finish()) as ArrayRef
Expand All @@ -1051,7 +1083,7 @@ impl<'a> ExtendedLastCacheState<'a> {
.collect(),
)
};
store.to_record_batch(table_def, schema, extended)
store.to_record_batch(table_def, schema, extended, n_non_expired)
}
}

Expand Down Expand Up @@ -1317,9 +1349,12 @@ impl LastCacheStore {
}
}

/// Get the number of values in the cache.
/// Get the number of values in the cache that have not expired past the TTL.
fn len(&self) -> usize {
self.instants.len()
self.instants
.iter()
.filter(|i| i.elapsed() < self.ttl)
.count()
}

/// Check if the cache is empty
Expand Down Expand Up @@ -1388,11 +1423,16 @@ impl LastCacheStore {
/// produced set of [`RecordBatch`]es. These are for the scenario where key columns are
/// included in the outputted batches, as the [`LastCacheStore`] only holds the field columns
/// for the cache.
///
/// Accepts an `n_non_expired` argument to indicate the number of non-expired elements in the
/// store. This is passed in vs. calling `self.len()`, since that is already invoked in the
/// calling function, and calling it here _could_ produce a different result.
fn to_record_batch(
&self,
table_def: Arc<TableDefinition>,
schema: ArrowSchemaRef,
extended: Option<Vec<ArrayRef>>,
n_non_expired: usize,
) -> Result<RecordBatch, ArrowError> {
let mut arrays = extended.unwrap_or_default();
if self.accept_new_fields {
Expand All @@ -1408,12 +1448,16 @@ impl LastCacheStore {
continue;
}
arrays.push(self.cache.get(&id).map_or_else(
|| new_null_array(field.data_type(), self.len()),
|c| c.data.as_array(),
|| new_null_array(field.data_type(), n_non_expired),
|c| c.data.as_array(n_non_expired),
));
}
} else {
arrays.extend(self.cache.iter().map(|(_, col)| col.data.as_array()));
arrays.extend(
self.cache
.iter()
.map(|(_, col)| col.data.as_array(n_non_expired)),
);
}
RecordBatch::try_new(schema, arrays)
}
Expand All @@ -1423,7 +1467,7 @@ impl LastCacheStore {
/// Returns whether or not the store is empty after expired entries are removed.
fn remove_expired(&mut self) -> bool {
while let Some(instant) = self.instants.back() {
if instant.elapsed() > self.ttl {
if instant.elapsed() >= self.ttl {
self.instants.pop_back();
} else {
break;
Expand Down Expand Up @@ -1594,43 +1638,47 @@ impl CacheColumnData {
}

/// Produce an arrow [`ArrayRef`] from this column for the sake of producing [`RecordBatch`]es
fn as_array(&self) -> ArrayRef {
///
/// Accepts `n_non_expired` to indicate how many of the first elements in the column buffer to
/// take, i.e., those that have not yet expired. That value is determined externally by the
/// [`LastCacheStore`] that tracks TTL.
fn as_array(&self, n_non_expired: usize) -> ArrayRef {
match self {
CacheColumnData::I64(buf) => {
let mut b = Int64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::U64(buf) => {
let mut b = UInt64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::F64(buf) => {
let mut b = Float64Builder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::String(buf) => {
let mut b = StringBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(v),
None => b.append_null(),
});
Arc::new(b.finish())
}
CacheColumnData::Bool(buf) => {
let mut b = BooleanBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(*v),
None => b.append_null(),
});
Expand All @@ -1639,7 +1687,7 @@ impl CacheColumnData {
CacheColumnData::Tag(buf) => {
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
StringDictionaryBuilder::new();
buf.iter().for_each(|val| match val {
buf.iter().take(n_non_expired).for_each(|val| match val {
Some(v) => b.append_value(v),
None => b.append_null(),
});
Expand All @@ -1648,14 +1696,16 @@ impl CacheColumnData {
CacheColumnData::Key(buf) => {
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
StringDictionaryBuilder::new();
buf.iter().for_each(|val| {
buf.iter().take(n_non_expired).for_each(|val| {
b.append_value(val);
});
Arc::new(b.finish())
}
CacheColumnData::Time(buf) => {
let mut b = TimestampNanosecondBuilder::new();
buf.iter().for_each(|val| b.append_value(*val));
buf.iter()
.take(n_non_expired)
.for_each(|val| b.append_value(*val));
Arc::new(b.finish())
}
}
Expand Down Expand Up @@ -1721,7 +1771,7 @@ mod tests {
WriteBufferImpl::new(
persister,
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
LastCacheProvider::new_from_catalog(catalog as _).unwrap(),
time_provider,
crate::test_help::make_exec(),
WalConfig::test_config(),
Expand Down Expand Up @@ -2289,7 +2339,7 @@ mod tests {
Some("cache"),
// use a cache size greater than 1 to ensure the TTL is doing the evicting
Some(10),
Some(Duration::from_millis(50)),
Some(Duration::from_millis(1000)),
Some(vec![
(region_col_id, "region".into()),
(host_col_id, "host".into()),
Expand Down Expand Up @@ -2345,25 +2395,7 @@ mod tests {
);

// wait for the TTL to clear the cache
tokio::time::sleep(Duration::from_millis(100)).await;

// the last cache eviction only happens when writes are flushed out to the buffer. If
// no writes are coming in, the last cache will still have data in it. So, we need to write
// some data to the buffer to trigger the last cache eviction.
wbuf.write_lp(
NamespaceName::new(db_name).unwrap(),
format!(
"\
{tbl_name},region=us,host=b usage=200\n\
"
)
.as_str(),
Time::from_timestamp_nanos(2_000),
false,
Precision::Nanosecond,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;

// Check what is in the last cache:
let batches = wbuf
Expand All @@ -2373,7 +2405,15 @@ mod tests {
.unwrap();

// The cache is completely empty after the TTL evicted data, so it will give back nothing:
assert_batches_sorted_eq!(["++", "++",], &batches);
assert_batches_sorted_eq!(
[
"+--------+------+------+-------+",
"| region | host | time | usage |",
"+--------+------+------+-------+",
"+--------+------+------+-------+",
],
&batches
);

// Ensure that records can be written to the cache again:
wbuf.write_lp(
Expand Down Expand Up @@ -2446,7 +2486,7 @@ mod tests {
tbl_id,
Some("cache"),
None,
Some(Duration::from_millis(50)),
None,
Some(vec![
(component_id_col_id, "component_id".into()),
(active_col_id, "active".into()),
Expand Down
Loading