Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rework the event system of sc-network #14197

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
cb99f5c
Take notification configs
altonen Mar 14, 2023
471616a
Return `NonDefaultSetConfig` from `TransactionsHandlerPrototype::new()`
altonen Mar 7, 2023
c456edd
Make fields of `NonDefaultSetConfig` private
altonen Mar 14, 2023
5593a53
Introduce `NotificationService`
altonen Mar 14, 2023
22cd46c
Add implementation of `NotificationService`
altonen Mar 14, 2023
b42b901
Initialize command streams and protocol handles in `Notifications`
altonen Mar 16, 2023
9685625
Poll commands from protocols in `Notifications`
altonen Mar 16, 2023
b93cf90
Start using `tracing_unbounded` for `Notifications` -> protocols channel
altonen Mar 20, 2023
8cc85c6
Send `NotificationEvent`s from `Notifications`
altonen Mar 20, 2023
f4f9090
Start using `NotificationService` for transactions protocol
altonen Mar 20, 2023
796f826
Implement `clone()` for `NotificationService`
altonen Mar 22, 2023
00a093a
Fix `BEEFY`, `NetworkGossip` and `GRANDPA` tests
altonen Apr 11, 2023
a3746dc
Start using `NotificationService` for `SyncingEngine`
altonen Apr 24, 2023
15d093b
Validate inbound substream before emitting `NotificationStreamOpened`
altonen Apr 25, 2023
513ca87
Convert statement store to use `NotificationService`
altonen May 9, 2023
2e9da9c
Split `NotificationService` files logically
altonen May 16, 2023
1fd9a08
Add getter for peer's `NotificationsSink`
altonen May 23, 2023
94e1e79
Merge remote-tracking branch 'origin/master' into notification-service
altonen May 25, 2023
b19ccb8
Apply suggestions from code review
altonen May 25, 2023
37db9f6
Get peer count from `SyncingEngine` in `sc-network` tests
altonen May 26, 2023
4e2ad89
Apply review comments
altonen May 26, 2023
3441cfa
Merge remote-tracking branch 'origin/master' into notification-service
altonen May 26, 2023
182fdc7
Apply suggestions from code review
altonen May 26, 2023
440402c
Introduce `NotificationEvent::NotificationSinkReplaced`
altonen May 26, 2023
96d2250
Refactor substream acceptance in `Notifications`
altonen May 26, 2023
fd41d7d
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jun 5, 2023
ea56743
Rename `Peerset` functions
altonen Jun 5, 2023
1097738
Add metrics
altonen Jun 6, 2023
140b830
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jun 6, 2023
711761d
Introduce `MessageSink`
altonen Jun 14, 2023
294fd49
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jul 17, 2023
b9368f8
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jul 24, 2023
258fd5a
Minor fixes
altonen Jul 24, 2023
1132424
Store peer role in `PeerStore`
altonen Jul 25, 2023
4b02a3d
Don't return `Result` for `send_sync_notification()`
altonen Jul 25, 2023
c3ff587
Do not pass Prometheus registry to `notification_service()`
altonen Jul 25, 2023
b8e2fcc
Fix metrics
altonen Aug 3, 2023
1b84a72
Merge remote-tracking branch 'origin/master' into notification-service
altonen Aug 3, 2023
aad966a
Rework peer role detection
altonen Aug 4, 2023
295a6a7
Remove rejected peers from `ProtocolController`
altonen Aug 4, 2023
fe8eeed
Fix BEEFY test
altonen Aug 7, 2023
ccb3beb
Remove dead code
altonen Aug 7, 2023
d38b53b
Start using `NotificationService` properly in `SyncingEngine`
altonen Aug 8, 2023
a84000f
Minor code cleanups
altonen Aug 9, 2023
92da646
Fix warnings
altonen Aug 9, 2023
18d948e
Fix documentation
altonen Aug 10, 2023
b7c3b68
Apply review comments
altonen Aug 10, 2023
10bdc28
Fix documentation
altonen Aug 10, 2023
e5e6af0
Apply suggestions from code review
altonen Aug 11, 2023
25c12b2
Apply review comments
altonen Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions client/consensus/grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parity_scale_codec::{DecodeAll, Encode};
use sc_network::{
config::{MultiaddrWithPeerId, Role},
event::Event as NetworkEvent,
service::traits::{MessageSink, NotificationEvent, NotificationService},
service::traits::{Direction, MessageSink, NotificationEvent, NotificationService},
types::ProtocolName,
Multiaddr, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NetworkSyncForkRequest, NotificationSenderError, NotificationSenderT as NotificationSender,
Expand Down Expand Up @@ -239,7 +239,6 @@ impl NotificationService for TestNotificationService {

/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, _peer: &PeerId, _notification: Vec<u8>) {
// TODO: this needs to be implemented
unimplemented!();
}

Expand All @@ -253,7 +252,7 @@ impl NotificationService for TestNotificationService {
}

/// Set handshake for the notification protocol replacing the old handshake.
async fn set_hanshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!();
}

Expand Down Expand Up @@ -472,6 +471,7 @@ fn good_commit_leads_to_relay() {
let _ = tester.notification_tx.unbounded_send(
NotificationEvent::NotificationStreamOpened {
peer: sender_id,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: Roles::FULL.encode(),
},
Expand All @@ -488,6 +488,7 @@ fn good_commit_leads_to_relay() {
let _ = tester.notification_tx.unbounded_send(
NotificationEvent::NotificationStreamOpened {
peer: receiver_id,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: Roles::FULL.encode(),
},
Expand Down Expand Up @@ -621,6 +622,7 @@ fn bad_commit_leads_to_report() {
let _ = tester.notification_tx.unbounded_send(
NotificationEvent::NotificationStreamOpened {
peer: sender_id,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: Roles::FULL.encode(),
},
Expand Down
21 changes: 4 additions & 17 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,11 @@ impl<B: BlockT> Future for GossipEngine<B> {
handshake,
result_tx,
} => {
log::debug!(
target: "gossip",
"accepting inbound substream from {peer}, handshake {handshake:?}"
);
let _ = result_tx.send(ValidationResult::Accept);
},
NotificationEvent::NotificationStreamOpened {
peer, handshake, ..
} => {
log::debug!(target: "gossip", "handshake {handshake:?}");
let Some(role) = this.network.peer_role(peer, handshake) else {
log::debug!(target: "gossip", "role for {peer} couldn't be determined");
continue
Expand Down Expand Up @@ -344,7 +339,7 @@ mod tests {
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sc_network::{
config::MultiaddrWithPeerId,
service::traits::{MessageSink, NotificationEvent},
service::traits::{Direction, MessageSink, NotificationEvent},
Event, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NotificationSenderError, NotificationSenderT as NotificationSender, NotificationService,
NotificationsSink, Roles,
Expand Down Expand Up @@ -515,28 +510,20 @@ mod tests {
rx: UnboundedReceiver<NotificationEvent>,
}

// TODO: provide implementation
#[async_trait::async_trait]
impl sc_network::service::traits::NotificationService for TestNotificationService {
/// Instruct `Notifications` to open a new substream for `peer`.
///
/// `dial_if_disconnected` informs `Notifications` whether to dial
// the peer if there is currently no active connection to it.
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!();
}

/// Instruct `Notifications` to close substream for `peer`.
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!();
}

/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, _peer: &PeerId, _notification: Vec<u8>) {
unimplemented!();
}

/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
async fn send_async_notification(
&self,
_peer: &PeerId,
Expand All @@ -545,12 +532,10 @@ mod tests {
unimplemented!();
}

/// Set handshake for the notification protocol replacing the old handshake.
async fn set_hanshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!();
}

/// Get next event from the `Notifications` event stream.
async fn next_event(&mut self) -> Option<NotificationEvent> {
self.rx.next().await
}
Expand Down Expand Up @@ -635,6 +620,7 @@ mod tests {
// Register the remote peer.
tx.send(NotificationEvent::NotificationStreamOpened {
peer: remote_peer,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: Roles::FULL.encode(),
})
Expand Down Expand Up @@ -797,6 +783,7 @@ mod tests {
// Register the remote peer.
tx.start_send(NotificationEvent::NotificationStreamOpened {
peer: remote_peer,
direction: Direction::Inbound,
negotiated_fallback: None,
handshake: Roles::FULL.encode(),
})
Expand Down
3 changes: 1 addition & 2 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ impl NonDefaultSetConfig {
}

/// Take `ProtocolHandlePair` from `NonDefaultSetConfig`
pub(crate) fn take_protocol_handle(self) -> ProtocolHandlePair {
pub fn take_protocol_handle(self) -> ProtocolHandlePair {
self.protocol_handle_pair
}

Expand Down Expand Up @@ -743,7 +743,6 @@ pub struct Params<Block: BlockT> {
/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,

// TODO(aaro): remove maybe?
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,
}
Expand Down
4 changes: 2 additions & 2 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ pub use service::{
NotificationSender as NotificationSenderT, NotificationSenderError,
NotificationSenderReady, NotificationService,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationCommand, NotificationSender,
NotificationsSink, OutboundFailure, ProtocolHandle, PublicKey,
};
pub use types::ProtocolName;

Expand Down
3 changes: 2 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ use std::{
use notifications::{Notifications, NotificationsOut};

pub use notifications::{
notification_service, NotificationsSink, NotifsHandlerError, ProtocolHandlePair, Ready,
notification_service, NotificationCommand, NotificationsSink, NotifsHandlerError,
ProtocolHandle, ProtocolHandlePair, Ready,
};

mod notifications;
Expand Down
1 change: 1 addition & 0 deletions client/network/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use sc_network_common::message::RequestId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

/// Type alias for using the message type using block type parameters.
#[allow(unused)]
pub type Message<B> = generic::Message<
<B as BlockT>::Header,
<B as BlockT>::Hash,
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
pub use self::{
behaviour::{Notifications, NotificationsOut, ProtocolConfig},
handler::{NotificationsSink, NotifsHandlerError, Ready},
service::{notification_service, ProtocolHandlePair},
service::{notification_service, NotificationCommand, ProtocolHandle, ProtocolHandlePair},
};

mod behaviour;
Expand Down
Loading