Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rework the event system of sc-network #14197

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
cb99f5c
Take notification configs
altonen Mar 14, 2023
471616a
Return `NonDefaultSetConfig` from `TransactionsHandlerPrototype::new()`
altonen Mar 7, 2023
c456edd
Make fields of `NonDefaultSetConfig` private
altonen Mar 14, 2023
5593a53
Introduce `NotificationService`
altonen Mar 14, 2023
22cd46c
Add implementation of `NotificationService`
altonen Mar 14, 2023
b42b901
Initialize command streams and protocol handles in `Notifications`
altonen Mar 16, 2023
9685625
Poll commands from protocols in `Notifications`
altonen Mar 16, 2023
b93cf90
Start using `tracing_unbounded` for `Notifications` -> protocols channel
altonen Mar 20, 2023
8cc85c6
Send `NotificationEvent`s from `Notifications`
altonen Mar 20, 2023
f4f9090
Start using `NotificationService` for transactions protocol
altonen Mar 20, 2023
796f826
Implement `clone()` for `NotificationService`
altonen Mar 22, 2023
00a093a
Fix `BEEFY`, `NetworkGossip` and `GRANDPA` tests
altonen Apr 11, 2023
a3746dc
Start using `NotificationService` for `SyncingEngine`
altonen Apr 24, 2023
15d093b
Validate inbound substream before emitting `NotificationStreamOpened`
altonen Apr 25, 2023
513ca87
Convert statement store to use `NotificationService`
altonen May 9, 2023
2e9da9c
Split `NotificationService` files logically
altonen May 16, 2023
1fd9a08
Add getter for peer's `NotificationsSink`
altonen May 23, 2023
94e1e79
Merge remote-tracking branch 'origin/master' into notification-service
altonen May 25, 2023
b19ccb8
Apply suggestions from code review
altonen May 25, 2023
37db9f6
Get peer count from `SyncingEngine` in `sc-network` tests
altonen May 26, 2023
4e2ad89
Apply review comments
altonen May 26, 2023
3441cfa
Merge remote-tracking branch 'origin/master' into notification-service
altonen May 26, 2023
182fdc7
Apply suggestions from code review
altonen May 26, 2023
440402c
Introduce `NotificationEvent::NotificationSinkReplaced`
altonen May 26, 2023
96d2250
Refactor substream acceptance in `Notifications`
altonen May 26, 2023
fd41d7d
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jun 5, 2023
ea56743
Rename `Peerset` functions
altonen Jun 5, 2023
1097738
Add metrics
altonen Jun 6, 2023
140b830
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jun 6, 2023
711761d
Introduce `MessageSink`
altonen Jun 14, 2023
294fd49
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jul 17, 2023
b9368f8
Merge remote-tracking branch 'origin/master' into notification-service
altonen Jul 24, 2023
258fd5a
Minor fixes
altonen Jul 24, 2023
1132424
Store peer role in `PeerStore`
altonen Jul 25, 2023
4b02a3d
Don't return `Result` for `send_sync_notification()`
altonen Jul 25, 2023
c3ff587
Do not pass Prometheus registry to `notification_service()`
altonen Jul 25, 2023
b8e2fcc
Fix metrics
altonen Aug 3, 2023
1b84a72
Merge remote-tracking branch 'origin/master' into notification-service
altonen Aug 3, 2023
aad966a
Rework peer role detection
altonen Aug 4, 2023
295a6a7
Remove rejected peers from `ProtocolController`
altonen Aug 4, 2023
fe8eeed
Fix BEEFY test
altonen Aug 7, 2023
ccb3beb
Remove dead code
altonen Aug 7, 2023
d38b53b
Start using `NotificationService` properly in `SyncingEngine`
altonen Aug 8, 2023
a84000f
Minor code cleanups
altonen Aug 9, 2023
92da646
Fix warnings
altonen Aug 9, 2023
18d948e
Fix documentation
altonen Aug 10, 2023
b7c3b68
Apply review comments
altonen Aug 10, 2023
10bdc28
Fix documentation
altonen Aug 10, 2023
e5e6af0
Apply suggestions from code review
altonen Aug 11, 2023
25c12b2
Apply review comments
altonen Aug 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
&config.chain_spec,
);
net_config.add_notification_protocol(sc_consensus_grandpa::grandpa_peers_set_config(
grandpa_protocol_name.clone(),
));
let (grandpa_protocol_config, grandpa_notification_service) =
sc_consensus_grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone());
net_config.add_notification_protocol(grandpa_protocol_config);

let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
Expand Down Expand Up @@ -315,6 +315,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
link: grandpa_link,
network,
sync: Arc::new(sync_service),
notification_service: grandpa_notification_service,
voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state: SharedVoterState::empty(),
Expand Down
28 changes: 15 additions & 13 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,20 @@ pub fn new_full_base(
&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
&config.chain_spec,
);
net_config.add_notification_protocol(grandpa::grandpa_peers_set_config(
grandpa_protocol_name.clone(),
));

let statement_handler_proto = sc_network_statement::StatementHandlerPrototype::new(
client
.block_hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
config.chain_spec.fork_id(),
);
net_config.add_notification_protocol(statement_handler_proto.set_config());
let (grandpa_protocol_config, grandpa_notification_service) =
grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone());
net_config.add_notification_protocol(grandpa_protocol_config);

let (statement_handler_proto, statement_config) =
sc_network_statement::StatementHandlerPrototype::new(
client
.block_hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
config.chain_spec.fork_id(),
);
net_config.add_notification_protocol(statement_config);

let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
Expand Down Expand Up @@ -551,6 +552,7 @@ pub fn new_full_base(
link: grandpa_link,
network: network.clone(),
sync: Arc::new(sync_service.clone()),
notification_service: grandpa_notification_service,
telemetry: telemetry.as_ref().map(|x| x.handle()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
prometheus_registry: prometheus_registry.clone(),
Expand Down
12 changes: 9 additions & 3 deletions client/consensus/beefy/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ pub(crate) mod beefy_protocol_name {
/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`].
pub fn beefy_peers_set_config(
gossip_protocol_name: sc_network::ProtocolName,
) -> sc_network::config::NonDefaultSetConfig {
let mut cfg = sc_network::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
) -> (sc_network::config::NonDefaultSetConfig, Box<dyn sc_network::NotificationService>) {
let (mut cfg, notification_service) = sc_network::config::NonDefaultSetConfig::new(
gossip_protocol_name,
Vec::new(),
1024 * 1024,
None,
Default::default(),
);
cfg.allow_non_reserved(25, 25);
cfg
(cfg, notification_service)
}

// cost scalars for reporting peers.
Expand Down
6 changes: 5 additions & 1 deletion client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::{NetworkRequest, ProtocolName};
use sc_network::{NetworkRequest, NotificationService, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
Expand Down Expand Up @@ -180,6 +180,8 @@ pub struct BeefyNetworkParams<B: Block, N, S> {
pub network: Arc<N>,
/// Syncing service implementing a sync oracle and an event stream for peers.
pub sync: Arc<S>,
/// Handle for receiving notification events.
pub notification_service: Box<dyn NotificationService>,
/// Chain specific BEEFY gossip protocol name. See
/// [`communication::beefy_protocol_name::gossip_protocol_name`].
pub gossip_protocol_name: ProtocolName,
Expand Down Expand Up @@ -245,6 +247,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let BeefyNetworkParams {
network,
sync,
notification_service,
gossip_protocol_name,
justifications_protocol_name,
..
Expand All @@ -259,6 +262,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
notification_service,
gossip_protocol_name,
gossip_validator.clone(),
None,
Expand Down
41 changes: 37 additions & 4 deletions client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use substrate_test_runtime_client::{BlockBuilderExt, ClientExt};
use tokio::time::Duration;

const GENESIS_HASH: H256 = H256::zero();
fn beefy_gossip_proto_name() -> ProtocolName {
pub(crate) fn beefy_gossip_proto_name() -> ProtocolName {
gossip_protocol_name(GENESIS_HASH, None)
}

Expand Down Expand Up @@ -370,6 +370,7 @@ async fn voter_init_setup(
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
"/beefy/whatever",
gossip_validator,
None,
Expand All @@ -391,6 +392,14 @@ where
{
let tasks = FuturesUnordered::new();

let mut notification_services = peers
.iter()
.map(|(peer_id, _, _)| {
let peer = &mut net.peers[*peer_id];
(*peer_id, peer.take_notification_service(&beefy_gossip_proto_name()).unwrap())
})
.collect::<std::collections::HashMap<_, _>>();

for (peer_id, key, api) in peers.into_iter() {
let peer = &net.peers[peer_id];

Expand All @@ -408,6 +417,7 @@ where
let network_params = crate::BeefyNetworkParams {
network: peer.network_service().clone(),
sync: peer.sync_service().clone(),
notification_service: notification_services.remove(&peer_id).unwrap(),
gossip_protocol_name: beefy_gossip_proto_name(),
justifications_protocol_name: on_demand_justif_handler.protocol_name(),
_phantom: PhantomData,
Expand Down Expand Up @@ -1031,7 +1041,25 @@ async fn should_initialize_voter_at_custom_genesis() {
net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
//
// NOTE: code from `voter_init_setup()` is moved here because the new network event system
// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
// first `GossipEngine`
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let (gossip_validator, _) = GossipValidator::new(known_peers);
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
"/beefy/whatever",
gossip_validator,
None,
);
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1).unwrap();

// Test initialization at session boundary.
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
Expand Down Expand Up @@ -1061,7 +1089,11 @@ async fn should_initialize_voter_at_custom_genesis() {

net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
// load persistent state - state preset in DB, but with different pallet genesis
let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let new_persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1).unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
Expand Down Expand Up @@ -1309,7 +1341,7 @@ async fn gossipped_finality_proofs() {
let api = Arc::new(TestApi::with_validator_set(&validator_set));
let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect();

let charlie = &net.peers[2];
let charlie = &mut net.peers[2];
let known_peers = Arc::new(Mutex::new(KnownPeers::<Block>::new()));
// Charlie will run just the gossip engine and not the full voter.
let (gossip_validator, _) = GossipValidator::new(known_peers);
Expand All @@ -1322,6 +1354,7 @@ async fn gossipped_finality_proofs() {
let mut charlie_gossip_engine = sc_network_gossip::GossipEngine::new(
charlie.network_service().clone(),
charlie.sync_service().clone(),
charlie.take_notification_service(&beefy_gossip_proto_name()).unwrap(),
beefy_gossip_proto_name(),
charlie_gossip_validator.clone(),
None,
Expand Down
4 changes: 4 additions & 0 deletions client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,12 +1125,16 @@ pub(crate) mod tests {
let api = Arc::new(TestApi::with_validator_set(&genesis_validator_set));
let network = peer.network_service().clone();
let sync = peer.sync_service().clone();
let notification_service = peer
.take_notification_service(&crate::tests::beefy_gossip_proto_name())
.unwrap();
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
notification_service,
"/beefy/1",
gossip_validator.clone(),
None,
Expand Down
4 changes: 3 additions & 1 deletion client/consensus/grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use finality_grandpa::{
Message::{Precommit, Prevote, PrimaryPropose},
};
use parity_scale_codec::{Decode, DecodeAll, Encode};
use sc_network::{NetworkBlock, NetworkSyncForkRequest, ReputationChange};
use sc_network::{NetworkBlock, NetworkSyncForkRequest, NotificationService, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sp_keystore::KeystorePtr;
Expand Down Expand Up @@ -247,6 +247,7 @@ impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
pub(crate) fn new(
service: N,
sync: S,
notification_service: Box<dyn NotificationService>,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
prometheus_registry: Option<&Registry>,
Expand All @@ -260,6 +261,7 @@ impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(),
sync.clone(),
notification_service,
protocol,
validator.clone(),
prometheus_registry,
Expand Down
Loading