Skip to content

Commit

Permalink
Add a test of gossip message buffer limiting in PeerManager
Browse files Browse the repository at this point in the history
This adds a simple test that the gossip message buffer in
`PeerManager` is limited, including the new behavior of bypassing
the limit when the broadcast comes from the
`ChannelMessageHandler`.
  • Loading branch information
TheBlueMatt committed Oct 1, 2024
1 parent 1ce72fd commit e483e53
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 19 deletions.
80 changes: 78 additions & 2 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2725,20 +2725,21 @@ fn is_gossip_msg(type_id: u16) -> bool {

#[cfg(test)]
mod tests {
use super::*;

use crate::sign::{NodeSigner, Recipient};
use crate::events;
use crate::io;
use crate::ln::types::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
use crate::ln::{msgs, wire};
use crate::ln::msgs::{Init, LightningError, SocketAddress};
use crate::util::test_utils;

use bitcoin::Network;
use bitcoin::constants::ChainHash;
use bitcoin::secp256k1::{PublicKey, SecretKey};
use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1};

use crate::sync::{Arc, Mutex};
use core::convert::Infallible;
Expand Down Expand Up @@ -3196,6 +3197,8 @@ mod tests {
let cfgs = create_peermgr_cfgs(2);
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
let peers = create_network(2, &cfgs);

// By calling establish_connect, we trigger do_attempt_write_data between
Expand Down Expand Up @@ -3359,6 +3362,79 @@ mod tests {
assert_eq!(peer_b.peers.read().unwrap().len(), 0);
}

#[test]
fn test_gossip_flood_pause() {
use crate::routing::test_utils::channel_announcement;
use lightning_types::features::ChannelFeatures;

// Simple test which connects two nodes to a PeerManager and checks that if we run out of
// socket buffer space we'll stop forwarding gossip but still push our own gossip.
let cfgs = create_peermgr_cfgs(2);
let peers = create_network(2, &cfgs);
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);

macro_rules! drain_queues { () => {
loop {
peers[0].process_events();
peers[1].process_events();

let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
if !msg.is_empty() {
assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
continue;
}
let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
if !msg.is_empty() {
assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
continue;
}
break;
}
} }

// First, make sure all pending messages have been processed and queues drained.
drain_queues!();

let secp_ctx = Secp256k1::new();
let key = SecretKey::from_slice(&[1; 32]).unwrap();
let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement {
msg,
update_msg: None,
};

fd_a.hang_writes.store(true, Ordering::Relaxed);

// Now push an arbitrarily large number of messages and check that only
// `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
peers[0].process_events();
}

assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);

// Check that if a broadcast message comes in from the channel handler (i.e. it is an
// announcement for our own channel), it gets queued anyway.
cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
peers[0].process_events();
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);

// Finally, deliver all the messages and make sure we got the right count. Note that there
// was an extra message that had already moved from the broadcast queue to the encrypted
// message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
fd_a.hang_writes.store(false, Ordering::Relaxed);
cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
peers[0].write_buffer_space_avail(&mut fd_a).unwrap();

drain_queues!();
assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty());
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2);
}

#[test]
fn test_filter_addresses(){
// Tests the filter_addresses function.
Expand Down
28 changes: 18 additions & 10 deletions lightning/src/routing/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ use crate::sync::{self, Arc};

use crate::routing::gossip::NodeId;

// Using the same keys for LN and BTC ids
pub(crate) fn add_channel(
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
) {
let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
let node_id_1 = NodeId::from_pubkey(&node_1_pubkey);
pub(crate) fn channel_announcement(
node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures,
short_channel_id: u64, secp_ctx: &Secp256k1<All>,
) -> ChannelAnnouncement {
let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey));
let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey));

let unsigned_announcement = UnsignedChannelAnnouncement {
Expand All @@ -48,13 +46,23 @@ pub(crate) fn add_channel(
};

let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
let valid_announcement = ChannelAnnouncement {
ChannelAnnouncement {
node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
contents: unsigned_announcement.clone(),
};
}
}

// Using the same keys for LN and BTC ids
pub(crate) fn add_channel(
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
) {
let valid_announcement =
channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx);
let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) {
Ok(res) => assert!(res),
_ => panic!()
Expand Down Expand Up @@ -108,7 +116,7 @@ pub(crate) fn update_channel(

pub(super) fn get_nodes(secp_ctx: &Secp256k1<All>) -> (SecretKey, PublicKey, Vec<SecretKey>, Vec<PublicKey>) {
let privkeys: Vec<SecretKey> = (2..22).map(|i| {
SecretKey::from_slice(&<Vec<u8>>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap()
SecretKey::from_slice(&[i; 32]).unwrap()
}).collect();

let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect();
Expand Down
20 changes: 13 additions & 7 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,7 @@ pub struct TestRoutingMessageHandler {
pub chan_anns_recvd: AtomicUsize,
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
pub request_full_sync: AtomicBool,
pub announcement_available_for_sync: AtomicBool,
}

impl TestRoutingMessageHandler {
Expand All @@ -995,27 +996,32 @@ impl TestRoutingMessageHandler {
chan_anns_recvd: AtomicUsize::new(0),
pending_events: Mutex::new(vec![]),
request_full_sync: AtomicBool::new(false),
announcement_available_for_sync: AtomicBool::new(false),
}
}
}
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
fn handle_node_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
Ok(true)
}
fn handle_channel_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
Ok(true)
}
fn handle_channel_update(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
Ok(true)
}
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
let chan_upd_1 = get_dummy_channel_update(starting_point);
let chan_upd_2 = get_dummy_channel_update(starting_point);
let chan_ann = get_dummy_channel_announcement(starting_point);
if self.announcement_available_for_sync.load(Ordering::Acquire) {
let chan_upd_1 = get_dummy_channel_update(starting_point);
let chan_upd_2 = get_dummy_channel_update(starting_point);
let chan_ann = get_dummy_channel_announcement(starting_point);

Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
} else {
None
}
}

fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> {
Expand Down

0 comments on commit e483e53

Please sign in to comment.