Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): handler for StreamMessages #823

Merged
merged 34 commits into from
Sep 26, 2024
Merged

Conversation

guy-starkware
Copy link
Contributor

@guy-starkware guy-starkware commented Sep 16, 2024

Create an object called StreamHandler that can buffer StreamMessages that arrive in any order, and output them in order as soon as any messages are in contiguous order.


This change is Reviewable

@guy-starkware guy-starkware changed the title Handler for StreamMessages Feat(consensus): Handler for StreamMessages Sep 17, 2024
Copy link

codecov bot commented Sep 17, 2024

Codecov Report

Attention: Patch coverage is 92.85714% with 3 lines in your changes missing coverage. Please review.

Project coverage is 61.67%. Comparing base (b0cfe82) to head (3ee7a86).
Report is 99 commits behind head on main.

Files with missing lines Patch % Lines
...sequencing/papyrus_consensus/src/stream_handler.rs 92.85% 2 Missing and 1 partial ⚠️

❗ There is a different number of reports uploaded between BASE (b0cfe82) and HEAD (3ee7a86). Click for more details.

HEAD has 2 uploads less than BASE
Flag BASE (b0cfe82) HEAD (3ee7a86)
3 1
Additional details and impacted files
@@             Coverage Diff             @@
##             main     #823       +/-   ##
===========================================
- Coverage   74.18%   61.67%   -12.52%     
===========================================
  Files         359      244      -115     
  Lines       36240    31628     -4612     
  Branches    36240    31628     -4612     
===========================================
- Hits        26886    19506     -7380     
- Misses       7220    10468     +3248     
+ Partials     2134     1654      -480     
Flag Coverage Δ
61.67% <92.85%> (-12.52%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

Benchmark movements:
tree_computation_flow performance improved 😺
tree_computation_flow time: [66.196 ms 66.367 ms 66.619 ms]
change: [-8.1790% -4.8341% -1.9513%] (p = 0.00 < 0.05)
Performance has improved.
Found 11 outliers among 100 measurements (11.00%)
6 (6.00%) high mild
5 (5.00%) high severe

Copy link

Benchmark movements:
tree_computation_flow performance improved 😺
tree_computation_flow time: [67.819 ms 67.908 ms 68.011 ms]
change: [-8.2338% -4.7933% -1.8521%] (p = 0.00 < 0.05)
Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
4 (4.00%) high mild
2 (2.00%) high severe

full_committer_flow performance regressed!
full_committer_flow time: [49.092 ms 49.160 ms 49.231 ms]
change: [+1.4215% +2.2385% +2.8188%] (p = 0.00 < 0.05)
Performance has regressed.
Found 4 outliers among 100 measurements (4.00%)
3 (3.00%) high mild
1 (1.00%) high severe

@guy-starkware guy-starkware changed the title Feat(consensus): Handler for StreamMessages feat(consensus): Handler for StreamMessages Sep 17, 2024
Copy link

Benchmark movements:
tree_computation_flow performance improved 😺
tree_computation_flow time: [66.893 ms 67.045 ms 67.219 ms]
change: [-8.5317% -5.2171% -2.2755%] (p = 0.00 < 0.05)
Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
2 (2.00%) high mild
4 (4.00%) high severe

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 3 files at r4, 1 of 4 files at r8.
Reviewable status: 2 of 7 files reviewed, 10 unresolved discussions (waiting on @guy-starkware and @ShahakShama)


crates/papyrus_protobuf/Cargo.toml line 24 at r10 (raw file):

papyrus_test_utils = { workspace = true, optional = true }
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }

Is this intentional?

Code quote:

tokio = { workspace = true, features = ["full"] }

crates/sequencing/papyrus_consensus/Cargo.toml line 15 at r10 (raw file):

futures.workspace = true
lazy_static.workspace = true
log.workspace = true

Please use tracing instead

Code quote:

log.workspace = true

crates/sequencing/papyrus_consensus/src/lib.rs line 14 at r10 (raw file):

#[allow(missing_docs)]
pub mod state_machine;
#[allow(missing_docs)]

Please remove and document instead

Code quote:

#[allow(missing_docs)]

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 14 at r10 (raw file):

pub struct StreamHandlerConfig {
    pub timeout_seconds: Option<u64>,
    pub timeout_millis: Option<u64>,

Why both?

Code quote:

    pub timeout_seconds: Option<u64>,
    pub timeout_millis: Option<u64>,

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 28 at r10 (raw file):

        }
    }
}

Seems like all fields get the default value for the type. Consider removing and deriving default instead. (#[derive(Default)])

Code quote:

impl Default for StreamHandlerConfig {
    fn default() -> Self {
        StreamHandlerConfig {
            timeout_seconds: None,
            timeout_millis: None,
            max_buffer_size: None,
            max_num_streams: None,
        }
    }
}

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 38 at r10 (raw file):

    pub receiver: mpsc::Receiver<StreamMessage<T>>,

    // these dictionaries are keyed on the stream_id
  1. Try and write complete sentences in comments (Capital T, a period at the end of the sentence)

  2. IMO dictionary is more python-oriented, consider renaming to mapping.

Code quote:

these dictionaries are keyed on the stream_id

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 73 at r10 (raw file):

        let t0 = std::time::Instant::now();
        loop {
            log::debug!("Listening for messages for {} milliseconds", t0.elapsed().as_millis());

This is not accurate, right?

Code quote:

log::debug!("Listening for messages for {} milliseconds", t0.elapsed().as_millis());

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 77 at r10 (raw file):

            if let Some(timeout) = self.config.timeout_seconds {
                if t0.elapsed().as_secs() > timeout {
                    break;

What happens then? Who is responsible for keep polling?

Code quote:

 break;

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 88 at r10 (raw file):

            if let Ok(message) = self.receiver.try_next() {
                if let None = message {

No need for pattern matching, consider

Suggestion:

if message.is_none() {

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 93 at r10 (raw file):

                }

                let message = message.unwrap(); // code above handles case where message is None
  1. Please refrain from unwrap outside of tests.
  2. You can instead do something like
let message = match message {
    Some(message) => message,
    None => break,
};

Code quote:

 let message = message.unwrap();

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 125 at r10 (raw file):

                    let num_streams = self.num_buffered.len() as u64;
                    if num_streams > max_streams {
                        panic!("Max number of streams reached! {}", max_streams);

This seems like an attack vector, right?

Code quote:

panic!("Max number of streams reached! {}", max_streams);

Copy link

Benchmark movements:
tree_computation_flow performance improved 😺
tree_computation_flow time: [65.817 ms 65.921 ms 66.054 ms]
change: [-8.2551% -4.9069% -2.0853%] (p = 0.00 < 0.05)
Performance has improved.
Found 10 outliers among 100 measurements (10.00%)
5 (5.00%) high mild
5 (5.00%) high severe

@guy-starkware guy-starkware changed the title feat(consensus): Handler for StreamMessages feat(consensus): handler for StreamMessages Sep 18, 2024
@guy-starkware guy-starkware changed the title feat(consensus): handler for StreamMessages feat(consensus): handler for StreamMessages Sep 18, 2024
Copy link
Contributor

@ShahakShama ShahakShama left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 4 files at r11, 2 of 3 files at r12, 1 of 2 files at r13, all commit messages.
Reviewable status: 7 of 8 files reviewed, 30 unresolved discussions (waiting on @dan-starkware and @guy-starkware)


crates/sequencing/papyrus_consensus/src/single_height_consensus.rs line 123 at r13 (raw file):

        let block_receiver = context.validate_proposal(self.height, p2p_messages_receiver).await;
        // (ProposalContentId, ProposalFin)

Is this a debug leftover?


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 125 at r10 (raw file):

Previously, dan-starkware wrote…

This seems like an attack vector, right?

+1
IMO you can just remove max_streams for now and add a TODO to solve the issue where an attacker can cause your memory to be inflated


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 15 at r13 (raw file):

#[derive(Default)]
pub struct StreamHandlerConfig {
    /// The maximum buffer size for each stream (if None, will have not limit).

not -> no. Same below


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 23 at r13 (raw file):

/// A StreamHandler is responsible for buffering and sending messages in order.
pub struct StreamHandler<

BTreeMap should only be used in cases where the map needs to be sorted. Otherwise, use HashMap.
Try to find out which case suits which map here. Haven't deep-dived into it


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 27 at r13 (raw file):

> {
    /// Configuration for the StreamHandler (things like max buffer size, etc.).
    pub config: StreamHandlerConfig,

Why are all these fields public? a public field is a field that is part of the api. All these fields look implementation specific


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 36 at r13 (raw file):

    // All these maps are keyed on the stream_id.
    /// The next chunk_id that is expected for each stream.
    pub next_chunk_ids: BTreeMap<u64, u64>,

From where did the word chunk arrive? IMO this should be message_id


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 36 at r13 (raw file):

    // All these maps are keyed on the stream_id.
    /// The next chunk_id that is expected for each stream.
    pub next_chunk_ids: BTreeMap<u64, u64>,

Please add type aliases for StreamId and MessageId and use them instead of u64 so it's easier to understand what is what.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 47 at r13 (raw file):

    /// Each nested map is keyed by the chunk_id of the first message it contains.
    /// The messages in each nested map are stored in a contiguous sequence of messages (as a Vec).
    pub message_buffers: BTreeMap<u64, BTreeMap<u64, Vec<StreamMessage<T>>>>,

Use VecDeque. It's much more efficient for the actions you're doing here


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 47 at r13 (raw file):

    /// Each nested map is keyed by the chunk_id of the first message it contains.
    /// The messages in each nested map are stored in a contiguous sequence of messages (as a Vec).
    pub message_buffers: BTreeMap<u64, BTreeMap<u64, Vec<StreamMessage<T>>>>,

try to think of a name for the value of this map, and create a type alias for it
Or even better, change it to BTreeMap<(u64, u64), Vec<StreamMessage>>

Now that I've looked into the usage, It looks like you can just have BTreeMap<(u64, u64), StreamMessage> and in the drain code just loop until you find a hole


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 74 at r13 (raw file):

    /// Guarntees that messages are sent in order.
    pub async fn listen(&mut self) {
        let mut still_open = true;

I like the style of loop and break instead of defining a boolean variable. I think it's more rustic.
Leaving unblocking since it's a matter of taste

same below


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 80 at r13 (raw file):

            } else {
                // Err comes when the channel is open but no message was received.
                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

call next and await instead


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 86 at r13 (raw file):

    #[cfg(test)]
    pub async fn listen_with_timeout(&mut self, timeout_millis: u128) {

Erase this method and wrap the call to the first method with tokio::time::timeout


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 107 at r13 (raw file):

            Some(message) => message,
            None => {
                // Message is none in case the channel was closed!

You can panic here. This channel should never close
This allows you to change the return type, the argument to StreamMessage and the while loop above to loop


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 131 at r13 (raw file):

            .or_insert(chunk_id);

        // Check if this there are too many streams:

try to fix the grammar here


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 142 at r13 (raw file):

            if let Some(max_chunk_id) = self.max_chunk_id.get(&stream_id) {
                if *max_chunk_id > chunk_id {
                    panic!(

This shouldn't be panic. you should mark this stream as failed/abandoned/dropped, dropping any future messages from it, and report the relevant peer


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 143 at r13 (raw file):

                if *max_chunk_id > chunk_id {
                    panic!(
                        "Received fin message with chunk_id {} that is smaller than the \

This message contains a lot of implementation details. try to rephrase it in words that the user can quickly understand without looking at the code, and move the numbers to the end. Also, add the stream id:
Received fin message for stream with pending non-fin messages. stream_id: {}. fin message_id: {}, largest message_id in stream: {}


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 155 at r13 (raw file):

        if let Some(fin_chunk_id) = self.fin_chunk_id.get(&stream_id) {
            if chunk_id > *fin_chunk_id {
                panic!(

Same here, don't panic


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 156 at r13 (raw file):

            if chunk_id > *fin_chunk_id {
                panic!(
                    "Received message with chunk_id {} that is bigger than the fin_chunk_id {}",

Same here, try to improve the message


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 164 at r13 (raw file):

        // This means we can just send the message without buffering it.
        if chunk_id == *next_chunk_id {
            self.sender.try_send(message).expect("Send should succeed");

Don't panic here. Instead, drop the stream. But unlike the other cases above, don't report the peer


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 172 at r13 (raw file):

            self.store(message);
        } else {
            panic!(

Same as before, don't panic


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 173 at r13 (raw file):

        } else {
            panic!(
                "Received message with chunk_id {} that is smaller than next_chunk_id {}",

Same as before, improve the message (something along the lines of "Received multiple messages with the same stream_id and message_id")

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 4 of 11 files reviewed, 28 unresolved discussions (waiting on @dan-starkware and @ShahakShama)


crates/papyrus_protobuf/Cargo.toml line 24 at r10 (raw file):

Previously, dan-starkware wrote…

Is this intentional?

No, I don't think it is...


crates/sequencing/papyrus_consensus/Cargo.toml line 15 at r10 (raw file):

Previously, dan-starkware wrote…

Please use tracing instead

got it.


crates/sequencing/papyrus_consensus/src/lib.rs line 14 at r10 (raw file):

Previously, dan-starkware wrote…

Please remove and document instead

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 14 at r10 (raw file):

Previously, dan-starkware wrote…

Why both?

I needed millis to make the tests run faster. I'm going to get rid of these timeouts altogether.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 28 at r10 (raw file):

Previously, dan-starkware wrote…

Seems like all fields get the default value for the type. Consider removing and deriving default instead. (#[derive(Default)])

good idea.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 88 at r10 (raw file):

Previously, dan-starkware wrote…

No need for pattern matching, consider

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 93 at r10 (raw file):

Previously, dan-starkware wrote…
  1. Please refrain from unwrap outside of tests.
  2. You can instead do something like
let message = match message {
    Some(message) => message,
    None => break,
};

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 125 at r10 (raw file):

Previously, dan-starkware wrote…

This seems like an attack vector, right?

There are many ways to make this thing fail. I think we need to decide what happens in that case (I used panic but it could be other things).


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 15 at r13 (raw file):

Previously, ShahakShama wrote…

not -> no. Same below

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 23 at r13 (raw file):

Previously, ShahakShama wrote…

BTreeMap should only be used in cases where the map needs to be sorted. Otherwise, use HashMap.
Try to find out which case suits which map here. Haven't deep-dived into it

I do use the fact that they are sorted. Also, I think the performance is better for short keys, but I'm not sure if that makes a big difference.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 27 at r13 (raw file):

Previously, ShahakShama wrote…

Why are all these fields public? a public field is a field that is part of the api. All these fields look implementation specific

I thought you'd want to plug into it the channels you have from whatever code is using the handler, so it should be open to modification. I'm leaving the config, sender and receiver as public.

The other fields, I agree, we can make them private.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 36 at r13 (raw file):

Previously, ShahakShama wrote…

From where did the word chunk arrive? IMO this should be message_id

ok


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 36 at r13 (raw file):

Previously, ShahakShama wrote…

Please add type aliases for StreamId and MessageId and use them instead of u64 so it's easier to understand what is what.

good idea!


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 47 at r13 (raw file):

Previously, ShahakShama wrote…

Use VecDeque. It's much more efficient for the actions you're doing here

I'm adding messages to the end, and pulling the messages out one by one from the beginning, I think there's no advantage to a vecdeque here.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 47 at r13 (raw file):

Previously, ShahakShama wrote…

try to think of a name for the value of this map, and create a type alias for it
Or even better, change it to BTreeMap<(u64, u64), Vec<StreamMessage>>

Now that I've looked into the usage, It looks like you can just have BTreeMap<(u64, u64), StreamMessage> and in the drain code just loop until you find a hole

The way I was thinking about it, is that if most of the time you get messages in the right order, it is much more efficient to fill up a vector and drain it in a row, instead of pushing messages into a map one-at-a-time.

As for the nested maps, it does make it easier to reason about the data, for example when a stream is done, I can just remove one nested map, instead of having to loop through and find all the ones that have a first index == stream_id.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 74 at r13 (raw file):

Previously, ShahakShama wrote…

I like the style of loop and break instead of defining a boolean variable. I think it's more rustic.
Leaving unblocking since it's a matter of taste

same below

I feel very uncomfortable with "loop" but I will have to start getting used to it :)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 80 at r13 (raw file):

Previously, ShahakShama wrote…

call next and await instead

I'm not sure I understand what you mean...?


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 86 at r13 (raw file):

Previously, ShahakShama wrote…

Erase this method and wrap the call to the first method with tokio::time::timeout

I need to successfully finish the loop (with the internal timeout) so I can inspect the StreamHandler object. This is only used for tests.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 107 at r13 (raw file):

Previously, ShahakShama wrote…

You can panic here. This channel should never close
This allows you to change the return type, the argument to StreamMessage and the while loop above to loop

I don't know, how do we know this channel will never get closed? The channel code allows this to happen, IMO we should take advantage of this option and allow shutting down gracefully.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 131 at r13 (raw file):

Previously, ShahakShama wrote…

try to fix the grammar here

I am going to fix all of these panics (and badly worded messages) as soon as we agree what we should do in all these cases. We'll discuss it tomorrow and then I will fix it.


crates/sequencing/papyrus_consensus/src/single_height_consensus.rs line 123 at r13 (raw file):

Previously, ShahakShama wrote…

Is this a debug leftover?

This is from starting to work on the integration into consensus, but I'll take it out.

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 2 of 11 files reviewed, 27 unresolved discussions (waiting on @dan-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 142 at r13 (raw file):

Previously, ShahakShama wrote…

This shouldn't be panic. you should mark this stream as failed/abandoned/dropped, dropping any future messages from it, and report the relevant peer

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 143 at r13 (raw file):

Previously, ShahakShama wrote…

This message contains a lot of implementation details. try to rephrase it in words that the user can quickly understand without looking at the code, and move the numbers to the end. Also, add the stream id:
Received fin message for stream with pending non-fin messages. stream_id: {}. fin message_id: {}, largest message_id in stream: {}

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 155 at r13 (raw file):

Previously, ShahakShama wrote…

Same here, don't panic

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 156 at r13 (raw file):

Previously, ShahakShama wrote…

Same here, try to improve the message

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 164 at r13 (raw file):

Previously, ShahakShama wrote…

Don't panic here. Instead, drop the stream. But unlike the other cases above, don't report the peer

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 172 at r13 (raw file):

Previously, ShahakShama wrote…

Same as before, don't panic

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 173 at r13 (raw file):

Previously, ShahakShama wrote…

Same as before, improve the message (something along the lines of "Received multiple messages with the same stream_id and message_id")

Done.

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 7 files at r14, 2 of 2 files at r16.
Reviewable status: 5 of 11 files reviewed, 26 unresolved discussions (waiting on @ShahakShama)

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 3 files at r12.
Reviewable status: 5 of 11 files reviewed, 24 unresolved discussions (waiting on @ShahakShama)


crates/papyrus_protobuf/src/converters/consensus.rs line 124 at r16 (raw file):

            message: T::try_from(value.message)?,
            stream_id: value.stream_id,
            message_id: value.message_id,

Please move this to a preliminary PR and rebase.

Code quote:

  message_id: value.message_id,

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 26 at r16 (raw file):

#[derive(Default)]
struct StreamStats {
    // the next message_id that is expected

Suggestion:

The next message_id that is expected.

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 20 files reviewed, 22 unresolved discussions (waiting on @guy-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 17 at r21 (raw file):

Previously, guy-starkware wrote…

My thinking was that no bounds is the simplest thing to do. Do you have a number that makes sense for the maximum length?

Agree, but it might be error-prone. The bound depends on the context, this is why it is configured.
Consider removing these altogether and we will add them later on - this will make the entire process (dev, test, CR..) easier.

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 20 files reviewed, 27 unresolved discussions (waiting on @guy-starkware and @ShahakShama)

a discussion (no related file):
Please refrain from panic, trace as warn instead.



crates/sequencing/papyrus_consensus/src/stream_handler.rs line 35 at r23 (raw file):

    max_message_id: MessageId,
    // The number of messages that are currently buffered.
    num_buffered: u64,

This seems redundant, why not using len instead?

Code quote:

    // The number of messages that are currently buffered.
    num_buffered: u64,

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 62 at r23 (raw file):

    // An end of a channel used to send out the messages in the correct order.
    sender: mpsc::Sender<StreamMessage<T>>,

Shouldn't this should send T?

Code quote:

sender: mpsc::Sender<StreamMessage<T>>,

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 95 at r23 (raw file):

    // Handle the message, return true if the channel is still open.
    fn handle_message(&mut self, message: StreamMessage<T>) -> bool {

Do we need the return value here?

Code quote:

 fn handle_message(&mut self, message: StreamMessage<T>) -> bool 

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 163 at r23 (raw file):

    // Go over each vector in the buffer, push to the end of it if the message_id is contiguous.
    // If no vector has a contiguous message_id, start a new vector.

Please fix or remove the comment.

Code quote:

    // Go over each vector in the buffer, push to the end of it if the message_id is contiguous.
    // If no vector has a contiguous message_id, start a new vector.

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 192 at r23 (raw file):

    // Tries to drain as many messages as possible from the buffer (in order),
    // DOES NOT guarantee that the buffer will be empty after calling this function.
    fn drain_buffer(&mut self, stream_id: u64) {

As the comment suggests, the name has a connotation of leaving something empty. Consider renaming, maybe process_buffer.

In general, IMO, it's better if the code is self-explanatory and use comments where needed/sparsely.

Code quote:

    // DOES NOT guarantee that the buffer will be empty after calling this function.
    fn drain_buffer(&mut self, stream_id: u64) {

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 193 at r23 (raw file):

    // DOES NOT guarantee that the buffer will be empty after calling this function.
    fn drain_buffer(&mut self, stream_id: u64) {
        let data = self.stream_data.get_mut(&stream_id).unwrap();

Please refrain from unwrapping

Code quote:

.unwrap();

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 195 at r23 (raw file):

        let data = self.stream_data.get_mut(&stream_id).unwrap();
        while let Some(message) = data.message_buffer.remove(&data.next_message_id) {
            self.sender.try_send(message).expect("Send should succeed");

Consider unifying with the previous send. For example by always storing and processing the buffer.

Add a TODO to reconsider the expect here.

Code quote:

self.sender.try_send(message).expect("Send should succeed");

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 20 files reviewed, 26 unresolved discussions (waiting on @dan-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 17 at r21 (raw file):

Previously, dan-starkware wrote…

Agree, but it might be error-prone. The bound depends on the context, this is why it is configured.
Consider removing these altogether and we will add them later on - this will make the entire process (dev, test, CR..) easier.

ok


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 100 at r21 (raw file):

Previously, guy-starkware wrote…

Good idea. But I'm leaving the if so I can put the check for too many streams into this conditional (seeing the next comment below).

Removed if in favor of entry


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 35 at r23 (raw file):

Previously, dan-starkware wrote…

This seems redundant, why not using len instead?

You are right. I think this is left over from when I was using a map of vectors.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 62 at r23 (raw file):

Previously, dan-starkware wrote…

Shouldn't this should send T?

Yes that is right!


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 95 at r23 (raw file):

Previously, dan-starkware wrote…

Do we need the return value here?

No, we can take it out.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 193 at r23 (raw file):

Previously, dan-starkware wrote…

Please refrain from unwrapping

Done.


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 195 at r23 (raw file):

Previously, dan-starkware wrote…

Consider unifying with the previous send. For example by always storing and processing the buffer.

Add a TODO to reconsider the expect here.

Just to make clear: you are suggesting to store every message in the buffer and then process the buffer to see if we can send that message?

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 12 of 20 files reviewed, 18 unresolved discussions (waiting on @guy-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 195 at r23 (raw file):

Previously, guy-starkware wrote…

Just to make clear: you are suggesting to store every message in the buffer and then process the buffer to see if we can send that message?

Yes, or having a function for the send. It relates to the second point (Add a TODO to reconsider the expect here). I assume there might be reasons for failing where we should'n unwrap (maybe retry or shifting buffer utilization)

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r25, all commit messages.
Reviewable status: 14 of 20 files reviewed, 18 unresolved discussions (waiting on @guy-starkware and @ShahakShama)

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 20 files reviewed, 18 unresolved discussions (waiting on @dan-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 195 at r23 (raw file):

Previously, dan-starkware wrote…

Yes, or having a function for the send. It relates to the second point (Add a TODO to reconsider the expect here). I assume there might be reasons for failing where we should'n unwrap (maybe retry or shifting buffer utilization)

Got it. Moved it into a function so we only have to address the issue of sending in one place.

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r28, all commit messages.
Reviewable status: 15 of 20 files reviewed, 4 unresolved discussions (waiting on @guy-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 48 at r28 (raw file):

> {
    // An end of a channel used to send out the messages in the correct order.
    sender: mpsc::Sender<T>,

Sender of a Receiver of T.
Tests should check that the corresponding channel is closed after fin. I'll explain F2F

Code quote:

sender: mpsc::Sender<T>,

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 11 at r28 (raw file):

    use super::*;

    fn make_random_message(

This is not random, right?

Code quote:

make_random_message

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 25 at r28 (raw file):

    // check if two vectors are the same
    // ref: https://stackoverflow.com/a/58175659

IMO we can check equality without the stackoverflow reference.

Code quote:

    // check if two vectors are the same
    // ref: https://stackoverflow.com/a/58175659

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 26 at r28 (raw file):

    // check if two vectors are the same
    // ref: https://stackoverflow.com/a/58175659
    fn do_vecs_match<T: PartialEq>(a: &Vec<T>, b: &Vec<T>) -> bool {

Please use slice instead

Code quote:

 &Vec<T>, b: &Vec<T>

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 44 at r28 (raw file):

    #[tokio::test]
    async fn test_stream_handler_in_order() {

Please remove the test_ prefix.

Code quote:

test_stream_handler_in_order

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 15 of 20 files reviewed, 3 unresolved discussions (waiting on @dan-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 48 at r28 (raw file):

Previously, dan-starkware wrote…

Sender of a Receiver of T.
Tests should check that the corresponding channel is closed after fin. I'll explain F2F

Done.


crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 26 at r28 (raw file):

Previously, dan-starkware wrote…

Please use slice instead

I can do that but there are a lot of collect() statements in the tests that use this function, and when I turn the inputs to do_vecs_match to be slices, they no longer know into what to collect the arrays. Maybe there's a shortcut I am not aware of, but it looks like I would need to explicitely cast all those cases into Vectors and then to slices. I think it is not worth the added code.

@dan-starkware
Copy link
Collaborator

crates/sequencing/papyrus_consensus/src/stream_handler.rs line 101 at r29 (raw file):

                // If we received a message for a stream that we have not seen before,
                // we need to create a new receiver for it.
                let (sender, receiver) = mpsc::channel(100);

Please move to a const with a TODO

Code quote:

(100

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 20 files reviewed, 4 unresolved discussions (waiting on @guy-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 47 at r17 (raw file):

    #[tokio::test]
    async fn test_stream_handler_in_order() {
        let (mut h, mut tx_input, mut rx_output) = setup_test();

Please rename, everywhere

Suggestion:

stream_handler

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 78 at r17 (raw file):

    #[tokio::test]
    async fn test_stream_handler_in_reverse() {

Please remove prefix, in all tests

Code quote:

 test_stream_handler_in_reverse

@dan-starkware
Copy link
Collaborator

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 66 at r17 (raw file):

        for i in 0..10 {
            let message = rx_output
                .try_next()

Why not next?

Code quote:

.try_next()

@dan-starkware
Copy link
Collaborator

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 26 at r28 (raw file):

Previously, guy-starkware wrote…

I can do that but there are a lot of collect() statements in the tests that use this function, and when I turn the inputs to do_vecs_match to be slices, they no longer know into what to collect the arrays. Maybe there's a shortcut I am not aware of, but it looks like I would need to explicitely cast all those cases into Vectors and then to slices. I think it is not worth the added code.

Can you use turbofish? See collect's documentation

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 20 files reviewed, 4 unresolved discussions (waiting on @dan-starkware and @ShahakShama)


crates/sequencing/papyrus_consensus/src/stream_handler.rs line 101 at r29 (raw file):

Previously, dan-starkware wrote…

Please move to a const with a TODO

Done.


crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 66 at r17 (raw file):

Previously, dan-starkware wrote…

Why not next?

This kind of receiver does not have next(). Only try_next(). But I did convert the expect into unwrap to make things a little bit shorter.


crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 78 at r17 (raw file):

Previously, dan-starkware wrote…

Please remove prefix, in all tests

Yes, of course I meant to do that...

@dan-starkware
Copy link
Collaborator

crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 66 at r17 (raw file):

Previously, guy-starkware wrote…

This kind of receiver does not have next(). Only try_next(). But I did convert the expect into unwrap to make things a little bit shorter.

You can use futures_util::stream::stream::StreamExt

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r30, all commit messages.
Reviewable status: 16 of 20 files reviewed, 2 unresolved discussions (waiting on @guy-starkware)

Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 15 of 20 files reviewed, all discussions resolved (waiting on @dan-starkware)


crates/sequencing/papyrus_consensus/src/stream_handler_test.rs line 66 at r17 (raw file):

Previously, dan-starkware wrote…

You can use futures_util::stream::stream::StreamExt

Got it.

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 4 files at r31, all commit messages.
Reviewable status: 19 of 20 files reviewed, all discussions resolved (waiting on @guy-starkware)

Copy link
Collaborator

@dan-starkware dan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 4 files at r20.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @guy-starkware)

@guy-starkware guy-starkware merged commit 43c320b into main Sep 26, 2024
19 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Oct 4, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants