Skip to content

Commit

Permalink
Fixed unstable gossipsub_broadcast_tx_with_accept test (#1921)
Browse files Browse the repository at this point in the history
I just updated the test to wait for peers to subscribe before publishing
the message.

### Before requesting review
- [x] I have reviewed the code myself
  • Loading branch information
xgreenx authored Jun 5, 2024
1 parent 5536f5e commit bcce89e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [#1888](https://github.com/FuelLabs/fuel-core/pull/1888): Upgraded `fuel-vm` to `0.51.0`. See [release](https://github.com/FuelLabs/fuel-vm/releases/tag/v0.51.0) for more information.

### Fixed
- [#1921](https://github.com/FuelLabs/fuel-core/pull/1921): Fixed unstable `gossipsub_broadcast_tx_with_accept` test.
- [#1915](https://github.com/FuelLabs/fuel-core/pull/1915): Fixed reconnection issue in the dev cluster with AWS cluster.
- [#1914](https://github.com/FuelLabs/fuel-core/pull/1914): Fixed halting of the node during synchronization in PoA service.

Expand Down
122 changes: 80 additions & 42 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
},
gossipsub::{
messages::{
GossipTopicTag,
GossipsubBroadcastRequest,
GossipsubMessage as FuelGossipsubMessage,
},
Expand Down Expand Up @@ -151,6 +152,10 @@ pub enum FuelP2PEvent {
topic_hash: TopicHash,
message: FuelGossipsubMessage,
},
NewSubscription {
peer_id: PeerId,
tag: GossipTopicTag,
},
InboundRequestMessage {
request_id: InboundRequestId,
request_message: RequestMessage,
Expand Down Expand Up @@ -454,6 +459,18 @@ impl FuelP2PService {
&self.peer_manager
}

fn get_topic_tag(&self, topic_hash: &TopicHash) -> Option<GossipTopicTag> {
let topic = self
.network_metadata
.gossipsub_data
.topics
.get_gossipsub_tag(topic_hash);
if topic.is_none() {
warn!(target: "fuel-p2p", "GossipTopicTag does not exist for {:?}", &topic_hash);
}
topic
}

fn handle_behaviour_event(
&mut self,
event: FuelBehaviourEvent,
Expand All @@ -474,27 +491,20 @@ impl FuelP2PService {
&mut self,
event: gossipsub::Event,
) -> Option<FuelP2PEvent> {
if let gossipsub::Event::Message {
propagation_source,
message,
message_id,
} = event
{
if let Some(correct_topic) = self
.network_metadata
.gossipsub_data
.topics
.get_gossipsub_tag(&message.topic)
{
match event {
gossipsub::Event::Message {
propagation_source,
message,
message_id,
} => {
let correct_topic = self.get_topic_tag(&message.topic)?;
match self.network_codec.decode(&message.data, correct_topic) {
Ok(decoded_message) => {
return Some(FuelP2PEvent::GossipsubMessage {
peer_id: propagation_source,
message_id,
topic_hash: message.topic,
message: decoded_message,
})
}
Ok(decoded_message) => Some(FuelP2PEvent::GossipsubMessage {
peer_id: propagation_source,
message_id,
topic_hash: message.topic,
message: decoded_message,
}),
Err(err) => {
warn!(target: "fuel-p2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err);

Expand All @@ -503,13 +513,16 @@ impl FuelP2PService {
propagation_source,
MessageAcceptance::Reject,
);
None
}
}
} else {
warn!(target: "fuel-p2p", "GossipTopicTag does not exist for {:?}", &message.topic);
}
gossipsub::Event::Subscribed { peer_id, topic } => {
let tag = self.get_topic_tag(&topic)?;
Some(FuelP2PEvent::NewSubscription { peer_id, tag })
}
_ => None,
}
None
}

fn handle_peer_report_event(
Expand Down Expand Up @@ -1223,21 +1236,37 @@ mod tests {
#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_tx_with_accept() {
gossipsub_broadcast(
GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default_test_tx())),
GossipsubMessageAcceptance::Accept,
)
.await;
for _ in 0..100 {
tokio::time::timeout(
Duration::from_secs(5),
gossipsub_broadcast(
GossipsubBroadcastRequest::NewTx(Arc::new(
Transaction::default_test_tx(),
)),
GossipsubMessageAcceptance::Accept,
),
)
.await
.unwrap();
}
}

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_tx_with_reject() {
gossipsub_broadcast(
GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default_test_tx())),
GossipsubMessageAcceptance::Reject,
)
.await;
for _ in 0..100 {
tokio::time::timeout(
Duration::from_secs(5),
gossipsub_broadcast(
GossipsubBroadcastRequest::NewTx(Arc::new(
Transaction::default_test_tx(),
)),
GossipsubMessageAcceptance::Reject,
),
)
.await
.unwrap();
}
}

#[tokio::test]
Expand Down Expand Up @@ -1395,23 +1424,32 @@ mod tests {
.behaviour_mut()
.block_peer(node_a.local_peer_id);

let mut a_connected_to_b = false;
let mut b_connected_to_c = false;
loop {
// verifies that we've got at least a single peer address to send message to
if a_connected_to_b && b_connected_to_c && !message_sent {
message_sent = true;
let broadcast_request = broadcast_request.clone();
node_a.publish_message(broadcast_request).unwrap();
}

tokio::select! {
node_a_event = node_a.next_event() => {
if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event {
if node_a.peer_manager.get_peer_info(&peer_id).is_some() {
// verifies that we've got at least a single peer address to send message to
if !message_sent {
message_sent = true;
let broadcast_request = broadcast_request.clone();
let _ = node_a.publish_message(broadcast_request);
}
if let Some(FuelP2PEvent::NewSubscription { peer_id, .. }) = &node_a_event {
if peer_id == &node_b.local_peer_id {
a_connected_to_b = true;
}
}

tracing::info!("Node A Event: {:?}", node_a_event);
},
node_b_event = node_b.next_event() => {
if let Some(FuelP2PEvent::NewSubscription { peer_id, .. }) = &node_b_event {
if peer_id == &node_c.local_peer_id {
b_connected_to_c = true;
}
}

if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, message_id, peer_id }) = node_b_event.clone() {
// Message Validation must be reported
// If it's `Accept`, Node B will propagate the message to Node C
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ async fn latest_binary_is_backward_compatible_and_follows_blocks_created_by_gene
genesis_multiaddr.as_str(),
"--peering-port",
"0",
// We need to set the native executor version to 1 to be
// sure it is not zero to force the usage of the WASM executor
"--native-executor-version",
"1",
])
.await
.unwrap();
Expand Down

0 comments on commit bcce89e

Please sign in to comment.