Skip to content

Commit

Permalink
Consider both logical and physical compaction when closing trace
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Jul 19, 2023
1 parent 74f3900 commit 746950b
Showing 1 changed file with 64 additions and 28 deletions.
92 changes: 64 additions & 28 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,9 @@ where
}
#[inline]
fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
// If the logical frontier is empty, we are not obliged to be able to represent any time,
// since no time is in advance of `[]`. In the case, we can drop all contents.
//
// We need to call `close` before advancing `logical_frontier` because otherwise we cannot
// insert anymore.
if frontier.is_empty() {
self.drop_batches();
self.close();
}

self.logical_frontier.clear();
self.logical_frontier.extend(frontier.iter().cloned());
self.consider_closing();
}
#[inline]
fn get_logical_compaction(&mut self) -> AntichainRef<B::Time> { self.logical_frontier.borrow() }
Expand All @@ -233,6 +224,7 @@ where
debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
self.physical_frontier.clear();
self.physical_frontier.extend(frontier.iter().cloned());
self.consider_closing();
self.consider_merges();
}
#[inline]
Expand Down Expand Up @@ -303,9 +295,8 @@ where
// merging the batch. This means it is a good time to perform amortized work proportional
// to the size of batch.
fn insert(&mut self, batch: Self::Batch) {
// If the logical frontier is empty, we are not obliged to be able to represent any time,
// since no time is in advance of `[]`. In the case, we can drop the batch.
if self.logical_frontier.is_empty() {
// Once the upper frontier is empty, we don't expect any more data.
if self.upper.is_empty() {
return
}

Expand All @@ -315,7 +306,7 @@ where
length: batch.len()
}));

assert!(batch.lower() != batch.upper());
assert_ne!(batch.lower(), batch.upper());
assert_eq!(batch.lower(), &self.upper);

self.upper.clone_from(batch.upper());
Expand Down Expand Up @@ -355,7 +346,7 @@ where
B::Time: Lattice+Ord,
B::R: Semigroup,
{
/// Drops and logs batches. Used in `set_logical_compaction` and drop.
/// Drops and logs batches. Used in `consider_closing` and drop.
fn drop_batches(&mut self) {
let merging = std::mem::take(&mut self.merging);
let pending = std::mem::take(&mut self.pending);
Expand Down Expand Up @@ -399,7 +390,7 @@ where

impl<B> Spine<B>
where
B: Batch,
B: Batch+Clone+'static,
B::Key: Ord+Clone,
B::Val: Ord+Clone,
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
Expand Down Expand Up @@ -779,6 +770,20 @@ where
}
}
}

/// Consider closing the trace
///
/// Used by `set_logical_compaction` and `set_physical_compaction`.
///
/// If both logical and physical compaction are empty, drop all batches and close the trace.
fn consider_closing(&mut self ) {
// If both logical and physical compaction are `[]`, we cannot represent any times
// anymore and are free to discard our contents.
if self.physical_frontier.is_empty() && self.logical_frontier.is_empty() {
self.drop_batches();
self.close();
}
}
}


Expand Down Expand Up @@ -948,15 +953,36 @@ mod test {
use trace::{BatchReader, TraceReader};

#[test]
fn test_empty_logical_compaction() {
fn test_empty_batch() {
timely::execute_directly(|worker| {
let (input, trace, probe) = worker.dataflow::<isize, _, _>(|scope| {
let (input, collection) = scope.new_collection::<(usize, usize), isize>();
let arranged = collection.arrange_by_key();
(input, arranged.trace, arranged.stream.probe())
});

drop(input);

worker.step_while(|| !probe.done());

// Check that there is a single empty batch with empty upper
let (mut len, mut count) = (0, 0);
trace.map_batches(|batch| {
len += batch.len();
count += 1;
assert!(batch.description().upper().is_empty());
});
assert_eq!(len, 0);
assert_eq!(count, 1);
});
}

#[test]
fn test_empty_compaction() {
timely::execute_directly(|worker| {
let (mut input, mut trace, probe) = worker.dataflow(|scope| {
let (input, collection) = scope.new_collection();

let arranged = collection.arrange_by_key();

(input, arranged.trace, arranged.stream.probe())
});

Expand All @@ -968,7 +994,7 @@ mod test {
input.advance_to(1);
input.flush();

worker.step_while(|| probe.less_than(&1));
worker.step_while(|| probe.less_than(input.time()));

// Check that the data is in the trace and that the upper has advanced to 1.
let mut len = 0;
Expand All @@ -978,36 +1004,46 @@ mod test {
trace.read_upper(&mut upper);
assert_eq!(upper, Antichain::from_elem(1));

// Permit logical compaction to [] to trigger cleanup.
// Permit logical compaction to []
trace.set_logical_compaction(Antichain::new().borrow());
input.advance_to(2);
input.flush();
worker.step_while(|| probe.less_than(&2));
worker.step_while(|| probe.less_than(input.time()));

// We still expect data because physical compaction hasn't been advanced
let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 2);

// Permit physical compaction to [] to trigger cleanup.
trace.set_physical_compaction(Antichain::new().borrow());
input.advance_to(3);
input.flush();
worker.step_while(|| probe.less_than(input.time()));

// Assert no data, single empty batch with empty upper frontier.
let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 0);
trace.map_batches(|batch| assert!(batch.description().upper().is_empty()));
trace.read_upper(&mut upper);
assert!(upper.is_empty());
assert_eq!(upper, Antichain::new());

// Insert more data and check that the trace stays empty.
input.insert((3, 'c'));
input.advance_to(3);
input.advance_to(4);
input.flush();

worker.step_while(|| probe.less_than(&3));
worker.step_while(|| probe.less_than(input.time()));

let (mut len, mut count) = (0, 0);
trace.map_batches(|batch| {
len += batch.len();
count += 1;
assert!(batch.description().upper().is_empty());
});
assert_eq!(len, 0);
assert_eq!(count, 1);
trace.map_batches(|batch| assert!(batch.description().upper().is_empty()));
trace.read_upper(&mut upper);
assert!(upper.is_empty());
});
}
}

0 comments on commit 746950b

Please sign in to comment.