From a2e47a701ed3a2106719a8e933cc389e43ac33c9 Mon Sep 17 00:00:00 2001 From: Fischer <1809327837@qq.com> Date: Mon, 23 Sep 2024 20:06:43 +0800 Subject: [PATCH] feat: use tx to migrate data --- core/benches/bench.rs | 11 +++++- core/src/layers/5-disk/gc.rs | 65 +++++++++++++++++++++--------------- 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/core/benches/bench.rs b/core/benches/bench.rs index 20294de..5fe5947 100644 --- a/core/benches/bench.rs +++ b/core/benches/bench.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Instant; fn main() { + util::init_logger(); let total_bytes = 512 * MiB; // Specify all benchmarks let benches = vec![ @@ -250,7 +251,7 @@ mod benches { ), AeadKey::default(), None, - false, + true, None, )?), @@ -666,6 +667,14 @@ mod util { use std::fmt::{self}; use std::time::Duration; + pub fn init_logger() { + env_logger::builder() + .is_test(true) + .filter_level(log::LevelFilter::Debug) + .try_init() + .unwrap(); + } + /// Display the amount of data in the unit of GiB, MiB, KiB, or bytes. #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct DisplayData(usize); diff --git a/core/src/layers/5-disk/gc.rs b/core/src/layers/5-disk/gc.rs index 5168ac8..338b890 100644 --- a/core/src/layers/5-disk/gc.rs +++ b/core/src/layers/5-disk/gc.rs @@ -6,6 +6,7 @@ use super::{ }; use crate::{ layers::{disk::segment::SEGMENT_SIZE, lsm::TxLsmTree}, + tx::TxProvider, BlockSet, Error, }; use crate::{ @@ -28,9 +29,9 @@ use core::{ use hashbrown::{HashMap, HashSet}; use log::debug; // Default gc interval time is 30 seconds -const DEFAULT_GC_INTERVAL_TIME: std::time::Duration = std::time::Duration::from_secs(3); +const DEFAULT_GC_INTERVAL_TIME: std::time::Duration = std::time::Duration::from_secs(5); const GC_WATERMARK: usize = 16; -const DEFAULT_GC_THRESHOLD: f64 = 0.5; +const DEFAULT_GC_THRESHOLD: f64 = 0.2; // SharedState is used to synchronize background GC and foreground I/O requests and lsm compaction // 1. Background GC will stop the world, I/O requests and lsm compaction will be blocked @@ -184,6 +185,7 @@ pub(super) struct GcWorker { reverse_index_table: Arc, block_validity_table: Arc, tx_log_store: Arc>, + tx_provider: Arc, user_data_disk: Arc, shared_state: SharedStateRef, } @@ -198,6 +200,7 @@ impl GcWorker { user_data_disk: Arc, shared_state: SharedStateRef, ) -> Self { + let tx_provider = TxProvider::new(); Self { victim_policy, logical_block_table, @@ -206,6 +209,7 @@ impl GcWorker { tx_log_store, user_data_disk, shared_state, + tx_provider, } } @@ -335,35 +339,42 @@ impl GcWorker { let offset = victim_segment.segment_id() * SEGMENT_SIZE; self.user_data_disk.read(offset, victim_data.as_mut())?; - let target_hba_batches = free_hbas.group_by(|hba1, hba2| hba2.saturating_sub(*hba1) == 1); - let mut victim_hba_iter = valid_hbas.iter(); - for target_hba_batch in target_hba_batches { - let batch_len = target_hba_batch.len(); - let mut write_buf = Buf::alloc(batch_len)?; + let mut tx = self.tx_provider.new_tx(); + let res: Result<()> = tx.context(|| { + let target_hba_batches = + free_hbas.group_by(|hba1, hba2| hba2.saturating_sub(*hba1) == 1); + let mut victim_hba_iter = valid_hbas.iter(); + for target_hba_batch in target_hba_batches { + let batch_len = target_hba_batch.len(); + let mut write_buf = Buf::alloc(batch_len)?; + + // read enough blocks to fill the batch + for i in 0..batch_len { + let Some(victim_hba) = victim_hba_iter.next() else { + break; + }; + let start = (victim_hba % SEGMENT_SIZE) * BLOCK_SIZE; + let end = start + BLOCK_SIZE; + + let des_start = i * BLOCK_SIZE; + let des_end = (i + 1) * BLOCK_SIZE; + write_buf.as_mut_slice()[des_start..des_end] + .copy_from_slice(&victim_data.as_slice()[start..end]); + } - // read enough blocks to fill the batch - for i in 0..batch_len { - let Some(victim_hba) = victim_hba_iter.next() else { - break; - }; - let start = (victim_hba % SEGMENT_SIZE) * BLOCK_SIZE; - let end = start + BLOCK_SIZE; - - let des_start = i * BLOCK_SIZE; - let des_end = (i + 1) * BLOCK_SIZE; - write_buf.as_mut_slice()[des_start..des_end] - .copy_from_slice(&victim_data.as_slice()[start..end]); + self.user_data_disk + .write(*target_hba_batch.first().unwrap(), write_buf.as_ref())?; } - self.user_data_disk - .write(*target_hba_batch.first().unwrap(), write_buf.as_ref())?; - } - - free_hbas - .iter() - .for_each(|hba| self.block_validity_table.set_allocated(*hba)); + free_hbas + .iter() + .for_each(|hba| self.block_validity_table.set_allocated(*hba)); - victim_segment.clear_segment(); + victim_segment.clear_segment(); + Ok(()) + }); + res?; + tx.commit()?; Ok(( valid_hbas.into_iter().zip(free_hbas).collect(),