Skip to content

Commit

Permalink
Add new metrics to show capacity and items across objects (#20)
Browse files Browse the repository at this point in the history
Signed-off-by: Vanessa Tang <yuetan@amazon.com>
  • Loading branch information
YueTang-Vanessa authored Nov 19, 2024
1 parent 9d61cad commit c18dd1e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 31 deletions.
21 changes: 21 additions & 0 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ impl BloomFilterType {
// Add item.
filter.set(item);
filter.num_items += 1;
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(1);
}
// Non Scaling Filters that are filled to capacity cannot handle more inserts.
Expand Down Expand Up @@ -208,6 +210,9 @@ impl BloomFilterType {
new_filter.set(item);
new_filter.num_items += 1;
self.filters.push(new_filter);

metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(1);
}
Ok(0)
Expand Down Expand Up @@ -289,6 +294,12 @@ impl BloomFilterType {
filter.number_of_bytes(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add(
filter.num_items.into(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed);
}

Ok(item)
Expand Down Expand Up @@ -329,6 +340,8 @@ impl BloomFilter {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed);
fltr
}

Expand Down Expand Up @@ -356,6 +369,10 @@ impl BloomFilter {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed);
fltr
}

Expand Down Expand Up @@ -422,6 +439,10 @@ impl Drop for BloomFilter {
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_sub(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_sub(self.num_items.into(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_sub(self.capacity.into(), std::sync::atomic::Ordering::Relaxed);
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ lazy_static! {
pub static ref BLOOM_NUM_OBJECTS: AtomicU64 = AtomicU64::new(0);
pub static ref BLOOM_OBJECT_TOTAL_MEMORY_BYTES: AtomicUsize = AtomicUsize::new(0);
pub static ref BLOOM_NUM_FILTERS_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0);
pub static ref BLOOM_NUM_ITEMS_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0);
pub static ref BLOOM_CAPACITY_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0);
}

pub fn bloom_info_handler(ctx: &InfoContext) -> ValkeyResult<()> {
Expand All @@ -27,6 +29,18 @@ pub fn bloom_info_handler(ctx: &InfoContext) -> ValkeyResult<()> {
.load(Ordering::Relaxed)
.to_string(),
)?
.field(
"bloom_num_items_across_objects",
BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.load(Ordering::Relaxed)
.to_string(),
)?
.field(
"bloom_capacity_across_objects",
BLOOM_CAPACITY_ACROSS_OBJECTS
.load(Ordering::Relaxed)
.to_string(),
)?
.build_section()?
.build_info()?;

Expand Down
4 changes: 2 additions & 2 deletions tests/test_aofrewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def test_aofrewrite_bloomfilter_metrics(self):

# Check info for scaled bloomfilter matches metrics data for bloomfilter
new_info_obj = self.client.execute_command(f'BF.INFO key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2, 7500, 21000)

# Check bloomfilter size has increased
assert new_info_obj[3] > info_obj[3]

# Delete the scaled bloomfilter to check both filters are deleted and metrics stats are set accordingly
self.client.execute_command('DEL key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)
55 changes: 28 additions & 27 deletions tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

def test_basic_command_metrics(self):
# Check that bloom metrics stats start at 0
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0)

# Create a default bloom filter and check its metrics values are correct
assert(self.client.execute_command('BF.ADD key item') == 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY)

# Check that other commands don't influence metrics
assert(self.client.execute_command('BF.EXISTS key item') == 1)
Expand All @@ -26,55 +27,55 @@ def test_basic_command_metrics(self):
self.client.execute_command("BF.INFO key")
assert(self.client.execute_command('BF.INSERT key ITEMS item5 item6')== [1, 1])

self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 6, DEFAULT_BLOOM_FILTER_CAPACITY)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 6, DEFAULT_BLOOM_FILTER_CAPACITY)

# Create a new default bloom filter and check metrics again
assert(self.client.execute_command('BF.ADD key2 item') == 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Create a non default filter with BF.RESERVE and check its metrics are correct
assert(self.client.execute_command('BF.RESERVE key3 0.001 2917251') == b'OK')
info_obj = self.client.execute_command('BF.INFO key3')

# We want to check the size of the newly created bloom filter but metrics contains the size of all bloomfilters so we must minus the
# two default bloomfilters we already created
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2 + 2917251)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2 + 2917251)

# Delete a non default key and make sure the metrics stats are still correct
self.client.execute_command('DEL key3')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Create a default filter with BF.INSERT and check its metrics are correct
assert(self.client.execute_command('BF.INSERT key4 ITEMS item1 item2') == [1, 1])
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 9, DEFAULT_BLOOM_FILTER_CAPACITY * 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 9, DEFAULT_BLOOM_FILTER_CAPACITY * 3)

# Delete a default key and make sure the metrics are still correct
self.client.execute_command('UNLINK key')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Create a key then cause it to expire and check if metrics are updated correctly
assert self.client.execute_command('BF.ADD TEST_EXP ITEM') == 1
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 4, DEFAULT_BLOOM_FILTER_CAPACITY * 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 4, DEFAULT_BLOOM_FILTER_CAPACITY * 3)
assert self.client.execute_command('TTL TEST_EXP') == -1
self.verify_bloom_filter_item_existence(self.client, 'TEST_EXP', 'ITEM')
curr_time = int(time.time())
assert self.client.execute_command(f'EXPIREAT TEST_EXP {curr_time + 5}') == 1
wait_for_equal(lambda: self.client.execute_command('BF.EXISTS TEST_EXP ITEM'), 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Flush database so all keys should now be gone and metrics should all be at 0
self.client.execute_command('FLUSHDB')
wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key2 item'), 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0)

def test_scaled_bloomfilter_metrics(self):
self.client.execute_command('BF.RESERVE key1 0.001 7000')
Expand All @@ -89,14 +90,14 @@ def test_scaled_bloomfilter_metrics(self):

# Check info for scaled bloomfilter matches metrics data for bloomfilter
new_info_obj = self.client.execute_command(f'BF.INFO key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2, 7500, 21000)

# Check bloomfilter size has increased
assert new_info_obj[3] > info_obj[3]

# Delete the scaled bloomfilter to check both filters are deleted and metrics stats are set accordingly
self.client.execute_command('DEL key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)


def test_copy_metrics(self):
Expand All @@ -105,12 +106,12 @@ def test_copy_metrics(self):
assert(self.client.execute_command('COPY key{123} copiedkey{123}') == 1)

# Verify that the metrics were updated correctly after copying
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 2, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Perform a FLUSHALL which should set all metrics data to 0
self.client.execute_command('FLUSHALL')
wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key{123} item'), 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)


def test_save_and_restore_metrics(self):
Expand Down Expand Up @@ -138,4 +139,4 @@ def test_save_and_restore_metrics(self):
for i in range(1, len(original_info_obj), 2):
assert original_info_obj[i] == restored_info_obj[i]

self.verify_bloom_metrics(self.client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3)
self.verify_bloom_metrics(new_client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3, 7501, 21000 + DEFAULT_BLOOM_FILTER_CAPACITY)
12 changes: 10 additions & 2 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p
)
self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)

def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters):
def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters, expected_num_items, expected_sum_capacity):
"""
Verify the metric values are recorded properly, the expected values are as below
expected_memory: the size of the memory used by the objects
Expand All @@ -152,14 +152,22 @@ def verify_bloom_metrics(self, info_response, expected_memory, expected_num_obje
total_memory_bites = -1
num_objects = -1
num_filters = -1
num_items = -1
sum_capacity = -1
for line in lines:
if line.startswith('bf_bloom_total_memory_bytes:'):
total_memory_bites = int(line.split(':')[1])
elif line.startswith('bf_bloom_num_objects:'):
num_objects = int(line.split(':')[1])
elif line.startswith('bf_bloom_num_filters_across_objects'):
num_filters = int(line.split(':')[1])
elif line.startswith('bf_bloom_num_items_across_objects'):
num_items = int(line.split(':')[1])
elif line.startswith('bf_bloom_capacity_across_objects'):
sum_capacity = int(line.split(':')[1])

assert total_memory_bites == expected_memory
assert num_objects == expected_num_objects
assert num_filters == expected_num_filters
assert num_filters == expected_num_filters
assert num_items == expected_num_items
assert sum_capacity == expected_sum_capacity

0 comments on commit c18dd1e

Please sign in to comment.