From d4908573a34ff51843822ce1e9af6bf063b23e4d Mon Sep 17 00:00:00 2001 From: zackcam Date: Tue, 26 Nov 2024 01:51:40 +0000 Subject: [PATCH] Updating defragmentation to defrag both bloomfiltertype and bloomfiler structs Signed-off-by: zackcam --- src/bloom/data_type.rs | 4 ++-- src/bloom/utils.rs | 33 +++++++++++++++------------ src/wrapper/bloom_callback.rs | 43 ++++++++++++++++++++++++++--------- tests/test_bloom_metrics.py | 2 +- 4 files changed, 53 insertions(+), 29 deletions(-) diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 215f182..c0693d7 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -59,7 +59,6 @@ pub trait ValkeyDataType { impl ValkeyDataType for BloomFilterType { /// Callback to load and parse RDB data of a bloom item and create it. fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option { - let mut filters = Vec::new(); if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION { logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str()); return None; @@ -73,6 +72,7 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let mut filters = Vec::with_capacity(num_filters as usize); for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -112,7 +112,7 @@ impl ValkeyDataType for BloomFilterType { num_items as u32, capacity as u32, ); - filters.push(filter); + filters.push(Box::new(filter)); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::(), diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 55e327f..4e53f82 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,3 +1,4 @@ +use super::data_type::BLOOM_TYPE_VERSION; use crate::{ configs::{ self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, @@ -6,9 +7,8 @@ use crate::{ }; use bloomfilter; use serde::{Deserialize, Serialize}; -use std::{mem, sync::atomic::Ordering}; - -use super::data_type::BLOOM_TYPE_VERSION; +use std::{mem, os::raw::c_void, sync::atomic::Ordering}; +use valkey_module::{logging, raw}; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -69,7 +69,7 @@ impl BloomError { pub struct BloomFilterType { pub expansion: u32, pub fp_rate: f64, - pub filters: Vec, + pub filters: Vec>, } impl BloomFilterType { @@ -92,7 +92,7 @@ impl BloomFilterType { ); // Create the bloom filter and add to the main BloomFilter object. - let bloom = BloomFilter::new(fp_rate, capacity); + let bloom: Box = Box::new(BloomFilter::new(fp_rate, capacity)); let filters = vec![bloom]; let bloom = BloomFilterType { expansion, @@ -104,14 +104,14 @@ impl BloomFilterType { /// Create a new BloomFilterType object from an existing one. pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { - let mut filters = Vec::new(); + let mut filters: Vec> = Vec::with_capacity(from_bf.filters.len()); metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::(), std::sync::atomic::Ordering::Relaxed, ); for filter in &from_bf.filters { - let new_filter = BloomFilter::create_copy_from(filter); + let new_filter = Box::new(BloomFilter::create_copy_from(filter)); filters.push(new_filter); } BloomFilterType { @@ -205,7 +205,7 @@ impl BloomFilterType { if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) { return Err(BloomError::ExceedsMaxBloomSize); } - let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity); + let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity)); // Add item. new_filter.set(item); new_filter.num_items += 1; @@ -245,9 +245,10 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, filters): (u32, f64, Vec) = - match bincode::deserialize::<(u32, f64, Vec)>(&decoded_bytes[1..]) - { + let (expansion, fp_rate, filters): (u32, f64, Vec>) = + match bincode::deserialize::<(u32, f64, Vec>)>( + &decoded_bytes[1..], + ) { Ok(values) => { if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { return Err(BloomError::BadExpansion); @@ -318,7 +319,7 @@ impl BloomFilterType { /// well within the u32::MAX limit. #[derive(Serialize, Deserialize)] pub struct BloomFilter { - pub bloom: bloomfilter::Bloom<[u8]>, + pub bloom: Box>, pub num_items: u32, pub capacity: u32, } @@ -332,7 +333,7 @@ impl BloomFilter { &configs::FIXED_SEED, ); let fltr = BloomFilter { - bloom, + bloom: Box::new(bloom), num_items: 0, capacity, }; @@ -361,7 +362,7 @@ impl BloomFilter { sip_keys, ); let fltr = BloomFilter { - bloom, + bloom: Box::new(bloom), num_items, capacity, }; @@ -377,7 +378,9 @@ impl BloomFilter { } pub fn number_of_bytes(&self) -> usize { - std::mem::size_of::() + (self.bloom.number_of_bits() / 8) as usize + std::mem::size_of::() + + std::mem::size_of::>() + + (self.bloom.number_of_bits() / 8) as usize } /// Caculates the number of bytes that the bloom filter will require to be allocated. diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index caa882b..30f9c5a 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -1,11 +1,12 @@ use crate::bloom; use crate::bloom::data_type::ValkeyDataType; +use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; -use crate::configs; use std::ffi::CString; +use std::mem; use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; -use std::sync::atomic::Ordering; +use valkey_module::logging; use valkey_module::logging::{log_io_error, ValkeyLogLevel}; use valkey_module::raw; use valkey_module::{RedisModuleDefragCtx, RedisModuleString}; @@ -135,15 +136,35 @@ pub unsafe extern "C" fn bloom_defrag( _from_key: *mut RedisModuleString, value: *mut *mut c_void, ) -> i32 { - let curr_item = &*(*value).cast::(); - if curr_item.memory_usage() - > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as usize - { - return 0; + let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::(); + + let num_filts = bloom_filter_type.filters.len(); + + for _ in 0..num_filts { + let bloom_filter_box = bloom_filter_type.filters.remove(0); + let bloom_filter = Box::into_raw(bloom_filter_box); + let defrag_result = unsafe { + raw::RedisModule_DefragAlloc.unwrap()( + core::ptr::null_mut(), + (bloom_filter as *const BloomFilter as *mut BloomFilter) as *mut c_void, + ) + }; + let mut defragged_filter = Box::from_raw(defrag_result as *mut BloomFilter); + + let inner_bloom = mem::replace( + &mut defragged_filter.bloom, + Box::new(bloomfilter::Bloom::new(1, 1)), + ); + let inner_bloom_ptr = Box::into_raw(inner_bloom); + let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()( + core::ptr::null_mut(), + inner_bloom_ptr as *mut c_void, + ); + defragged_filter.bloom = + Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>); + bloom_filter_type.filters.push(defragged_filter); } - let new_item = BloomFilterType::create_copy_from(curr_item); - let bb = Box::new(new_item); - drop(Box::from_raw((*value).cast::())); - *value = Box::into_raw(bb).cast::(); + let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) }; + *value = val; 0 } diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 29eb4e6..4f7afb1 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -4,7 +4,7 @@ from valkeytests.conftest import resource_port_tracker from util.waiters import * -DEFAULT_BLOOM_FILTER_SIZE = 179960 +DEFAULT_BLOOM_FILTER_SIZE = 179968 DEFAULT_BLOOM_FILTER_CAPACITY = 100000 class TestBloomMetrics(ValkeyBloomTestCaseBase):