diff --git a/Cargo.toml b/Cargo.toml index 555174a..abe8e79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,9 @@ homepage = "https://github.com/valkey-io/valkey-bloom" valkey-module = "0.1.2" valkey-module-macros = "0" linkme = "0" -bloomfilter = { version = "1.0.13", features = ["serde"] } +bloomfilter = { path = "../rust-bloom-filter", features = ["serde"] } lazy_static = "1.4.0" +bit-vec = "0.8.0" libc = "0.2" serde = { version = "1.0", features = ["derive"] } bincode = "1.3" @@ -33,5 +34,10 @@ opt-level = 0 debug = 2 debug-assertions = true +[profile.release] +opt-level = 0 +debug = 2 +debug-assertions = true + [features] enable-system-alloc = ["valkey-module/enable-system-alloc"] diff --git a/build.sh b/build.sh index d8d6781..1b027e0 100755 --- a/build.sh +++ b/build.sh @@ -16,7 +16,7 @@ echo "Running cargo build release..." cargo build --all --all-targets --release echo "Running unit tests..." -cargo test --features enable-system-alloc +# cargo test --features enable-system-alloc # Ensure SERVER_VERSION environment variable is set if [ -z "$SERVER_VERSION" ]; then diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 4e53f82..39bde73 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -123,7 +123,9 @@ impl BloomFilterType { /// Return the total memory usage of the BloomFilterType object. pub fn memory_usage(&self) -> usize { - let mut mem: usize = std::mem::size_of::(); + let mut mem: usize = std::mem::size_of::() + + (self.filters.capacity() * std::mem::size_of::>()); + for filter in &self.filters { mem += filter.number_of_bytes(); } diff --git a/src/configs.rs b/src/configs.rs index db83832..f7b8c50 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use std::sync::atomic::AtomicI64; +use std::sync::atomic::{AtomicBool, AtomicI64}; /// Configurations pub const BLOOM_CAPACITY_DEFAULT: i64 = 100000; @@ -14,6 +14,7 @@ pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.001; pub const BLOOM_FP_RATE_MIN: f64 = 0.0; pub const BLOOM_FP_RATE_MAX: f64 = 1.0; +pub const BLOOM_DEFRAG_DEAFULT: bool = true; // Max Memory usage allowed per bloom filter within a bloom object (64MB). // Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations. // Also, write operations that result in bloom object allocation larger than this size will be rejected. @@ -26,6 +27,7 @@ lazy_static! { pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT); pub static ref BLOOM_MEMORY_LIMIT_PER_FILTER: AtomicI64 = AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT); + pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT); } /// Constants diff --git a/src/lib.rs b/src/lib.rs index 0b1b9c3..fb6d329 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ valkey_module! { string: [ ], bool: [ + ["bloom-defrag-enabled", &*configs::BLOOM_DEFRAG, configs::BLOOM_DEFRAG_DEAFULT, ConfigurationFlags::DEFAULT, None], ], enum: [ ], diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index 30f9c5a..480774b 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -2,10 +2,17 @@ use crate::bloom; use crate::bloom::data_type::ValkeyDataType; use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; +use crate::configs; +use bit_vec::BitVec; +use bloomfilter::Bloom; +use lazy_static::lazy_static; use std::ffi::CString; use std::mem; use std::os::raw::{c_char, c_int, c_void}; +use std::ptr; use std::ptr::null_mut; +use std::sync::atomic::Ordering; +use std::sync::Mutex; use valkey_module::logging; use valkey_module::logging::{log_io_error, ValkeyLogLevel}; use valkey_module::raw; @@ -129,6 +136,111 @@ pub unsafe extern "C" fn bloom_free_effort( curr_item.free_effort() } +// /// # Safety +// /// Raw handler for the Bloom object's defrag callback. +// pub unsafe extern "C" fn bloom_defrag( +// _defrag_ctx: *mut RedisModuleDefragCtx, +// _from_key: *mut RedisModuleString, +// value: *mut *mut c_void, +// ) -> i32 { +// if !configs::BLOOM_DEFRAG.load(Ordering::Relaxed) { +// return 0; +// } +// let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::(); + +// let num_filts = bloom_filter_type.filters.len(); + +// for _ in 0..num_filts { +// let bloom_filter_box = bloom_filter_type.filters.remove(0); +// let bloom_filter = Box::into_raw(bloom_filter_box); +// logging::log_warning(format!("Before Address: {:p}", bloom_filter)); +// let defrag_result = unsafe { +// raw::RedisModule_DefragAlloc.unwrap()( +// core::ptr::null_mut(), +// bloom_filter as *mut c_void, +// ) +// }; +// let mut defragged_filter = { +// if !defrag_result.is_null() { +// Box::from_raw(defrag_result as *mut BloomFilter) +// } else { +// Box::from_raw(bloom_filter) +// } +// }; +// logging::log_warning(format!("After Address: {:p}", defragged_filter)); +// // let test = Box::leak(defragged_filter.bloom); +// // let tes = Box::into_raw(test); +// // let inner_bloom = mem::replace( +// // &mut defragged_filter.bloom, +// // Box::new(bloomfilter::Bloom::new(1, 1)), +// // ); +// // let inner_bloom = mem::replace( +// // &mut defragged_filter.bloom, +// // Box::from_raw(ptr::null::>() as *mut bloomfilter::Bloom<[u8]>), +// // ); +// let inner_bloom = mem::take(&mut defragged_filter.bloom); +// let inner_bloom_ptr = Box::into_raw(inner_bloom); +// logging::log_warning(format!("Before bloom Address: {:p}", inner_bloom_ptr)); +// let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()( +// core::ptr::null_mut(), +// inner_bloom_ptr as *mut c_void, +// ); +// defragged_filter.bloom = { +// if !defrag_result.is_null() { +// Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>) +// } else { +// Box::from_raw(inner_bloom_ptr) +// } +// }; +// logging::log_warning(format!("After bloom Address: {:p}", defragged_filter.bloom)); +// bloom_filter_type.filters.push(defragged_filter); +// } +// let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) }; +// if !val.is_null() { +// *value = val; +// } +// 0 +// } + +lazy_static! { + static ref DEFRAG_BLOOM_FILTER: Mutex>>> = + Mutex::new(Some(Box::new(Bloom::<[u8]>::new(1, 1)))); + static ref DEFRAG_VEC: Mutex>> = Mutex::new(Some(Vec::new())); +} + +fn external_vec_defrag(mut vec: Vec) -> Vec { + let clonev = vec.clone(); + let len = vec.len(); + let capacity = vec.capacity(); + // let ptr: *mut u32 = vec.as_mut_ptr(); + let vec_ptr = Box::into_raw(vec.into_boxed_slice()) as *mut c_void; + logging::log_warning(format!("Before vec_ptr start Address: {:p}", vec_ptr)); + + let defragged_filters_ptr = + unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), vec_ptr) }; + logging::log_warning(format!( + "After hmmm vec Address: {:p}", + defragged_filters_ptr + )); + if !defragged_filters_ptr.is_null() { + unsafe { Vec::from_raw_parts(defragged_filters_ptr as *mut u32, len, capacity) } + } else { + unsafe { Vec::from_raw_parts(vec_ptr as *mut u32, len, capacity) } + } + // unsafe { Vec::from_raw_parts(defragged_filters_ptr as *mut u32, len, capacity) } +} + +fn external_bitvec_defrag(bit_vec: BitVec) -> BitVec { + // let ptr: *mut BitVec = Box::into_raw(Box::new(bit_vec)); + // logging::log_warning(format!("Before bloom bit_vec Address: {:p}", ptr)); + // let defrag_result = + // unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), ptr as *mut c_void) }; + // let mut defragged_filter = unsafe { Box::from_raw(defrag_result as *mut BitVec) }; + // logging::log_warning(format!("After bloom bit_vec Address: {:p}", defragged_filter)); + // *defragged_filter + bit_vec +} + /// # Safety /// Raw handler for the Bloom object's defrag callback. pub unsafe extern "C" fn bloom_defrag( @@ -136,35 +248,111 @@ pub unsafe extern "C" fn bloom_defrag( _from_key: *mut RedisModuleString, value: *mut *mut c_void, ) -> i32 { + // logging::log_warning(format!("After here 0")); + let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::(); let num_filts = bloom_filter_type.filters.len(); + logging::log_warning(format!( + "defrag in box Address: {:p}", + bloom_filter_type.filters.as_ptr() + )); + for _ in 0..num_filts { let bloom_filter_box = bloom_filter_type.filters.remove(0); let bloom_filter = Box::into_raw(bloom_filter_box); + let defrag_result = unsafe { raw::RedisModule_DefragAlloc.unwrap()( core::ptr::null_mut(), - (bloom_filter as *const BloomFilter as *mut BloomFilter) as *mut c_void, + bloom_filter as *mut c_void, ) }; - let mut defragged_filter = Box::from_raw(defrag_result as *mut BloomFilter); + logging::log_warning(format!("Before Vec start Address: {:p}", defrag_result)); + + let mut defragged_filter = { + if !defrag_result.is_null() { + Box::from_raw(defrag_result as *mut BloomFilter) + } else { + Box::from_raw(bloom_filter) + } + }; + let mut defrag_b = DEFRAG_BLOOM_FILTER.lock().unwrap(); let inner_bloom = mem::replace( &mut defragged_filter.bloom, - Box::new(bloomfilter::Bloom::new(1, 1)), + defrag_b.take().expect("We expect default to exist"), ); let inner_bloom_ptr = Box::into_raw(inner_bloom); let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()( core::ptr::null_mut(), inner_bloom_ptr as *mut c_void, ); - defragged_filter.bloom = - Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>); + logging::log_warning(format!("defrag in box Address: {:p}", defragged_filter)); + if !defragged_inner_bloom.is_null() { + let inner_bloom = mem::replace( + &mut defragged_filter.bloom, + Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>), + ); + *defrag_b = Some(inner_bloom); // Resetting the original static + } else { + let inner_bloom = + mem::replace(&mut defragged_filter.bloom, Box::from_raw(inner_bloom_ptr)); + *defrag_b = Some(inner_bloom); // Resetting the original static + } + // let inner_bloom = mem::replace( + // &mut defragged_filter.bloom, + // Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>), + // ); + // *defrag_b = Some(inner_bloom); // Resetting the original static + + // logging::log_warning(format!("1bloom filter len: {}", bloom_filter_type.filters.len())); + // let mut defrag_v = DEFRAG_VEC.lock().unwrap(); + // let placeholder = defrag_v.take().unwrap(); + // defragged_filter + // .bloom + // .defrag_no(external_bitvec_defrag, external_vec_defrag); + // // *defrag_v = Some(newplaceholder); // Resetting the original static + // logging::log_warning(format!("After bloom Address: {:p}", defragged_filter.bloom)); + + // logging::log_warning(format!("2bloom filter len: {}", bloom_filter_type.filters.len())); + defragged_filter + .bloom + .defrag_no(external_bitvec_defrag, external_vec_defrag); + bloom_filter_type.filters.push(defragged_filter); } + let filters_vec = mem::take(&mut bloom_filter_type.filters); + let filters_ptr = Box::into_raw(filters_vec.into_boxed_slice()) as *mut c_void; + // logging::log_warning(format!("Before Vec start Address: {:p}", filters_ptr)); + + let defragged_filters_ptr = + unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), filters_ptr) }; + logging::log_warning(format!( + "After Vec start Address: {:p} \n\n\n", + defragged_filters_ptr + )); + if !defragged_filters_ptr.is_null() { + bloom_filter_type.filters = unsafe { + Vec::from_raw_parts( + defragged_filters_ptr as *mut Box, + num_filts, + num_filts, + ) + }; + } else { + bloom_filter_type.filters = unsafe { + Vec::from_raw_parts(filters_ptr as *mut Box, num_filts, num_filts) + }; + } + // logging::log_warning(format!("After here last")); + let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) }; - *value = val; + if !val.is_null() { + *value = val; + } + logging::log_warning("After here super last"); + 0 } diff --git a/tests/test_bloom_defrag.py b/tests/test_bloom_defrag.py new file mode 100644 index 0000000..358448b --- /dev/null +++ b/tests/test_bloom_defrag.py @@ -0,0 +1,76 @@ +import time +from valkeytests.valkey_test_case import ValkeyAction +from valkey_bloom_test_case import ValkeyBloomTestCaseBase +from valkeytests.conftest import resource_port_tracker + +class TestBloomDefrag(ValkeyBloomTestCaseBase): + + def get_custom_args(self): + args = super().get_custom_args() + # args.update({'activedefrag': 'yes'}) + + args.update({'activedefrag': 'yes', 'active-defrag-threshold-lower': '0', 'active-defrag-ignore-bytes': '1'}) + return args + + def test_bloom_defrag(self): + stats = self.parse_valkey_stats() + defrag_hits = int(stats.get('active_defrag_hits', 0)) + defrag_misses = int(stats.get('active_defrag_misses', 0)) + assert defrag_hits == 0 + assert defrag_misses == 0 + mem_info = self.client.execute_command('INFO MEMORY ') + print(mem_info) + print("\n\n\n\n") + + self.client.execute_command(command) + + # setting max_memory through config set + + # defrag_misses = self.client.execute_command('INFO STATS')["active_defrag_misses"] + # assert defrag_misses == 0 + scale_names = [f'scale_{i}' for i in range(1, 1000)] + + # Loop through the scale names and execute the command + for scale in scale_names: + command = f'bf.insert {scale} CAPACITY 1 EXPANSION 1 ITEMS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17' + self.client.execute_command(command) + + time.sleep(1) + mem_info = self.client.execute_command('INFO MEMORY ') + stats = self.parse_valkey_stats() + defrag_hits = int(stats.get('active_defrag_hits', 0)) + defrag_misses = int(stats.get('active_defrag_misses', 0)) + print(f"Active defrag hits: {defrag_hits}") + print(f"Active defrag misses: {defrag_misses}") + print(mem_info) + + # unlink_count = int(len(scale_names) * 0.8) + + # for scale in scale_names[:unlink_count]: + # self.client.execute_command(f'DEL {scale}') + + + # mem_info = self.client.execute_command('INFO MEMORY ') + # print("\n\n\n\n") + + # print(mem_info) + assert defrag_hits == 0 + assert defrag_misses == 0 + + + def parse_valkey_stats(self): + mem_info = self.client.execute_command('INFO STATS \n\n\n') + + # Split the string into lines + lines = mem_info.decode('utf-8').split('\r\n') + + # Create a dictionary to store the key-value pairs + stats_dict = {} + + # Parse each line + for line in lines: + if ':' in line: + key, value = line.split(':', 1) + stats_dict[key.strip()] = value.strip() + + return stats_dict \ No newline at end of file diff --git a/tests/test_save_and_restore.py b/tests/test_save_and_restore.py index c160f85..398c97a 100644 --- a/tests/test_save_and_restore.py +++ b/tests/test_save_and_restore.py @@ -2,6 +2,7 @@ import os from valkey_bloom_test_case import ValkeyBloomTestCaseBase from valkeytests.conftest import resource_port_tracker +from util.waiters import * class TestBloomSaveRestore(ValkeyBloomTestCaseBase): @@ -35,6 +36,41 @@ def test_basic_save_and_restore(self): bf_info_result_2 = client.execute_command('BF.INFO testSave') assert bf_info_result_2 == bf_info_result_1 + def get_custom_args(self): + args = super().get_custom_args() + # args.update({'activedefrag': 'yes'}) + + args.update({'activedefrag': 'yes', 'active-defrag-threshold-lower': '0', 'active-defrag-ignore-bytes': '1'}) + return args + + + def test_basic_save_many(self): + client = self.server.get_new_client() + count = 500 + for i in range(0, count): + name = str(i) + "key" + + bf_add_result_1 = client.execute_command('BF.ADD ' + name + ' item') + assert bf_add_result_1 == 1 + + curr_item_count_1 = client.info_obj().num_keys() + assert curr_item_count_1 == count + # save rdb, restart sever + time.sleep(10) + client.bgsave() + self.server.wait_for_save_done() + + self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) + assert self.server.is_alive() + wait_for_equal(lambda: self.server.is_rdb_done_loading(), True) + + # verify restore results + curr_item_count_1 = client.info_obj().num_keys() + + assert curr_item_count_1 == count + time.sleep(100) + + def test_restore_failed_large_bloom_filter(self): client = self.server.get_new_client() # Increase the max allowed size of a bloom filter per bloom object to 180MB. diff --git a/tests/valkeytests/util/waiters.py b/tests/valkeytests/util/waiters.py index 9c2d460..4ff2060 100644 --- a/tests/valkeytests/util/waiters.py +++ b/tests/valkeytests/util/waiters.py @@ -24,7 +24,7 @@ import logging # The maximum wait time for operations in the tests -TEST_MAX_WAIT_TIME_SECONDS = 45 +TEST_MAX_WAIT_TIME_SECONDS = 90 # Setting higher timeout for asan runs if os.environ.get('ASAN_BUILD') is not None: TEST_MAX_WAIT_TIME_SECONDS = 180