From bcce89e022f1bf4a3276aa82800492481f2e9514 Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Wed, 5 Jun 2024 13:48:14 +0200 Subject: [PATCH] Fixed unstable `gossipsub_broadcast_tx_with_accept` test (#1921) I just updated the test to wait for peers to subscribe before publishing the message. ### Before requesting review - [x] I have reviewed the code myself --- CHANGELOG.md | 1 + crates/services/p2p/src/p2p_service.rs | 122 ++++++++++++------ .../src/backward_compatibility.rs | 4 - 3 files changed, 81 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 86c8ac67e52..7237bb715c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [#1888](https://github.com/FuelLabs/fuel-core/pull/1888): Upgraded `fuel-vm` to `0.51.0`. See [release](https://github.com/FuelLabs/fuel-vm/releases/tag/v0.51.0) for more information. ### Fixed +- [#1921](https://github.com/FuelLabs/fuel-core/pull/1921): Fixed unstable `gossipsub_broadcast_tx_with_accept` test. - [#1915](https://github.com/FuelLabs/fuel-core/pull/1915): Fixed reconnection issue in the dev cluster with AWS cluster. - [#1914](https://github.com/FuelLabs/fuel-core/pull/1914): Fixed halting of the node during synchronization in PoA service. diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 71ac712a36c..41ef9fb8109 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -13,6 +13,7 @@ use crate::{ }, gossipsub::{ messages::{ + GossipTopicTag, GossipsubBroadcastRequest, GossipsubMessage as FuelGossipsubMessage, }, @@ -151,6 +152,10 @@ pub enum FuelP2PEvent { topic_hash: TopicHash, message: FuelGossipsubMessage, }, + NewSubscription { + peer_id: PeerId, + tag: GossipTopicTag, + }, InboundRequestMessage { request_id: InboundRequestId, request_message: RequestMessage, @@ -454,6 +459,18 @@ impl FuelP2PService { &self.peer_manager } + fn get_topic_tag(&self, topic_hash: &TopicHash) -> Option { + let topic = self + .network_metadata + .gossipsub_data + .topics + .get_gossipsub_tag(topic_hash); + if topic.is_none() { + warn!(target: "fuel-p2p", "GossipTopicTag does not exist for {:?}", &topic_hash); + } + topic + } + fn handle_behaviour_event( &mut self, event: FuelBehaviourEvent, @@ -474,27 +491,20 @@ impl FuelP2PService { &mut self, event: gossipsub::Event, ) -> Option { - if let gossipsub::Event::Message { - propagation_source, - message, - message_id, - } = event - { - if let Some(correct_topic) = self - .network_metadata - .gossipsub_data - .topics - .get_gossipsub_tag(&message.topic) - { + match event { + gossipsub::Event::Message { + propagation_source, + message, + message_id, + } => { + let correct_topic = self.get_topic_tag(&message.topic)?; match self.network_codec.decode(&message.data, correct_topic) { - Ok(decoded_message) => { - return Some(FuelP2PEvent::GossipsubMessage { - peer_id: propagation_source, - message_id, - topic_hash: message.topic, - message: decoded_message, - }) - } + Ok(decoded_message) => Some(FuelP2PEvent::GossipsubMessage { + peer_id: propagation_source, + message_id, + topic_hash: message.topic, + message: decoded_message, + }), Err(err) => { warn!(target: "fuel-p2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err); @@ -503,13 +513,16 @@ impl FuelP2PService { propagation_source, MessageAcceptance::Reject, ); + None } } - } else { - warn!(target: "fuel-p2p", "GossipTopicTag does not exist for {:?}", &message.topic); } + gossipsub::Event::Subscribed { peer_id, topic } => { + let tag = self.get_topic_tag(&topic)?; + Some(FuelP2PEvent::NewSubscription { peer_id, tag }) + } + _ => None, } - None } fn handle_peer_report_event( @@ -1223,21 +1236,37 @@ mod tests { #[tokio::test] #[instrument] async fn gossipsub_broadcast_tx_with_accept() { - gossipsub_broadcast( - GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default_test_tx())), - GossipsubMessageAcceptance::Accept, - ) - .await; + for _ in 0..100 { + tokio::time::timeout( + Duration::from_secs(5), + gossipsub_broadcast( + GossipsubBroadcastRequest::NewTx(Arc::new( + Transaction::default_test_tx(), + )), + GossipsubMessageAcceptance::Accept, + ), + ) + .await + .unwrap(); + } } #[tokio::test] #[instrument] async fn gossipsub_broadcast_tx_with_reject() { - gossipsub_broadcast( - GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default_test_tx())), - GossipsubMessageAcceptance::Reject, - ) - .await; + for _ in 0..100 { + tokio::time::timeout( + Duration::from_secs(5), + gossipsub_broadcast( + GossipsubBroadcastRequest::NewTx(Arc::new( + Transaction::default_test_tx(), + )), + GossipsubMessageAcceptance::Reject, + ), + ) + .await + .unwrap(); + } } #[tokio::test] @@ -1395,23 +1424,32 @@ mod tests { .behaviour_mut() .block_peer(node_a.local_peer_id); + let mut a_connected_to_b = false; + let mut b_connected_to_c = false; loop { + // verifies that we've got at least a single peer address to send message to + if a_connected_to_b && b_connected_to_c && !message_sent { + message_sent = true; + let broadcast_request = broadcast_request.clone(); + node_a.publish_message(broadcast_request).unwrap(); + } + tokio::select! { node_a_event = node_a.next_event() => { - if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event { - if node_a.peer_manager.get_peer_info(&peer_id).is_some() { - // verifies that we've got at least a single peer address to send message to - if !message_sent { - message_sent = true; - let broadcast_request = broadcast_request.clone(); - let _ = node_a.publish_message(broadcast_request); - } + if let Some(FuelP2PEvent::NewSubscription { peer_id, .. }) = &node_a_event { + if peer_id == &node_b.local_peer_id { + a_connected_to_b = true; } } - tracing::info!("Node A Event: {:?}", node_a_event); }, node_b_event = node_b.next_event() => { + if let Some(FuelP2PEvent::NewSubscription { peer_id, .. }) = &node_b_event { + if peer_id == &node_c.local_peer_id { + b_connected_to_c = true; + } + } + if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, message_id, peer_id }) = node_b_event.clone() { // Message Validation must be reported // If it's `Accept`, Node B will propagate the message to Node C diff --git a/version-compatibility/forkless-upgrade/src/backward_compatibility.rs b/version-compatibility/forkless-upgrade/src/backward_compatibility.rs index 277e8b40afb..fc39bd79910 100644 --- a/version-compatibility/forkless-upgrade/src/backward_compatibility.rs +++ b/version-compatibility/forkless-upgrade/src/backward_compatibility.rs @@ -85,10 +85,6 @@ async fn latest_binary_is_backward_compatible_and_follows_blocks_created_by_gene genesis_multiaddr.as_str(), "--peering-port", "0", - // We need to set the native executor version to 1 to be - // sure it is not zero to force the usage of the WASM executor - "--native-executor-version", - "1", ]) .await .unwrap();