Skip to content

Commit

Permalink
Merge pull request #155 from marvin-hansen/main
Browse files Browse the repository at this point in the history
improvements to the codebase:
  • Loading branch information
marvin-hansen authored Nov 27, 2024
2 parents bc2e50d + 9e248f6 commit a281918
Show file tree
Hide file tree
Showing 37 changed files with 511 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
msrv = "1.65"
msrv = "1.80"
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::time::Duration;

const BUFFER_SIZE: usize = 65536;

const BATCH_SIZES: [u64; 3] = [1, 10, 100];

struct Checker;

impl EventHandler<i64> for Checker {
Expand Down Expand Up @@ -61,7 +63,7 @@ fn criterion_benchmark(c: &mut Criterion) {
group.throughput(Throughput::Elements(N));
group.warm_up_time(Duration::from_secs(10));
group.sampling_mode(SamplingMode::Flat);
for batch_size in [1, 10, 50, 100] {
for batch_size in BATCH_SIZES {
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
Expand All @@ -80,7 +82,7 @@ fn criterion_benchmark(c: &mut Criterion) {
group.throughput(Throughput::Elements(N));
group.warm_up_time(Duration::from_secs(10));
group.sampling_mode(SamplingMode::Flat);
for batch_size in [10, 50, 100, 1000] {
for batch_size in BATCH_SIZES {
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
Expand All @@ -99,7 +101,7 @@ fn criterion_benchmark(c: &mut Criterion) {
group.throughput(Throughput::Elements(N));
group.warm_up_time(Duration::from_secs(10));
group.sampling_mode(SamplingMode::Flat);
for batch_size in [1, 10, 50, 100, 1000] {
for batch_size in BATCH_SIZES {
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
Expand All @@ -118,7 +120,7 @@ fn criterion_benchmark(c: &mut Criterion) {
group.throughput(Throughput::Elements(N));
group.warm_up_time(Duration::from_secs(10));
group.sampling_mode(SamplingMode::Flat);
for batch_size in [10, 50, 100, 1000] {
for batch_size in BATCH_SIZES {
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use criterion::{black_box, criterion_group, Criterion};
use dcl_data_structures::ring_buffer::sequence::atomic_sequence::AtomicSequence;
use dcl_data_structures::ring_buffer::prelude::AtomicSequence;
use dcl_data_structures::ring_buffer::sequence::atomic_sequence_ordered::AtomicSequenceOrdered;

fn sequence_benchmark(c: &mut Criterion) {
let sequence = AtomicSequence::default();
let sequence = AtomicSequenceOrdered::default();

// Benchmark get operation
c.bench_function("sequence_get", |b| {
Expand All @@ -22,7 +23,7 @@ fn sequence_benchmark(c: &mut Criterion) {
c.bench_function("sequence_compare_exchange_success", |b| {
sequence.set(0);
b.iter(|| {
black_box(sequence.compare_exchange(0, 1));
black_box(sequence.compare_and_swap(0, 1));
sequence.set(0); // Reset for next iteration
})
});
Expand All @@ -31,14 +32,14 @@ fn sequence_benchmark(c: &mut Criterion) {
c.bench_function("sequence_compare_exchange_failure", |b| {
sequence.set(1);
b.iter(|| {
black_box(sequence.compare_exchange(0, 2));
black_box(sequence.compare_and_swap(0, 2));
})
});

// Benchmark sequence creation from value
c.bench_function("sequence_from_value", |b| {
b.iter(|| {
black_box(AtomicSequence::from(black_box(42)));
black_box(AtomicSequenceOrdered::from(black_box(42)));
})
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::sync::{
///
pub struct ProcessingSequenceBarrier<W: WaitStrategy> {
/// A vector of atomic sequences that this barrier depends on
gating_sequences: Vec<Arc<AtomicSequence>>,
gating_sequences: Vec<Arc<AtomicSequenceOrdered>>,
/// The strategy used for waiting when sequences are not yet available
wait_strategy: Arc<W>,
/// A flag indicating whether the barrier has been alerted (typically for shutdown)
Expand All @@ -59,7 +59,7 @@ impl<W: WaitStrategy> ProcessingSequenceBarrier<W> {
/// Returns a new instance of `ProcessingSequenceBarrier`
pub fn new(
wait_strategy: Arc<W>,
gating_sequences: Vec<Arc<AtomicSequence>>,
gating_sequences: Vec<Arc<AtomicSequenceOrdered>>,
is_alerted: Arc<AtomicBool>,
) -> Self {
ProcessingSequenceBarrier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ struct Processor<E, T> {
/// The event handler implementation
handler: E,
/// Cursor tracking the current position in the event sequence
cursor: Arc<AtomicSequence>,
cursor: Arc<AtomicSequenceOrdered>,
/// Phantom data to handle type parameters
_marker: PhantomData<T>,
}
Expand All @@ -158,7 +158,7 @@ struct ProcessorMut<E, T> {
/// The mutable event handler implementation
handler: E,
/// Cursor tracking the current position in the event sequence
cursor: Arc<AtomicSequence>,
cursor: Arc<AtomicSequenceOrdered>,
/// Phantom data to handle type parameters
_marker: PhantomData<T>,
}
Expand Down Expand Up @@ -218,7 +218,7 @@ where
})
}

fn get_cursor(&self) -> Arc<AtomicSequence> {
fn get_cursor(&self) -> Arc<AtomicSequenceOrdered> {
self.cursor.clone()
}
}
Expand All @@ -240,7 +240,7 @@ where
})
}

fn get_cursor(&self) -> Arc<AtomicSequence> {
fn get_cursor(&self) -> Arc<AtomicSequenceOrdered> {
self.cursor.clone()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ where
pub struct BarrierScope<'a, S: Sequencer, D: DataProvider<T>, T> {
sequencer: S,
data_provider: Arc<D>,
gating_sequences: Vec<Arc<AtomicSequence>>,
cursors: Vec<Arc<AtomicSequence>>,
gating_sequences: Vec<Arc<AtomicSequenceOrdered>>,
cursors: Vec<Arc<AtomicSequenceOrdered>>,
event_handlers: Vec<Box<dyn Runnable + 'a>>,
_element: PhantomData<T>,
}
Expand All @@ -144,7 +144,7 @@ where
{
with_sequencer: WithSequencer<S, W, D, T>,
event_handlers: Vec<Box<dyn Runnable + 'a>>,
gating_sequences: Vec<Arc<AtomicSequence>>,
gating_sequences: Vec<Arc<AtomicSequenceOrdered>>,
}

impl RustDisruptorBuilder {
Expand Down
4 changes: 3 additions & 1 deletion dcl_data_structures/src/ring_buffer/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ pub use crate::ring_buffer::executor::*;
pub use crate::ring_buffer::producer::multi_producer::*;
pub use crate::ring_buffer::producer::single_producer::*;
pub use crate::ring_buffer::ringbuffer::const_array_ring_buffer::*;
pub use crate::ring_buffer::sequence::atomic_sequence::*;
pub use crate::ring_buffer::sequence::atomic_sequence_ordered::*;
pub use crate::ring_buffer::sequence::atomic_sequence_relaxed::*;
pub use crate::ring_buffer::sequence::Sequence;
pub use crate::ring_buffer::traits::*;
pub use crate::ring_buffer::utils::*;
pub use crate::ring_buffer::wait_strategy::*;
36 changes: 20 additions & 16 deletions dcl_data_structures/src/ring_buffer/producer/multi_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@ use std::sync::{
/// * `W` - The wait strategy used for coordinating between producers and consumers
pub struct MultiProducerSequencer<W: WaitStrategy> {
/// The current cursor position in the ring buffer
cursor: Arc<AtomicSequence>,
cursor: Arc<AtomicSequenceOrdered>,
/// The strategy used for waiting when the buffer is full
wait_strategy: Arc<W>,
/// Sequences that this producer must wait for before overwriting slots
gating_sequences: Vec<Arc<AtomicSequence>>,
gating_sequences: Vec<Arc<AtomicSequenceOrdered>>,
/// Size of the ring buffer
buffer_size: usize,
/// Tracks the highest claimed sequence
high_watermark: AtomicSequence,
high_watermark: AtomicSequenceOrdered,
/// Bitmap tracking which sequences are ready for publishing
ready_sequences: BitMap,
/// Flag indicating if the sequencer has been drained
is_done: Arc<AtomicBool>,
low_watermark: AtomicSequence,
low_watermark: AtomicSequenceOrdered,
}

/// Manual implementation of Clone for MultiProducerSequencer
Expand All @@ -98,10 +98,10 @@ impl<W: WaitStrategy> Clone for MultiProducerSequencer<W> {
wait_strategy: self.wait_strategy.clone(),
gating_sequences: self.gating_sequences.clone(),
buffer_size: self.buffer_size,
high_watermark: AtomicSequence::default(),
high_watermark: AtomicSequenceOrdered::default(),
ready_sequences: BitMap::new(NonZeroUsize::try_from(self.buffer_size).unwrap()),
is_done: self.is_done.clone(),
low_watermark: AtomicSequence::default(),
low_watermark: AtomicSequenceOrdered::default(),
}
}
}
Expand All @@ -115,14 +115,14 @@ impl<W: WaitStrategy> MultiProducerSequencer<W> {
/// * `wait_strategy` - The strategy to use when waiting for available slots
pub fn new(buffer_size: usize, wait_strategy: W) -> Self {
MultiProducerSequencer {
cursor: Arc::new(AtomicSequence::default()),
cursor: Arc::new(AtomicSequenceOrdered::default()),
wait_strategy: Arc::new(wait_strategy),
gating_sequences: Vec::new(),
buffer_size,
high_watermark: AtomicSequence::default(),
high_watermark: AtomicSequenceOrdered::default(),
ready_sequences: BitMap::new(NonZeroUsize::try_from(buffer_size).unwrap()),
is_done: Default::default(),
low_watermark: AtomicSequence::default(),
low_watermark: AtomicSequenceOrdered::default(),
}
}

Expand All @@ -138,7 +138,10 @@ impl<W: WaitStrategy> MultiProducerSequencer<W> {
/// `true` if there is enough space in the buffer, `false` otherwise
fn has_capacity(&self, high_watermark: Sequence, count: usize) -> bool {
self.buffer_size
> (high_watermark - min_cursor_sequence(&self.gating_sequences)) as usize + count
> (high_watermark
- get_min_cursor_sequence::<_, AtomicSequenceOrdered>(&self.gating_sequences))
as usize
+ count
}
}

Expand All @@ -164,7 +167,7 @@ impl<W: WaitStrategy> Sequencer for MultiProducerSequencer<W> {
let high_watermark = self.high_watermark.get();
if self.has_capacity(high_watermark, count) {
let end = high_watermark + count as Sequence;
if self.high_watermark.compare_exchange(high_watermark, end) {
if self.high_watermark.compare_and_swap(high_watermark, end) {
return (high_watermark + 1, end);
}
}
Expand Down Expand Up @@ -199,7 +202,7 @@ impl<W: WaitStrategy> Sequencer for MultiProducerSequencer<W> {
}

let mut current = low_watermark;
while !self.cursor.compare_exchange(current, good_to_release) {
while !self.cursor.compare_and_swap(current, good_to_release) {
current = self.cursor.get();
if current > good_to_release {
break;
Expand All @@ -222,7 +225,7 @@ impl<W: WaitStrategy> Sequencer for MultiProducerSequencer<W> {
/// A new processing sequence barrier
fn create_barrier(
&mut self,
gating_sequences: &[Arc<AtomicSequence>],
gating_sequences: &[Arc<AtomicSequenceOrdered>],
) -> ProcessingSequenceBarrier<W> {
ProcessingSequenceBarrier::new(
self.wait_strategy.clone(),
Expand All @@ -236,7 +239,7 @@ impl<W: WaitStrategy> Sequencer for MultiProducerSequencer<W> {
/// # Arguments
///
/// * `gating_sequence` - The sequence to add
fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>) {
fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequenceOrdered>) {
self.gating_sequences.push(gating_sequence.clone());
}

Expand All @@ -245,14 +248,15 @@ impl<W: WaitStrategy> Sequencer for MultiProducerSequencer<W> {
/// # Returns
///
/// The current cursor as an atomic sequence
fn get_cursor(&self) -> Arc<AtomicSequence> {
fn get_cursor(&self) -> Arc<AtomicSequenceOrdered> {
self.cursor.clone()
}

/// Drains the sequencer, preventing further event production.
fn drain(self) {
let current = self.cursor.get();
while min_cursor_sequence(&self.gating_sequences) < current {
while get_min_cursor_sequence::<_, AtomicSequenceOrdered>(&self.gating_sequences) < current
{
self.wait_strategy.signal();
}
self.is_done.store(true, Ordering::SeqCst);
Expand Down
18 changes: 10 additions & 8 deletions dcl_data_structures/src/ring_buffer/producer/single_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ pub struct Producer<D: DataProvider<T>, T, S: Sequencer> {
/// * `W` - The wait strategy used for coordinating with consumers
pub struct SingleProducerSequencer<W: WaitStrategy> {
/// The current cursor position in the ring buffer
cursor: Arc<AtomicSequence>,
cursor: Arc<AtomicSequenceOrdered>,
/// The next sequence to write to
next_write_sequence: Cell<Sequence>,
/// Cached sequence value to reduce consumer queries
cached_available_sequence: Cell<Sequence>,
/// The strategy used for waiting when the buffer is full
wait_strategy: Arc<W>,
/// Sequences that this producer must wait for before overwriting slots
gating_sequences: Vec<Arc<AtomicSequence>>,
gating_sequences: Vec<Arc<AtomicSequenceOrdered>>,
/// Size of the ring buffer
buffer_size: usize,
/// Flag indicating if the sequencer has been drained
Expand All @@ -105,7 +105,7 @@ impl<W: WaitStrategy> SingleProducerSequencer<W> {
/// * `wait_strategy` - The strategy to use when waiting for available slots
pub fn new(buffer_size: usize, wait_strategy: W) -> Self {
SingleProducerSequencer {
cursor: Arc::new(AtomicSequence::default()),
cursor: Arc::new(AtomicSequenceOrdered::default()),
next_write_sequence: Cell::new(0),
cached_available_sequence: Cell::new(Sequence::default()),
wait_strategy: Arc::new(wait_strategy),
Expand Down Expand Up @@ -139,7 +139,8 @@ impl<W: WaitStrategy> Sequencer for SingleProducerSequencer<W> {
let (start, end) = (next, next + (count - 1) as Sequence);

while min_sequence + (self.buffer_size as Sequence) < end {
min_sequence = min_cursor_sequence(&self.gating_sequences);
min_sequence =
get_min_cursor_sequence::<_, AtomicSequenceOrdered>(&self.gating_sequences);
}

self.cached_available_sequence.set(min_sequence);
Expand Down Expand Up @@ -171,7 +172,7 @@ impl<W: WaitStrategy> Sequencer for SingleProducerSequencer<W> {
/// A new processing sequence barrier
fn create_barrier(
&mut self,
gating_sequences: &[Arc<AtomicSequence>],
gating_sequences: &[Arc<AtomicSequenceOrdered>],
) -> ProcessingSequenceBarrier<W> {
ProcessingSequenceBarrier::new(
self.wait_strategy.clone(),
Expand All @@ -185,7 +186,7 @@ impl<W: WaitStrategy> Sequencer for SingleProducerSequencer<W> {
/// # Arguments
///
/// * `gating_sequence` - The sequence to add
fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>) {
fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequenceOrdered>) {
self.gating_sequences.push(gating_sequence.clone());
}

Expand All @@ -194,14 +195,15 @@ impl<W: WaitStrategy> Sequencer for SingleProducerSequencer<W> {
/// # Returns
///
/// The current cursor as an atomic sequence
fn get_cursor(&self) -> Arc<AtomicSequence> {
fn get_cursor(&self) -> Arc<AtomicSequenceOrdered> {
self.cursor.clone()
}

/// Drains the sequencer, preventing further event production.
fn drain(self) {
let current = self.next_write_sequence.take() - 1;
while min_cursor_sequence(&self.gating_sequences) < current {
while get_min_cursor_sequence::<_, AtomicSequenceOrdered>(&self.gating_sequences) < current
{
self.wait_strategy.signal();
}
self.is_done.store(true, Ordering::SeqCst);
Expand Down
Loading

0 comments on commit a281918

Please sign in to comment.