Skip to content

Commit

Permalink
[bugfix] Fix possible inconsistent state in journal
Browse files Browse the repository at this point in the history
  • Loading branch information
lucassong-mh committed Jun 18, 2024
1 parent 9c5ae99 commit 89bea72
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 111 deletions.
76 changes: 46 additions & 30 deletions core/src/layers/3-log/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl ChunkAlloc {
}

/// Constructs a `ChunkAlloc` from its parts.
pub(super) fn from_parts(state: ChunkAllocState, tx_provider: Arc<TxProvider>) -> Self {
pub(super) fn from_parts(mut state: ChunkAllocState, tx_provider: Arc<TxProvider>) -> Self {
state.in_journal = false;
let new_self = Self {
state: Arc::new(Mutex::new(state)),
tx_provider,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl ChunkAlloc {
}
}
}
ids.sort_unstable();
ids
};

Expand Down Expand Up @@ -215,7 +217,7 @@ impl Debug for ChunkAlloc {
let state = self.state.lock();
f.debug_struct("ChunkAlloc")
.field("bitmap_free_count", &state.free_count)
.field("bitmap_min_free", &state.min_free)
.field("bitmap_next_free", &state.next_free)
.finish()
}
}
Expand All @@ -232,9 +234,11 @@ pub struct ChunkAllocState {
alloc_map: BitMap,
// The number of free chunks.
free_count: usize,
// The minimum free chunk Id. Useful to narrow the scope of searching for
// free chunk IDs.
min_free: usize,
// The next free chunk Id. Used to narrow the scope of
// searching for free chunk IDs.
next_free: usize,
/// Whether the state is in the journal or not.
in_journal: bool,
}
// TODO: Separate persistent and volatile state of `ChunkAlloc`

Expand All @@ -245,26 +249,43 @@ impl ChunkAllocState {
Self {
alloc_map: BitMap::repeat(false, capacity),
free_count: capacity,
min_free: 0,
next_free: 0,
in_journal: false,
}
}

/// Creates a persistent state in the journal. The state in the journal and
/// the state that `RawLogStore` manages act differently on allocation and
/// edits' appliance.
pub fn new_in_journal(capacity: usize) -> Self {
Self {
alloc_map: BitMap::repeat(false, capacity),
free_count: capacity,
next_free: 0,
in_journal: true,
}
}

/// Allocates a chunk, returning its ID.
pub fn alloc(&mut self) -> Option<ChunkId> {
let min_free = self.min_free;
if min_free >= self.alloc_map.len() {
return None;
let mut next_free = self.next_free;
if next_free == self.alloc_map.len() {
next_free = 0;
}

let free_chunk_id = self
.alloc_map
.first_zero(min_free)
.expect("there must exists a zero");
let free_chunk_id = {
if let Some(chunk_id) = self.alloc_map.first_zero(next_free) {
chunk_id
} else {
self.alloc_map
.first_zero(0)
.expect("there must exists a zero")
}
};

self.alloc_map.set(free_chunk_id, true);
self.free_count -= 1;

// Keep the invariance that all free chunk IDs are no less than `min_free`
self.min_free = free_chunk_id + 1;
self.next_free = free_chunk_id + 1;

Some(free_chunk_id)
}
Expand All @@ -275,14 +296,9 @@ impl ChunkAllocState {
///
/// Deallocating a free chunk causes panic.
pub fn dealloc(&mut self, chunk_id: ChunkId) {
// debug_assert_eq!(self.alloc_map[chunk_id], true); // may fail in journal's commit
debug_assert_eq!(self.alloc_map[chunk_id], true);
self.alloc_map.set(chunk_id, false);
self.free_count += 1;

// Keep the invariance that all free chunk IDs are no less than min_free
if chunk_id < self.min_free {
self.min_free = chunk_id;
}
}

/// Returns the total number of chunks.
Expand All @@ -306,15 +322,15 @@ impl ChunkAllocState {
////////////////////////////////////////////////////////////////////////////////

/// A persistent edit to the state of a chunk allocator.
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChunkAllocEdit {
edit_table: HashMap<ChunkId, ChunkEdit>,
}

/// The smallest unit of a persistent edit to the
/// state of a chunk allocator, which is
/// a chunk being either allocated or deallocated.
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
enum ChunkEdit {
Alloc,
Dealloc,
Expand Down Expand Up @@ -379,23 +395,23 @@ impl ChunkAllocEdit {

impl Edit<ChunkAllocState> for ChunkAllocEdit {
fn apply_to(&self, state: &mut ChunkAllocState) {
let mut to_be_deallocated = Vec::new();
for (&chunk_id, chunk_edit) in &self.edit_table {
match chunk_edit {
ChunkEdit::Alloc => {
// Journal's state also needs to be updated
if !state.is_chunk_allocated(chunk_id) {
if state.in_journal {
let _allocated_id = state.alloc().unwrap();
// `_allocated_id` may not be equal to `chunk_id` due to concurrent TXs,
// but eventually the state will be consistent
}

// Except journal, nothing needs to be done
}
ChunkEdit::Dealloc => {
state.dealloc(chunk_id);
to_be_deallocated.push(chunk_id);
}
}
}
for chunk_id in to_be_deallocated {
state.dealloc(chunk_id);
}
}
}

Expand Down
54 changes: 27 additions & 27 deletions core/src/layers/3-log/raw_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl<D: BlockSet> RawLogStore<D> {
/// This method must be called within a TX. Otherwise, this method panics.
pub fn create_log(&self) -> Result<RawLog<D>> {
let mut state = self.state.lock();
let new_log_id = state.persistent.alloc_log_id();
let new_log_id = state.alloc_log_id();
state
.add_to_write_set(new_log_id)
.expect("created log can't appear in write set");
Expand Down Expand Up @@ -335,10 +335,7 @@ impl<D> Debug for RawLogStore<D> {
let state = self.state.lock();
f.debug_struct("RawLogStore")
.field("persistent_log_table", &state.persistent.log_table)
.field(
"persistent_next_free_log_id",
&state.persistent.next_free_log_id,
)
.field("next_free_log_id", &state.next_free_log_id)
.field("write_set", &state.write_set)
.field("chunk_alloc", &self.chunk_alloc)
.finish()
Expand Down Expand Up @@ -612,11 +609,12 @@ impl<'a> RawLogHeadRef<'a> {
debug_assert!(offset + nblocks <= self.entry.head.num_blocks as _);

let prepared_blocks = self.prepare_blocks(offset, nblocks);
debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted());
debug_assert_eq!(prepared_blocks.len(), nblocks);

// Batch read
// Note that `prepared_blocks` are not always sorted
let mut offset = 0;
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) {
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) {
let len = consecutive_blocks.len();
let first_bid = *consecutive_blocks.first().unwrap();
let buf_slice =
Expand Down Expand Up @@ -687,11 +685,12 @@ impl<'a> RawLogTailRef<'a> {
debug_assert!(offset + nblocks <= tail_nblocks);

let prepared_blocks = self.prepare_blocks(offset, nblocks);
debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted());
debug_assert_eq!(prepared_blocks.len(), nblocks);

// Batch read
// Note that `prepared_blocks` are not always sorted
let mut offset = 0;
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) {
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) {
let len = consecutive_blocks.len();
let first_bid = *consecutive_blocks.first().unwrap();
let buf_slice =
Expand All @@ -707,11 +706,12 @@ impl<'a> RawLogTailRef<'a> {
let nblocks = buf.nblocks();

let prepared_blocks = self.prepare_blocks(self.len() as _, nblocks);
debug_assert!(prepared_blocks.len() == nblocks && prepared_blocks.is_sorted());
debug_assert_eq!(prepared_blocks.len(), nblocks);

// Batch write
// Note that `prepared_blocks` are not always sorted
let mut offset = 0;
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2 - b1 == 1) {
for consecutive_blocks in prepared_blocks.group_by(|b1, b2| b2.saturating_sub(*b1) == 1) {
let len = consecutive_blocks.len();
let first_bid = *consecutive_blocks.first().unwrap();
let buf_slice = &buf.as_slice()[offset * BLOCK_SIZE..(offset + len) * BLOCK_SIZE];
Expand Down Expand Up @@ -781,6 +781,7 @@ impl<'a> RawLogTailRef<'a> {
/// The volatile and persistent state of a `RawLogStore`.
struct State {
persistent: RawLogStoreState,
next_free_log_id: u64,
write_set: HashSet<RawLogId>,
lazy_deletes: HashMap<RawLogId, Arc<LazyDelete<RawLogEntry>>>,
}
Expand All @@ -789,7 +790,6 @@ struct State {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RawLogStoreState {
log_table: HashMap<RawLogId, RawLogEntry>,
next_free_log_id: u64,
}

/// A log entry implies the persistent state of the raw log.
Expand All @@ -810,8 +810,14 @@ impl State {
persistent: RawLogStoreState,
lazy_deletes: HashMap<RawLogId, Arc<LazyDelete<RawLogEntry>>>,
) -> Self {
let next_free_log_id = if let Some(max_log_id) = lazy_deletes.keys().max() {
max_log_id + 1
} else {
0
};
Self {
persistent: persistent.clone(),
next_free_log_id,
write_set: HashSet::new(),
lazy_deletes,
}
Expand All @@ -821,6 +827,15 @@ impl State {
edit.apply_to(&mut self.persistent);
}

pub fn alloc_log_id(&mut self) -> u64 {
let new_log_id = self.next_free_log_id;
self.next_free_log_id = self
.next_free_log_id
.checked_add(1)
.expect("64-bit IDs won't be exhausted even though IDs are not recycled");
new_log_id
}

pub fn add_to_write_set(&mut self, log_id: RawLogId) -> Result<()> {
let not_exists = self.write_set.insert(log_id);
if !not_exists {
Expand All @@ -840,19 +855,9 @@ impl RawLogStoreState {
pub fn new() -> Self {
Self {
log_table: HashMap::new(),
next_free_log_id: 0,
}
}

pub fn alloc_log_id(&mut self) -> u64 {
let new_log_id = self.next_free_log_id;
self.next_free_log_id = self
.next_free_log_id
.checked_add(1)
.expect("64-bit IDs won't be exhausted even though IDs are not recycled");
new_log_id
}

pub fn create_log(&mut self, new_log_id: u64) {
let new_log_entry = RawLogEntry {
head: RawLogHead::new(),
Expand Down Expand Up @@ -1039,11 +1044,6 @@ impl Edit<RawLogStoreState> for RawLogStoreEdit {
let RawLogCreate { tail } = create;
state.create_log(log_id);
state.append_log(log_id, tail);

// Journal's state also needs to be updated
if state.next_free_log_id <= log_id {
let _ = state.alloc_log_id();
}
}
RawLogEdit::Append(append) => {
let RawLogAppend { tail } = append;
Expand Down
Loading

0 comments on commit 89bea72

Please sign in to comment.