Skip to content

Commit

Permalink
feat: add stream helper types (paradigmxyz#10163)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Aug 7, 2024
1 parent 9ad7ebb commit 31b5548
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/payload/builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-ethereum-engine-primitives.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
futures-util.workspace = true
pin-project.workspace = true

# metrics
reth-metrics.workspace = true
Expand Down
77 changes: 76 additions & 1 deletion crates/payload/builder/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use reth_payload_primitives::PayloadTypes;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::broadcast;
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
Stream, StreamExt,
};
use tracing::debug;

/// Payload builder events.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -34,4 +39,74 @@ impl<Engine: PayloadTypes + 'static> PayloadEvents<Engine> {
let mut event_stream = self.into_stream();
event_stream.next().await
}

/// Returns a new stream that yields all built payloads.
pub fn into_built_payload_stream(self) -> BuiltPayloadStream<Engine> {
BuiltPayloadStream { st: self.into_stream() }
}

/// Returns a new stream that yields received payload attributes
pub fn into_attributes_stream(self) -> PayloadAttributeStream<Engine> {
PayloadAttributeStream { st: self.into_stream() }
}
}

/// A stream that yields built payloads.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct BuiltPayloadStream<T: PayloadTypes> {
/// The stream of events.
#[pin]
st: BroadcastStream<Events<T>>,
}

impl<T: PayloadTypes + 'static> Stream for BuiltPayloadStream<T> {
type Item = T::BuiltPayload;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
Some(Ok(Events::Attributes(_))) => {
// ignoring attributes
continue
}
Some(Err(err)) => {
debug!(%err, "payload event stream stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
}
}

/// A stream that yields received payload attributes
#[derive(Debug)]
#[pin_project::pin_project]
pub struct PayloadAttributeStream<T: PayloadTypes> {
/// The stream of events.
#[pin]
st: BroadcastStream<Events<T>>,
}

impl<T: PayloadTypes + 'static> Stream for PayloadAttributeStream<T> {
type Item = T::PayloadBuilderAttributes;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
Some(Ok(Events::BuiltPayload(_))) => {
// ignoring payloads
continue
}
Some(Err(err)) => {
debug!(%err, "payload event stream stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
}
}

0 comments on commit 31b5548

Please sign in to comment.