Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat(network): split transmitter data (#380)
Browse files Browse the repository at this point in the history
* change(network): convert to SessionProtocol

* refactor(network): move sender fn from connection to transmitter

* feat(network): split transmitter data

* fix(network): transmitter not check received data size

* change(network): increase transmitter chunk size to 4MB

* fix(network): transmitter data size can be equal or smaller than 9

Transmitter data size should always larger than 9.

* fix(network): router report snappy io error

Forget to update transmitter session seq

* refactor(network): transmitter InternalMessage encode and decode

* refactor(network): rename InternalMessage to SeqChunkMessage

* fix(network): compilation
  • Loading branch information
zeroqn authored Aug 12, 2020
1 parent 2974420 commit 0322cd6
Show file tree
Hide file tree
Showing 17 changed files with 836 additions and 448 deletions.
1 change: 1 addition & 0 deletions core/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio-util = { version = "0.2", features = ["codec"] }
hostname = "0.3"
lazy_static = "1.4"
bs58 = "0.3"
arc-swap = "0.4"

[dev-dependencies]
env_logger = "0.6"
Expand Down
190 changes: 28 additions & 162 deletions core/network/src/connection/control.rs
Original file line number Diff line number Diff line change
@@ -1,172 +1,38 @@
use std::marker::PhantomData;

use futures::channel::mpsc::UnboundedSender;
use log::error;
use tentacle::{
error::SendErrorKind,
secio::PeerId,
service::{ServiceControl, TargetSession},
SessionId,
};

use async_trait::async_trait;
use protocol::{traits::Priority, Bytes};

use crate::{
error::NetworkError,
event::PeerManagerEvent,
traits::{MessageSender, NetworkProtocol, SharedSessionBook},
};

pub struct ConnectionServiceControl<P: NetworkProtocol, B: SharedSessionBook> {
inner: ServiceControl,
mgr_srv: UnboundedSender<PeerManagerEvent>,
sessions: B,

// Indicate which protocol this connection service control
pin_protocol: PhantomData<fn() -> P>,
}

impl<P: NetworkProtocol, B: SharedSessionBook> ConnectionServiceControl<P, B> {
pub fn new(
control: ServiceControl,
mgr_srv: UnboundedSender<PeerManagerEvent>,
book: B,
) -> Self {
ConnectionServiceControl {
inner: control,
mgr_srv,
sessions: book,

pin_protocol: PhantomData,
}
}

pub fn filter_blocked(
&self,
tar: TargetSession,
) -> (Option<TargetSession>, Option<Vec<SessionId>>) {
self.sessions.refresh_blocked();

let all_blocked = self.sessions.all_blocked();
if all_blocked.is_empty() {
return (Some(tar), None);
}

match tar {
TargetSession::Single(sid) => {
if all_blocked.contains(&sid) {
(None, Some(vec![sid]))
} else {
(Some(TargetSession::Single(sid)), None)
}
}
TargetSession::Multi(sids) => {
let (sendable, blocked): (Vec<SessionId>, Vec<SessionId>) =
sids.into_iter().partition(|sid| !all_blocked.contains(sid));

if sendable.is_empty() && blocked.is_empty() {
unreachable!()
} else if sendable.is_empty() {
(None, Some(blocked))
} else if blocked.is_empty() {
(Some(TargetSession::Multi(sendable)), None)
} else {
(Some(TargetSession::Multi(sendable)), Some(blocked))
}
}
TargetSession::All => {
let sendable = self.sessions.all_sendable();

(Some(TargetSession::Multi(sendable)), Some(all_blocked))
}
}
}
use tentacle::error::SendErrorKind;
use tentacle::service::{ServiceControl, TargetSession};
use tentacle::ProtocolId;

use protocol::traits::Priority;
use protocol::Bytes;

pub struct ProtocolMessage {
pub protocol_id: ProtocolId,
pub target: TargetSession,
pub data: Bytes,
pub priority: Priority,
}

impl<P: NetworkProtocol, B: SharedSessionBook + Clone> Clone for ConnectionServiceControl<P, B> {
fn clone(&self) -> Self {
ConnectionServiceControl {
inner: self.inner.clone(),
mgr_srv: self.mgr_srv.clone(),
sessions: self.sessions.clone(),

pin_protocol: PhantomData,
}
}
#[derive(Clone)]
pub struct ConnectionServiceControl {
inner: ServiceControl,
}

#[async_trait]
impl<P, B> MessageSender for ConnectionServiceControl<P, B>
where
P: NetworkProtocol,
B: SharedSessionBook + Send + Sync + Unpin + 'static,
{
fn send(&self, tar: TargetSession, msg: Bytes, pri: Priority) -> Result<(), NetworkError> {
let proto_id = P::message_proto_id();

let (tar, opt_blocked) = match self.filter_blocked(tar) {
(None, None) => unreachable!(),
(None, blocked) => {
return Err(NetworkError::Send {
blocked,
other: None,
});
}
(Some(tar), opt_blocked) => (tar, opt_blocked),
};

let ret = match pri {
Priority::High => self.inner.quick_filter_broadcast(tar, proto_id, msg),
Priority::Normal => self.inner.filter_broadcast(tar, proto_id, msg),
};

let ret = ret.map_err(|err| match &err {
SendErrorKind::BrokenPipe => NetworkError::Shutdown,
SendErrorKind::WouldBlock => NetworkError::Busy,
});

if ret.is_err() || opt_blocked.is_some() {
let other = ret.err();
return Err(NetworkError::Send {
blocked: opt_blocked,
other: other.map(NetworkError::boxed),
});
}

Ok(())
impl ConnectionServiceControl {
pub fn new(control: ServiceControl) -> Self {
ConnectionServiceControl { inner: control }
}

async fn multisend(
&self,
peer_ids: Vec<PeerId>,
msg: Bytes,
pri: Priority,
) -> Result<(), NetworkError> {
let (connected, unconnected) = self.sessions.peers(peer_ids);
let send_ret = self.send(TargetSession::Multi(connected), msg, pri);
if unconnected.is_empty() {
return send_ret;
}

let connect_peers = PeerManagerEvent::ConnectPeersNow {
pids: unconnected.clone(),
};
if self.mgr_srv.unbounded_send(connect_peers).is_err() {
error!("network: peer manager service exit");
}

if send_ret.is_err() || !unconnected.is_empty() {
let other = send_ret.err().map(NetworkError::boxed);
let unconnected = if unconnected.is_empty() {
None
} else {
Some(unconnected)
};
pub fn send(&self, message: ProtocolMessage) -> Result<(), SendErrorKind> {
let ProtocolMessage {
target,
protocol_id,
data,
..
} = message;

return Err(NetworkError::MultiCast { unconnected, other });
match message.priority {
Priority::High => self.inner.quick_filter_broadcast(target, protocol_id, data),
Priority::Normal => self.inner.filter_broadcast(target, protocol_id, data),
}

Ok(())
}
}
55 changes: 23 additions & 32 deletions core/network/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
mod control;
mod keeper;
pub use control::ConnectionServiceControl;

pub use control::{ConnectionServiceControl, ProtocolMessage};
pub use keeper::ConnectionServiceKeeper;

use std::{
collections::VecDeque,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::{
channel::mpsc::UnboundedReceiver, channel::mpsc::UnboundedSender, pin_mut, stream::Stream,
};
use std::collections::VecDeque;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::channel::mpsc::UnboundedReceiver;
use futures::stream::Stream;
use log::debug;
use tentacle::{
builder::ServiceBuilder, error::SendErrorKind, multiaddr::Multiaddr, secio::SecioKeyPair,
service::Service,
};
use tentacle::builder::ServiceBuilder;
use tentacle::error::SendErrorKind;
use tentacle::multiaddr::Multiaddr;
use tentacle::secio::SecioKeyPair;
use tentacle::service::Service;

use crate::{
error::NetworkError,
event::{ConnectionEvent, PeerManagerEvent},
traits::{NetworkProtocol, SharedSessionBook},
};
use crate::error::NetworkError;
use crate::event::ConnectionEvent;
use crate::traits::NetworkProtocol;

pub struct ConnectionConfig {
/// Secio keypair for stream encryption and peer identity
Expand Down Expand Up @@ -114,14 +111,8 @@ impl<P: NetworkProtocol> ConnectionService<P> {
Ok(())
}

pub fn control<B: SharedSessionBook>(
&self,
mgr_tx: UnboundedSender<PeerManagerEvent>,
book: B,
) -> ConnectionServiceControl<P, B> {
let control_ref = self.inner.control();

ConnectionServiceControl::new(control_ref.clone(), mgr_tx, book)
pub fn control(&self) -> ConnectionServiceControl {
ConnectionServiceControl::new(self.inner.control().clone())
}

// BrokenPipe means service is closed.
Expand Down Expand Up @@ -205,7 +196,7 @@ impl<P: NetworkProtocol + Unpin> Future for ConnectionService<P> {
// No-empty means service is temporary unavailable, try later
while serv_mut.pending_events.is_empty() {
let event_rx = &mut serv_mut.event_rx;
pin_mut!(event_rx);
futures::pin_mut!(event_rx);

let event = crate::service_ready!("connection service", event_rx.poll_next(ctx));
debug!("network: event [{}]", event);
Expand All @@ -216,7 +207,7 @@ impl<P: NetworkProtocol + Unpin> Future for ConnectionService<P> {
// Advance service state
loop {
let inner = &mut serv_mut.inner;
pin_mut!(inner);
futures::pin_mut!(inner);

crate::service_ready!("connection service", inner.poll_next(ctx));
}
Expand Down
3 changes: 3 additions & 0 deletions core/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub enum ErrorKind {

#[display(fmt = "kind: untaggable {}", _0)]
Untaggable(String),

#[display(fmt = "kind: internal {}", _0)]
Internal(String),
}

impl Error for ErrorKind {}
Expand Down
8 changes: 0 additions & 8 deletions core/network/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@ use crate::{

use std::{collections::HashMap, str::FromStr};

#[derive(Constructor)]
#[non_exhaustive]
pub struct RawSessionMessage {
pub(crate) sid: SessionId,
pub(crate) pid: PeerId,
pub(crate) msg: Bytes,
}

pub struct Headers(HashMap<String, Vec<u8>>);

impl Default for Headers {
Expand Down
Loading

0 comments on commit 0322cd6

Please sign in to comment.