Skip to content

Commit

Permalink
add wildcard queries
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Nov 22, 2024
1 parent fad304f commit 2a0fa4f
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 19 deletions.
43 changes: 41 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ Each data point has

- a nanosecond timestamp, which is also its primary key (big-endian stored negated, because we want to scan from newest to oldest, and forwards scans are faster)
- the actual value (float)
- a tagset (list of key-value pairs, e.g. service=db; env=prod)
- a metric name (e.g. cpu.total)
- a tagset (list of key-value pairs, e.g. "service=db; env=prod")
- a metric name (e.g. "cpu.total")

A `Database` is contained in a single Fjall `Keyspace` and consists of a couple of partitions (prefixed by `_talna#`). This way it can be integrated in an existing application using Fjall.

Expand Down Expand Up @@ -106,3 +106,42 @@ println!("{buckets:#?}");
```

![Timeseries parameters](./timeseries.svg)

## Filter query operators

The filter query DSL supports a couple of operators:

### AND

`env:prod AND service:db`

### OR

`db:postgres OR db:mariadb`

### NOT

`!db:postgres AND !db:mariadb`

### Wildcard

`service:db.postgres.v* OR service:db.mariadb.v*`

Note that wildcards can only be applied on the right side, so tags need to be designed in *increasing* cardinality (hierarchical):

`BAD!: loc:munich.bavaria.germany.eu.earth`

`GOOD!: loc:earth.eu.germany.bavaria.munich`, allows queries like: `loc:earth.eu.germany.*`

### Nesting

`env:prod AND (service:db OR service:rest-api OR service:graphql-api)`

<!-- TODO: 1.0.0 Set,
e.g. service:[db, rest-api, graphql-api]
expands to (x OR y OR z), see nom parser
### Set
`env:prod AND service:[db, rest-api, graphql-api]`
-->
4 changes: 2 additions & 2 deletions billion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ fn main() -> talna::Result<()> {

{
let db = Database::builder()
// TODO: cache currently bloats memory a lot because of Arcs in block cache
// TODO: timeseries would probably benefit a lot from a CompressedBlockCache (to skip I/O, but not CPU work)
// TODO: 1.0.0 cache currently bloats memory a lot because of Arcs in block cache
// timeseries would probably benefit a lot from a CompressedBlockCache (to skip I/O, but not CPU work)
.cache_size_mib(8)
.hyper_mode(true)
.open(path)?;
Expand Down
129 changes: 126 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ impl Database {

let series_ids = filter.evaluate(&self.0.smap, &self.0.tag_index, metric)?;
if series_ids.is_empty() {
log::debug!("Query did not match any series");
log::debug!("Query {filter_expr:?} did not match any series");
return Ok(vec![]);
}

log::debug!(
log::trace!(
"Querying metric {metric}{{{filter}}} [{min:?}..{max:?}] in series {series_ids:?}"
);

Expand Down Expand Up @@ -370,7 +370,7 @@ impl Database {
} else {
// NOTE: Actually create series

// TODO: atomic, persistent counter
// TODO: 1.0.0 atomic, persistent counter
let next_series_id = self.0.smap.partition.inner().len()? as SeriesId;

log::trace!("Creating series {next_series_id} for permutation {series_key:?}");
Expand Down Expand Up @@ -1007,4 +1007,127 @@ mod tests {

Ok(())
}

#[test]
fn test_wildcard() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let db = Database::builder().open(&folder)?;
let metric_name = MetricName::try_from("hello").unwrap();

db.write_at(
metric_name,
0,
4.0,
tagset!(
"env" => "prod",
"service" => "server.nginx",
),
)?;
db.write_at(
metric_name,
0,
4.0,
tagset!(
"env" => "prod",
"service" => "db.bigtable",
),
)?;
db.write_at(
metric_name,
0,
4.0,
tagset!(
"env" => "prod",
"service" => "db.neon",
),
)?;
db.write_at(
metric_name,
0,
4.0,
tagset!(
"env" => "prod",
"service" => "db.postgres.14",
),
)?;
db.write_at(
metric_name,
0,
4.0,
tagset!(
"env" => "prod",
"service" => "db.postgres.15",
),
)?;
db.write_at(
metric_name,
0,
4.0,
tagset!(
"env" => "prod",
"service" => "db.postgres.16",
),
)?;

{
let aggregator = db.count(metric_name, "env").build()?;
assert_eq!(1, aggregator.len());
assert!(aggregator.contains_key("prod"));
for (_, mut aggregator) in aggregator {
let bucket = aggregator.next().unwrap()?;
assert_eq!(6, bucket.len);
}
}

{
let aggregator = db
.count(metric_name, "env")
.filter("service:db.postgres.16")
.build()?;
assert_eq!(1, aggregator.len());
assert!(aggregator.contains_key("prod"));
for (_, mut aggregator) in aggregator {
let bucket = aggregator.next().unwrap()?;
assert_eq!(1, bucket.len);
}
}

{
let aggregator = db
.count(metric_name, "env")
.filter("service:db.postgres.*")
.build()?;
assert_eq!(1, aggregator.len());
assert!(aggregator.contains_key("prod"));
for (_, mut aggregator) in aggregator {
let bucket = aggregator.next().unwrap()?;
assert_eq!(3, bucket.len);
}
}

{
let aggregator = db
.count(metric_name, "env")
.filter("service:db.*")
.build()?;
assert_eq!(1, aggregator.len());
assert!(aggregator.contains_key("prod"));
for (_, mut aggregator) in aggregator {
let bucket = aggregator.next().unwrap()?;
assert_eq!(5, bucket.len);
}
}

{
let aggregator = db.count(metric_name, "env").filter("service:*").build()?;
assert_eq!(1, aggregator.len());
assert!(aggregator.contains_key("prod"));
for (_, mut aggregator) in aggregator {
let bucket = aggregator.next().unwrap()?;
assert_eq!(6, bucket.len);
}
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions src/db_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ pub struct Builder {
hyper_mode: bool,
}

// TODO: prefix bloom filters would be *really* nice
// TODO: if we can make lsm-tree optimize ranges that have a common prefix
// TODO: 1.0.0 prefix bloom filters would be *really* nice
// if we can make lsm-tree optimize ranges that have a common prefix

impl Builder {
pub(crate) fn new() -> Self {
Expand Down
34 changes: 31 additions & 3 deletions src/query/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Node<'a> {
And(Vec<Self>),
Or(Vec<Self>),
Eq(Tag<'a>),
Wildcard(Tag<'a>),
Not(Box<Self>),
AllStar,
}
Expand All @@ -22,6 +23,7 @@ impl<'a> std::fmt::Display for Node<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Node::Eq(leaf) => write!(f, "{}:{}", leaf.key, leaf.value),
Node::Wildcard(leaf) => write!(f, "{}:{}*", leaf.key, leaf.value),
Node::And(nodes) => write!(
f,
"({})",
Expand Down Expand Up @@ -88,7 +90,7 @@ pub fn union(vecs: &[Vec<SeriesId>]) -> Vec<SeriesId> {
}

impl<'a> Node<'a> {
// TODO: unit test and add benchmark case
// TODO: 1.0.0 unit test and add benchmark case
pub fn evaluate(
&self,
smap: &SeriesMapping,
Expand All @@ -98,8 +100,10 @@ impl<'a> Node<'a> {
match self {
Node::AllStar => tag_index.query_eq(metric_name),
Node::Eq(leaf) => {
let term = format!("{metric_name}#{}:{}", leaf.key, leaf.value);
tag_index.query_eq(&term)
tag_index.query_eq(&TagIndex::format_key(metric_name, leaf.key, leaf.value))
}
Node::Wildcard(leaf) => {
tag_index.query_prefix(&TagIndex::format_key(metric_name, leaf.key, leaf.value))
}
Node::And(children) => {
// TODO: evaluate lazily...
Expand Down Expand Up @@ -137,6 +141,7 @@ impl<'a> Node<'a> {

#[derive(Debug)]
pub enum Item<'a> {
Wildcard((&'a str, &'a str)),
Identifier((&'a str, &'a str)),
And,
Or,
Expand Down Expand Up @@ -166,6 +171,15 @@ pub fn parse_filter_query(s: &str) -> Result<Node, crate::Error> {
let v = splits.next().expect("should be valid identifier");
output_queue.push_back(Item::Identifier((k, v)));
}
lexer::Token::Wildcard(id) => {
let mut splits = id.split(':');
let k = splits.next().expect("should be valid identifier");
let v = splits
.next()
.expect("should be valid identifier")
.trim_end_matches("*");
output_queue.push_back(Item::Wildcard((k, v)));
}
lexer::Token::And => {
loop {
let Some(top) = op_stack.back() else {
Expand Down Expand Up @@ -241,6 +255,9 @@ pub fn parse_filter_query(s: &str) -> Result<Node, crate::Error> {
Item::Identifier((key, value)) => {
buf.push(Node::Eq(Tag { key, value }));
}
Item::Wildcard((key, value)) => {
buf.push(Node::Wildcard(Tag { key, value }));
}
Item::And => {
let Some(b) = buf.pop() else {
return Err(crate::Error::InvalidQuery);
Expand Down Expand Up @@ -319,6 +336,17 @@ mod tests {
);
}

#[test_log::test]
fn test_parse_filter_query_wildcard_1() {
assert_eq!(
Node::Wildcard(Tag {
key: "service",
value: "db-"
}),
parse_filter_query("service:db-*").unwrap()
);
}

#[test_log::test]
fn test_intersection() {
assert_eq!(
Expand Down
10 changes: 9 additions & 1 deletion src/query/lexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ pub enum Token<'a> {
#[token(")")]
ParanClose,

#[regex("[a-zA-Z_-]+:[a-zA-Z0-9_-]+")]
#[regex("[a-zA-Z_-]+:[a-zA-Z0-9_\\-.]*\\*")]
Wildcard(&'a str),

#[regex("[a-zA-Z_-]+:[a-zA-Z0-9_\\-.]+")]
Identifier(&'a str),
}

// TODO: 1.0.0 replace with nom parser

// TODO: 1.0.0 TagSet values should probably also be allowed to be integers
// so we can something like: give me the AVG response time of all 4xx HTTP responses

pub fn tokenize_filter_query(s: &str) -> impl Iterator<Item = Result<Token, ()>> + '_ {
Token::lexer(s)
}
23 changes: 18 additions & 5 deletions src/tag_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl TagIndex {
term: &str,
series_id: SeriesId,
) -> crate::Result<()> {
log::trace!("indexing {term:?} => {series_id}");
// log::trace!("Indexing {term:?} => {series_id}");

tx.fetch_update(&self.partition, term, |bytes| match bytes {
Some(bytes) => {
Expand All @@ -80,7 +80,7 @@ impl TagIndex {
}
postings.push(series_id);

log::trace!("posting list {term:?} is now {postings:?}");
// log::trace!("posting list {term:?} is now {postings:?}");

Some(Self::serialize_postings_list(&postings).into())
}
Expand All @@ -90,6 +90,16 @@ impl TagIndex {
Ok(())
}

pub fn format_key(metric_name: &str, key: &str, value: &str) -> String {
let mut s = String::with_capacity(metric_name.len() + 1 + key.len() + 1 + value.len());
s.push_str(metric_name);
s.push('#');
s.push_str(key);
s.push(':');
s.push_str(value);
s
}

pub fn query_eq(&self, term: &str) -> crate::Result<Vec<SeriesId>> {
Ok(self
.partition
Expand All @@ -109,12 +119,12 @@ impl TagIndex {
.unwrap_or_default())
}

pub fn query_prefix(&self, metric: MetricName, prefix: &str) -> crate::Result<Vec<SeriesId>> {
pub fn query_prefix(&self, prefix: &str) -> crate::Result<Vec<SeriesId>> {
let mut ids = vec![];

let read_tx = self.keyspace.read_tx();

for kv in read_tx.prefix(&self.partition, format!("{metric}#{prefix}")) {
for kv in read_tx.prefix(&self.partition, prefix) {
let (_, v) = kv?;

let mut reader = &v[..];
Expand Down Expand Up @@ -189,7 +199,10 @@ mod tests {

tx.commit()?;

assert_eq!(vec![0, 3], tag_index.query_prefix(metric, "service:prod-")?);
assert_eq!(
vec![0, 3],
tag_index.query_prefix("cpu.total:service:prod-")?
);

Ok(())
}
Expand Down
Loading

0 comments on commit 2a0fa4f

Please sign in to comment.