-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: metadata cache provider and datafusion trait impls #25566
Conversation
5d159c9
to
583f69e
Compare
/// Compare the configuration of a given cache, producing a helpful error message if they differ | ||
pub(crate) fn compare_config(&self, other: &Self) -> Result<(), anyhow::Error> { | ||
if self.max_cardinality != other.max_cardinality { | ||
bail!( | ||
"incompatible `max_cardinality`, expected {}, got {}", | ||
self.max_cardinality, | ||
other.max_cardinality | ||
); | ||
} | ||
if self.max_age != other.max_age { | ||
bail!( | ||
"incompatible `max_age`, expected {}, got {}", | ||
self.max_age.as_secs(), | ||
other.max_age.as_secs() | ||
) | ||
} | ||
if self.column_ids != other.column_ids { | ||
bail!( | ||
"incompatible column id selection, expected {}, got {}", | ||
self.column_ids | ||
.iter() | ||
.map(|id| id.to_string()) | ||
.collect::<Vec<_>>() | ||
.join(","), | ||
other | ||
.column_ids | ||
.iter() | ||
.map(|id| id.to_string()) | ||
.collect::<Vec<_>>() | ||
.join(","), | ||
) | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is newly added to the core cache code, as it is used on cache creation in the MetaCacheProvider
to give friendly errors if users attempt to overwrite an existing cache.
/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] | ||
/// | ||
/// This is intended to be derived from a set of filter expressions in Datafusion by analyzing | ||
/// them with a `LiteralGuarantee`. | ||
/// | ||
/// This uses a `BTreeSet` to store the values so that they are iterated over in sorted order. | ||
#[derive(Debug, Clone, PartialEq, Eq)] | ||
pub(crate) enum Predicate { | ||
In(BTreeSet<Value>), | ||
NotIn(BTreeSet<Value>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type was changed from the previous enum
that also contained Eq
and NotEq
variants. Those were no longer needed with how predicates are pushed down from DataFusion (see the table_function.rs
module)
44d0ec2
to
9dc7cc8
Compare
This adds the MetaDataCacheProvider for managing metadata caches in the influxdb3 instance. This includes APIs to create caches through the WAL as well as from a catalog on initialization, to write data into the managed caches, and to query data out of them. The query side is fairly involved, relying on Datafusion's TableFunctionImpl and TableProvider traits to make querying the cache using a user-defined table function (UDTF) possible. The predicate code was modified to only support two kinds of predicates: IN and NOT IN, which simplifies the code, and maps nicely with the DataFusion LiteralGuarantee which we leverage to derive the predicates from the incoming queries. A custom ExecutionPlan implementation was added specifically for the metadata cache that can report the predicates that are pushed down to the cache during query planning/execution. A big set of tests was added to to check that queries are working, and that predicates are being pushed down properly.
9dc7cc8
to
088df39
Compare
"| us-west | d |", | ||
"+---------+------+", | ||
], | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these test cases were removed because the Eq
and NotEq
variants were removed from the Predicate
type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice PR - comments were very handy to follow the changes. I learnt few DF tricks as well when going through this. 🙏 Just left few queries for my understanding.
@@ -691,6 +692,7 @@ pub struct TableDefinition { | |||
pub column_map: BiHashMap<ColumnId, Arc<str>>, | |||
pub series_key: Option<Vec<ColumnId>>, | |||
pub last_caches: HashMap<Arc<str>, LastCacheDefinition>, | |||
pub meta_caches: HashMap<Arc<str>, MetaCacheDefinition>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to clear out the metacache
when table or db is deleted? Maybe address in one of the follow up PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will see if I can address this now, since I did have to merge in your delete work anyways, it shouldn't be too hard to make this additional change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write!(f, " predicates=[")?; | ||
let mut p_iter = predicates.iter(); | ||
while let Some((col_id, predicate)) = p_iter.next() { | ||
write!(f, "[{col_id} {predicate}]")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like we have a custom display to plug into explain
output. Maybe we could swap column name for id here?
predicates=[[0 IN (us-east)], [1 IN (a,b)]]
probably reads better (imo) when its
predicates=[[region IN (us-east)], [host IN (a,b)]]
Just an UX thing, when there's many columns involved this might be a little difficult to understand. If col_id
is useful in explain
, then we could add that info here like region$0
or some variation of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, I think I could do this by passing forward the Arc<TableDefinition>
to the MetaCacheExec
so it only pulls the column names when needed, e.g., for the EXPLAIN
output. The table definition is already there so should not be too tricky, and yeah - given that part of the motivation for this whole execution plan implementation is to make it readable to the operator so I will get this in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// discard those predicates and have DataFusion handle the filtering. | ||
// | ||
// This is a conservative approach; it may be that we can combine multiple literal guarantees on | ||
// a single column, but thusfar, from testing in the parent module, this does not seem necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole thing is quite interesting, thanks for summarizing it. I'd need to go through this closely to understand it fully but I think I generally the get the idea.
/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to | ||
/// the node in the next level of the tree. | ||
#[derive(Debug, Default)] | ||
struct Node(BTreeMap<Value, (i64, Option<Node>)>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my clarity, does this mean each level in the tree will correspond to a column (in turn a BTreeMap
)? If that's the case,
- does the filtering take advantage of the fact that it doesn't need to scan subsequent levels if the query doesn't include it, for eg.
select region from metadata_cache('cpu')
will not requirehost
to be scanned. It sounds like something we'll do, but I couldn't figure out where we short circuit it. - do we need to support out of band queries, like
select host from metadata_cache('cpu')
(note: there's no where clause forregion
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the filtering take advantage of the fact that it doesn't need to scan subsequent levels if the query doesn't include it, for eg. select region from metadata_cache('cpu') will not require host to be scanned. It sounds like something we'll do, but I couldn't figure out where we short circuit it.
It does not. That would be "projection pushdown" which we could accomplish by handling the projection
arg passed in the TableProvider::scan
method. I did not attempt that yet. I will add an issue to tackle this since it is definitely doable but trickier than I want to handle on this PR and some requirements might need refining.
do we need to support out of band queries, like select host from metadata_cache('cpu') (note: there's no where clause for region)?
It does handle this now, and there is a test that does this, but it is doing a full scan of the cache hierarchy, and as I note in that test, then DataFusion does the filtering/projection - so this is not optimal.
We might be able to improve handling of this if we tackle projection pushdown, but will need to decide some things, e.g., if the output should be sorted or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"| us-east | b |", | ||
"+---------+------+", | ||
], | ||
explain_contains: "MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These explain_contains
are really neat 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is super helpful for testing. I originally had the entire EXPLAIN
output in the test but unfortunately it was flaky since some things differ in CI 😔. There might be some helpers in datafusion that we could use to normalize the EXPLAIN
output, will need to dig for that.
@praveen-influx - on further thought, I have made some forward progress on subsequent issues so I want to get this merged. I opened issues to address the feedback you gave: |
Closes #25543
Closes #25544
This adds the
MetaCacheProvider
for managing metadata caches in theinfluxdb3
instance. This includes APIs to create caches through the WAL as well as from a catalog on initialization, to write data into the managed caches, and to query data out of them.The query side is fairly involved, relying on Datafusion's
TableFunctionImpl
andTableProvider
traits to make querying the cache using a user-defined table function (UDTF) possible.The predicate code was modified to only support two kinds of predicates: IN and NOT IN, which simplifies the code, and maps nicely with the DataFusion
LiteralGuarantee
which we leverage to derive the predicates from the incoming queries.MetaCacheExec
, a customExecutionPlan
implementation was added specifically for the metadata cache that can report the predicates that are pushed down to the cache during query planning/execution.A big set of tests was added to to check that queries are working, and that predicates are being pushed down properly.
Additional Notes
meta_cache
module of theinfluxdb3_cache
crate around so that the primarymod.rs
just contains tests, and then has the following additional modules:cache.rs
: (existing with some changes) core cache implementation - I left a couple comments on where the main changes were made to that codeprovider.rs
: (new) contains code for the newMetaCacheProvider
table_function.rs
: (new) contains code for DataFusion trait implementations