Skip to content

Commit

Permalink
Background compaction in a thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Sep 5, 2024
1 parent eb1c530 commit 263d385
Show file tree
Hide file tree
Showing 15 changed files with 757 additions and 434 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
run: cargo build
- name: Run tests
run: cargo test --release --verbose -- --nocapture
run: cargo test --release -- --nocapture
- name: Run simple example
run: cargo run --example simple
- name: Run multithreaded example
Expand Down
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ uuid = { version = "1.10.0" }
rand = "0.8.5"
fslock = "0.2.1"
libc = "0.2.158"
crossbeam-channel = "0.5.13"

[features]
whitebox_testing = []
Expand Down
43 changes: 15 additions & 28 deletions candy-crasher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@ const CONFIG: Config = Config {
truncate_up: true,
clear_on_unsupported_version: true,
mlock_headers: false,
num_compaction_threads: 4,
};

fn child_inserts() -> Result<()> {
// our job is to create 1M entries while being killed by our evil parent

let store = CandyStore::open("dbdir", CONFIG)?;
let highest_bytes = store.get("highest")?.unwrap_or(vec![0, 0, 0, 0]);
let highest = u32::from_le_bytes([
highest_bytes[0],
highest_bytes[1],
highest_bytes[2],
highest_bytes[3],
]);
let highest = u32::from_le_bytes(highest_bytes.try_into().unwrap());

if highest == TARGET - 1 {
println!("child finished (already at {highest})");
Expand All @@ -50,12 +46,7 @@ fn child_removals() -> Result<()> {

let store = CandyStore::open("dbdir", CONFIG)?;
let lowest_bytes = store.get("lowest")?.unwrap_or(vec![0, 0, 0, 0]);
let lowest = u32::from_le_bytes([
lowest_bytes[0],
lowest_bytes[1],
lowest_bytes[2],
lowest_bytes[3],
]);
let lowest = u32::from_le_bytes(lowest_bytes.try_into().unwrap());

if lowest == TARGET - 1 {
println!("child finished (already at {lowest})");
Expand All @@ -79,12 +70,7 @@ fn child_list_inserts() -> Result<()> {
let store = CandyStore::open("dbdir", CONFIG)?;

let highest_bytes = store.get("list_highest")?.unwrap_or(vec![0, 0, 0, 0]);
let highest = u32::from_le_bytes([
highest_bytes[0],
highest_bytes[1],
highest_bytes[2],
highest_bytes[3],
]);
let highest = u32::from_le_bytes(highest_bytes.try_into().unwrap());

if highest == TARGET - 1 {
println!("child finished (already at {highest})");
Expand All @@ -108,12 +94,7 @@ fn child_list_removals() -> Result<()> {
let store = CandyStore::open("dbdir", CONFIG)?;

let lowest_bytes = store.get("list_lowest")?.unwrap_or(vec![0, 0, 0, 0]);
let lowest = u32::from_le_bytes([
lowest_bytes[0],
lowest_bytes[1],
lowest_bytes[2],
lowest_bytes[3],
]);
let lowest = u32::from_le_bytes(lowest_bytes.try_into().unwrap());

if lowest == TARGET - 1 {
println!("child finished (already at {lowest})");
Expand Down Expand Up @@ -258,6 +239,7 @@ fn main() -> Result<()> {
// "dbdir",
// Config {
// expected_number_of_keys: 1_000_000,
// clear_on_unsupported_version: true,
// ..Default::default()
// },
// )?;
Expand All @@ -277,7 +259,7 @@ fn main() -> Result<()> {
for res in store.iter() {
let (k, v) = res?;
assert_eq!(v, b"i am a key");
let k = u32::from_le_bytes([k[0], k[1], k[2], k[3]]);
let k = u32::from_le_bytes(k.try_into().unwrap());
assert!(k < TARGET);
count += 1;
}
Expand All @@ -296,7 +278,12 @@ fn main() -> Result<()> {
store.remove("lowest")?,
Some((TARGET - 1).to_le_bytes().to_vec())
);
assert_eq!(store.iter().count(), 0);
assert_eq!(
store.iter().count(),
0,
"{:?}",
store.iter().collect::<Vec<_>>()
);

println!("DB validated successfully");
}
Expand All @@ -314,14 +301,14 @@ fn main() -> Result<()> {

for (i, res) in store.iter_list("xxx").enumerate() {
let (k, v) = res?;
assert_eq!(k, (i as u32).to_le_bytes());
assert_eq!(u32::from_le_bytes(k.try_into().unwrap()), i as u32);
assert_eq!(v, b"yyy");
}

println!("DB validated successfully");
}

parent_run(shared_stuff, child_list_removals, 10..30)?;
parent_run(shared_stuff, child_list_removals, 10..80)?;

{
println!("Parent starts validating the DB...");
Expand Down
10 changes: 8 additions & 2 deletions candy-longliving/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ fn main() -> Result<()> {
let num_iters: usize = args[2].parse().expect("num_iters not a number");
let tail_length: usize = args[3].parse().expect("tail_length not a number");

let db = Arc::new(CandyStore::open("dbdir", Config::default())?);
let db = Arc::new(CandyStore::open(
"dbdir",
Config {
min_compaction_threashold: 1024 * 1024,
..Default::default()
},
)?);
db.clear()?;

let mut handles = vec![];
Expand All @@ -35,7 +41,7 @@ fn main() -> Result<()> {
if i % 10000 == 0 {
let t1 = Instant::now();
println!(
"thread {thd} at {i} {:?} rate={}us",
"thread {thd} at {i} {} rate={}us",
db.stats(),
t1.duration_since(t0).as_micros() / 10_000,
);
Expand Down
5 changes: 3 additions & 2 deletions candy-perf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ fn test_concurrency_without_contention(num_threads: u32, num_keys: u32) -> Resul
{
let t0 = Instant::now();
for i in thd * num_keys..(thd + 1) * num_keys {
db.set(&i.to_le_bytes(), &thd.to_le_bytes())?;
let status = db.set(&i.to_le_bytes(), &thd.to_le_bytes())?;
debug_assert!(status.was_created());
}
insert_time_ns.fetch_add(
Instant::now().duration_since(t0).as_nanos() as u64,
Expand All @@ -257,7 +258,7 @@ fn test_concurrency_without_contention(num_threads: u32, num_keys: u32) -> Resul
let t0 = Instant::now();
for i in thd * num_keys..(thd + 1) * num_keys {
let val = db.get(&i.to_le_bytes())?;
debug_assert_eq!(val, Some(thd.to_le_bytes().to_vec()));
debug_assert_eq!(val, Some(thd.to_le_bytes().to_vec()), "thd={thd} i={i}");
black_box(val.unwrap());
}
get_time_ns.fetch_add(
Expand Down
1 change: 1 addition & 0 deletions src/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl PartedHash {
self.0 as u32
}

#[allow(dead_code)]
pub fn as_u64(&self) -> u64 {
self.0
}
Expand Down
7 changes: 3 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,13 @@ pub enum CandyError {
KeyTooLong(usize),
ValueTooLong(usize),
EntryCannotFitInShard(usize, usize),
KeyAlreadyExists(Vec<u8>, u64),
}

impl Display for CandyError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Self::KeyTooLong(sz) => write!(f, "key too long {sz}"),
Self::ValueTooLong(sz) => write!(f, "value too long {sz}"),
Self::KeyAlreadyExists(key, ph) => {
write!(f, "key {key:?} already exists (0x{ph:016x})")
}
Self::EntryCannotFitInShard(sz, max) => {
write!(f, "entry too big ({sz}) for a single shard file ({max})")
}
Expand Down Expand Up @@ -111,6 +107,8 @@ pub struct Config {
pub clear_on_unsupported_version: bool,
/// whether or not to mlock the shard headers to RAM (POSIX only)
pub mlock_headers: bool,
/// number of background compaction threads
pub num_compaction_threads: usize,
/// optionally delay modifying operations before for the given duration before flushing data to disk,
/// to ensure reboot consistency
#[cfg(feature = "flush_aggregation")]
Expand All @@ -128,6 +126,7 @@ impl Default for Config {
truncate_up: true,
clear_on_unsupported_version: false,
mlock_headers: false,
num_compaction_threads: 4,
#[cfg(feature = "flush_aggregation")]
flush_aggregation_delay: None,
}
Expand Down
1 change: 0 additions & 1 deletion src/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ impl CandyStore {
// if the item already exists, it's already part of the list. just update it and preserve the index
if let Some(mut existing_val) = self.get_raw(&item_key)? {
match mode {
InsertMode::MustCreate => unreachable!(),
InsertMode::GetOrCreate => {
existing_val.truncate(existing_val.len() - size_of::<u64>());
return Ok(InsertToListStatus::ExistingValue(existing_val));
Expand Down
Loading

0 comments on commit 263d385

Please sign in to comment.