Skip to content

Commit

Permalink
fix: convert huberror into p2p specific error
Browse files Browse the repository at this point in the history
  • Loading branch information
hhamud committed Aug 30, 2024
1 parent 73bbe93 commit 4275217
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/eth/src/id_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethers::{
use sqlx::Acquire;
use std::error::Error;
use std::sync::Arc;
use teleport_config::Config;
use teleport_protobuf::protobufs::generated::{
on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType,
};
Expand Down
18 changes: 5 additions & 13 deletions crates/hub/src/hub.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::{
key::Key,
};
use crate::key::Key;

use teleport_p2p::{
event_loop::Command,
gossip_node::{GossipNode, NodeOptions},
};
use libp2p::{futures::channel::mpsc, Multiaddr, PeerId};
use prost::Message;
use teleport_p2p::{
event_loop::Command,
gossip_node::{AddrInfo, GossipNode, NodeOptions},
};
use teleport_protobuf::protobufs::generated::*;
use teleport_storage::Store;

Expand Down Expand Up @@ -46,12 +44,6 @@ pub struct TestUser {
mnemonic: String,
}

#[derive(Debug, Clone)]
pub struct AddrInfo {
pub id: PeerId,
pub addrs: Vec<Multiaddr>,
}

#[derive(Debug, Clone)]
pub struct HubOptions {
pub network: FarcasterNetwork,
Expand Down
3 changes: 0 additions & 3 deletions crates/hub/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
mod errors;
mod hub;
mod key;
mod p2p;
mod sync;
mod validation;

// Re-export hub
pub use crate::hub::Hub;
2 changes: 1 addition & 1 deletion crates/hub/src/errors.rs → crates/p2p/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum UnavailableType {
}

#[derive(Error, Debug)]
pub enum HubError {
pub enum P2pError {
#[error("the request did not have valid authentication credentials {0:#?}")]
Unauthenticated(String),
#[error("the authenticated request did not have the authority to perform this action {0:#?}")]
Expand Down
14 changes: 7 additions & 7 deletions crates/p2p/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;

use super::gossip_behaviour::{GossipBehaviour, GossipBehaviourEvent};
use super::time::get_farcaster_time;
use crate::errors::{BadRequestType, HubError, UnavailableType};
use crate::errors::{BadRequestType, P2pError, UnavailableType};
use libp2p::futures::channel::oneshot;
use libp2p::futures::StreamExt;
use libp2p::gossipsub;
Expand Down Expand Up @@ -442,7 +442,7 @@ impl EventLoop {
.publish(topic, encoded_peer);
}

fn gossip_message(&mut self, message: generated::Message) -> Result<(), HubError> {
fn gossip_message(&mut self, message: generated::Message) -> Result<(), P2pError> {
let gossip_message = generated::GossipMessage {
topics: vec![self.state.primary_topic.to_string()],
peer_id: self.swarm.local_peer_id().to_bytes(),
Expand All @@ -457,7 +457,7 @@ impl EventLoop {
fn gossip_contact_info(
&mut self,
contact_info: generated::ContactInfoContent,
) -> Result<(), HubError> {
) -> Result<(), P2pError> {
let gossip_message = generated::GossipMessage {
topics: vec![self.state.contact_info_topic.to_string()],
peer_id: self.swarm.local_peer_id().to_bytes(),
Expand All @@ -471,7 +471,7 @@ impl EventLoop {
self.publish(gossip_message)
}

fn publish(&mut self, message: generated::GossipMessage) -> Result<(), HubError> {
fn publish(&mut self, message: generated::GossipMessage) -> Result<(), P2pError> {
let encode_result = message.encode_to_vec();

for topic_str in message.topics {
Expand All @@ -483,7 +483,7 @@ impl EventLoop {
.publish(topic, encode_result.clone());

if let Err(err) = publish_result {
return Err(HubError::BadRequest(
return Err(P2pError::BadRequest(
BadRequestType::Duplicate,
err.to_string(),
));
Expand All @@ -493,13 +493,13 @@ impl EventLoop {
Ok(())
}

fn dial_multi_addr(&mut self, multi_addr: Multiaddr) -> Result<(), HubError> {
fn dial_multi_addr(&mut self, multi_addr: Multiaddr) -> Result<(), P2pError> {
println!("dialing {:?}", multi_addr);
let res = self.swarm.dial(multi_addr);

match res {
Ok(_) => Ok(()),
Err(err) => Err(HubError::Unavailable(
Err(err) => Err(P2pError::Unavailable(
UnavailableType::Generic,
err.to_string(),
)),
Expand Down
29 changes: 17 additions & 12 deletions crates/p2p/src/gossip_node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{net::TcpListener, str::FromStr};

use crate::hub::AddrInfo;
use libp2p::{
core::upgrade,
futures::{
Expand All @@ -26,11 +25,17 @@ use super::{
gossip_behaviour::GossipBehaviour,
};

use crate::errors::{BadRequestType, HubError};
use crate::errors::{BadRequestType, P2pError};

const MULTI_ADDR_LOCAL_HOST: &str = "/ip4/127.0.0.1";
const MAX_MESSAGE_QUEUE_SIZE: usize = 100_000;

#[derive(Debug, Clone)]
pub struct AddrInfo {
pub id: PeerId,
pub addrs: Vec<Multiaddr>,
}

#[derive(Message)]
pub struct Peer {
#[prost(bytes, tag = "1")]
Expand Down Expand Up @@ -101,7 +106,7 @@ impl NodeOptions {
}
}

pub(crate) struct GossipNode {
pub struct GossipNode {
command_sender: mpsc::Sender<super::event_loop::Command>,
event_loop: Option<EventLoop>,
}
Expand All @@ -121,7 +126,7 @@ impl GossipNode {
}
}

pub async fn start(&mut self) -> Result<(), HubError> {
pub async fn start(&mut self) -> Result<(), P2pError> {
let listen_ip_multi_addr = MULTI_ADDR_LOCAL_HOST.to_string();
let listen_port = {
let tcp_listener = TcpListener::bind("127.0.0.1:0").unwrap();
Expand Down Expand Up @@ -154,7 +159,7 @@ impl GossipNode {
Ok(())
}

pub fn gossip_message(&mut self, message: generated::Message) -> Result<(), HubError> {
pub fn gossip_message(&mut self, message: generated::Message) -> Result<(), P2pError> {
self.command_sender
.start_send(super::event_loop::Command::GossipMessage { message })
.expect("Failed to send GossipMessage command");
Expand All @@ -164,14 +169,14 @@ impl GossipNode {
pub fn gossip_contact_info(
&mut self,
contact_info: ContactInfoContent,
) -> Result<(), HubError> {
) -> Result<(), P2pError> {
self.command_sender
.start_send(super::event_loop::Command::GossipContactInfo { contact_info })
.expect("Failed to send GossipContactInfo command");
Ok(())
}

pub async fn get_state(&mut self) -> Result<EventLoopState, HubError> {
pub async fn get_state(&mut self) -> Result<EventLoopState, P2pError> {
let (sender, receiver) = oneshot::channel::<EventLoopState>();
self.command_sender
.start_send(super::event_loop::Command::GetState { sender })
Expand All @@ -181,23 +186,23 @@ impl GossipNode {
}
}

pub fn decode_message(message: &[u8]) -> Result<GossipMessage, HubError> {
pub fn decode_message(message: &[u8]) -> Result<GossipMessage, P2pError> {
let gossip_message = GossipMessage::decode(message)
.map_err(|_| {
HubError::BadRequest(BadRequestType::ParseFailure, "Invalid failure".to_owned())
P2pError::BadRequest(BadRequestType::ParseFailure, "Invalid failure".to_owned())
})
.unwrap();

let supported_versions = vec![GossipVersion::V1, GossipVersion::V11];
if gossip_message.topics.len() == 0 || !supported_versions.contains(&gossip_message.version()) {
return Err(HubError::BadRequest(
return Err(P2pError::BadRequest(
BadRequestType::ParseFailure,
"Invalid failure".to_owned(),
));
}

PeerId::from_bytes(&gossip_message.peer_id).map_err(|_| {
HubError::BadRequest(BadRequestType::ParseFailure, "Invalid failure".to_owned())
P2pError::BadRequest(BadRequestType::ParseFailure, "Invalid failure".to_owned())
})?;

Ok(gossip_message)
Expand Down Expand Up @@ -241,7 +246,7 @@ pub fn default_message_id_fn(message: &GossipSubMessage) -> MessageId {
MessageId::from(source_string)
}

fn create_node(options: NodeOptions) -> Result<Swarm<GossipBehaviour>, HubError> {
fn create_node(options: NodeOptions) -> Result<Swarm<GossipBehaviour>, P2pError> {
let local_key = options
.keypair
.unwrap_or(identity::Keypair::generate_ed25519());
Expand Down
2 changes: 2 additions & 0 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod errors;
pub mod event_loop;
pub mod gossip_behaviour;
pub mod gossip_node;
pub mod time;
pub mod validation;
12 changes: 6 additions & 6 deletions crates/p2p/src/time.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::errors::*;
use crate::errors::{BadRequestType, P2pError};

const FARCASTER_EPOCH: i64 = 1609459200000;

pub fn get_farcaster_time() -> Result<u32, HubError> {
pub fn get_farcaster_time() -> Result<u32, P2pError> {
to_farcaster_time(
(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand All @@ -13,17 +13,17 @@ pub fn get_farcaster_time() -> Result<u32, HubError> {
)
}

pub fn to_farcaster_time(time: i64) -> Result<u32, HubError> {
pub fn to_farcaster_time(time: i64) -> Result<u32, P2pError> {
println!("to_farcaster_time: {}", time);
if time < FARCASTER_EPOCH {
return Err(HubError::BadRequest(
return Err(P2pError::BadRequest(
BadRequestType::InvalidParam,
"time must be after Farcaster epoch (01/01/2021".to_string(),
));
}
let seconds_since_epoch = (time - FARCASTER_EPOCH) / 1000;
if seconds_since_epoch > 2i64.pow(32) - 1 {
return Err(HubError::BadRequest(
return Err(P2pError::BadRequest(
BadRequestType::InvalidParam,
"time too far in future".to_string(),
));
Expand All @@ -32,6 +32,6 @@ pub fn to_farcaster_time(time: i64) -> Result<u32, HubError> {
Ok(seconds_since_epoch.try_into().unwrap())
}

pub fn from_farcaster_time(time: u32) -> Result<i64, HubError> {
pub fn from_farcaster_time(time: u32) -> Result<i64, P2pError> {
Ok(time as i64 * 1000 + FARCASTER_EPOCH)
}
24 changes: 12 additions & 12 deletions crates/p2p/src/validation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::HubError;
use crate::errors::P2pError;
use chrono::{DateTime, Utc};
use prost::Message;
use teleport_crypto::{blake3, ed25519};
Expand All @@ -13,7 +13,7 @@ impl<'a> Validator<'a> {
Self { message }
}

fn validate_msg_body(&self) -> Result<(), HubError> {
fn validate_msg_body(&self) -> Result<(), P2pError> {
let msg_data = self.message.data.as_ref().unwrap();
let body = msg_data.body.as_ref().unwrap();
match body {
Expand Down Expand Up @@ -48,10 +48,10 @@ impl<'a> Validator<'a> {
Ok(())
}

fn validate_signature(&self) -> Result<(), HubError> {
fn validate_signature(&self) -> Result<(), P2pError> {
let signature_scheme = generated::SignatureScheme::from_i32(self.message.signature_scheme)
.ok_or_else(|| {
HubError::Unknown(format!(
P2pError::Unknown(format!(
"Unknown signature scheme: {:?}",
self.message.signature_scheme
))
Expand All @@ -76,22 +76,22 @@ impl<'a> Validator<'a> {
return Ok(());
}
Err(err) => {
return Err(HubError::Unknown(err.to_string()));
return Err(P2pError::Unknown(err.to_string()));
}
}
}
_ => {
return Err(HubError::Unknown(format!(
return Err(P2pError::Unknown(format!(
"Unknown signature scheme: {:?}",
self.message.signature_scheme
)));
}
}
}

fn validate_hash(&self) -> Result<(), HubError> {
fn validate_hash(&self) -> Result<(), P2pError> {
if self.message.hash_scheme != generated::HashScheme::Blake3 as i32 {
return Err(HubError::Unknown(format!(
return Err(P2pError::Unknown(format!(
"Unknown hash scheme: {:?}",
self.message.hash_scheme
)));
Expand All @@ -107,7 +107,7 @@ impl<'a> Validator<'a> {
}

if self.message.hash != msg_hash {
return Err(HubError::Unknown(format!(
return Err(P2pError::Unknown(format!(
"Invalid message hash: {:?}",
self.message.hash
)));
Expand All @@ -116,7 +116,7 @@ impl<'a> Validator<'a> {
Ok(())
}

pub fn validate(&self) -> Result<(), HubError> {
pub fn validate(&self) -> Result<(), P2pError> {
self.validate_hash()?;
// todo: check if the signer belongs to the user?
self.validate_signature()?;
Expand All @@ -125,11 +125,11 @@ impl<'a> Validator<'a> {
Ok(())
}

fn validate_timestamp(&self) -> Result<(), HubError> {
fn validate_timestamp(&self) -> Result<(), P2pError> {
let timestamp = self.message.data.as_ref().unwrap().timestamp as i64;
let msg_datetime = DateTime::from_timestamp(timestamp, 0).unwrap();
if (msg_datetime - Utc::now()).num_seconds() > 600 {
return Err(HubError::Unknown(format!(
return Err(P2pError::Unknown(format!(
"Invalid timestamp: {:?}",
msg_datetime
)));
Expand Down

0 comments on commit 4275217

Please sign in to comment.