From 89bea728fa8291bbf9b8c5be2d7dc1bc0597a7a0 Mon Sep 17 00:00:00 2001 From: Shaowei Song Date: Tue, 18 Jun 2024 16:58:42 +0800 Subject: [PATCH] [bugfix] Fix possible inconsistent state in journal --- core/src/layers/3-log/chunk.rs | 76 +++++++++++-------- core/src/layers/3-log/raw_log.rs | 54 +++++++------- core/src/layers/3-log/tx_log.rs | 124 +++++++++++++++++-------------- 3 files changed, 143 insertions(+), 111 deletions(-) diff --git a/core/src/layers/3-log/chunk.rs b/core/src/layers/3-log/chunk.rs index 15b1069..db73b1f 100644 --- a/core/src/layers/3-log/chunk.rs +++ b/core/src/layers/3-log/chunk.rs @@ -68,7 +68,8 @@ impl ChunkAlloc { } /// Constructs a `ChunkAlloc` from its parts. - pub(super) fn from_parts(state: ChunkAllocState, tx_provider: Arc) -> Self { + pub(super) fn from_parts(mut state: ChunkAllocState, tx_provider: Arc) -> Self { + state.in_journal = false; let new_self = Self { state: Arc::new(Mutex::new(state)), tx_provider, @@ -147,6 +148,7 @@ impl ChunkAlloc { } } } + ids.sort_unstable(); ids }; @@ -215,7 +217,7 @@ impl Debug for ChunkAlloc { let state = self.state.lock(); f.debug_struct("ChunkAlloc") .field("bitmap_free_count", &state.free_count) - .field("bitmap_min_free", &state.min_free) + .field("bitmap_next_free", &state.next_free) .finish() } } @@ -232,9 +234,11 @@ pub struct ChunkAllocState { alloc_map: BitMap, // The number of free chunks. free_count: usize, - // The minimum free chunk Id. Useful to narrow the scope of searching for - // free chunk IDs. - min_free: usize, + // The next free chunk Id. Used to narrow the scope of + // searching for free chunk IDs. + next_free: usize, + /// Whether the state is in the journal or not. + in_journal: bool, } // TODO: Separate persistent and volatile state of `ChunkAlloc` @@ -245,26 +249,43 @@ impl ChunkAllocState { Self { alloc_map: BitMap::repeat(false, capacity), free_count: capacity, - min_free: 0, + next_free: 0, + in_journal: false, + } + } + + /// Creates a persistent state in the journal. The state in the journal and + /// the state that `RawLogStore` manages act differently on allocation and + /// edits' appliance. + pub fn new_in_journal(capacity: usize) -> Self { + Self { + alloc_map: BitMap::repeat(false, capacity), + free_count: capacity, + next_free: 0, + in_journal: true, } } /// Allocates a chunk, returning its ID. pub fn alloc(&mut self) -> Option { - let min_free = self.min_free; - if min_free >= self.alloc_map.len() { - return None; + let mut next_free = self.next_free; + if next_free == self.alloc_map.len() { + next_free = 0; } - let free_chunk_id = self - .alloc_map - .first_zero(min_free) - .expect("there must exists a zero"); + let free_chunk_id = { + if let Some(chunk_id) = self.alloc_map.first_zero(next_free) { + chunk_id + } else { + self.alloc_map + .first_zero(0) + .expect("there must exists a zero") + } + }; + self.alloc_map.set(free_chunk_id, true); self.free_count -= 1; - - // Keep the invariance that all free chunk IDs are no less than `min_free` - self.min_free = free_chunk_id + 1; + self.next_free = free_chunk_id + 1; Some(free_chunk_id) } @@ -275,14 +296,9 @@ impl ChunkAllocState { /// /// Deallocating a free chunk causes panic. pub fn dealloc(&mut self, chunk_id: ChunkId) { - // debug_assert_eq!(self.alloc_map[chunk_id], true); // may fail in journal's commit + debug_assert_eq!(self.alloc_map[chunk_id], true); self.alloc_map.set(chunk_id, false); self.free_count += 1; - - // Keep the invariance that all free chunk IDs are no less than min_free - if chunk_id < self.min_free { - self.min_free = chunk_id; - } } /// Returns the total number of chunks. @@ -306,7 +322,7 @@ impl ChunkAllocState { //////////////////////////////////////////////////////////////////////////////// /// A persistent edit to the state of a chunk allocator. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChunkAllocEdit { edit_table: HashMap, } @@ -314,7 +330,7 @@ pub struct ChunkAllocEdit { /// The smallest unit of a persistent edit to the /// state of a chunk allocator, which is /// a chunk being either allocated or deallocated. -#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] enum ChunkEdit { Alloc, Dealloc, @@ -379,23 +395,23 @@ impl ChunkAllocEdit { impl Edit for ChunkAllocEdit { fn apply_to(&self, state: &mut ChunkAllocState) { + let mut to_be_deallocated = Vec::new(); for (&chunk_id, chunk_edit) in &self.edit_table { match chunk_edit { ChunkEdit::Alloc => { - // Journal's state also needs to be updated - if !state.is_chunk_allocated(chunk_id) { + if state.in_journal { let _allocated_id = state.alloc().unwrap(); - // `_allocated_id` may not be equal to `chunk_id` due to concurrent TXs, - // but eventually the state will be consistent } - // Except journal, nothing needs to be done } ChunkEdit::Dealloc => { - state.dealloc(chunk_id); + to_be_deallocated.push(chunk_id); } } } + for chunk_id in to_be_deallocated { + state.dealloc(chunk_id); + } } } diff --git a/core/src/layers/3-log/raw_log.rs b/core/src/layers/3-log/raw_log.rs index 50f5a27..edd8730 100644 --- a/core/src/layers/3-log/raw_log.rs +++ b/core/src/layers/3-log/raw_log.rs @@ -235,7 +235,7 @@ impl RawLogStore { /// This method must be called within a TX. Otherwise, this method panics. pub fn create_log(&self) -> Result> { let mut state = self.state.lock(); - let new_log_id = state.persistent.alloc_log_id(); + let new_log_id = state.alloc_log_id(); state .add_to_write_set(new_log_id) .expect("created log can't appear in write set"); @@ -335,10 +335,7 @@ impl Debug for RawLogStore { let state = self.state.lock(); f.debug_struct("RawLogStore") .field("persistent_log_table", &state.persistent.log_table) - .field( - "persistent_next_free_log_id", - &state.persistent.next_free_log_id, - ) + .field("next_free_log_id", &state.next_free_log_id) .field("write_set", &state.write_set) .field("chunk_alloc", &self.chunk_alloc) .finish() @@ -612,11 +609,12 @@ impl<'a> RawLogHeadRef<'a> { debug_assert!(offset + nblocks <= self.entry.head.num_blocks as _); let prepared_blocks = self.prepare_blocks(offset, nblocks); - debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted()); + debug_assert_eq!(prepared_blocks.len(), nblocks); // Batch read + // Note that `prepared_blocks` are not always sorted let mut offset = 0; - for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) { + for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) { let len = consecutive_blocks.len(); let first_bid = *consecutive_blocks.first().unwrap(); let buf_slice = @@ -687,11 +685,12 @@ impl<'a> RawLogTailRef<'a> { debug_assert!(offset + nblocks <= tail_nblocks); let prepared_blocks = self.prepare_blocks(offset, nblocks); - debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted()); + debug_assert_eq!(prepared_blocks.len(), nblocks); // Batch read + // Note that `prepared_blocks` are not always sorted let mut offset = 0; - for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) { + for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) { let len = consecutive_blocks.len(); let first_bid = *consecutive_blocks.first().unwrap(); let buf_slice = @@ -707,11 +706,12 @@ impl<'a> RawLogTailRef<'a> { let nblocks = buf.nblocks(); let prepared_blocks = self.prepare_blocks(self.len() as _, nblocks); - debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted()); + debug_assert_eq!(prepared_blocks.len(), nblocks); // Batch write + // Note that `prepared_blocks` are not always sorted let mut offset = 0; - for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) { + for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) { let len = consecutive_blocks.len(); let first_bid = *consecutive_blocks.first().unwrap(); let buf_slice = &buf.as_slice()[offset * BLOCK_SIZE..(offset + len) * BLOCK_SIZE]; @@ -781,6 +781,7 @@ impl<'a> RawLogTailRef<'a> { /// The volatile and persistent state of a `RawLogStore`. struct State { persistent: RawLogStoreState, + next_free_log_id: u64, write_set: HashSet, lazy_deletes: HashMap>>, } @@ -789,7 +790,6 @@ struct State { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RawLogStoreState { log_table: HashMap, - next_free_log_id: u64, } /// A log entry implies the persistent state of the raw log. @@ -810,8 +810,14 @@ impl State { persistent: RawLogStoreState, lazy_deletes: HashMap>>, ) -> Self { + let next_free_log_id = if let Some(max_log_id) = lazy_deletes.keys().max() { + max_log_id + 1 + } else { + 0 + }; Self { persistent: persistent.clone(), + next_free_log_id, write_set: HashSet::new(), lazy_deletes, } @@ -821,6 +827,15 @@ impl State { edit.apply_to(&mut self.persistent); } + pub fn alloc_log_id(&mut self) -> u64 { + let new_log_id = self.next_free_log_id; + self.next_free_log_id = self + .next_free_log_id + .checked_add(1) + .expect("64-bit IDs won't be exhausted even though IDs are not recycled"); + new_log_id + } + pub fn add_to_write_set(&mut self, log_id: RawLogId) -> Result<()> { let not_exists = self.write_set.insert(log_id); if !not_exists { @@ -840,19 +855,9 @@ impl RawLogStoreState { pub fn new() -> Self { Self { log_table: HashMap::new(), - next_free_log_id: 0, } } - pub fn alloc_log_id(&mut self) -> u64 { - let new_log_id = self.next_free_log_id; - self.next_free_log_id = self - .next_free_log_id - .checked_add(1) - .expect("64-bit IDs won't be exhausted even though IDs are not recycled"); - new_log_id - } - pub fn create_log(&mut self, new_log_id: u64) { let new_log_entry = RawLogEntry { head: RawLogHead::new(), @@ -1039,11 +1044,6 @@ impl Edit for RawLogStoreEdit { let RawLogCreate { tail } = create; state.create_log(log_id); state.append_log(log_id, tail); - - // Journal's state also needs to be updated - if state.next_free_log_id <= log_id { - let _ = state.alloc_log_id(); - } } RawLogEdit::Append(append) => { let RawLogAppend { tail } = append; diff --git a/core/src/layers/3-log/tx_log.rs b/core/src/layers/3-log/tx_log.rs index acf29d9..ae3ce97 100644 --- a/core/src/layers/3-log/tx_log.rs +++ b/core/src/layers/3-log/tx_log.rs @@ -117,22 +117,19 @@ impl TxLogStore { let total_nblocks = disk.nblocks(); let (log_store_nblocks, journal_nblocks) = Self::calc_store_and_journal_nblocks(total_nblocks); + let nchunks = log_store_nblocks / CHUNK_NBLOCKS; + let log_store_area = disk.subset(1..1 + log_store_nblocks)?; let journal_area = disk.subset(1 + log_store_nblocks..1 + log_store_nblocks + journal_nblocks)?; let tx_provider = TxProvider::new(); - let nchunks = log_store_nblocks / CHUNK_NBLOCKS; - let chunk_alloc = ChunkAlloc::new(nchunks, tx_provider.clone()); - let raw_log_store = RawLogStore::new(log_store_area, tx_provider.clone(), chunk_alloc); - let tx_log_store_state = TxLogStoreState::new(); - let journal = { let all_state = AllState { - chunk_alloc: ChunkAllocState::new(nchunks), + chunk_alloc: ChunkAllocState::new_in_journal(nchunks), raw_log_store: RawLogStoreState::new(), - tx_log_store: tx_log_store_state.clone(), + tx_log_store: TxLogStoreState::new(), }; Arc::new(Mutex::new(Journal::format( journal_area, @@ -141,6 +138,11 @@ impl TxLogStore { JournalCompactPolicy {}, )?)) }; + Self::register_commit_handler_for_journal(&journal, &tx_provider); + + let chunk_alloc = ChunkAlloc::new(nchunks, tx_provider.clone()); + let raw_log_store = RawLogStore::new(log_store_area, tx_provider.clone(), chunk_alloc); + let tx_log_store_state = TxLogStoreState::new(); let superblock = Superblock { journal_area_meta: journal.lock().meta(), @@ -171,6 +173,37 @@ impl TxLogStore { (log_store_nblocks, journal_nblocks) // TBD } + fn register_commit_handler_for_journal( + journal: &Arc>>, + tx_provider: &Arc, + ) { + let journal = journal.clone(); + tx_provider.register_commit_handler({ + move |current: CurrentTx<'_>| { + let mut journal = journal.lock(); + current.data_with(|tx_log_edit: &TxLogStoreEdit| { + if tx_log_edit.is_empty() { + return; + } + journal.add(AllEdit::from_tx_log_edit(tx_log_edit)); + }); + current.data_with(|raw_log_edit: &RawLogStoreEdit| { + if raw_log_edit.is_empty() { + return; + } + journal.add(AllEdit::from_raw_log_edit(raw_log_edit)); + }); + current.data_with(|chunk_edit: &ChunkAllocEdit| { + if chunk_edit.is_empty() { + return; + } + journal.add(AllEdit::from_chunk_edit(chunk_edit)); + }); + journal.commit(); + } + }); + } + /// Recovers an existing `TxLogStore` from a disk using the given key. pub fn recover(disk: D, root_key: Key) -> Result { let superblock = Superblock::open(&disk.subset(0..1)?, &root_key)?; @@ -186,23 +219,28 @@ impl TxLogStore { 1 + superblock.chunk_area_nblocks ..1 + superblock.chunk_area_nblocks + journal_area_meta.total_nblocks(), )?; - Journal::recover(journal_area, &journal_area_meta, JournalCompactPolicy {})? + Arc::new(Mutex::new(Journal::recover( + journal_area, + &journal_area_meta, + JournalCompactPolicy {}, + )?)) }; - let all_state = journal.state(); + Self::register_commit_handler_for_journal(&journal, &tx_provider); - let chunk_alloc = - ChunkAlloc::from_parts(all_state.chunk_alloc.clone(), tx_provider.clone()); - let chunk_area = disk.subset(1..1 + superblock.chunk_area_nblocks)?; - let raw_log_store = RawLogStore::from_parts( - all_state.raw_log_store.clone(), - chunk_area, + let AllState { chunk_alloc, - tx_provider.clone(), - ); + raw_log_store, + tx_log_store, + } = journal.lock().state().clone(); + + let chunk_alloc = ChunkAlloc::from_parts(chunk_alloc, tx_provider.clone()); + let chunk_area = disk.subset(1..1 + superblock.chunk_area_nblocks)?; + let raw_log_store = + RawLogStore::from_parts(raw_log_store, chunk_area, chunk_alloc, tx_provider.clone()); let tx_log_store = TxLogStore::from_parts( - all_state.tx_log_store.clone(), + tx_log_store, raw_log_store, - Arc::new(Mutex::new(journal)), + journal, superblock, root_key, disk, @@ -258,33 +296,6 @@ impl TxLogStore { } }); - // Commit handler for journal - let journal = journal.clone(); - tx_provider.register_commit_handler({ - move |current: CurrentTx<'_>| { - let mut journal = journal.lock(); - current.data_with(|chunk_edit: &ChunkAllocEdit| { - if chunk_edit.is_empty() { - return; - } - journal.add(AllEdit::from_chunk_edit(chunk_edit)); - }); - current.data_with(|raw_log_edit: &RawLogStoreEdit| { - if raw_log_edit.is_empty() { - return; - } - journal.add(AllEdit::from_raw_log_edit(raw_log_edit)); - }); - current.data_with(|tx_log_edit: &TxLogStoreEdit| { - if tx_log_edit.is_empty() { - return; - } - journal.add(AllEdit::from_tx_log_edit(tx_log_edit)); - }); - journal.commit(); - } - }); - // Commit handler for log store tx_provider.register_commit_handler({ let state = new_self.state.clone(); @@ -679,10 +690,9 @@ impl TxLogStore { /// Syncs all the data managed by `TxLogStore` for persistence. pub fn sync(&self) -> Result<()> { self.raw_log_store.sync().unwrap(); - self.journal.lock().flush()?; + self.journal.lock().flush().unwrap(); - self.raw_disk.flush()?; - Ok(()) + self.raw_disk.flush() } } @@ -1290,14 +1300,14 @@ mod journaling { pub type Journal = EditJournal; pub type JournalCompactPolicy = NeverCompactPolicy; - #[derive(Clone, Serialize, Deserialize)] + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AllState { pub chunk_alloc: ChunkAllocState, pub raw_log_store: RawLogStoreState, pub tx_log_store: TxLogStoreState, } - #[derive(Serialize, Deserialize)] + #[derive(Debug, Serialize, Deserialize)] pub struct AllEdit { pub chunk_edit: ChunkAllocEdit, pub raw_log_edit: RawLogStoreEdit, @@ -1306,9 +1316,15 @@ mod journaling { impl Edit for AllEdit { fn apply_to(&self, state: &mut AllState) { - self.chunk_edit.apply_to(&mut state.chunk_alloc); - self.raw_log_edit.apply_to(&mut state.raw_log_store); - self.tx_log_edit.apply_to(&mut state.tx_log_store); + if !self.tx_log_edit.is_empty() { + self.tx_log_edit.apply_to(&mut state.tx_log_store); + } + if !self.raw_log_edit.is_empty() { + self.raw_log_edit.apply_to(&mut state.raw_log_store); + } + if !self.chunk_edit.is_empty() { + self.chunk_edit.apply_to(&mut state.chunk_alloc); + } } }