diff --git a/CHANGELOG.md b/CHANGELOG.md index 005dad2492f..60ed6b0081c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,12 @@ Description of the upcoming release here. ### Changed +- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p - [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor. +#### Breaking +- [#1573](https://github.com/FuelLabs/fuel-core/pull/1573): Remove nested p2p request/response encoding. Only breaks p2p networking compatibility with older fuel-core versions, but is otherwise fully internal. + ## [Version 0.22.0] ### Added @@ -26,7 +30,6 @@ Description of the upcoming release here. ### Changed -- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p - [#1517](https://github.com/FuelLabs/fuel-core/pull/1517): Changed default gossip heartbeat interval to 500ms. - [#1520](https://github.com/FuelLabs/fuel-core/pull/1520): Extract `executor` into `fuel-core-executor` crate. diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 2de9385e9d1..133e3286577 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -15,8 +15,8 @@ use crate::{ heartbeat, peer_report::PeerReportBehaviour, request_response::messages::{ - NetworkResponse, RequestMessage, + ResponseMessage, }, }; use fuel_core_types::fuel_types::BlockHeight; @@ -166,9 +166,9 @@ impl FuelBehaviour { pub fn send_response_msg( &mut self, - channel: ResponseChannel, - message: NetworkResponse, - ) -> Result<(), NetworkResponse> { + channel: ResponseChannel, + message: ResponseMessage, + ) -> Result<(), ResponseMessage> { self.request_response.send_response(channel, message) } diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index a3a186fdd84..02b2c0ba7c7 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -7,8 +7,6 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ - NetworkResponse, - OutboundResponse, RequestMessage, ResponseMessage, }, @@ -30,37 +28,14 @@ pub trait GossipsubCodec { ) -> Result; } -pub trait RequestResponseConverter { - /// Response that is ready to be converted into NetworkResponse - type OutboundResponse; - /// Response that is sent over the network - type NetworkResponse; - /// Final Response Message deserialized from IntermediateResponse - type ResponseMessage; - - fn convert_to_network_response( - &self, - res_msg: &Self::OutboundResponse, - ) -> Result; - - fn convert_to_response( - &self, - inter_msg: &Self::NetworkResponse, - ) -> Result; -} - /// Main Codec trait /// Needs to be implemented and provided to FuelBehaviour pub trait NetworkCodec: GossipsubCodec< RequestMessage = GossipsubBroadcastRequest, ResponseMessage = GossipsubMessage, - > + RequestResponseCodec - + RequestResponseConverter< - NetworkResponse = NetworkResponse, - OutboundResponse = OutboundResponse, - ResponseMessage = ResponseMessage, - > + Clone + > + RequestResponseCodec + + Clone + Send + 'static { diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 01cf2e361d7..1af88d08f18 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,7 +1,6 @@ use super::{ GossipsubCodec, NetworkCodec, - RequestResponseConverter, }; use crate::{ gossipsub::messages::{ @@ -10,8 +9,6 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ - NetworkResponse, - OutboundResponse, RequestMessage, ResponseMessage, REQUEST_RESPONSE_PROTOCOL_ID, @@ -30,6 +27,18 @@ use serde::{ }; use std::io; +/// Helper method for decoding data +/// Reusable across `RequestResponseCodec` and `GossipsubCodec` +fn deserialize<'a, R: Deserialize<'a>>(encoded_data: &'a [u8]) -> Result { + postcard::from_bytes(encoded_data) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) +} + +fn serialize(data: &D) -> Result, io::Error> { + postcard::to_stdvec(&data) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) +} + #[derive(Debug, Clone)] pub struct PostcardCodec { /// Used for `max_size` parameter when reading Response Message @@ -49,21 +58,6 @@ impl PostcardCodec { max_response_size: max_block_size, } } - - /// Helper method for decoding data - /// Reusable across `RequestResponseCodec` and `GossipsubCodec` - fn deserialize<'a, R: Deserialize<'a>>( - &self, - encoded_data: &'a [u8], - ) -> Result { - postcard::from_bytes(encoded_data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } - - fn serialize(&self, data: &D) -> Result, io::Error> { - postcard::to_stdvec(&data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } } /// Since Postcard does not support async reads or writes out of the box @@ -77,7 +71,7 @@ impl PostcardCodec { impl RequestResponseCodec for PostcardCodec { type Protocol = MessageExchangePostcardProtocol; type Request = RequestMessage; - type Response = NetworkResponse; + type Response = ResponseMessage; async fn read_request( &mut self, @@ -92,8 +86,7 @@ impl RequestResponseCodec for PostcardCodec { .take(self.max_response_size as u64) .read_to_end(&mut response) .await?; - - self.deserialize(&response) + deserialize(&response) } async fn read_response( @@ -110,7 +103,7 @@ impl RequestResponseCodec for PostcardCodec { .read_to_end(&mut response) .await?; - self.deserialize(&response) + deserialize(&response) } async fn write_request( @@ -122,14 +115,9 @@ impl RequestResponseCodec for PostcardCodec { where T: futures::AsyncWrite + Unpin + Send, { - match postcard::to_stdvec(&req) { - Ok(encoded_data) => { - socket.write_all(&encoded_data).await?; - - Ok(()) - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } + let encoded_data = serialize(&req)?; + socket.write_all(&encoded_data).await?; + Ok(()) } async fn write_response( @@ -141,14 +129,9 @@ impl RequestResponseCodec for PostcardCodec { where T: futures::AsyncWrite + Unpin + Send, { - match postcard::to_stdvec(&res) { - Ok(encoded_data) => { - socket.write_all(&encoded_data).await?; - - Ok(()) - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } + let encoded_data = serialize(&res)?; + socket.write_all(&encoded_data).await?; + Ok(()) } } @@ -170,87 +153,13 @@ impl GossipsubCodec for PostcardCodec { gossipsub_tag: GossipTopicTag, ) -> Result { let decoded_response = match gossipsub_tag { - GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(self.deserialize(encoded_data)?) - } + GossipTopicTag::NewTx => GossipsubMessage::NewTx(deserialize(encoded_data)?), }; Ok(decoded_response) } } -impl RequestResponseConverter for PostcardCodec { - type OutboundResponse = OutboundResponse; - type NetworkResponse = NetworkResponse; - type ResponseMessage = ResponseMessage; - - fn convert_to_network_response( - &self, - res_msg: &Self::OutboundResponse, - ) -> Result { - match res_msg { - OutboundResponse::Block(sealed_block) => { - let response = if let Some(sealed_block) = sealed_block { - Some(self.serialize(sealed_block.as_ref())?) - } else { - None - }; - - Ok(NetworkResponse::Block(response)) - } - OutboundResponse::Transactions(transactions) => { - let response = if let Some(transactions) = transactions { - Some(self.serialize(transactions.as_ref())?) - } else { - None - }; - - Ok(NetworkResponse::Transactions(response)) - } - OutboundResponse::SealedHeaders(maybe_headers) => { - let response = maybe_headers - .as_ref() - .map(|headers| self.serialize(&headers)) - .transpose()?; - Ok(NetworkResponse::Headers(response)) - } - } - } - - fn convert_to_response( - &self, - inter_msg: &Self::NetworkResponse, - ) -> Result { - match inter_msg { - NetworkResponse::Block(block_bytes) => { - let response = if let Some(block_bytes) = block_bytes { - Some(self.deserialize(block_bytes)?) - } else { - None - }; - - Ok(ResponseMessage::SealedBlock(Box::new(response))) - } - NetworkResponse::Transactions(tx_bytes) => { - let response = if let Some(tx_bytes) = tx_bytes { - Some(self.deserialize(tx_bytes)?) - } else { - None - }; - - Ok(ResponseMessage::Transactions(response)) - } - NetworkResponse::Headers(headers_bytes) => { - let response = headers_bytes - .as_ref() - .map(|bytes| self.deserialize(bytes)) - .transpose()?; - Ok(ResponseMessage::SealedHeaders(response)) - } - } - } -} - impl NetworkCodec for PostcardCodec { fn get_req_res_protocol(&self) -> ::Protocol { MessageExchangePostcardProtocol {} diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 68831be4df9..de80500632a 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -3,6 +3,10 @@ use crate::{ FuelBehaviour, FuelBehaviourEvent, }, + codecs::{ + postcard::PostcardCodec, + GossipsubCodec, + }, config::{ build_transport_function, Config, @@ -14,19 +18,18 @@ use crate::{ }, topics::GossipsubTopics, }, + heartbeat::HeartbeatEvent, peer_manager::{ PeerManager, Punisher, }, peer_report::PeerReportEvent, request_response::messages::{ - NetworkResponse, - OutboundResponse, RequestError, RequestMessage, ResponseChannelItem, - ResponseError, ResponseMessage, + ResponseSendError, }, TryPeerId, }; @@ -59,15 +62,6 @@ use libp2p::{ SwarmBuilder, }; use libp2p_gossipsub::PublishError; - -use crate::{ - codecs::{ - postcard::PostcardCodec, - GossipsubCodec, - RequestResponseConverter, - }, - heartbeat::HeartbeatEvent, -}; use rand::seq::IteratorRandom; use std::{ collections::HashMap, @@ -110,7 +104,7 @@ pub struct FuelP2PService { /// Holds the ResponseChannel(s) for the inbound requests from the p2p Network /// Once the Response is prepared by the NetworkOrchestrator /// It will send it to the specified Peer via its unique ResponseChannel - inbound_requests_table: HashMap>, + inbound_requests_table: HashMap>, /// NetworkCodec used as for encoding and decoding of Gossipsub messages network_codec: PostcardCodec, @@ -292,6 +286,7 @@ impl FuelP2PService { /// Sends RequestMessage to a peer /// If the peer is not defined it will pick one at random + /// Only returns error if no peers are connected pub fn send_request_msg( &mut self, peer_id: Option, @@ -328,31 +323,21 @@ impl FuelP2PService { pub fn send_response_msg( &mut self, request_id: InboundRequestId, - message: OutboundResponse, - ) -> Result<(), ResponseError> { - match ( - self.network_codec.convert_to_network_response(&message), - self.inbound_requests_table.remove(&request_id), - ) { - (Ok(message), Some(channel)) => { - if self - .swarm - .behaviour_mut() - .send_response_msg(channel, message) - .is_err() - { - debug!("Failed to send ResponseMessage for {:?}", request_id); - return Err(ResponseError::SendingResponseFailed) - } - } - (Ok(_), None) => { - debug!("ResponseChannel for {:?} does not exist!", request_id); - return Err(ResponseError::ResponseChannelDoesNotExist) - } - (Err(e), _) => { - debug!("Failed to convert to IntermediateResponse with {:?}", e); - return Err(ResponseError::ConversionToIntermediateFailed) - } + message: ResponseMessage, + ) -> Result<(), ResponseSendError> { + let Some(channel) = self.inbound_requests_table.remove(&request_id) else { + debug!("ResponseChannel for {:?} does not exist!", request_id); + return Err(ResponseSendError::ResponseChannelDoesNotExist) + }; + + if self + .swarm + .behaviour_mut() + .send_response_msg(channel, message) + .is_err() + { + debug!("Failed to send ResponseMessage for {:?}", request_id); + return Err(ResponseSendError::SendingResponseFailed) } Ok(()) @@ -553,7 +538,7 @@ impl FuelP2PService { fn handle_request_response_event( &mut self, - event: RequestResponseEvent, + event: RequestResponseEvent, ) -> Option { match event { RequestResponseEvent::Message { peer, message } => match message { @@ -573,54 +558,36 @@ impl FuelP2PService { request_id, response, } => { - match ( - self.outbound_requests_table.remove(&request_id), - self.network_codec.convert_to_response(&response), - ) { + let Some(channel) = self.outbound_requests_table.remove(&request_id) + else { + debug!("Send channel not found for {:?}", request_id); + return None; + }; + + let send_ok = match (channel, response) { ( - Some(ResponseChannelItem::Block(channel)), - Ok(ResponseMessage::SealedBlock(block)), - ) => { - if channel.send(*block).is_err() { - tracing::error!( - "Failed to send through the channel for {:?}", - request_id - ); - } - } + ResponseChannelItem::Block(channel), + ResponseMessage::Block(block), + ) => channel.send(block).is_ok(), ( - Some(ResponseChannelItem::Transactions(channel)), - Ok(ResponseMessage::Transactions(transactions)), - ) => { - if channel.send(transactions).is_err() { - tracing::error!( - "Failed to send through the channel for {:?}", - request_id - ); - } - } + ResponseChannelItem::Transactions(channel), + ResponseMessage::Transactions(transactions), + ) => channel.send(transactions).is_ok(), ( - Some(ResponseChannelItem::SealedHeaders(channel)), - Ok(ResponseMessage::SealedHeaders(headers)), - ) => { - if channel.send((peer, headers)).is_err() { - tracing::error!( - "Failed to send through the channel for {:?}", - request_id - ); - } - } + ResponseChannelItem::SealedHeaders(channel), + ResponseMessage::SealedHeaders(headers), + ) => channel.send((peer, headers)).is_ok(), - (Some(_), Err(e)) => { - tracing::error!("Failed to convert IntermediateResponse into a ResponseMessage {:?} with {:?}", response, e); - } - (None, Ok(_)) => { + (_, _) => { tracing::error!( - "Send channel not found for {:?}", - request_id + "Mismatching request and response channel types" ); + return None; } - _ => {} + }; + + if !send_ok { + debug!("Failed to send through the channel for {:?}", request_id); } } }, @@ -722,9 +689,9 @@ mod tests { p2p_service::FuelP2PEvent, peer_manager::PeerInfo, request_response::messages::{ - OutboundResponse, RequestMessage, ResponseChannelItem, + ResponseMessage, }, service::to_message_acceptance, }; @@ -1607,17 +1574,17 @@ mod tests { consensus: Consensus::PoA(PoAConsensus::new(Default::default())), }; - let _ = node_b.send_response_msg(*request_id, OutboundResponse::Block(Some(Arc::new(sealed_block)))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Block(Some(sealed_block))); } RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); - let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); } RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions)))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); } } } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 22564f7cecf..2d82ac42dd6 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -1,8 +1,3 @@ -use std::{ - ops::Range, - sync::Arc, -}; - use fuel_core_types::{ blockchain::{ SealedBlock, @@ -16,6 +11,7 @@ use serde::{ Deserialize, Serialize, }; +use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; @@ -25,16 +21,6 @@ pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; #[cfg(test)] pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); -// Peer receives a `RequestMessage`. -// It prepares a response in form of `OutboundResponse` -// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format. -// The Peer that requested the message receives the response over the wire in `NetworkResponse` format. -// It then unpacks it into `ResponseMessage`. -// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receiving channel. -// Client Peer: `RequestMessage` (send request) -// Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response) -// Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response) - #[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)] pub enum RequestMessage { Block(BlockHeight), @@ -42,14 +28,6 @@ pub enum RequestMessage { Transactions(Range), } -/// Final Response Message that p2p service sends to the Orchestrator -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum ResponseMessage { - SealedBlock(Box>), - SealedHeaders(Option>), - Transactions(Option>), -} - /// Holds oneshot channels for specific responses #[derive(Debug)] pub enum ResponseChannelItem { @@ -58,22 +36,11 @@ pub enum ResponseChannelItem { Transactions(oneshot::Sender>>), } -/// Response that is sent over the wire -/// and then additionally deserialized into `ResponseMessage` -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum NetworkResponse { - Block(Option>), - Headers(Option>), - Transactions(Option>), -} - -/// Initial state of the `ResponseMessage` prior to having its inner value serialized -/// and wrapped into `NetworkResponse` -#[derive(Debug, Clone)] -pub enum OutboundResponse { - Block(Option>), +#[derive(Debug, Serialize, Deserialize)] +pub enum ResponseMessage { + Block(Option), SealedHeaders(Option>), - Transactions(Option>>), + Transactions(Option>), } #[derive(Debug, Error)] @@ -82,8 +49,9 @@ pub enum RequestError { NoPeersConnected, } +/// Errors than can occur when attempting to send a response #[derive(Debug, Eq, PartialEq, Error)] -pub enum ResponseError { +pub enum ResponseSendError { #[error("Response channel does not exist")] ResponseChannelDoesNotExist, #[error("Failed to send response")] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 2a0f6df6592..7b2eac2ab45 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -15,9 +15,9 @@ use crate::{ P2pDb, }, request_response::messages::{ - OutboundResponse, RequestMessage, ResponseChannelItem, + ResponseMessage, }, }; use anyhow::anyhow; @@ -163,6 +163,7 @@ pub trait TaskP2PService: Send { &mut self, message: GossipsubBroadcastRequest, ) -> anyhow::Result<()>; + fn send_request_msg( &mut self, peer_id: Option, @@ -173,8 +174,9 @@ pub trait TaskP2PService: Send { fn send_response_msg( &mut self, request_id: InboundRequestId, - message: OutboundResponse, + message: ResponseMessage, ) -> anyhow::Result<()>; + fn report_message( &mut self, message: GossipsubMessageInfo, @@ -229,7 +231,7 @@ impl TaskP2PService for FuelP2PService { fn send_response_msg( &mut self, request_id: InboundRequestId, - message: OutboundResponse, + message: ResponseMessage, ) -> anyhow::Result<()> { self.send_response_msg(request_id, message)?; Ok(()) @@ -494,7 +496,10 @@ where let request_msg = RequestMessage::Block(height); let channel_item = ResponseChannelItem::Block(channel); let peer = self.p2p_service.get_peer_id_with_height(&height); - let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); + let found_peers = self.p2p_service.send_request_msg(peer, request_msg, channel_item).is_ok(); + if !found_peers { + tracing::debug!("No peers found for block at height {:?}", height); + } } Some(TaskRequest::GetSealedHeaders { block_height_range, channel: response}) => { let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); @@ -505,12 +510,16 @@ where let block_height = BlockHeight::from(block_height_range.end.saturating_sub(1)); let peer = self.p2p_service .get_peer_id_with_height(&block_height); - let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); + let found_peers = self.p2p_service.send_request_msg(peer, request_msg, channel_item).is_ok(); + if !found_peers { + tracing::debug!("No peers found for block at height {:?}", block_height); + } } Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { let request_msg = RequestMessage::Transactions(block_height_range); let channel_item = ResponseChannelItem::Transactions(channel); - let _ = self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item); + self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item) + .expect("We always a peer here, so send has a target"); } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { // report_message(&mut self.p2p_service, message, acceptance); @@ -557,28 +566,26 @@ where match request_message { RequestMessage::Block(block_height) => { match self.db.get_sealed_block(&block_height) { - Ok(maybe_block) => { - let response = maybe_block.map(Arc::new); - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Block(response)); + Ok(response) => { + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Block(response)); }, Err(e) => { tracing::error!("Failed to get block at height {:?}: {:?}", block_height, e); let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Block(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Block(response)); return Err(e.into()) } } } RequestMessage::Transactions(range) => { match self.db.get_transactions(range.clone()) { - Ok(maybe_transactions) => { - let response = maybe_transactions.map(Arc::new); - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(response)); + Ok(response) => { + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); }, Err(e) => { tracing::error!("Failed to get transactions for range {:?}: {:?}", range, e); let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); return Err(e.into()) } } @@ -589,17 +596,17 @@ where tracing::error!("Requested range of sealed headers is too big. Requested length: {:?}, Max length: {:?}", range.len(), max_len); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); } else { match self.db.get_sealed_headers(range.clone()) { Ok(headers) => { let response = Some(headers); - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); }, Err(e) => { tracing::error!("Failed to get sealed headers for range {:?}: {:?}", range, &e); let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); return Err(e.into()) } } @@ -964,7 +971,7 @@ pub mod tests { fn send_response_msg( &mut self, _request_id: InboundRequestId, - _message: OutboundResponse, + _message: ResponseMessage, ) -> anyhow::Result<()> { todo!() }