From 0d308454c913054512b117456e93873d0135f7fa Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Mon, 4 Dec 2023 15:32:11 +0100 Subject: [PATCH 01/11] fix non-registered waker in Runtime::with method --- src/reactor.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reactor.rs b/src/reactor.rs index 7b8939d..e17ee3e 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -401,11 +401,15 @@ pub struct Runtime { impl Runtime { /// Creates new reactor runtime using provided [`Poll`] engine and a service exposing /// [`Handler`] API to the reactor. - pub fn with(service: H, poller: P) -> io::Result { + pub fn with(service: H, mut poller: P) -> io::Result { let (ctl_send, ctl_recv) = chan::unbounded(); let (waker_writer, waker_reader) = P::Waker::pair()?; + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); + let waker_id = poller.register(&waker_reader, IoType::read_only()); + let controller = Controller { ctl_send, waker: waker_writer, From c4015910cd5ed4a4629993868b9c721ec3abb4f3 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Mon, 4 Dec 2023 15:35:44 +0100 Subject: [PATCH 02/11] refactor all resources ids to be sequential numbers --- src/poller/mod.rs | 13 ++--- src/poller/popol.rs | 47 ++++++++++------- src/reactor.rs | 124 ++++++++++++++++++++------------------------ src/resource.rs | 18 ++----- 4 files changed, 94 insertions(+), 108 deletions(-) diff --git a/src/poller/mod.rs b/src/poller/mod.rs index 7d51d08..cd7b2e2 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -27,11 +27,12 @@ pub mod popol; use std::fmt::{self, Display, Formatter}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::time::Duration; use std::{io, ops}; use crate::resource::Io; +use crate::ResourceId; /// Information about I/O events which has happened for a resource. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] @@ -147,19 +148,19 @@ pub enum IoFail { /// To read I/O events from the engine please use its Iterator interface. pub trait Poll where - Self: Send + Iterator)>, - for<'a> &'a mut Self: Iterator)>, + Self: Send + Iterator)>, + for<'a> &'a mut Self: Iterator)>, { /// Waker type used by the poll provider. type Waker: Waker; /// Registers a file-descriptor based resource for a poll. - fn register(&mut self, fd: &impl AsRawFd, interest: IoType); + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId; /// Unregisters a file-descriptor based resource from a poll. - fn unregister(&mut self, fd: &impl AsRawFd); + fn unregister(&mut self, id: ResourceId); /// Subscribes for a specific set of events for a given file descriptor-backed resource (see /// [`IoType`] for the details on event subscription). - fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool; + fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool; /// Runs single poll syscall over all registered resources with an optional timeout. /// diff --git a/src/poller/popol.rs b/src/poller/popol.rs index c699fbc..ee28168 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -30,12 +30,14 @@ use std::sync::Arc; use std::time::Duration; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; +use crate::ResourceId; /// Manager for a set of reactor which are polled for an event loop by the /// re-actor by using [`popol`] library. pub struct Poller { - poll: popol::Sources, - events: VecDeque>, + poll: popol::Sources, + events: VecDeque>, + id_top: ResourceId, } impl Default for Poller { @@ -48,6 +50,7 @@ impl Poller { Self { poll: popol::Sources::new(), events: empty!(), + id_top: 0, } } @@ -57,6 +60,7 @@ impl Poller { Self { poll: popol::Sources::with_capacity(capacity), events: VecDeque::with_capacity(capacity), + id_top: 0, } } } @@ -64,26 +68,29 @@ impl Poller { impl Poll for Poller { type Waker = PopolWaker; - fn register(&mut self, fd: &impl AsRawFd, interest: IoType) { + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId { + let id = self.id_top; + self.id_top += 1; + #[cfg(feature = "log")] - log::trace!(target: "popol", "Registering {}", fd.as_raw_fd()); - self.poll.register(fd.as_raw_fd(), fd, interest.into()); + log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id); + + self.poll.register(id, fd, interest.into()); + id } - fn unregister(&mut self, fd: &impl AsRawFd) { + fn unregister(&mut self, id: ResourceId) { #[cfg(feature = "log")] - log::trace!(target: "popol", "Unregistering {}", fd.as_raw_fd()); - self.poll.unregister(&fd.as_raw_fd()); + log::trace!(target: "popol", "Unregistering {}", id); + self.poll.unregister(&id); } - fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool { - let fd = fd.as_raw_fd(); - + fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool { #[cfg(feature = "log")] - log::trace!(target: "popol", "Setting interest `{interest}` on {}", fd); + log::trace!(target: "popol", "Setting interest `{interest}` on {}", id); - self.poll.unset(&fd, (!interest).into()); - self.poll.set(&fd, interest.into()) + self.poll.unset(&id, (!interest).into()); + self.poll.set(&id, interest.into()) } fn poll(&mut self, timeout: Option) -> io::Result { @@ -115,21 +122,21 @@ impl Poll for Poller { } impl Iterator for Poller { - type Item = (RawFd, Result); + type Item = (ResourceId, Result); fn next(&mut self) -> Option { let event = self.events.pop_front()?; - let fd = event.key; + let id = event.key; let fired = event.raw_events(); let res = if event.is_hangup() { #[cfg(feature = "log")] - log::trace!(target: "popol", "Hangup on {fd}"); + log::trace!(target: "popol", "Hangup on {id}"); Err(IoFail::Connectivity(fired)) } else if event.is_error() || event.is_invalid() { #[cfg(feature = "log")] - log::trace!(target: "popol", "OS error on {fd} (fired events {fired:#b})"); + log::trace!(target: "popol", "OS error on {id} (fired events {fired:#b})"); Err(IoFail::Os(fired)) } else { @@ -139,11 +146,11 @@ impl Iterator for Poller { }; #[cfg(feature = "log")] - log::trace!(target: "popol", "I/O event on {fd}: {io}"); + log::trace!(target: "popol", "I/O event on {id}: {io}"); Ok(io) }; - Some((fd, res)) + Some((id, res)) } } diff --git a/src/reactor.rs b/src/reactor.rs index e17ee3e..cd40d6b 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::thread::JoinHandle; use std::time::Duration; use std::{io, thread}; @@ -32,7 +32,7 @@ use crossbeam_channel as chan; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; use crate::resource::WriteError; -use crate::{Resource, Timer, Timestamp, WriteAtomic}; +use crate::{Resource, ResourceId, Timer, Timestamp, WriteAtomic}; /// Maximum amount of time to wait for I/O. const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); @@ -42,10 +42,10 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); #[display(doc_comments)] pub enum Error { /// transport {0} got disconnected during poll operation. - ListenerDisconnect(L::Id, L), + ListenerDisconnect(ResourceId, L), /// transport {0} got disconnected during poll operation. - TransportDisconnect(T::Id, T), + TransportDisconnect(ResourceId, T), /// polling multiple reactor has failed. Details: {0:?} Poll(io::Error), @@ -81,7 +81,7 @@ pub enum Action { /// closed, listener is not unbound, connections are not closed etc. All these actions must be /// handled by the handler upon the handover event. #[display("unregister_listener")] - UnregisterListener(L::Id), + UnregisterListener(ResourceId), /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via /// [`Handler::handover_transport`]. @@ -90,11 +90,11 @@ pub enum Action { /// closed, listener is not unbound, connections are not closed etc. All these actions must be /// handled by the handler upon the handover event. #[display("unregister_transport")] - UnregisterTransport(T::Id), + UnregisterTransport(ResourceId), /// Write the data to one of the transport resources using [`io::Write`]. #[display("send_to({0})")] - Send(T::Id, Vec), + Send(ResourceId, Vec), /// Set a new timer for a given duration from this moment. /// @@ -143,7 +143,7 @@ pub trait Handler: Send + Iterator::Id, + id: ResourceId, event: ::Event, time: Timestamp, ); @@ -151,7 +151,7 @@ pub trait Handler: Send + Iterator::Id, + id: ResourceId, event: ::Event, time: Timestamp, ); @@ -271,7 +271,7 @@ impl Reactor { let thread = builder.spawn(move || { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - poller.register(&waker_reader, IoType::read_only()); + let waker_id = poller.register(&waker_reader, IoType::read_only()); let runtime = Runtime { service, @@ -280,9 +280,8 @@ impl Reactor { ctl_recv, listeners: empty!(), transports: empty!(), - listener_map: empty!(), - transport_map: empty!(), waker: waker_reader, + waker_id, timeouts: Timer::new(), }; @@ -390,11 +389,10 @@ pub struct Runtime { poller: P, controller: Controller::Send>, ctl_recv: chan::Receiver>, - listener_map: HashMap::Id>, - transport_map: HashMap::Id>, - listeners: HashMap<::Id, H::Listener>, - transports: HashMap<::Id, H::Transport>, + listeners: HashMap, + transports: HashMap, waker: ::Recv, + waker_id: ResourceId, timeouts: Timer, } @@ -422,9 +420,8 @@ impl Runtime { ctl_recv, listeners: empty!(), transports: empty!(), - listener_map: empty!(), - transport_map: empty!(), waker: waker_reader, + waker_id, timeouts: Timer::new(), }) } @@ -442,11 +439,11 @@ impl Runtime { let before_poll = Timestamp::now(); let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT); - for res in self.listeners.values() { - self.poller.set_interest(res, res.interests()); + for (id, res) in &self.listeners { + self.poller.set_interest(*id, res.interests()); } - for res in self.transports.values() { - self.poller.set_interest(res, res.interests()); + for (id, res) in &self.transports { + self.poller.set_interest(*id, res.interests()); } // Blocking @@ -506,8 +503,8 @@ impl Runtime { let mut awoken = false; let mut unregister_queue = vec![]; - while let Some((fd, res)) = self.poller.next() { - if fd == self.waker.as_raw_fd() { + while let Some((id, res)) = self.poller.next() { + if id == self.waker_id { if let Err(err) = res { #[cfg(feature = "log")] log::error!(target: "reactor", "Polling waker has failed: {err}"); @@ -519,16 +516,16 @@ impl Runtime { self.waker.reset(); awoken = true; - } else if let Some(id) = self.listener_map.get(&fd) { + } else if self.listeners.contains_key(&id) { match res { Ok(io) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from listener {id} (fd={fd})"); + log::trace!(target: "reactor", "Got `{io}` event from listener {id}"); - let listener = self.listeners.get_mut(id).expect("resource disappeared"); + let listener = self.listeners.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = listener.handle_io(io) { - self.service.handle_listener_event(*id, event, time); + self.service.handle_listener_event(id, event, time); } } } @@ -536,26 +533,26 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} hung up (OS flags {flags:#b})"); - let listener = self.listeners.remove(id).expect("resource disappeared"); - unregister_queue.push(listener.as_raw_fd()); - self.service.handle_error(Error::ListenerDisconnect(*id, listener)); + let listener = self.listeners.remove(&id).expect("resource disappeared"); + unregister_queue.push(id); + self.service.handle_error(Error::ListenerDisconnect(id, listener)); } Err(IoFail::Os(flags)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})"); - self.unregister_listener(*id); + self.unregister_listener(id); } } - } else if let Some(id) = self.transport_map.get(&fd) { + } else if self.transports.contains_key(&id) { match res { Ok(io) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from transport {id} (fd={fd})"); + log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); - let transport = self.transports.get_mut(id).expect("resource disappeared"); + let transport = self.transports.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = transport.handle_io(io) { - self.service.handle_transport_event(*id, event, time); + self.service.handle_transport_event(id, event, time); } } } @@ -563,14 +560,14 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})"); - let transport = self.transports.remove(id).expect("resource disappeared"); - unregister_queue.push(transport.as_raw_fd()); - self.service.handle_error(Error::TransportDisconnect(*id, transport)); + let transport = self.transports.remove(&id).expect("resource disappeared"); + unregister_queue.push(id); + self.service.handle_error(Error::TransportDisconnect(id, transport)); } Err(IoFail::Os(posix_events)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})"); - self.unregister_transport(*id); + self.unregister_transport(id); } } } else { @@ -581,8 +578,8 @@ impl Runtime { } // We need this b/c of borrow checker - for fd in unregister_queue { - self.poller.unregister(&fd); + for id in unregister_queue { + self.poller.unregister(id); } awoken @@ -615,30 +612,26 @@ impl Runtime { ) -> Result<(), Error> { match action { Action::RegisterListener(listener) => { - let id = listener.id(); let fd = listener.as_raw_fd(); #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering listener on {id} (fd={fd})"); + log::debug!(target: "reactor", "Registering listener with fd={fd}"); - self.poller.register(&listener, IoType::read_only()); + let id = self.poller.register(&listener, IoType::read_only()); self.listeners.insert(id, listener); - self.listener_map.insert(fd, id); } Action::RegisterTransport(transport) => { - let id = transport.id(); let fd = transport.as_raw_fd(); #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering transport on {id} (fd={fd})"); + log::debug!(target: "reactor", "Registering transport with fd={fd}"); - self.poller.register(&transport, IoType::read_only()); + let id = self.poller.register(&transport, IoType::read_only()); self.transports.insert(id, transport); - self.transport_map.insert(fd, id); } Action::UnregisterListener(id) => { let Some(listener) = self.unregister_listener(id) else { - return Ok(()) + return Ok(()); }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id}"); @@ -646,7 +639,7 @@ impl Runtime { } Action::UnregisterTransport(id) => { let Some(transport) = self.unregister_transport(id) else { - return Ok(()) + return Ok(()); }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over transport {id}"); @@ -660,7 +653,7 @@ impl Runtime { #[cfg(feature = "log")] log::error!(target: "reactor", "Transport {id} is not in the reactor"); - return Ok(()) + return Ok(()); }; match transport.write_atomic(&data) { Err(WriteError::NotReady) => { @@ -699,11 +692,11 @@ impl Runtime { // We just drop here? } - fn unregister_listener(&mut self, id: ::Id) -> Option { + fn unregister_listener(&mut self, id: ResourceId) -> Option { let Some(listener) = self.listeners.remove(&id) else { #[cfg(feature = "log")] log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); - return None + return None; }; let fd = listener.as_raw_fd(); @@ -711,19 +704,16 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})"); - self.listener_map - .remove(&fd) - .expect("listener index content doesn't match registered listeners"); - self.poller.unregister(&listener); + self.poller.unregister(id); Some(listener) } - fn unregister_transport(&mut self, id: ::Id) -> Option { + fn unregister_transport(&mut self, id: ResourceId) -> Option { let Some(transport) = self.transports.remove(&id) else { #[cfg(feature = "log")] log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); - return None + return None; }; let fd = transport.as_raw_fd(); @@ -731,10 +721,7 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})"); - self.transport_map - .remove(&fd) - .expect("transport index content doesn't match registered transports"); - self.poller.unregister(&transport); + self.poller.unregister(id); Some(transport) } @@ -743,6 +730,7 @@ impl Runtime { #[cfg(test)] mod test { use std::io::stdout; + use std::os::fd::RawFd; use std::thread::sleep; use super::*; @@ -765,9 +753,7 @@ mod test { fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) } } impl Resource for DumbRes { - type Id = RawFd; type Event = (); - fn id(&self) -> Self::Id { self.0.as_raw_fd() } fn interests(&self) -> IoType { IoType::read_write() } fn handle_io(&mut self, _io: Io) -> Option { None } } @@ -815,7 +801,7 @@ mod test { } fn handle_listener_event( &mut self, - _d: ::Id, + _d: ResourceId, _event: ::Event, _time: Timestamp, ) { @@ -823,7 +809,7 @@ mod test { } fn handle_transport_event( &mut self, - _id: ::Id, + _id: ResourceId, _event: ::Event, _time: Timestamp, ) { diff --git a/src/resource.rs b/src/resource.rs index 8d56c7f..476f368 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -21,12 +21,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::hash::Hash; -use std::io::ErrorKind; +use std::io::{self, ErrorKind}; use std::os::unix::io::AsRawFd; -use std::os::unix::prelude::RawFd; -use std::{io, net}; use crate::poller::IoType; @@ -40,19 +38,16 @@ pub enum Io { Write, } -/// Marker traits for types which can be used as a reactor-managed [`Resource`] identifiers. -pub trait ResourceId: Copy + Eq + Ord + Hash + Send + Debug + Display {} +/// The resource identifier must be globally unique and non-reusable object. Because of this, +/// things like [`RawFd`] and socket addresses can't operate like resource identifiers. +pub type ResourceId = u64; /// A resource which can be managed by the reactor. pub trait Resource: AsRawFd + WriteAtomic + Send { - /// Resource identifier type. - type Id: ResourceId; /// Events which resource may generate upon receiving I/O from the reactor via /// [`Self::handle_io`]. These events are passed to the reactor [`crate::Handler`]. type Event; - /// Method returning the [`ResourceId`]. - fn id(&self) -> Self::Id; /// Method informing the reactor which types of events this resource is subscribed for. fn interests(&self) -> IoType; @@ -61,9 +56,6 @@ pub trait Resource: AsRawFd + WriteAtomic + Send { fn handle_io(&mut self, io: Io) -> Option; } -impl ResourceId for net::SocketAddr {} -impl ResourceId for RawFd {} - /// Error during write operation for a reactor-managed [`Resource`]. #[derive(Debug, Display, Error, From)] pub enum WriteError { From dda46f2863509cb4ee606a17bbdbcbb1af634bb7 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Wed, 20 Dec 2023 14:23:11 +0100 Subject: [PATCH 03/11] add Handle::handle_registered method and provide resource ids on handovers --- src/reactor.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index cd40d6b..55e78ac 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use std::thread::JoinHandle; use std::time::Duration; use std::{io, thread}; @@ -156,6 +156,14 @@ pub trait Handler: Send + Iterator Runtime { let id = self.poller.register(&listener, IoType::read_only()); self.listeners.insert(id, listener); + self.service.handle_registered(fd, id); } Action::RegisterTransport(transport) => { let fd = transport.as_raw_fd(); @@ -628,6 +637,7 @@ impl Runtime { let id = self.poller.register(&transport, IoType::read_only()); self.transports.insert(id, transport); + self.service.handle_registered(fd, id); } Action::UnregisterListener(id) => { let Some(listener) = self.unregister_listener(id) else { @@ -635,7 +645,7 @@ impl Runtime { }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id}"); - self.service.handover_listener(listener); + self.service.handover_listener(id, listener); } Action::UnregisterTransport(id) => { let Some(transport) = self.unregister_transport(id) else { @@ -643,7 +653,7 @@ impl Runtime { }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over transport {id}"); - self.service.handover_transport(transport); + self.service.handover_transport(id, transport); } Action::Send(id, data) => { #[cfg(feature = "log")] @@ -815,6 +825,7 @@ mod test { ) { unreachable!() } + fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId) {} fn handle_command(&mut self, cmd: Self::Command) { match cmd { Cmd::Init => { @@ -829,8 +840,12 @@ mod test { fn handle_error(&mut self, err: Error) { panic!("{err}") } - fn handover_listener(&mut self, _listener: Self::Listener) { unreachable!() } - fn handover_transport(&mut self, _transport: Self::Transport) { unreachable!() } + fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) { + unreachable!() + } + fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) { + unreachable!() + } } let reactor = Reactor::new(DumbService::default(), poller::popol::Poller::new()).unwrap(); From 8244a415235f9411635ddebeb5f461b70058df41 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Wed, 20 Dec 2023 14:26:37 +0100 Subject: [PATCH 04/11] make ResourceId newtype instead of typedef --- src/poller/popol.rs | 6 +++--- src/resource.rs | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/poller/popol.rs b/src/poller/popol.rs index ee28168..a01224b 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -50,7 +50,7 @@ impl Poller { Self { poll: popol::Sources::new(), events: empty!(), - id_top: 0, + id_top: ResourceId::ZERO, } } @@ -60,7 +60,7 @@ impl Poller { Self { poll: popol::Sources::with_capacity(capacity), events: VecDeque::with_capacity(capacity), - id_top: 0, + id_top: ResourceId::ZERO, } } } @@ -70,7 +70,7 @@ impl Poll for Poller { fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId { let id = self.id_top; - self.id_top += 1; + self.id_top.inc(); #[cfg(feature = "log")] log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id); diff --git a/src/resource.rs b/src/resource.rs index 476f368..bca5a72 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -40,7 +40,15 @@ pub enum Io { /// The resource identifier must be globally unique and non-reusable object. Because of this, /// things like [`RawFd`] and socket addresses can't operate like resource identifiers. -pub type ResourceId = u64; +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] +#[display(inner)] +pub struct ResourceId(u64); + +impl ResourceId { + pub(crate) const ZERO: ResourceId = ResourceId(0); + + pub(crate) fn inc(&mut self) { self.0 += 1 } +} /// A resource which can be managed by the reactor. pub trait Resource: AsRawFd + WriteAtomic + Send { From 4299b8468dc5228c7ba4e6b36a8fe7884aad9865 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Tue, 26 Dec 2023 10:06:54 +0100 Subject: [PATCH 05/11] chore: update deps to fix nightly build --- Cargo.lock | 251 +++++++++++++++++++---------------------------------- 1 file changed, 89 insertions(+), 162 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d79985..bb64f4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "amplify" -version = "4.0.0" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26966af46e0d200e8bf2b7f16230997c1c3f2d141bc27ccc091c012ed527b58" +checksum = "8629db306c0bbeb0a402e2918bdcf0026b5ddb24c46460f3bf5410b350d98710" dependencies = [ "amplify_derive", "amplify_num", @@ -16,31 +16,34 @@ dependencies = [ [[package]] name = "amplify_derive" -version = "3.0.0" +version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580f12b79a9e10cfa8d2515128d83a53f387e290096a75904c92b8a2a4d542a6" +checksum = "759dcbfaf94d838367a86d493ec34ccc8aa6fe365cb7880d6bf89006de24d9c1" dependencies = [ "amplify_syn", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] name = "amplify_num" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddce3bc63e807ea02065e8d8b702695f3d302ae4158baddff8b0ce5c73947251" +checksum = "9681187211554ab98f138ba159e90861b136c20afc680dcff2ba82d020721e27" +dependencies = [ + "wasm-bindgen", +] [[package]] name = "amplify_syn" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29b08d74fda406d5a94abfdcdb91ba13bb06562ccf0a4581867fa924ca242b01" +checksum = "7736fb8d473c0d83098b5bac44df6a561e20470375cd8bcae30516dc889fd62a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -63,9 +66,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bumpalo" -version = "3.12.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "cfg-if" @@ -75,18 +78,18 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "concurrent-queue" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" dependencies = [ "crossbeam-utils", ] [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" dependencies = [ "cfg-if", "crossbeam-utils", @@ -94,23 +97,13 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.15" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ "cfg-if", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "io-reactor" version = "0.2.1" @@ -126,49 +119,48 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.141" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "log" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ - "cfg-if", "value-bag", ] [[package]] name = "mio" -version = "0.8.6" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "log", "wasi", - "windows-sys 0.45.0", + "windows-sys", ] [[package]] name = "once_cell" -version = "1.17.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "polling" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be1c66a6add46bff50935c313dae30a5030cf8385c5206e8a95e9e9def974aa" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" dependencies = [ "autocfg", "bitflags", @@ -177,7 +169,7 @@ dependencies = [ "libc", "log", "pin-project-lite", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -191,18 +183,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.56" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.26" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -219,26 +211,27 @@ dependencies = [ ] [[package]] -name = "unicode-ident" -version = "1.0.8" +name = "syn" +version = "2.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +checksum = "ee659fb5f3d355364e1f3e5bc10fb82068efbf824a1e9d1c9504244a6469ad53" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] [[package]] -name = "value-bag" -version = "1.0.0-alpha.9" +name = "unicode-ident" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" -dependencies = [ - "ctor", - "version_check", -] +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] -name = "version_check" -version = "0.9.4" +name = "value-bag" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "4a72e1902dde2bd6441347de2b70b7f5d59bf157c6c62f0c44572607a1d55bbe" [[package]] name = "wasi" @@ -248,9 +241,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -258,24 +251,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.43", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -283,31 +276,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.43", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" - -[[package]] -name = "windows-sys" -version = "0.45.0" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.1", -] +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "windows-sys" @@ -315,119 +299,62 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets 0.48.0", + "windows-targets", ] [[package]] name = "windows-targets" -version = "0.42.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows-targets" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" -dependencies = [ - "windows_aarch64_gnullvm 0.48.0", - "windows_aarch64_msvc 0.48.0", - "windows_i686_gnu 0.48.0", - "windows_i686_msvc 0.48.0", - "windows_x86_64_gnu 0.48.0", - "windows_x86_64_gnullvm 0.48.0", - "windows_x86_64_msvc 0.48.0", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - -[[package]] -name = "windows_i686_gnu" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" - -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnullvm" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" From ade2a86dcb97985b6850ae70bb7551a847a5a2b3 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Tue, 26 Dec 2023 10:08:02 +0100 Subject: [PATCH 06/11] ci: fix MSRV-breaking import --- src/reactor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/reactor.rs b/src/reactor.rs index 55e78ac..1457187 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -740,7 +740,6 @@ impl Runtime { #[cfg(test)] mod test { use std::io::stdout; - use std::os::fd::RawFd; use std::thread::sleep; use super::*; From 135612ff3367eaeca4994d7e5d43f35cce3a5566 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Tue, 26 Dec 2023 10:09:30 +0100 Subject: [PATCH 07/11] ci: fix feature-based builds --- src/resource.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/resource.rs b/src/resource.rs index bca5a72..2977c63 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -44,6 +44,7 @@ pub enum Io { #[display(inner)] pub struct ResourceId(u64); +#[allow(dead_code)] // We need this before we've got non-popol implementations impl ResourceId { pub(crate) const ZERO: ResourceId = ResourceId(0); From 93b289fdc3478365219a93a5f0b79156595faa22 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Tue, 26 Dec 2023 10:13:28 +0100 Subject: [PATCH 08/11] ci: increase MSRV required by amplify dependency --- .github/workflows/build.yml | 6 +++--- Cargo.toml | 2 +- MANIFEST.yml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d33a0c8..71a7f5e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -58,10 +58,10 @@ jobs: fail-fast: false matrix: os: - - ubuntu-20.04 - ubuntu-22.04 - - macos-11 + - ubuntu-latest - macos-12 + - macos-latest # - windows-2019 # - windows-2022 steps: @@ -81,7 +81,7 @@ jobs: strategy: fail-fast: false matrix: - toolchain: [ nightly, beta, stable, 1.65.0 ] + toolchain: [ nightly, beta, stable, 1.66.0 ] steps: - uses: actions/checkout@v2 - name: Install rust ${{ matrix.toolchain }} diff --git a/Cargo.toml b/Cargo.toml index 4d3ecf3..00d21b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["reactor", "networking", "patterns", "concurrency", "poll"] categories = ["concurrency", "asynchronous", "network-programming", "rust-patterns"] homepage = "https://github.com/rust-amplify" repository = "https://github.com/rust-amplify/io-reactor" -rust-version = "1.65" # Due to if ... let clause +rust-version = "1.66" # Due to amplify dependency edition = "2021" license = "Apache-2.0" readme = "README.md" diff --git a/MANIFEST.yml b/MANIFEST.yml index a6c4d4d..52261e7 100644 --- a/MANIFEST.yml +++ b/MANIFEST.yml @@ -3,7 +3,7 @@ Type: Library Kind: Free software License: Apache-2.0 Language: Rust -Compiler: 1.60 +Compiler: 1.66 Author: Maxim Orlovsky Maintained: UBIDECO Institute, Switzerland Maintainers: From e3374754f16ca74376a98dd41bff7d5fdb3c0daa Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 4 Jan 2024 16:32:52 +0100 Subject: [PATCH 09/11] new mechanism for resource id generation --- src/lib.rs | 2 +- src/poller/mod.rs | 2 ++ src/poller/popol.rs | 22 ++++++++++++++++------ src/reactor.rs | 9 +++------ src/resource.rs | 22 ++++++++++++++++++---- 5 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 420ae01..cecd07f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,7 +76,7 @@ mod reactor; mod resource; mod timeouts; -pub use resource::{Io, Resource, ResourceId, WriteAtomic, WriteError}; +pub use resource::{Io, Resource, ResourceId, ResourceIdGenerator, WriteAtomic, WriteError}; pub use timeouts::{Timer, Timestamp}; pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime}; diff --git a/src/poller/mod.rs b/src/poller/mod.rs index cd7b2e2..4dd3cbd 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -154,6 +154,8 @@ where /// Waker type used by the poll provider. type Waker: Waker; + /// Registers a waker object. + fn register_waker(&mut self, fd: &impl AsRawFd); /// Registers a file-descriptor based resource for a poll. fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId; /// Unregisters a file-descriptor based resource from a poll. diff --git a/src/poller/popol.rs b/src/poller/popol.rs index a01224b..d791a06 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -30,14 +30,14 @@ use std::sync::Arc; use std::time::Duration; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; -use crate::ResourceId; +use crate::{ResourceId, ResourceIdGenerator}; /// Manager for a set of reactor which are polled for an event loop by the /// re-actor by using [`popol`] library. pub struct Poller { poll: popol::Sources, events: VecDeque>, - id_top: ResourceId, + id_gen: ResourceIdGenerator, } impl Default for Poller { @@ -50,7 +50,7 @@ impl Poller { Self { poll: popol::Sources::new(), events: empty!(), - id_top: ResourceId::ZERO, + id_gen: ResourceIdGenerator::default(), } } @@ -60,7 +60,7 @@ impl Poller { Self { poll: popol::Sources::with_capacity(capacity), events: VecDeque::with_capacity(capacity), - id_top: ResourceId::ZERO, + id_gen: ResourceIdGenerator::default(), } } } @@ -68,9 +68,19 @@ impl Poller { impl Poll for Poller { type Waker = PopolWaker; + fn register_waker(&mut self, fd: &impl AsRawFd) { + let id = ResourceId::WAKER; + if self.poll.get(&id).is_some() { + #[cfg(feature = "log")] + log::error!(target: "popol", "Reactor waker is already registered, terminating"); + panic!("Reactor waker is already registered"); + } + + self.poll.register(id, fd, popol::interest::READ); + } + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId { - let id = self.id_top; - self.id_top.inc(); + let id = self.id_gen.next(); #[cfg(feature = "log")] log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id); diff --git a/src/reactor.rs b/src/reactor.rs index 1457187..61df204 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -279,7 +279,7 @@ impl Reactor { let thread = builder.spawn(move || { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - let waker_id = poller.register(&waker_reader, IoType::read_only()); + poller.register_waker(&waker_reader); let runtime = Runtime { service, @@ -289,7 +289,6 @@ impl Reactor { listeners: empty!(), transports: empty!(), waker: waker_reader, - waker_id, timeouts: Timer::new(), }; @@ -400,7 +399,6 @@ pub struct Runtime { listeners: HashMap, transports: HashMap, waker: ::Recv, - waker_id: ResourceId, timeouts: Timer, } @@ -414,7 +412,7 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - let waker_id = poller.register(&waker_reader, IoType::read_only()); + poller.register_waker(&waker_reader); let controller = Controller { ctl_send, @@ -429,7 +427,6 @@ impl Runtime { listeners: empty!(), transports: empty!(), waker: waker_reader, - waker_id, timeouts: Timer::new(), }) } @@ -512,7 +509,7 @@ impl Runtime { let mut unregister_queue = vec![]; while let Some((id, res)) = self.poller.next() { - if id == self.waker_id { + if id == ResourceId::WAKER { if let Err(err) = res { #[cfg(feature = "log")] log::error!(target: "reactor", "Polling waker has failed: {err}"); diff --git a/src/resource.rs b/src/resource.rs index 2977c63..d94d8a4 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -38,17 +38,31 @@ pub enum Io { Write, } +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] +#[display(inner)] +pub struct ResourceIdGenerator(u64); + +impl Default for ResourceIdGenerator { + fn default() -> Self { ResourceIdGenerator(1) } +} + +#[allow(dead_code)] // We need this before we've got non-popol implementations +impl ResourceIdGenerator { + pub fn next(&mut self) -> ResourceId { + let id = self.0; + self.0 += 1; + ResourceId(id) + } +} + /// The resource identifier must be globally unique and non-reusable object. Because of this, /// things like [`RawFd`] and socket addresses can't operate like resource identifiers. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] #[display(inner)] pub struct ResourceId(u64); -#[allow(dead_code)] // We need this before we've got non-popol implementations impl ResourceId { - pub(crate) const ZERO: ResourceId = ResourceId(0); - - pub(crate) fn inc(&mut self) { self.0 += 1 } + pub const WAKER: ResourceId = ResourceId(0); } /// A resource which can be managed by the reactor. From 5fbdfeb5d2a6beabba1ba9faed54bde50a3a53d3 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 4 Jan 2024 16:36:44 +0100 Subject: [PATCH 10/11] doc: document new resource id generator --- src/resource.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/resource.rs b/src/resource.rs index d94d8a4..e250577 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -38,6 +38,8 @@ pub enum Io { Write, } +/// Generator for the new [`ResourceId`]s which should be used by pollers implementing [`Poll`] +/// trait. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] #[display(inner)] pub struct ResourceIdGenerator(u64); @@ -48,6 +50,7 @@ impl Default for ResourceIdGenerator { #[allow(dead_code)] // We need this before we've got non-popol implementations impl ResourceIdGenerator { + /// Returns the next id for the resource. pub fn next(&mut self) -> ResourceId { let id = self.0; self.0 += 1; @@ -62,6 +65,7 @@ impl ResourceIdGenerator { pub struct ResourceId(u64); impl ResourceId { + /// Resource id for the waker (always zero). pub const WAKER: ResourceId = ResourceId(0); } From 51f7c65d285998631b5d397390c0272c1be6f9b5 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 4 Jan 2024 16:37:47 +0100 Subject: [PATCH 11/11] chore: fix linter warnings --- src/reactor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/reactor.rs b/src/reactor.rs index 61df204..48c1490 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -21,6 +21,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(unused_variables)] // because we need them for feature-gated logger + use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; use std::os::unix::io::{AsRawFd, RawFd};