-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(consensus): handler for StreamMessages #823
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #823 +/- ##
===========================================
- Coverage 74.18% 61.67% -12.52%
===========================================
Files 359 244 -115
Lines 36240 31628 -4612
Branches 36240 31628 -4612
===========================================
- Hits 26886 19506 -7380
- Misses 7220 10468 +3248
+ Partials 2134 1654 -480
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Benchmark movements: |
Benchmark movements: full_committer_flow performance regressed! |
Benchmark movements: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r4, 1 of 4 files at r8.
Reviewable status: 2 of 7 files reviewed, 10 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
crates/papyrus_protobuf/Cargo.toml
line 24 at r10 (raw file):
papyrus_test_utils = { workspace = true, optional = true } thiserror.workspace = true tokio = { workspace = true, features = ["full"] }
Is this intentional?
Code quote:
tokio = { workspace = true, features = ["full"] }
crates/sequencing/papyrus_consensus/Cargo.toml
line 15 at r10 (raw file):
futures.workspace = true lazy_static.workspace = true log.workspace = true
Please use tracing instead
Code quote:
log.workspace = true
crates/sequencing/papyrus_consensus/src/lib.rs
line 14 at r10 (raw file):
#[allow(missing_docs)] pub mod state_machine; #[allow(missing_docs)]
Please remove and document instead
Code quote:
#[allow(missing_docs)]
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 14 at r10 (raw file):
pub struct StreamHandlerConfig { pub timeout_seconds: Option<u64>, pub timeout_millis: Option<u64>,
Why both?
Code quote:
pub timeout_seconds: Option<u64>,
pub timeout_millis: Option<u64>,
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 28 at r10 (raw file):
} } }
Seems like all fields get the default value for the type. Consider removing and deriving default
instead. (#[derive(Default)]
)
Code quote:
impl Default for StreamHandlerConfig {
fn default() -> Self {
StreamHandlerConfig {
timeout_seconds: None,
timeout_millis: None,
max_buffer_size: None,
max_num_streams: None,
}
}
}
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 38 at r10 (raw file):
pub receiver: mpsc::Receiver<StreamMessage<T>>, // these dictionaries are keyed on the stream_id
-
Try and write complete sentences in comments (Capital T, a period at the end of the sentence)
-
IMO dictionary is more python-oriented, consider renaming to mapping.
Code quote:
these dictionaries are keyed on the stream_id
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 73 at r10 (raw file):
let t0 = std::time::Instant::now(); loop { log::debug!("Listening for messages for {} milliseconds", t0.elapsed().as_millis());
This is not accurate, right?
Code quote:
log::debug!("Listening for messages for {} milliseconds", t0.elapsed().as_millis());
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 77 at r10 (raw file):
if let Some(timeout) = self.config.timeout_seconds { if t0.elapsed().as_secs() > timeout { break;
What happens then? Who is responsible for keep polling?
Code quote:
break;
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 88 at r10 (raw file):
if let Ok(message) = self.receiver.try_next() { if let None = message {
No need for pattern matching, consider
Suggestion:
if message.is_none() {
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 93 at r10 (raw file):
} let message = message.unwrap(); // code above handles case where message is None
- Please refrain from
unwrap
outside of tests. - You can instead do something like
let message = match message {
Some(message) => message,
None => break,
};
Code quote:
let message = message.unwrap();
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 125 at r10 (raw file):
let num_streams = self.num_buffered.len() as u64; if num_streams > max_streams { panic!("Max number of streams reached! {}", max_streams);
This seems like an attack vector, right?
Code quote:
panic!("Max number of streams reached! {}", max_streams);
Benchmark movements: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 4 files at r11, 2 of 3 files at r12, 1 of 2 files at r13, all commit messages.
Reviewable status: 7 of 8 files reviewed, 30 unresolved discussions (waiting on @dan-starkware and @guy-starkware)
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 123 at r13 (raw file):
let block_receiver = context.validate_proposal(self.height, p2p_messages_receiver).await; // (ProposalContentId, ProposalFin)
Is this a debug leftover?
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 125 at r10 (raw file):
Previously, dan-starkware wrote…
This seems like an attack vector, right?
+1
IMO you can just remove max_streams for now and add a TODO to solve the issue where an attacker can cause your memory to be inflated
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 15 at r13 (raw file):
#[derive(Default)] pub struct StreamHandlerConfig { /// The maximum buffer size for each stream (if None, will have not limit).
not -> no. Same below
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 23 at r13 (raw file):
/// A StreamHandler is responsible for buffering and sending messages in order. pub struct StreamHandler<
BTreeMap should only be used in cases where the map needs to be sorted. Otherwise, use HashMap.
Try to find out which case suits which map here. Haven't deep-dived into it
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 27 at r13 (raw file):
> { /// Configuration for the StreamHandler (things like max buffer size, etc.). pub config: StreamHandlerConfig,
Why are all these fields public? a public field is a field that is part of the api. All these fields look implementation specific
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 36 at r13 (raw file):
// All these maps are keyed on the stream_id. /// The next chunk_id that is expected for each stream. pub next_chunk_ids: BTreeMap<u64, u64>,
From where did the word chunk arrive? IMO this should be message_id
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 36 at r13 (raw file):
// All these maps are keyed on the stream_id. /// The next chunk_id that is expected for each stream. pub next_chunk_ids: BTreeMap<u64, u64>,
Please add type aliases for StreamId and MessageId and use them instead of u64 so it's easier to understand what is what.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 47 at r13 (raw file):
/// Each nested map is keyed by the chunk_id of the first message it contains. /// The messages in each nested map are stored in a contiguous sequence of messages (as a Vec). pub message_buffers: BTreeMap<u64, BTreeMap<u64, Vec<StreamMessage<T>>>>,
Use VecDeque. It's much more efficient for the actions you're doing here
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 47 at r13 (raw file):
/// Each nested map is keyed by the chunk_id of the first message it contains. /// The messages in each nested map are stored in a contiguous sequence of messages (as a Vec). pub message_buffers: BTreeMap<u64, BTreeMap<u64, Vec<StreamMessage<T>>>>,
try to think of a name for the value of this map, and create a type alias for it
Or even better, change it to BTreeMap<(u64, u64), Vec<StreamMessage>>
Now that I've looked into the usage, It looks like you can just have BTreeMap<(u64, u64), StreamMessage> and in the drain code just loop until you find a hole
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 74 at r13 (raw file):
/// Guarntees that messages are sent in order. pub async fn listen(&mut self) { let mut still_open = true;
I like the style of loop and break instead of defining a boolean variable. I think it's more rustic.
Leaving unblocking since it's a matter of taste
same below
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 80 at r13 (raw file):
} else { // Err comes when the channel is open but no message was received. tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
call next and await instead
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 86 at r13 (raw file):
#[cfg(test)] pub async fn listen_with_timeout(&mut self, timeout_millis: u128) {
Erase this method and wrap the call to the first method with tokio::time::timeout
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 107 at r13 (raw file):
Some(message) => message, None => { // Message is none in case the channel was closed!
You can panic here. This channel should never close
This allows you to change the return type, the argument to StreamMessage and the while loop above to loop
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 131 at r13 (raw file):
.or_insert(chunk_id); // Check if this there are too many streams:
try to fix the grammar here
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 142 at r13 (raw file):
if let Some(max_chunk_id) = self.max_chunk_id.get(&stream_id) { if *max_chunk_id > chunk_id { panic!(
This shouldn't be panic. you should mark this stream as failed/abandoned/dropped, dropping any future messages from it, and report the relevant peer
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 143 at r13 (raw file):
if *max_chunk_id > chunk_id { panic!( "Received fin message with chunk_id {} that is smaller than the \
This message contains a lot of implementation details. try to rephrase it in words that the user can quickly understand without looking at the code, and move the numbers to the end. Also, add the stream id:
Received fin message for stream with pending non-fin messages. stream_id: {}. fin message_id: {}, largest message_id in stream: {}
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 155 at r13 (raw file):
if let Some(fin_chunk_id) = self.fin_chunk_id.get(&stream_id) { if chunk_id > *fin_chunk_id { panic!(
Same here, don't panic
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 156 at r13 (raw file):
if chunk_id > *fin_chunk_id { panic!( "Received message with chunk_id {} that is bigger than the fin_chunk_id {}",
Same here, try to improve the message
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 164 at r13 (raw file):
// This means we can just send the message without buffering it. if chunk_id == *next_chunk_id { self.sender.try_send(message).expect("Send should succeed");
Don't panic here. Instead, drop the stream. But unlike the other cases above, don't report the peer
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 172 at r13 (raw file):
self.store(message); } else { panic!(
Same as before, don't panic
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 173 at r13 (raw file):
} else { panic!( "Received message with chunk_id {} that is smaller than next_chunk_id {}",
Same as before, improve the message (something along the lines of "Received multiple messages with the same stream_id and message_id")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 4 of 11 files reviewed, 28 unresolved discussions (waiting on @dan-starkware and @ShahakShama)
crates/papyrus_protobuf/Cargo.toml
line 24 at r10 (raw file):
Previously, dan-starkware wrote…
Is this intentional?
No, I don't think it is...
crates/sequencing/papyrus_consensus/Cargo.toml
line 15 at r10 (raw file):
Previously, dan-starkware wrote…
Please use tracing instead
got it.
crates/sequencing/papyrus_consensus/src/lib.rs
line 14 at r10 (raw file):
Previously, dan-starkware wrote…
Please remove and document instead
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 14 at r10 (raw file):
Previously, dan-starkware wrote…
Why both?
I needed millis to make the tests run faster. I'm going to get rid of these timeouts altogether.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 28 at r10 (raw file):
Previously, dan-starkware wrote…
Seems like all fields get the default value for the type. Consider removing and deriving
default
instead. (#[derive(Default)]
)
good idea.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 88 at r10 (raw file):
Previously, dan-starkware wrote…
No need for pattern matching, consider
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 93 at r10 (raw file):
Previously, dan-starkware wrote…
- Please refrain from
unwrap
outside of tests.- You can instead do something like
let message = match message { Some(message) => message, None => break, };
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 125 at r10 (raw file):
Previously, dan-starkware wrote…
This seems like an attack vector, right?
There are many ways to make this thing fail. I think we need to decide what happens in that case (I used panic but it could be other things).
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 15 at r13 (raw file):
Previously, ShahakShama wrote…
not -> no. Same below
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 23 at r13 (raw file):
Previously, ShahakShama wrote…
BTreeMap should only be used in cases where the map needs to be sorted. Otherwise, use HashMap.
Try to find out which case suits which map here. Haven't deep-dived into it
I do use the fact that they are sorted. Also, I think the performance is better for short keys, but I'm not sure if that makes a big difference.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 27 at r13 (raw file):
Previously, ShahakShama wrote…
Why are all these fields public? a public field is a field that is part of the api. All these fields look implementation specific
I thought you'd want to plug into it the channels you have from whatever code is using the handler, so it should be open to modification. I'm leaving the config, sender and receiver as public.
The other fields, I agree, we can make them private.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 36 at r13 (raw file):
Previously, ShahakShama wrote…
From where did the word chunk arrive? IMO this should be message_id
ok
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 36 at r13 (raw file):
Previously, ShahakShama wrote…
Please add type aliases for StreamId and MessageId and use them instead of u64 so it's easier to understand what is what.
good idea!
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 47 at r13 (raw file):
Previously, ShahakShama wrote…
Use VecDeque. It's much more efficient for the actions you're doing here
I'm adding messages to the end, and pulling the messages out one by one from the beginning, I think there's no advantage to a vecdeque here.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 47 at r13 (raw file):
Previously, ShahakShama wrote…
try to think of a name for the value of this map, and create a type alias for it
Or even better, change it to BTreeMap<(u64, u64), Vec<StreamMessage>>Now that I've looked into the usage, It looks like you can just have BTreeMap<(u64, u64), StreamMessage> and in the drain code just loop until you find a hole
The way I was thinking about it, is that if most of the time you get messages in the right order, it is much more efficient to fill up a vector and drain it in a row, instead of pushing messages into a map one-at-a-time.
As for the nested maps, it does make it easier to reason about the data, for example when a stream is done, I can just remove one nested map, instead of having to loop through and find all the ones that have a first index == stream_id.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 74 at r13 (raw file):
Previously, ShahakShama wrote…
I like the style of loop and break instead of defining a boolean variable. I think it's more rustic.
Leaving unblocking since it's a matter of tastesame below
I feel very uncomfortable with "loop" but I will have to start getting used to it :)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 80 at r13 (raw file):
Previously, ShahakShama wrote…
call next and await instead
I'm not sure I understand what you mean...?
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 86 at r13 (raw file):
Previously, ShahakShama wrote…
Erase this method and wrap the call to the first method with tokio::time::timeout
I need to successfully finish the loop (with the internal timeout) so I can inspect the StreamHandler object. This is only used for tests.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 107 at r13 (raw file):
Previously, ShahakShama wrote…
You can panic here. This channel should never close
This allows you to change the return type, the argument to StreamMessage and the while loop above toloop
I don't know, how do we know this channel will never get closed? The channel code allows this to happen, IMO we should take advantage of this option and allow shutting down gracefully.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 131 at r13 (raw file):
Previously, ShahakShama wrote…
try to fix the grammar here
I am going to fix all of these panics (and badly worded messages) as soon as we agree what we should do in all these cases. We'll discuss it tomorrow and then I will fix it.
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 123 at r13 (raw file):
Previously, ShahakShama wrote…
Is this a debug leftover?
This is from starting to work on the integration into consensus, but I'll take it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 11 files reviewed, 27 unresolved discussions (waiting on @dan-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 142 at r13 (raw file):
Previously, ShahakShama wrote…
This shouldn't be panic. you should mark this stream as failed/abandoned/dropped, dropping any future messages from it, and report the relevant peer
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 143 at r13 (raw file):
Previously, ShahakShama wrote…
This message contains a lot of implementation details. try to rephrase it in words that the user can quickly understand without looking at the code, and move the numbers to the end. Also, add the stream id:
Received fin message for stream with pending non-fin messages. stream_id: {}. fin message_id: {}, largest message_id in stream: {}
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 155 at r13 (raw file):
Previously, ShahakShama wrote…
Same here, don't panic
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 156 at r13 (raw file):
Previously, ShahakShama wrote…
Same here, try to improve the message
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 164 at r13 (raw file):
Previously, ShahakShama wrote…
Don't panic here. Instead, drop the stream. But unlike the other cases above, don't report the peer
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 172 at r13 (raw file):
Previously, ShahakShama wrote…
Same as before, don't panic
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 173 at r13 (raw file):
Previously, ShahakShama wrote…
Same as before, improve the message (something along the lines of "Received multiple messages with the same stream_id and message_id")
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 7 files at r14, 2 of 2 files at r16.
Reviewable status: 5 of 11 files reviewed, 26 unresolved discussions (waiting on @ShahakShama)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r12.
Reviewable status: 5 of 11 files reviewed, 24 unresolved discussions (waiting on @ShahakShama)
crates/papyrus_protobuf/src/converters/consensus.rs
line 124 at r16 (raw file):
message: T::try_from(value.message)?, stream_id: value.stream_id, message_id: value.message_id,
Please move this to a preliminary PR and rebase.
Code quote:
message_id: value.message_id,
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 26 at r16 (raw file):
#[derive(Default)] struct StreamStats { // the next message_id that is expected
Suggestion:
The next message_id that is expected.
commit-id:116b21cb
commit-id:d0448e57
commit-id:266e57fb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 20 files reviewed, 22 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 17 at r21 (raw file):
Previously, guy-starkware wrote…
My thinking was that no bounds is the simplest thing to do. Do you have a number that makes sense for the maximum length?
Agree, but it might be error-prone. The bound depends on the context, this is why it is configured.
Consider removing these altogether and we will add them later on - this will make the entire process (dev, test, CR..) easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 20 files reviewed, 27 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
a discussion (no related file):
Please refrain from panic, trace as warn instead.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 35 at r23 (raw file):
max_message_id: MessageId, // The number of messages that are currently buffered. num_buffered: u64,
This seems redundant, why not using len
instead?
Code quote:
// The number of messages that are currently buffered.
num_buffered: u64,
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 62 at r23 (raw file):
// An end of a channel used to send out the messages in the correct order. sender: mpsc::Sender<StreamMessage<T>>,
Shouldn't this should send T
?
Code quote:
sender: mpsc::Sender<StreamMessage<T>>,
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 95 at r23 (raw file):
// Handle the message, return true if the channel is still open. fn handle_message(&mut self, message: StreamMessage<T>) -> bool {
Do we need the return value here?
Code quote:
fn handle_message(&mut self, message: StreamMessage<T>) -> bool
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 163 at r23 (raw file):
// Go over each vector in the buffer, push to the end of it if the message_id is contiguous. // If no vector has a contiguous message_id, start a new vector.
Please fix or remove the comment.
Code quote:
// Go over each vector in the buffer, push to the end of it if the message_id is contiguous.
// If no vector has a contiguous message_id, start a new vector.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 192 at r23 (raw file):
// Tries to drain as many messages as possible from the buffer (in order), // DOES NOT guarantee that the buffer will be empty after calling this function. fn drain_buffer(&mut self, stream_id: u64) {
As the comment suggests, the name has a connotation of leaving something empty. Consider renaming, maybe process_buffer
.
In general, IMO, it's better if the code is self-explanatory and use comments where needed/sparsely.
Code quote:
// DOES NOT guarantee that the buffer will be empty after calling this function.
fn drain_buffer(&mut self, stream_id: u64) {
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 193 at r23 (raw file):
// DOES NOT guarantee that the buffer will be empty after calling this function. fn drain_buffer(&mut self, stream_id: u64) { let data = self.stream_data.get_mut(&stream_id).unwrap();
Please refrain from unwrapping
Code quote:
.unwrap();
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 195 at r23 (raw file):
let data = self.stream_data.get_mut(&stream_id).unwrap(); while let Some(message) = data.message_buffer.remove(&data.next_message_id) { self.sender.try_send(message).expect("Send should succeed");
Consider unifying with the previous send. For example by always storing and processing the buffer.
Add a TODO to reconsider the expect
here.
Code quote:
self.sender.try_send(message).expect("Send should succeed");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 20 files reviewed, 26 unresolved discussions (waiting on @dan-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 17 at r21 (raw file):
Previously, dan-starkware wrote…
Agree, but it might be error-prone. The bound depends on the context, this is why it is configured.
Consider removing these altogether and we will add them later on - this will make the entire process (dev, test, CR..) easier.
ok
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 100 at r21 (raw file):
Previously, guy-starkware wrote…
Good idea. But I'm leaving the
if
so I can put the check for too many streams into this conditional (seeing the next comment below).
Removed if
in favor of entry
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 35 at r23 (raw file):
Previously, dan-starkware wrote…
This seems redundant, why not using
len
instead?
You are right. I think this is left over from when I was using a map of vectors.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 62 at r23 (raw file):
Previously, dan-starkware wrote…
Shouldn't this should send
T
?
Yes that is right!
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 95 at r23 (raw file):
Previously, dan-starkware wrote…
Do we need the return value here?
No, we can take it out.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 193 at r23 (raw file):
Previously, dan-starkware wrote…
Please refrain from unwrapping
Done.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 195 at r23 (raw file):
Previously, dan-starkware wrote…
Consider unifying with the previous send. For example by always storing and processing the buffer.
Add a TODO to reconsider the
expect
here.
Just to make clear: you are suggesting to store every message in the buffer and then process the buffer to see if we can send that message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 12 of 20 files reviewed, 18 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 195 at r23 (raw file):
Previously, guy-starkware wrote…
Just to make clear: you are suggesting to store every message in the buffer and then process the buffer to see if we can send that message?
Yes, or having a function for the send. It relates to the second point (Add a TODO to reconsider the expect
here). I assume there might be reasons for failing where we should'n unwrap (maybe retry or shifting buffer utilization)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r25, all commit messages.
Reviewable status: 14 of 20 files reviewed, 18 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 20 files reviewed, 18 unresolved discussions (waiting on @dan-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 195 at r23 (raw file):
Previously, dan-starkware wrote…
Yes, or having a function for the send. It relates to the second point (Add a TODO to reconsider the
expect
here). I assume there might be reasons for failing where we should'n unwrap (maybe retry or shifting buffer utilization)
Got it. Moved it into a function so we only have to address the issue of sending in one place.
9b8c854
to
6077854
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r28, all commit messages.
Reviewable status: 15 of 20 files reviewed, 4 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 48 at r28 (raw file):
> { // An end of a channel used to send out the messages in the correct order. sender: mpsc::Sender<T>,
Sender of a Receiver of T.
Tests should check that the corresponding channel is closed after fin. I'll explain F2F
Code quote:
sender: mpsc::Sender<T>,
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 11 at r28 (raw file):
use super::*; fn make_random_message(
This is not random, right?
Code quote:
make_random_message
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 25 at r28 (raw file):
// check if two vectors are the same // ref: https://stackoverflow.com/a/58175659
IMO we can check equality without the stackoverflow reference.
Code quote:
// check if two vectors are the same
// ref: https://stackoverflow.com/a/58175659
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 26 at r28 (raw file):
// check if two vectors are the same // ref: https://stackoverflow.com/a/58175659 fn do_vecs_match<T: PartialEq>(a: &Vec<T>, b: &Vec<T>) -> bool {
Please use slice instead
Code quote:
&Vec<T>, b: &Vec<T>
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 44 at r28 (raw file):
#[tokio::test] async fn test_stream_handler_in_order() {
Please remove the test_
prefix.
Code quote:
test_stream_handler_in_order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 15 of 20 files reviewed, 3 unresolved discussions (waiting on @dan-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 48 at r28 (raw file):
Previously, dan-starkware wrote…
Sender of a Receiver of T.
Tests should check that the corresponding channel is closed after fin. I'll explain F2F
Done.
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 26 at r28 (raw file):
Previously, dan-starkware wrote…
Please use slice instead
I can do that but there are a lot of collect()
statements in the tests that use this function, and when I turn the inputs to do_vecs_match
to be slices, they no longer know into what to collect the arrays. Maybe there's a shortcut I am not aware of, but it looks like I would need to explicitely cast all those cases into Vectors and then to slices. I think it is not worth the added code.
Please move to a const with a TODO Code quote: (100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 20 files reviewed, 4 unresolved discussions (waiting on @guy-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 47 at r17 (raw file):
#[tokio::test] async fn test_stream_handler_in_order() { let (mut h, mut tx_input, mut rx_output) = setup_test();
Please rename, everywhere
Suggestion:
stream_handler
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 78 at r17 (raw file):
#[tokio::test] async fn test_stream_handler_in_reverse() {
Please remove prefix, in all tests
Code quote:
test_stream_handler_in_reverse
Why not Code quote: .try_next() |
Previously, guy-starkware wrote…
Can you use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 20 files reviewed, 4 unresolved discussions (waiting on @dan-starkware and @ShahakShama)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 101 at r29 (raw file):
Previously, dan-starkware wrote…
Please move to a const with a TODO
Done.
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 66 at r17 (raw file):
Previously, dan-starkware wrote…
Why not
next
?
This kind of receiver does not have next()
. Only try_next()
. But I did convert the expect
into unwrap
to make things a little bit shorter.
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 78 at r17 (raw file):
Previously, dan-starkware wrote…
Please remove prefix, in all tests
Yes, of course I meant to do that...
Previously, guy-starkware wrote…
You can use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r30, all commit messages.
Reviewable status: 16 of 20 files reviewed, 2 unresolved discussions (waiting on @guy-starkware)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 15 of 20 files reviewed, all discussions resolved (waiting on @dan-starkware)
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 66 at r17 (raw file):
Previously, dan-starkware wrote…
You can use
futures_util::stream::stream::StreamExt
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 4 files at r31, all commit messages.
Reviewable status: 19 of 20 files reviewed, all discussions resolved (waiting on @guy-starkware)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 4 files at r20.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @guy-starkware)
Create an object called StreamHandler that can buffer StreamMessages that arrive in any order, and output them in order as soon as any messages are in contiguous order.
This change is