Skip to content

Commit

Permalink
Simplify p2p request/response message serialization (#1573)
Browse files Browse the repository at this point in the history
For some reason we had a two layers of serialization for
request/response messages. This doesn't seem useful at all, and
complicates e.g. error handling. This PR removes the extra layer,
substantially simplifying that logic. One major upside of this is that
#1345 and #1350 can now be solved in a single follow-up PR.

~~Hopefully this doesn't conflict too much with the ongoing libp2p
update PR #1379.~~

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
Dentosal and xgreenx authored Jan 6, 2024
1 parent df821a0 commit ca7d210
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 286 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ Description of the upcoming release here.

### Changed

- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p
- [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor.

#### Breaking
- [#1573](https://github.com/FuelLabs/fuel-core/pull/1573): Remove nested p2p request/response encoding. Only breaks p2p networking compatibility with older fuel-core versions, but is otherwise fully internal.

## [Version 0.22.0]

### Added
Expand All @@ -26,7 +30,6 @@ Description of the upcoming release here.

### Changed

- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p
- [#1517](https://github.com/FuelLabs/fuel-core/pull/1517): Changed default gossip heartbeat interval to 500ms.
- [#1520](https://github.com/FuelLabs/fuel-core/pull/1520): Extract `executor` into `fuel-core-executor` crate.

Expand Down
8 changes: 4 additions & 4 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::{
heartbeat,
peer_report::PeerReportBehaviour,
request_response::messages::{
NetworkResponse,
RequestMessage,
ResponseMessage,
},
};
use fuel_core_types::fuel_types::BlockHeight;
Expand Down Expand Up @@ -166,9 +166,9 @@ impl FuelBehaviour {

pub fn send_response_msg(
&mut self,
channel: ResponseChannel<NetworkResponse>,
message: NetworkResponse,
) -> Result<(), NetworkResponse> {
channel: ResponseChannel<ResponseMessage>,
message: ResponseMessage,
) -> Result<(), ResponseMessage> {
self.request_response.send_response(channel, message)
}

Expand Down
29 changes: 2 additions & 27 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
NetworkResponse,
OutboundResponse,
RequestMessage,
ResponseMessage,
},
Expand All @@ -30,37 +28,14 @@ pub trait GossipsubCodec {
) -> Result<Self::ResponseMessage, io::Error>;
}

pub trait RequestResponseConverter {
/// Response that is ready to be converted into NetworkResponse
type OutboundResponse;
/// Response that is sent over the network
type NetworkResponse;
/// Final Response Message deserialized from IntermediateResponse
type ResponseMessage;

fn convert_to_network_response(
&self,
res_msg: &Self::OutboundResponse,
) -> Result<Self::NetworkResponse, io::Error>;

fn convert_to_response(
&self,
inter_msg: &Self::NetworkResponse,
) -> Result<Self::ResponseMessage, io::Error>;
}

/// Main Codec trait
/// Needs to be implemented and provided to FuelBehaviour
pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + RequestResponseCodec<Request = RequestMessage, Response = NetworkResponse>
+ RequestResponseConverter<
NetworkResponse = NetworkResponse,
OutboundResponse = OutboundResponse,
ResponseMessage = ResponseMessage,
> + Clone
> + RequestResponseCodec<Request = RequestMessage, Response = ResponseMessage>
+ Clone
+ Send
+ 'static
{
Expand Down
135 changes: 22 additions & 113 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::{
GossipsubCodec,
NetworkCodec,
RequestResponseConverter,
};
use crate::{
gossipsub::messages::{
Expand All @@ -10,8 +9,6 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
NetworkResponse,
OutboundResponse,
RequestMessage,
ResponseMessage,
REQUEST_RESPONSE_PROTOCOL_ID,
Expand All @@ -30,6 +27,18 @@ use serde::{
};
use std::io;

/// Helper method for decoding data
/// Reusable across `RequestResponseCodec` and `GossipsubCodec`
fn deserialize<'a, R: Deserialize<'a>>(encoded_data: &'a [u8]) -> Result<R, io::Error> {
postcard::from_bytes(encoded_data)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
}

fn serialize<D: Serialize>(data: &D) -> Result<Vec<u8>, io::Error> {
postcard::to_stdvec(&data)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
}

#[derive(Debug, Clone)]
pub struct PostcardCodec {
/// Used for `max_size` parameter when reading Response Message
Expand All @@ -49,21 +58,6 @@ impl PostcardCodec {
max_response_size: max_block_size,
}
}

/// Helper method for decoding data
/// Reusable across `RequestResponseCodec` and `GossipsubCodec`
fn deserialize<'a, R: Deserialize<'a>>(
&self,
encoded_data: &'a [u8],
) -> Result<R, io::Error> {
postcard::from_bytes(encoded_data)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
}

fn serialize<D: Serialize>(&self, data: &D) -> Result<Vec<u8>, io::Error> {
postcard::to_stdvec(&data)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
}
}

/// Since Postcard does not support async reads or writes out of the box
Expand All @@ -77,7 +71,7 @@ impl PostcardCodec {
impl RequestResponseCodec for PostcardCodec {
type Protocol = MessageExchangePostcardProtocol;
type Request = RequestMessage;
type Response = NetworkResponse;
type Response = ResponseMessage;

async fn read_request<T>(
&mut self,
Expand All @@ -92,8 +86,7 @@ impl RequestResponseCodec for PostcardCodec {
.take(self.max_response_size as u64)
.read_to_end(&mut response)
.await?;

self.deserialize(&response)
deserialize(&response)
}

async fn read_response<T>(
Expand All @@ -110,7 +103,7 @@ impl RequestResponseCodec for PostcardCodec {
.read_to_end(&mut response)
.await?;

self.deserialize(&response)
deserialize(&response)
}

async fn write_request<T>(
Expand All @@ -122,14 +115,9 @@ impl RequestResponseCodec for PostcardCodec {
where
T: futures::AsyncWrite + Unpin + Send,
{
match postcard::to_stdvec(&req) {
Ok(encoded_data) => {
socket.write_all(&encoded_data).await?;

Ok(())
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
let encoded_data = serialize(&req)?;
socket.write_all(&encoded_data).await?;
Ok(())
}

async fn write_response<T>(
Expand All @@ -141,14 +129,9 @@ impl RequestResponseCodec for PostcardCodec {
where
T: futures::AsyncWrite + Unpin + Send,
{
match postcard::to_stdvec(&res) {
Ok(encoded_data) => {
socket.write_all(&encoded_data).await?;

Ok(())
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
let encoded_data = serialize(&res)?;
socket.write_all(&encoded_data).await?;
Ok(())
}
}

Expand All @@ -170,87 +153,13 @@ impl GossipsubCodec for PostcardCodec {
gossipsub_tag: GossipTopicTag,
) -> Result<Self::ResponseMessage, io::Error> {
let decoded_response = match gossipsub_tag {
GossipTopicTag::NewTx => {
GossipsubMessage::NewTx(self.deserialize(encoded_data)?)
}
GossipTopicTag::NewTx => GossipsubMessage::NewTx(deserialize(encoded_data)?),
};

Ok(decoded_response)
}
}

impl RequestResponseConverter for PostcardCodec {
type OutboundResponse = OutboundResponse;
type NetworkResponse = NetworkResponse;
type ResponseMessage = ResponseMessage;

fn convert_to_network_response(
&self,
res_msg: &Self::OutboundResponse,
) -> Result<Self::NetworkResponse, io::Error> {
match res_msg {
OutboundResponse::Block(sealed_block) => {
let response = if let Some(sealed_block) = sealed_block {
Some(self.serialize(sealed_block.as_ref())?)
} else {
None
};

Ok(NetworkResponse::Block(response))
}
OutboundResponse::Transactions(transactions) => {
let response = if let Some(transactions) = transactions {
Some(self.serialize(transactions.as_ref())?)
} else {
None
};

Ok(NetworkResponse::Transactions(response))
}
OutboundResponse::SealedHeaders(maybe_headers) => {
let response = maybe_headers
.as_ref()
.map(|headers| self.serialize(&headers))
.transpose()?;
Ok(NetworkResponse::Headers(response))
}
}
}

fn convert_to_response(
&self,
inter_msg: &Self::NetworkResponse,
) -> Result<Self::ResponseMessage, io::Error> {
match inter_msg {
NetworkResponse::Block(block_bytes) => {
let response = if let Some(block_bytes) = block_bytes {
Some(self.deserialize(block_bytes)?)
} else {
None
};

Ok(ResponseMessage::SealedBlock(Box::new(response)))
}
NetworkResponse::Transactions(tx_bytes) => {
let response = if let Some(tx_bytes) = tx_bytes {
Some(self.deserialize(tx_bytes)?)
} else {
None
};

Ok(ResponseMessage::Transactions(response))
}
NetworkResponse::Headers(headers_bytes) => {
let response = headers_bytes
.as_ref()
.map(|bytes| self.deserialize(bytes))
.transpose()?;
Ok(ResponseMessage::SealedHeaders(response))
}
}
}
}

impl NetworkCodec for PostcardCodec {
fn get_req_res_protocol(&self) -> <Self as RequestResponseCodec>::Protocol {
MessageExchangePostcardProtocol {}
Expand Down
Loading

0 comments on commit ca7d210

Please sign in to comment.