From 2a0fa4fd06f4fe4aaf2191613d59a87f6d95a62c Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 22 Nov 2024 19:18:03 +0100 Subject: [PATCH] add wildcard queries --- README.md | 43 ++++++++++++++- billion/src/main.rs | 4 +- src/db.rs | 129 ++++++++++++++++++++++++++++++++++++++++++-- src/db_builder.rs | 4 +- src/query/filter.rs | 34 ++++++++++-- src/query/lexer.rs | 10 +++- src/tag_index.rs | 23 ++++++-- src/tag_sets.rs | 2 +- 8 files changed, 230 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index b20c507..778a75b 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,8 @@ Each data point has - a nanosecond timestamp, which is also its primary key (big-endian stored negated, because we want to scan from newest to oldest, and forwards scans are faster) - the actual value (float) -- a tagset (list of key-value pairs, e.g. “service=db; env=prod”) -- a metric name (e.g. “cpu.total”) +- a tagset (list of key-value pairs, e.g. "service=db; env=prod") +- a metric name (e.g. "cpu.total") A `Database` is contained in a single Fjall `Keyspace` and consists of a couple of partitions (prefixed by `_talna#`). This way it can be integrated in an existing application using Fjall. @@ -106,3 +106,42 @@ println!("{buckets:#?}"); ``` ![Timeseries parameters](./timeseries.svg) + +## Filter query operators + +The filter query DSL supports a couple of operators: + +### AND + +`env:prod AND service:db` + +### OR + +`db:postgres OR db:mariadb` + +### NOT + +`!db:postgres AND !db:mariadb` + +### Wildcard + +`service:db.postgres.v* OR service:db.mariadb.v*` + +Note that wildcards can only be applied on the right side, so tags need to be designed in *increasing* cardinality (hierarchical): + +`BAD!: loc:munich.bavaria.germany.eu.earth` + +`GOOD!: loc:earth.eu.germany.bavaria.munich`, allows queries like: `loc:earth.eu.germany.*` + +### Nesting + +`env:prod AND (service:db OR service:rest-api OR service:graphql-api)` + + diff --git a/billion/src/main.rs b/billion/src/main.rs index 1c2f098..3049e6b 100644 --- a/billion/src/main.rs +++ b/billion/src/main.rs @@ -29,8 +29,8 @@ fn main() -> talna::Result<()> { { let db = Database::builder() - // TODO: cache currently bloats memory a lot because of Arcs in block cache - // TODO: timeseries would probably benefit a lot from a CompressedBlockCache (to skip I/O, but not CPU work) + // TODO: 1.0.0 cache currently bloats memory a lot because of Arcs in block cache + // timeseries would probably benefit a lot from a CompressedBlockCache (to skip I/O, but not CPU work) .cache_size_mib(8) .hyper_mode(true) .open(path)?; diff --git a/src/db.rs b/src/db.rs index daef86d..24b4cb5 100644 --- a/src/db.rs +++ b/src/db.rs @@ -190,11 +190,11 @@ impl Database { let series_ids = filter.evaluate(&self.0.smap, &self.0.tag_index, metric)?; if series_ids.is_empty() { - log::debug!("Query did not match any series"); + log::debug!("Query {filter_expr:?} did not match any series"); return Ok(vec![]); } - log::debug!( + log::trace!( "Querying metric {metric}{{{filter}}} [{min:?}..{max:?}] in series {series_ids:?}" ); @@ -370,7 +370,7 @@ impl Database { } else { // NOTE: Actually create series - // TODO: atomic, persistent counter + // TODO: 1.0.0 atomic, persistent counter let next_series_id = self.0.smap.partition.inner().len()? as SeriesId; log::trace!("Creating series {next_series_id} for permutation {series_key:?}"); @@ -1007,4 +1007,127 @@ mod tests { Ok(()) } + + #[test] + fn test_wildcard() -> crate::Result<()> { + let folder = tempfile::tempdir()?; + let db = Database::builder().open(&folder)?; + let metric_name = MetricName::try_from("hello").unwrap(); + + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "env" => "prod", + "service" => "server.nginx", + ), + )?; + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "env" => "prod", + "service" => "db.bigtable", + ), + )?; + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "env" => "prod", + "service" => "db.neon", + ), + )?; + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "env" => "prod", + "service" => "db.postgres.14", + ), + )?; + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "env" => "prod", + "service" => "db.postgres.15", + ), + )?; + db.write_at( + metric_name, + 0, + 4.0, + tagset!( + "env" => "prod", + "service" => "db.postgres.16", + ), + )?; + + { + let aggregator = db.count(metric_name, "env").build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("prod")); + for (_, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + assert_eq!(6, bucket.len); + } + } + + { + let aggregator = db + .count(metric_name, "env") + .filter("service:db.postgres.16") + .build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("prod")); + for (_, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + assert_eq!(1, bucket.len); + } + } + + { + let aggregator = db + .count(metric_name, "env") + .filter("service:db.postgres.*") + .build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("prod")); + for (_, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + assert_eq!(3, bucket.len); + } + } + + { + let aggregator = db + .count(metric_name, "env") + .filter("service:db.*") + .build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("prod")); + for (_, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + assert_eq!(5, bucket.len); + } + } + + { + let aggregator = db.count(metric_name, "env").filter("service:*").build()?; + assert_eq!(1, aggregator.len()); + assert!(aggregator.contains_key("prod")); + for (_, mut aggregator) in aggregator { + let bucket = aggregator.next().unwrap()?; + assert_eq!(6, bucket.len); + } + } + + Ok(()) + } } diff --git a/src/db_builder.rs b/src/db_builder.rs index 50616f0..eaaff43 100644 --- a/src/db_builder.rs +++ b/src/db_builder.rs @@ -8,8 +8,8 @@ pub struct Builder { hyper_mode: bool, } -// TODO: prefix bloom filters would be *really* nice -// TODO: if we can make lsm-tree optimize ranges that have a common prefix +// TODO: 1.0.0 prefix bloom filters would be *really* nice +// if we can make lsm-tree optimize ranges that have a common prefix impl Builder { pub(crate) fn new() -> Self { diff --git a/src/query/filter.rs b/src/query/filter.rs index 93abbaf..42fab73 100644 --- a/src/query/filter.rs +++ b/src/query/filter.rs @@ -14,6 +14,7 @@ pub enum Node<'a> { And(Vec), Or(Vec), Eq(Tag<'a>), + Wildcard(Tag<'a>), Not(Box), AllStar, } @@ -22,6 +23,7 @@ impl<'a> std::fmt::Display for Node<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Node::Eq(leaf) => write!(f, "{}:{}", leaf.key, leaf.value), + Node::Wildcard(leaf) => write!(f, "{}:{}*", leaf.key, leaf.value), Node::And(nodes) => write!( f, "({})", @@ -88,7 +90,7 @@ pub fn union(vecs: &[Vec]) -> Vec { } impl<'a> Node<'a> { - // TODO: unit test and add benchmark case + // TODO: 1.0.0 unit test and add benchmark case pub fn evaluate( &self, smap: &SeriesMapping, @@ -98,8 +100,10 @@ impl<'a> Node<'a> { match self { Node::AllStar => tag_index.query_eq(metric_name), Node::Eq(leaf) => { - let term = format!("{metric_name}#{}:{}", leaf.key, leaf.value); - tag_index.query_eq(&term) + tag_index.query_eq(&TagIndex::format_key(metric_name, leaf.key, leaf.value)) + } + Node::Wildcard(leaf) => { + tag_index.query_prefix(&TagIndex::format_key(metric_name, leaf.key, leaf.value)) } Node::And(children) => { // TODO: evaluate lazily... @@ -137,6 +141,7 @@ impl<'a> Node<'a> { #[derive(Debug)] pub enum Item<'a> { + Wildcard((&'a str, &'a str)), Identifier((&'a str, &'a str)), And, Or, @@ -166,6 +171,15 @@ pub fn parse_filter_query(s: &str) -> Result { let v = splits.next().expect("should be valid identifier"); output_queue.push_back(Item::Identifier((k, v))); } + lexer::Token::Wildcard(id) => { + let mut splits = id.split(':'); + let k = splits.next().expect("should be valid identifier"); + let v = splits + .next() + .expect("should be valid identifier") + .trim_end_matches("*"); + output_queue.push_back(Item::Wildcard((k, v))); + } lexer::Token::And => { loop { let Some(top) = op_stack.back() else { @@ -241,6 +255,9 @@ pub fn parse_filter_query(s: &str) -> Result { Item::Identifier((key, value)) => { buf.push(Node::Eq(Tag { key, value })); } + Item::Wildcard((key, value)) => { + buf.push(Node::Wildcard(Tag { key, value })); + } Item::And => { let Some(b) = buf.pop() else { return Err(crate::Error::InvalidQuery); @@ -319,6 +336,17 @@ mod tests { ); } + #[test_log::test] + fn test_parse_filter_query_wildcard_1() { + assert_eq!( + Node::Wildcard(Tag { + key: "service", + value: "db-" + }), + parse_filter_query("service:db-*").unwrap() + ); + } + #[test_log::test] fn test_intersection() { assert_eq!( diff --git a/src/query/lexer.rs b/src/query/lexer.rs index 3719ee5..cf67141 100644 --- a/src/query/lexer.rs +++ b/src/query/lexer.rs @@ -18,10 +18,18 @@ pub enum Token<'a> { #[token(")")] ParanClose, - #[regex("[a-zA-Z_-]+:[a-zA-Z0-9_-]+")] + #[regex("[a-zA-Z_-]+:[a-zA-Z0-9_\\-.]*\\*")] + Wildcard(&'a str), + + #[regex("[a-zA-Z_-]+:[a-zA-Z0-9_\\-.]+")] Identifier(&'a str), } +// TODO: 1.0.0 replace with nom parser + +// TODO: 1.0.0 TagSet values should probably also be allowed to be integers +// so we can something like: give me the AVG response time of all 4xx HTTP responses + pub fn tokenize_filter_query(s: &str) -> impl Iterator> + '_ { Token::lexer(s) } diff --git a/src/tag_index.rs b/src/tag_index.rs index c4b2bc6..e4e4b92 100644 --- a/src/tag_index.rs +++ b/src/tag_index.rs @@ -66,7 +66,7 @@ impl TagIndex { term: &str, series_id: SeriesId, ) -> crate::Result<()> { - log::trace!("indexing {term:?} => {series_id}"); + // log::trace!("Indexing {term:?} => {series_id}"); tx.fetch_update(&self.partition, term, |bytes| match bytes { Some(bytes) => { @@ -80,7 +80,7 @@ impl TagIndex { } postings.push(series_id); - log::trace!("posting list {term:?} is now {postings:?}"); + // log::trace!("posting list {term:?} is now {postings:?}"); Some(Self::serialize_postings_list(&postings).into()) } @@ -90,6 +90,16 @@ impl TagIndex { Ok(()) } + pub fn format_key(metric_name: &str, key: &str, value: &str) -> String { + let mut s = String::with_capacity(metric_name.len() + 1 + key.len() + 1 + value.len()); + s.push_str(metric_name); + s.push('#'); + s.push_str(key); + s.push(':'); + s.push_str(value); + s + } + pub fn query_eq(&self, term: &str) -> crate::Result> { Ok(self .partition @@ -109,12 +119,12 @@ impl TagIndex { .unwrap_or_default()) } - pub fn query_prefix(&self, metric: MetricName, prefix: &str) -> crate::Result> { + pub fn query_prefix(&self, prefix: &str) -> crate::Result> { let mut ids = vec![]; let read_tx = self.keyspace.read_tx(); - for kv in read_tx.prefix(&self.partition, format!("{metric}#{prefix}")) { + for kv in read_tx.prefix(&self.partition, prefix) { let (_, v) = kv?; let mut reader = &v[..]; @@ -189,7 +199,10 @@ mod tests { tx.commit()?; - assert_eq!(vec![0, 3], tag_index.query_prefix(metric, "service:prod-")?); + assert_eq!( + vec![0, 3], + tag_index.query_prefix("cpu.total:service:prod-")? + ); Ok(()) } diff --git a/src/tag_sets.rs b/src/tag_sets.rs index a843d99..51d9528 100644 --- a/src/tag_sets.rs +++ b/src/tag_sets.rs @@ -23,7 +23,7 @@ impl TagSets { } pub fn insert(&self, tx: &mut WriteTransaction, series_id: SeriesId, tags: &str) { - log::trace!("storing tag set {series_id:?} => {tags:?}"); + // log::trace!("Storing tag set {series_id:?} => {tags:?}"); tx.insert(&self.partition, series_id.to_be_bytes(), tags); }