Skip to content

Commit

Permalink
feat: use tx to migrate data
Browse files Browse the repository at this point in the history
  • Loading branch information
Fischer0522 committed Sep 23, 2024
1 parent 16bc7c0 commit a2e47a7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 28 deletions.
11 changes: 10 additions & 1 deletion core/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Instant;

fn main() {
util::init_logger();
let total_bytes = 512 * MiB;
// Specify all benchmarks
let benches = vec![
Expand Down Expand Up @@ -250,7 +251,7 @@ mod benches {
),
AeadKey::default(),
None,
false,
true,
None,
)?),

Expand Down Expand Up @@ -666,6 +667,14 @@ mod util {
use std::fmt::{self};
use std::time::Duration;

pub fn init_logger() {
env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Debug)
.try_init()
.unwrap();
}

/// Display the amount of data in the unit of GiB, MiB, KiB, or bytes.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct DisplayData(usize);
Expand Down
65 changes: 38 additions & 27 deletions core/src/layers/5-disk/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::{
};
use crate::{
layers::{disk::segment::SEGMENT_SIZE, lsm::TxLsmTree},
tx::TxProvider,
BlockSet, Error,
};
use crate::{
Expand All @@ -28,9 +29,9 @@ use core::{
use hashbrown::{HashMap, HashSet};
use log::debug;
// Default gc interval time is 30 seconds
const DEFAULT_GC_INTERVAL_TIME: std::time::Duration = std::time::Duration::from_secs(3);
const DEFAULT_GC_INTERVAL_TIME: std::time::Duration = std::time::Duration::from_secs(5);
const GC_WATERMARK: usize = 16;
const DEFAULT_GC_THRESHOLD: f64 = 0.5;
const DEFAULT_GC_THRESHOLD: f64 = 0.2;

// SharedState is used to synchronize background GC and foreground I/O requests and lsm compaction
// 1. Background GC will stop the world, I/O requests and lsm compaction will be blocked
Expand Down Expand Up @@ -184,6 +185,7 @@ pub(super) struct GcWorker<D> {
reverse_index_table: Arc<ReverseIndexTable>,
block_validity_table: Arc<AllocTable>,
tx_log_store: Arc<TxLogStore<D>>,
tx_provider: Arc<TxProvider>,
user_data_disk: Arc<D>,
shared_state: SharedStateRef,
}
Expand All @@ -198,6 +200,7 @@ impl<D: BlockSet + 'static> GcWorker<D> {
user_data_disk: Arc<D>,
shared_state: SharedStateRef,
) -> Self {
let tx_provider = TxProvider::new();
Self {
victim_policy,
logical_block_table,
Expand All @@ -206,6 +209,7 @@ impl<D: BlockSet + 'static> GcWorker<D> {
tx_log_store,
user_data_disk,
shared_state,
tx_provider,
}
}

Expand Down Expand Up @@ -335,35 +339,42 @@ impl<D: BlockSet + 'static> GcWorker<D> {
let offset = victim_segment.segment_id() * SEGMENT_SIZE;
self.user_data_disk.read(offset, victim_data.as_mut())?;

let target_hba_batches = free_hbas.group_by(|hba1, hba2| hba2.saturating_sub(*hba1) == 1);
let mut victim_hba_iter = valid_hbas.iter();
for target_hba_batch in target_hba_batches {
let batch_len = target_hba_batch.len();
let mut write_buf = Buf::alloc(batch_len)?;
let mut tx = self.tx_provider.new_tx();
let res: Result<()> = tx.context(|| {
let target_hba_batches =
free_hbas.group_by(|hba1, hba2| hba2.saturating_sub(*hba1) == 1);
let mut victim_hba_iter = valid_hbas.iter();
for target_hba_batch in target_hba_batches {
let batch_len = target_hba_batch.len();
let mut write_buf = Buf::alloc(batch_len)?;

// read enough blocks to fill the batch
for i in 0..batch_len {
let Some(victim_hba) = victim_hba_iter.next() else {
break;
};
let start = (victim_hba % SEGMENT_SIZE) * BLOCK_SIZE;
let end = start + BLOCK_SIZE;

let des_start = i * BLOCK_SIZE;
let des_end = (i + 1) * BLOCK_SIZE;
write_buf.as_mut_slice()[des_start..des_end]
.copy_from_slice(&victim_data.as_slice()[start..end]);
}

// read enough blocks to fill the batch
for i in 0..batch_len {
let Some(victim_hba) = victim_hba_iter.next() else {
break;
};
let start = (victim_hba % SEGMENT_SIZE) * BLOCK_SIZE;
let end = start + BLOCK_SIZE;

let des_start = i * BLOCK_SIZE;
let des_end = (i + 1) * BLOCK_SIZE;
write_buf.as_mut_slice()[des_start..des_end]
.copy_from_slice(&victim_data.as_slice()[start..end]);
self.user_data_disk
.write(*target_hba_batch.first().unwrap(), write_buf.as_ref())?;
}

self.user_data_disk
.write(*target_hba_batch.first().unwrap(), write_buf.as_ref())?;
}

free_hbas
.iter()
.for_each(|hba| self.block_validity_table.set_allocated(*hba));
free_hbas
.iter()
.for_each(|hba| self.block_validity_table.set_allocated(*hba));

victim_segment.clear_segment();
victim_segment.clear_segment();
Ok(())
});
res?;
tx.commit()?;

Ok((
valid_hbas.into_iter().zip(free_hbas).collect(),
Expand Down

0 comments on commit a2e47a7

Please sign in to comment.