Skip to content

Commit

Permalink
Draft: Extra debugging and more factors being defragged
Browse files Browse the repository at this point in the history
Signed-off-by: zackcam <zackcam@amazon.com>
  • Loading branch information
zackcam committed Nov 28, 2024
1 parent d490857 commit c2c1ce7
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 11 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
valkey-module = "0.1.2"
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "1.0.13", features = ["serde"] }
bloomfilter = { path = "../rust-bloom-filter", features = ["serde"] }
lazy_static = "1.4.0"
bit-vec = "0.8.0"
libc = "0.2"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3"
Expand All @@ -33,5 +34,10 @@ opt-level = 0
debug = 2
debug-assertions = true

[profile.release]
opt-level = 0
debug = 2
debug-assertions = true

[features]
enable-system-alloc = ["valkey-module/enable-system-alloc"]
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ echo "Running cargo build release..."
cargo build --all --all-targets --release

echo "Running unit tests..."
cargo test --features enable-system-alloc
# cargo test --features enable-system-alloc

# Ensure SERVER_VERSION environment variable is set
if [ -z "$SERVER_VERSION" ]; then
Expand Down
4 changes: 3 additions & 1 deletion src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ impl BloomFilterType {

/// Return the total memory usage of the BloomFilterType object.
pub fn memory_usage(&self) -> usize {
let mut mem: usize = std::mem::size_of::<BloomFilterType>();
let mut mem: usize = std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>());

for filter in &self.filters {
mem += filter.number_of_bytes();
}
Expand Down
4 changes: 3 additions & 1 deletion src/configs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use lazy_static::lazy_static;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicBool, AtomicI64};

/// Configurations
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100000;
Expand All @@ -14,6 +14,7 @@ pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.001;
pub const BLOOM_FP_RATE_MIN: f64 = 0.0;
pub const BLOOM_FP_RATE_MAX: f64 = 1.0;

pub const BLOOM_DEFRAG_DEAFULT: bool = true;
// Max Memory usage allowed per bloom filter within a bloom object (64MB).
// Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations.
// Also, write operations that result in bloom object allocation larger than this size will be rejected.
Expand All @@ -26,6 +27,7 @@ lazy_static! {
pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT);
pub static ref BLOOM_MEMORY_LIMIT_PER_FILTER: AtomicI64 =
AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT);
pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT);
}

/// Constants
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ valkey_module! {
string: [
],
bool: [
["bloom-defrag-enabled", &*configs::BLOOM_DEFRAG, configs::BLOOM_DEFRAG_DEAFULT, ConfigurationFlags::DEFAULT, None],
],
enum: [
],
Expand Down
200 changes: 194 additions & 6 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use bit_vec::BitVec;
use bloomfilter::Bloom;
use lazy_static::lazy_static;
use std::ffi::CString;
use std::mem;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr;
use std::ptr::null_mut;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use valkey_module::logging;
use valkey_module::logging::{log_io_error, ValkeyLogLevel};
use valkey_module::raw;
Expand Down Expand Up @@ -129,42 +136,223 @@ pub unsafe extern "C" fn bloom_free_effort(
curr_item.free_effort()
}

// /// # Safety
// /// Raw handler for the Bloom object's defrag callback.
// pub unsafe extern "C" fn bloom_defrag(
// _defrag_ctx: *mut RedisModuleDefragCtx,
// _from_key: *mut RedisModuleString,
// value: *mut *mut c_void,
// ) -> i32 {
// if !configs::BLOOM_DEFRAG.load(Ordering::Relaxed) {
// return 0;
// }
// let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();

// let num_filts = bloom_filter_type.filters.len();

// for _ in 0..num_filts {
// let bloom_filter_box = bloom_filter_type.filters.remove(0);
// let bloom_filter = Box::into_raw(bloom_filter_box);
// logging::log_warning(format!("Before Address: {:p}", bloom_filter));
// let defrag_result = unsafe {
// raw::RedisModule_DefragAlloc.unwrap()(
// core::ptr::null_mut(),
// bloom_filter as *mut c_void,
// )
// };
// let mut defragged_filter = {
// if !defrag_result.is_null() {
// Box::from_raw(defrag_result as *mut BloomFilter)
// } else {
// Box::from_raw(bloom_filter)
// }
// };
// logging::log_warning(format!("After Address: {:p}", defragged_filter));
// // let test = Box::leak(defragged_filter.bloom);
// // let tes = Box::into_raw(test);
// // let inner_bloom = mem::replace(
// // &mut defragged_filter.bloom,
// // Box::new(bloomfilter::Bloom::new(1, 1)),
// // );
// // let inner_bloom = mem::replace(
// // &mut defragged_filter.bloom,
// // Box::from_raw(ptr::null::<bloomfilter::Bloom<[u8]>>() as *mut bloomfilter::Bloom<[u8]>),
// // );
// let inner_bloom = mem::take(&mut defragged_filter.bloom);
// let inner_bloom_ptr = Box::into_raw(inner_bloom);
// logging::log_warning(format!("Before bloom Address: {:p}", inner_bloom_ptr));
// let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()(
// core::ptr::null_mut(),
// inner_bloom_ptr as *mut c_void,
// );
// defragged_filter.bloom = {
// if !defrag_result.is_null() {
// Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>)
// } else {
// Box::from_raw(inner_bloom_ptr)
// }
// };
// logging::log_warning(format!("After bloom Address: {:p}", defragged_filter.bloom));
// bloom_filter_type.filters.push(defragged_filter);
// }
// let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) };
// if !val.is_null() {
// *value = val;
// }
// 0
// }

lazy_static! {
static ref DEFRAG_BLOOM_FILTER: Mutex<Option<Box<Bloom<[u8]>>>> =
Mutex::new(Some(Box::new(Bloom::<[u8]>::new(1, 1))));
static ref DEFRAG_VEC: Mutex<Option<Vec<u32>>> = Mutex::new(Some(Vec::new()));
}

fn external_vec_defrag(mut vec: Vec<u32>) -> Vec<u32> {
let clonev = vec.clone();
let len = vec.len();
let capacity = vec.capacity();
// let ptr: *mut u32 = vec.as_mut_ptr();
let vec_ptr = Box::into_raw(vec.into_boxed_slice()) as *mut c_void;
logging::log_warning(format!("Before vec_ptr start Address: {:p}", vec_ptr));

let defragged_filters_ptr =
unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), vec_ptr) };
logging::log_warning(format!(
"After hmmm vec Address: {:p}",
defragged_filters_ptr
));
if !defragged_filters_ptr.is_null() {
unsafe { Vec::from_raw_parts(defragged_filters_ptr as *mut u32, len, capacity) }
} else {
unsafe { Vec::from_raw_parts(vec_ptr as *mut u32, len, capacity) }
}
// unsafe { Vec::from_raw_parts(defragged_filters_ptr as *mut u32, len, capacity) }
}

fn external_bitvec_defrag(bit_vec: BitVec) -> BitVec {
// let ptr: *mut BitVec = Box::into_raw(Box::new(bit_vec));
// logging::log_warning(format!("Before bloom bit_vec Address: {:p}", ptr));
// let defrag_result =
// unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), ptr as *mut c_void) };
// let mut defragged_filter = unsafe { Box::from_raw(defrag_result as *mut BitVec) };
// logging::log_warning(format!("After bloom bit_vec Address: {:p}", defragged_filter));
// *defragged_filter
bit_vec
}

/// # Safety
/// Raw handler for the Bloom object's defrag callback.
pub unsafe extern "C" fn bloom_defrag(
_defrag_ctx: *mut RedisModuleDefragCtx,
_from_key: *mut RedisModuleString,
value: *mut *mut c_void,
) -> i32 {
// logging::log_warning(format!("After here 0"));

let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();

let num_filts = bloom_filter_type.filters.len();

logging::log_warning(format!(
"defrag in box Address: {:p}",
bloom_filter_type.filters.as_ptr()
));

for _ in 0..num_filts {
let bloom_filter_box = bloom_filter_type.filters.remove(0);
let bloom_filter = Box::into_raw(bloom_filter_box);

let defrag_result = unsafe {
raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
(bloom_filter as *const BloomFilter as *mut BloomFilter) as *mut c_void,
bloom_filter as *mut c_void,
)
};
let mut defragged_filter = Box::from_raw(defrag_result as *mut BloomFilter);

logging::log_warning(format!("Before Vec start Address: {:p}", defrag_result));

let mut defragged_filter = {
if !defrag_result.is_null() {
Box::from_raw(defrag_result as *mut BloomFilter)
} else {
Box::from_raw(bloom_filter)
}
};
let mut defrag_b = DEFRAG_BLOOM_FILTER.lock().unwrap();
let inner_bloom = mem::replace(
&mut defragged_filter.bloom,
Box::new(bloomfilter::Bloom::new(1, 1)),
defrag_b.take().expect("We expect default to exist"),
);
let inner_bloom_ptr = Box::into_raw(inner_bloom);
let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
inner_bloom_ptr as *mut c_void,
);
defragged_filter.bloom =
Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>);
logging::log_warning(format!("defrag in box Address: {:p}", defragged_filter));
if !defragged_inner_bloom.is_null() {
let inner_bloom = mem::replace(
&mut defragged_filter.bloom,
Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>),
);
*defrag_b = Some(inner_bloom); // Resetting the original static
} else {
let inner_bloom =
mem::replace(&mut defragged_filter.bloom, Box::from_raw(inner_bloom_ptr));
*defrag_b = Some(inner_bloom); // Resetting the original static
}
// let inner_bloom = mem::replace(
// &mut defragged_filter.bloom,
// Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>),
// );
// *defrag_b = Some(inner_bloom); // Resetting the original static

// logging::log_warning(format!("1bloom filter len: {}", bloom_filter_type.filters.len()));
// let mut defrag_v = DEFRAG_VEC.lock().unwrap();
// let placeholder = defrag_v.take().unwrap();
// defragged_filter
// .bloom
// .defrag_no(external_bitvec_defrag, external_vec_defrag);
// // *defrag_v = Some(newplaceholder); // Resetting the original static
// logging::log_warning(format!("After bloom Address: {:p}", defragged_filter.bloom));

// logging::log_warning(format!("2bloom filter len: {}", bloom_filter_type.filters.len()));
defragged_filter
.bloom
.defrag_no(external_bitvec_defrag, external_vec_defrag);

bloom_filter_type.filters.push(defragged_filter);
}
let filters_vec = mem::take(&mut bloom_filter_type.filters);
let filters_ptr = Box::into_raw(filters_vec.into_boxed_slice()) as *mut c_void;
// logging::log_warning(format!("Before Vec start Address: {:p}", filters_ptr));

let defragged_filters_ptr =
unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), filters_ptr) };
logging::log_warning(format!(
"After Vec start Address: {:p} \n\n\n",
defragged_filters_ptr
));
if !defragged_filters_ptr.is_null() {
bloom_filter_type.filters = unsafe {
Vec::from_raw_parts(
defragged_filters_ptr as *mut Box<BloomFilter>,
num_filts,
num_filts,
)
};
} else {
bloom_filter_type.filters = unsafe {
Vec::from_raw_parts(filters_ptr as *mut Box<BloomFilter>, num_filts, num_filts)
};
}
// logging::log_warning(format!("After here last"));

let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) };
*value = val;
if !val.is_null() {
*value = val;
}
logging::log_warning("After here super last");

0
}
76 changes: 76 additions & 0 deletions tests/test_bloom_defrag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import time
from valkeytests.valkey_test_case import ValkeyAction
from valkey_bloom_test_case import ValkeyBloomTestCaseBase
from valkeytests.conftest import resource_port_tracker

class TestBloomDefrag(ValkeyBloomTestCaseBase):

def get_custom_args(self):
args = super().get_custom_args()
# args.update({'activedefrag': 'yes'})

args.update({'activedefrag': 'yes', 'active-defrag-threshold-lower': '0', 'active-defrag-ignore-bytes': '1'})
return args

def test_bloom_defrag(self):
stats = self.parse_valkey_stats()
defrag_hits = int(stats.get('active_defrag_hits', 0))
defrag_misses = int(stats.get('active_defrag_misses', 0))
assert defrag_hits == 0
assert defrag_misses == 0
mem_info = self.client.execute_command('INFO MEMORY ')
print(mem_info)
print("\n\n\n\n")

self.client.execute_command(command)

# setting max_memory through config set

# defrag_misses = self.client.execute_command('INFO STATS')["active_defrag_misses"]
# assert defrag_misses == 0
scale_names = [f'scale_{i}' for i in range(1, 1000)]

# Loop through the scale names and execute the command
for scale in scale_names:
command = f'bf.insert {scale} CAPACITY 1 EXPANSION 1 ITEMS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17'
self.client.execute_command(command)

time.sleep(1)
mem_info = self.client.execute_command('INFO MEMORY ')
stats = self.parse_valkey_stats()
defrag_hits = int(stats.get('active_defrag_hits', 0))
defrag_misses = int(stats.get('active_defrag_misses', 0))
print(f"Active defrag hits: {defrag_hits}")
print(f"Active defrag misses: {defrag_misses}")
print(mem_info)

# unlink_count = int(len(scale_names) * 0.8)

# for scale in scale_names[:unlink_count]:
# self.client.execute_command(f'DEL {scale}')


# mem_info = self.client.execute_command('INFO MEMORY ')
# print("\n\n\n\n")

# print(mem_info)
assert defrag_hits == 0
assert defrag_misses == 0


def parse_valkey_stats(self):
mem_info = self.client.execute_command('INFO STATS \n\n\n')

# Split the string into lines
lines = mem_info.decode('utf-8').split('\r\n')

# Create a dictionary to store the key-value pairs
stats_dict = {}

# Parse each line
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
stats_dict[key.strip()] = value.strip()

return stats_dict
Loading

0 comments on commit c2c1ce7

Please sign in to comment.