From e952875811e6a5b117c5eb22e74c1dd517e3ef2f Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Mon, 19 Aug 2024 11:21:51 +0200 Subject: [PATCH] deps: add umio as contrib package --- Cargo.toml | 1 + contrib/umio/Cargo.toml | 19 ++++ contrib/umio/README.md | 23 +++++ contrib/umio/src/buffer.rs | 87 ++++++++++++++++++ contrib/umio/src/dispatcher.rs | 135 ++++++++++++++++++++++++++++ contrib/umio/src/eloop.rs | 125 ++++++++++++++++++++++++++ contrib/umio/src/external.rs | 1 + contrib/umio/src/lib.rs | 17 ++++ contrib/umio/src/provider.rs | 72 +++++++++++++++ contrib/umio/tests/common/mod.rs | 69 ++++++++++++++ contrib/umio/tests/test_incoming.rs | 40 +++++++++ contrib/umio/tests/test_notify.rs | 31 +++++++ contrib/umio/tests/test_outgoing.rs | 39 ++++++++ contrib/umio/tests/test_shutdown.rs | 26 ++++++ contrib/umio/tests/test_timeout.rs | 34 +++++++ packages/utracker/Cargo.toml | 3 +- 16 files changed, 721 insertions(+), 1 deletion(-) create mode 100644 contrib/umio/Cargo.toml create mode 100644 contrib/umio/README.md create mode 100644 contrib/umio/src/buffer.rs create mode 100644 contrib/umio/src/dispatcher.rs create mode 100644 contrib/umio/src/eloop.rs create mode 100644 contrib/umio/src/external.rs create mode 100644 contrib/umio/src/lib.rs create mode 100644 contrib/umio/src/provider.rs create mode 100644 contrib/umio/tests/common/mod.rs create mode 100644 contrib/umio/tests/test_incoming.rs create mode 100644 contrib/umio/tests/test_notify.rs create mode 100644 contrib/umio/tests/test_outgoing.rs create mode 100644 contrib/umio/tests/test_shutdown.rs create mode 100644 contrib/umio/tests/test_timeout.rs diff --git a/Cargo.toml b/Cargo.toml index e8fd1d4f8..a2305115e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "contrib/umio", "examples/get_metadata", "examples/simple_torrent", "packages/bencode", diff --git a/contrib/umio/Cargo.toml b/contrib/umio/Cargo.toml new file mode 100644 index 000000000..7307f6b44 --- /dev/null +++ b/contrib/umio/Cargo.toml @@ -0,0 +1,19 @@ +[package] +authors = ["Andrew "] +description = "Message Based Readiness API In Rust" +keywords = ["message", "mio", "readyness"] +name = "umio" +readme = "README.md" + +categories.workspace = true +documentation.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +publish.workspace = true + +repository.workspace = true +version.workspace = true + +[dependencies] +mio = "0.5" diff --git a/contrib/umio/README.md b/contrib/umio/README.md new file mode 100644 index 000000000..0fe5411cc --- /dev/null +++ b/contrib/umio/README.md @@ -0,0 +1,23 @@ +umio-rs +======= +Message Based Readiness API In Rust. + +Thin layer over mio for working with a single udp socket while retaining access to timers and event loop channels. + + +License +------- + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +Contribution +------------ + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any +additional terms or conditions. diff --git a/contrib/umio/src/buffer.rs b/contrib/umio/src/buffer.rs new file mode 100644 index 000000000..c6b21b72f --- /dev/null +++ b/contrib/umio/src/buffer.rs @@ -0,0 +1,87 @@ +#[allow(clippy::module_name_repetitions)] +pub struct BufferPool { + // Use Stack For Temporal Locality + buffers: Vec, + buffer_size: usize, +} + +impl BufferPool { + pub fn new(buffer_size: usize) -> BufferPool { + let buffers = Vec::new(); + + BufferPool { buffers, buffer_size } + } + + pub fn pop(&mut self) -> Buffer { + self.buffers.pop().unwrap_or(Buffer::new(self.buffer_size)) + } + + pub fn push(&mut self, mut buffer: Buffer) { + buffer.reset_position(); + + self.buffers.push(buffer); + } +} + +//----------------------------------------------------------------------------// + +/// Reusable region of memory for incoming and outgoing messages. +pub struct Buffer { + buffer: Vec, + bytes_written: usize, +} + +impl Buffer { + fn new(len: usize) -> Buffer { + Buffer { + buffer: vec![0u8; len], + bytes_written: 0, + } + } + + fn reset_position(&mut self) { + self.set_written(0); + } + + /// Update the number of bytes written to the buffer. + pub fn set_written(&mut self, bytes: usize) { + self.bytes_written = bytes; + } +} + +impl AsRef<[u8]> for Buffer { + fn as_ref(&self) -> &[u8] { + &self.buffer[..self.bytes_written] + } +} + +impl AsMut<[u8]> for Buffer { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.bytes_written..] + } +} + +#[cfg(test)] +mod tests { + use super::{Buffer, BufferPool}; + + const DEFAULT_BUFFER_SIZE: usize = 1500; + + #[test] + fn positive_buffer_pool_buffer_len() { + let mut buffers = BufferPool::new(DEFAULT_BUFFER_SIZE); + let mut buffer = buffers.pop(); + + assert_eq!(buffer.as_mut().len(), DEFAULT_BUFFER_SIZE); + assert_eq!(buffer.as_ref().len(), 0); + } + + #[test] + fn positive_buffer_len_update() { + let mut buffer = Buffer::new(DEFAULT_BUFFER_SIZE); + buffer.set_written(DEFAULT_BUFFER_SIZE - 1); + + assert_eq!(buffer.as_mut().len(), 1); + assert_eq!(buffer.as_ref().len(), DEFAULT_BUFFER_SIZE - 1); + } +} diff --git a/contrib/umio/src/dispatcher.rs b/contrib/umio/src/dispatcher.rs new file mode 100644 index 000000000..a62fd5979 --- /dev/null +++ b/contrib/umio/src/dispatcher.rs @@ -0,0 +1,135 @@ +use std::collections::VecDeque; +use std::net::SocketAddr; + +use mio::udp::UdpSocket; +use mio::{EventLoop, EventSet, Handler, PollOpt, Token}; + +use crate::buffer::{Buffer, BufferPool}; +use crate::{provider, Provider}; + +/// Handles events occurring within the event loop. +pub trait Dispatcher: Sized { + type Timeout; + type Message: Send; + + /// Process an incoming message from the given address. + #[allow(unused)] + fn incoming(&mut self, provider: Provider<'_, Self>, message: &[u8], addr: SocketAddr) {} + + /// Process a message sent via the event loop channel. + #[allow(unused)] + fn notify(&mut self, provider: Provider<'_, Self>, message: Self::Message) {} + + /// Process a timeout that has been triggered. + #[allow(unused)] + fn timeout(&mut self, provider: Provider<'_, Self>, timeout: Self::Timeout) {} +} + +//----------------------------------------------------------------------------// + +const UDP_SOCKET_TOKEN: Token = Token(2); + +pub struct DispatchHandler { + dispatch: D, + out_queue: VecDeque<(Buffer, SocketAddr)>, + udp_socket: UdpSocket, + buffer_pool: BufferPool, + current_set: EventSet, +} + +impl DispatchHandler { + pub fn new( + udp_socket: UdpSocket, + buffer_size: usize, + dispatch: D, + event_loop: &mut EventLoop>, + ) -> DispatchHandler { + let buffer_pool = BufferPool::new(buffer_size); + let out_queue = VecDeque::new(); + + event_loop + .register(&udp_socket, UDP_SOCKET_TOKEN, EventSet::readable(), PollOpt::edge()) + .unwrap(); + + DispatchHandler { + dispatch, + out_queue, + udp_socket, + buffer_pool, + current_set: EventSet::readable(), + } + } + + pub fn handle_write(&mut self) { + if let Some((buffer, addr)) = self.out_queue.pop_front() { + self.udp_socket.send_to(buffer.as_ref(), &addr).unwrap(); + + self.buffer_pool.push(buffer); + }; + } + + pub fn handle_read(&mut self) -> Option<(Buffer, SocketAddr)> { + let mut buffer = self.buffer_pool.pop(); + + if let Ok(Some((bytes, addr))) = self.udp_socket.recv_from(buffer.as_mut()) { + buffer.set_written(bytes); + + Some((buffer, addr)) + } else { + None + } + } +} + +impl Handler for DispatchHandler { + type Timeout = D::Timeout; + type Message = D::Message; + + fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { + if token != UDP_SOCKET_TOKEN { + return; + } + + if events.is_writable() { + self.handle_write(); + } + + if events.is_readable() { + let Some((buffer, addr)) = self.handle_read() else { + return; + }; + + { + let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop); + + self.dispatch.incoming(provider, buffer.as_ref(), addr); + } + + self.buffer_pool.push(buffer); + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { + let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop); + + self.dispatch.notify(provider, msg); + } + + fn timeout(&mut self, event_loop: &mut EventLoop, timeout: Self::Timeout) { + let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop); + + self.dispatch.timeout(provider, timeout); + } + + fn tick(&mut self, event_loop: &mut EventLoop) { + self.current_set = if self.out_queue.is_empty() { + EventSet::readable() + } else { + EventSet::readable() | EventSet::writable() + }; + + event_loop + .reregister(&self.udp_socket, UDP_SOCKET_TOKEN, self.current_set, PollOpt::edge()) + .unwrap(); + } +} diff --git a/contrib/umio/src/eloop.rs b/contrib/umio/src/eloop.rs new file mode 100644 index 000000000..cedf030bf --- /dev/null +++ b/contrib/umio/src/eloop.rs @@ -0,0 +1,125 @@ +use std::io::Result; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +use mio::udp::UdpSocket; +use mio::{EventLoop, EventLoopConfig, Sender}; + +use crate::dispatcher::{DispatchHandler, Dispatcher}; + +const DEFAULT_BUFFER_SIZE: usize = 1500; +const DEFAULT_CHANNEL_CAPACITY: usize = 4096; +const DEFAULT_TIMER_CAPACITY: usize = 65536; + +/// Builder for specifying attributes of an event loop. +pub struct ELoopBuilder { + channel_capacity: usize, + timer_capacity: usize, + buffer_size: usize, + bind_address: SocketAddr, +} + +impl ELoopBuilder { + /// Create a new event loop builder. + #[must_use] + pub fn new() -> ELoopBuilder { + Self::default() + } + + /// Manually set the maximum channel message capacity. + #[must_use] + pub fn channel_capacity(mut self, capacity: usize) -> ELoopBuilder { + self.channel_capacity = capacity; + + self + } + + /// Manually set the maximum timer capacity. + #[must_use] + pub fn timer_capacity(mut self, capacity: usize) -> ELoopBuilder { + self.timer_capacity = capacity; + + self + } + + /// Manually set the bind address for the udp socket in the event loop. + #[must_use] + pub fn bind_address(mut self, address: SocketAddr) -> ELoopBuilder { + self.bind_address = address; + + self + } + + /// Manually set the length of buffers provided by the event loop. + #[must_use] + pub fn buffer_length(mut self, length: usize) -> ELoopBuilder { + self.buffer_size = length; + + self + } + + /// Build the event loop with the current builder. + /// + /// # Errors + /// + /// It would error when the builder config has an problem. + pub fn build(self) -> Result> { + ELoop::from_builder(&self) + } +} + +impl Default for ELoopBuilder { + fn default() -> Self { + let default_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)); + + ELoopBuilder { + channel_capacity: DEFAULT_CHANNEL_CAPACITY, + timer_capacity: DEFAULT_TIMER_CAPACITY, + buffer_size: DEFAULT_BUFFER_SIZE, + bind_address: default_addr, + } + } +} + +//----------------------------------------------------------------------------// + +/// Wrapper around the main application event loop. +pub struct ELoop { + buffer_size: usize, + socket_addr: SocketAddr, + event_loop: EventLoop>, +} + +impl ELoop { + fn from_builder(builder: &ELoopBuilder) -> Result> { + let mut event_loop_config = EventLoopConfig::new(); + event_loop_config + .notify_capacity(builder.channel_capacity) + .timer_capacity(builder.timer_capacity); + + let event_loop = EventLoop::configured(event_loop_config)?; + + Ok(ELoop { + buffer_size: builder.buffer_size, + socket_addr: builder.bind_address, + event_loop, + }) + } + + /// Grab a channel to send messages to the event loop. + #[must_use] + pub fn channel(&self) -> Sender { + self.event_loop.channel() + } + + /// Run the event loop with the given dispatcher until a shutdown occurs. + /// + /// # Errors + /// + /// It would error if unable to bind to the socket. + pub fn run(&mut self, dispatcher: D) -> Result<()> { + let udp_socket = UdpSocket::bound(&self.socket_addr)?; + let mut dispatch_handler = DispatchHandler::new(udp_socket, self.buffer_size, dispatcher, &mut self.event_loop); + + self.event_loop.run(&mut dispatch_handler) + } +} diff --git a/contrib/umio/src/external.rs b/contrib/umio/src/external.rs new file mode 100644 index 000000000..2ef9d04f8 --- /dev/null +++ b/contrib/umio/src/external.rs @@ -0,0 +1 @@ +pub use mio::{Sender, Timeout, TimerError, TimerResult}; diff --git a/contrib/umio/src/lib.rs b/contrib/umio/src/lib.rs new file mode 100644 index 000000000..c246f2dfe --- /dev/null +++ b/contrib/umio/src/lib.rs @@ -0,0 +1,17 @@ +//! Message Based Readiness API +//! +//! This library is a thin wrapper around mio for clients who wish to +//! use a single udp socket in conjunction with message passing and +//! timeouts. + +mod buffer; +mod dispatcher; +mod eloop; +mod provider; + +/// Exports of bare mio types. +pub mod external; + +pub use dispatcher::Dispatcher; +pub use eloop::{ELoop, ELoopBuilder}; +pub use provider::Provider; diff --git a/contrib/umio/src/provider.rs b/contrib/umio/src/provider.rs new file mode 100644 index 000000000..b9961292f --- /dev/null +++ b/contrib/umio/src/provider.rs @@ -0,0 +1,72 @@ +use std::collections::VecDeque; +use std::net::SocketAddr; + +use mio::{EventLoop, Sender, Timeout, TimerResult}; + +use crate::buffer::{Buffer, BufferPool}; +use crate::dispatcher::{DispatchHandler, Dispatcher}; + +/// Provides services to dispatcher clients. +pub struct Provider<'a, D: Dispatcher> { + buffer_pool: &'a mut BufferPool, + out_queue: &'a mut VecDeque<(Buffer, SocketAddr)>, + event_loop: &'a mut EventLoop>, +} + +pub fn new<'a, D: Dispatcher>( + buffer_pool: &'a mut BufferPool, + out_queue: &'a mut VecDeque<(Buffer, SocketAddr)>, + event_loop: &'a mut EventLoop>, +) -> Provider<'a, D> { + Provider { + buffer_pool, + out_queue, + event_loop, + } +} + +impl<'a, D: Dispatcher> Provider<'a, D> { + /// Grab a channel to send messages to the event loop. + #[must_use] + pub fn channel(&self) -> Sender { + self.event_loop.channel() + } + + /// Execute a closure with a buffer and send the buffer contents to the + /// destination address or reclaim the buffer and do not send anything. + pub fn outgoing(&mut self, out: F) + where + F: FnOnce(&mut [u8]) -> Option<(usize, SocketAddr)>, + { + let mut buffer = self.buffer_pool.pop(); + let opt_send_to = out(buffer.as_mut()); + + match opt_send_to { + None => self.buffer_pool.push(buffer), + Some((bytes, addr)) => { + buffer.set_written(bytes); + + self.out_queue.push_back((buffer, addr)); + } + } + } + + /// Set a timeout with the given delay and token. + /// + /// # Errors + /// + /// It would error when the timeout returns in a error. + pub fn set_timeout(&mut self, token: D::Timeout, delay: u64) -> TimerResult { + self.event_loop.timeout_ms(token, delay) + } + + /// Clear a timeout using the provided timeout identifier. + pub fn clear_timeout(&mut self, timeout: Timeout) -> bool { + self.event_loop.clear_timeout(timeout) + } + + /// Shutdown the event loop. + pub fn shutdown(&mut self) { + self.event_loop.shutdown(); + } +} diff --git a/contrib/umio/tests/common/mod.rs b/contrib/umio/tests/common/mod.rs new file mode 100644 index 000000000..97102082b --- /dev/null +++ b/contrib/umio/tests/common/mod.rs @@ -0,0 +1,69 @@ +use std::net::SocketAddr; +use std::sync::mpsc::{self}; + +use umio::{Dispatcher, Provider}; + +pub struct MockDispatcher { + send: mpsc::Sender, +} + +#[allow(dead_code)] +#[derive(Debug)] +pub enum MockMessage { + MessageReceived(Vec, SocketAddr), + TimeoutReceived(u32), + NotifyReceived, + + SendNotify, + SendMessage(Vec, SocketAddr), + SendTimeout(u32, u64), + + Shutdown, +} + +impl MockDispatcher { + pub fn new() -> (MockDispatcher, mpsc::Receiver) { + let (send, recv) = mpsc::channel(); + + (MockDispatcher { send }, recv) + } +} + +impl Dispatcher for MockDispatcher { + type Timeout = u32; + type Message = MockMessage; + + fn incoming(&mut self, _: Provider<'_, Self>, message: &[u8], addr: SocketAddr) { + let owned_message = message.to_vec(); + + self.send.send(MockMessage::MessageReceived(owned_message, addr)).unwrap(); + } + + fn notify(&mut self, mut provider: Provider<'_, Self>, msg: Self::Message) { + match msg { + MockMessage::SendMessage(message, addr) => { + provider.outgoing(|buffer| { + for (src, dst) in message.iter().zip(buffer.as_mut().iter_mut()) { + *dst = *src; + } + + Some((message.len(), addr)) + }); + } + MockMessage::SendTimeout(token, delay) => { + provider.set_timeout(token, delay).unwrap(); + } + MockMessage::SendNotify => { + self.send.send(MockMessage::NotifyReceived).unwrap(); + } + MockMessage::Shutdown => { + provider.shutdown(); + } + _ => panic!("Invalid Message To Send To Dispatcher: {msg:?}"), + } + } + + fn timeout(&mut self, _: Provider<'_, Self>, token: Self::Timeout) { + self.send.send(MockMessage::TimeoutReceived(token)).unwrap(); + } +} diff --git a/contrib/umio/tests/test_incoming.rs b/contrib/umio/tests/test_incoming.rs new file mode 100644 index 000000000..85bfb1545 --- /dev/null +++ b/contrib/umio/tests/test_incoming.rs @@ -0,0 +1,40 @@ +use std::net::UdpSocket; +use std::thread::{self}; +use std::time::Duration; + +use common::{MockDispatcher, MockMessage}; +use umio::ELoopBuilder; + +mod common; + +#[test] +fn positive_receive_incoming_message() { + let eloop_addr = "127.0.0.1:5050".parse().unwrap(); + let mut eloop = ELoopBuilder::new().bind_address(eloop_addr).build().unwrap(); + + let (dispatcher, dispatch_recv) = MockDispatcher::new(); + let dispatch_send = eloop.channel(); + + thread::spawn(move || { + eloop.run(dispatcher).unwrap(); + }); + thread::sleep(Duration::from_millis(50)); + + let socket_addr = "127.0.0.1:5051".parse().unwrap(); + let socket = UdpSocket::bind(socket_addr).unwrap(); + let message = b"This Is A Test Message"; + + socket.send_to(&message[..], eloop_addr).unwrap(); + thread::sleep(Duration::from_millis(50)); + + match dispatch_recv.try_recv() { + Ok(MockMessage::MessageReceived(msg, addr)) => { + assert_eq!(&msg[..], &message[..]); + + assert_eq!(addr, socket_addr); + } + _ => panic!("ELoop Failed To Receive Incoming Message"), + } + + dispatch_send.send(MockMessage::Shutdown).unwrap(); +} diff --git a/contrib/umio/tests/test_notify.rs b/contrib/umio/tests/test_notify.rs new file mode 100644 index 000000000..5b624dee6 --- /dev/null +++ b/contrib/umio/tests/test_notify.rs @@ -0,0 +1,31 @@ +use std::thread::{self}; +use std::time::Duration; + +use common::{MockDispatcher, MockMessage}; +use umio::ELoopBuilder; + +mod common; + +#[test] +fn positive_send_notify() { + let eloop_addr = "127.0.0.1:0".parse().unwrap(); + let mut eloop = ELoopBuilder::new().bind_address(eloop_addr).build().unwrap(); + + let (dispatcher, dispatch_recv) = MockDispatcher::new(); + let dispatch_send = eloop.channel(); + + thread::spawn(move || { + eloop.run(dispatcher).unwrap(); + }); + thread::sleep(Duration::from_millis(50)); + + dispatch_send.send(MockMessage::SendNotify).unwrap(); + thread::sleep(Duration::from_millis(50)); + + match dispatch_recv.try_recv() { + Ok(MockMessage::NotifyReceived) => (), + _ => panic!("ELoop Failed To Receive Incoming Message"), + } + + dispatch_send.send(MockMessage::Shutdown).unwrap(); +} diff --git a/contrib/umio/tests/test_outgoing.rs b/contrib/umio/tests/test_outgoing.rs new file mode 100644 index 000000000..3db172f75 --- /dev/null +++ b/contrib/umio/tests/test_outgoing.rs @@ -0,0 +1,39 @@ +use std::net::UdpSocket; +use std::thread::{self}; +use std::time::Duration; + +use common::{MockDispatcher, MockMessage}; +use umio::ELoopBuilder; + +mod common; + +#[test] +fn positive_send_outgoing_message() { + let eloop_addr = "127.0.0.1:5052".parse().unwrap(); + let mut eloop = ELoopBuilder::new().bind_address(eloop_addr).build().unwrap(); + + let (dispatcher, _) = MockDispatcher::new(); + let dispatch_send = eloop.channel(); + + thread::spawn(move || { + eloop.run(dispatcher).unwrap(); + }); + thread::sleep(Duration::from_millis(50)); + + let message = b"This Is A Test Message"; + let mut message_recv = [0u8; 22]; + let socket_addr = "127.0.0.1:5053".parse().unwrap(); + let socket = UdpSocket::bind(socket_addr).unwrap(); + dispatch_send + .send(MockMessage::SendMessage(message.to_vec(), socket_addr)) + .unwrap(); + thread::sleep(Duration::from_millis(50)); + + let (bytes, addr) = socket.recv_from(&mut message_recv).unwrap(); + + assert_eq!(bytes, message.len()); + assert_eq!(&message[..], &message_recv[..]); + assert_eq!(addr, eloop_addr); + + dispatch_send.send(MockMessage::Shutdown).unwrap(); +} diff --git a/contrib/umio/tests/test_shutdown.rs b/contrib/umio/tests/test_shutdown.rs new file mode 100644 index 000000000..dd7023f2f --- /dev/null +++ b/contrib/umio/tests/test_shutdown.rs @@ -0,0 +1,26 @@ +use std::thread::{self}; +use std::time::Duration; + +use common::{MockDispatcher, MockMessage}; +use umio::ELoopBuilder; + +mod common; + +#[test] +fn positive_execute_shutdown() { + let eloop_addr = "127.0.0.1:0".parse().unwrap(); + let mut eloop = ELoopBuilder::new().bind_address(eloop_addr).build().unwrap(); + + let (dispatcher, _) = MockDispatcher::new(); + let dispatch_send = eloop.channel(); + + thread::spawn(move || { + eloop.run(dispatcher).unwrap(); + }); + thread::sleep(Duration::from_millis(50)); + + dispatch_send.send(MockMessage::Shutdown).unwrap(); + thread::sleep(Duration::from_millis(50)); + + assert!(dispatch_send.send(MockMessage::SendNotify).is_err()); +} diff --git a/contrib/umio/tests/test_timeout.rs b/contrib/umio/tests/test_timeout.rs new file mode 100644 index 000000000..60f537016 --- /dev/null +++ b/contrib/umio/tests/test_timeout.rs @@ -0,0 +1,34 @@ +use std::thread::{self}; +use std::time::Duration; + +use common::{MockDispatcher, MockMessage}; +use umio::ELoopBuilder; + +mod common; + +#[test] +fn positive_send_notify() { + let eloop_addr = "127.0.0.1:0".parse().unwrap(); + let mut eloop = ELoopBuilder::new().bind_address(eloop_addr).build().unwrap(); + + let (dispatcher, dispatch_recv) = MockDispatcher::new(); + let dispatch_send = eloop.channel(); + + thread::spawn(move || { + eloop.run(dispatcher).unwrap(); + }); + thread::sleep(Duration::from_millis(50)); + + let token = 5; + dispatch_send.send(MockMessage::SendTimeout(token, 50)).unwrap(); + thread::sleep(Duration::from_millis(300)); + + match dispatch_recv.try_recv() { + Ok(MockMessage::TimeoutReceived(tkn)) => { + assert_eq!(tkn, token); + } + _ => panic!("ELoop Failed To Receive Timeout"), + } + + dispatch_send.send(MockMessage::Shutdown).unwrap(); +} diff --git a/packages/utracker/Cargo.toml b/packages/utracker/Cargo.toml index 29dce63e8..7c38d5269 100644 --- a/packages/utracker/Cargo.toml +++ b/packages/utracker/Cargo.toml @@ -19,6 +19,8 @@ version.workspace = true handshake = { path = "../handshake" } util = { path = "../util" } +umio = { path = "../../contrib/umio" } + byteorder = "1" chrono = "0" futures = "0" @@ -26,7 +28,6 @@ nom = "7" rand = "0" thiserror = "1" tracing = "0" -umio = "0" [dev-dependencies] tokio = { version = "1", features = ["full"] }