From ceb7c485f45180f72e1a74417dbb7a9aacd56c15 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 4 Jul 2024 15:34:11 +0200 Subject: [PATCH] error: fix missed service notifications on all resource unregistrations --- src/poller/mod.rs | 4 ++-- src/reactor.rs | 46 ++++++++++++---------------------------------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/src/poller/mod.rs b/src/poller/mod.rs index 4dd3cbd..ab9d577 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -133,9 +133,9 @@ impl Display for IoType { #[derive(Copy, Clone, Debug, Display, Error)] #[display(doc_comments)] pub enum IoFail { - /// connection is absent (POSIX events {0:#b}) + /// hung up (POSIX events {0:#b}) Connectivity(i16), - /// OS-level error (POSIX events {0:#b}) + /// errored (POSIX events {0:#b}) Os(i16), } diff --git a/src/reactor.rs b/src/reactor.rs index ac7eebf..a7f8bb1 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -32,7 +32,7 @@ use std::{io, thread}; use crossbeam_channel as chan; -use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; +use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend}; use crate::resource::WriteError; use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic}; @@ -505,11 +505,10 @@ impl Runtime { /// # Returns /// - /// Whether it was awaken by a waker + /// Whether it was awakened by a waker fn handle_events(&mut self, time: Timestamp) -> bool { let mut awoken = false; - let mut unregister_queue = vec![]; while let Some((id, res)) = self.poller.next() { if id == ResourceId::WAKER { if let Err(err) = res { @@ -536,19 +535,13 @@ impl Runtime { } } } - Err(IoFail::Connectivity(flags)) => { + Err(err) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Listener {id} hung up (OS flags {flags:#b})"); - - let listener = self.listeners.remove(&id).expect("resource disappeared"); - unregister_queue.push(id); + log::trace!(target: "reactor", "Listener {id} {err}"); + let listener = + self.unregister_listener(id).expect("listener has disappeared"); self.service.handle_error(Error::ListenerDisconnect(id, listener)); } - Err(IoFail::Os(flags)) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})"); - self.unregister_listener(id); - } } } else if self.transports.contains_key(&id) { match res { @@ -563,19 +556,13 @@ impl Runtime { } } } - Err(IoFail::Connectivity(posix_events)) => { + Err(err) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})"); - - let transport = self.transports.remove(&id).expect("resource disappeared"); - unregister_queue.push(id); + log::trace!(target: "reactor", "Transport {id} {err}"); + let transport = + self.unregister_transport(id).expect("transport has disappeared"); self.service.handle_error(Error::TransportDisconnect(id, transport)); } - Err(IoFail::Os(posix_events)) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})"); - self.unregister_transport(id); - } } } else { panic!( @@ -584,11 +571,6 @@ impl Runtime { } } - // We need this b/c of borrow checker - for id in unregister_queue { - self.poller.unregister(id); - } - awoken } @@ -708,10 +690,8 @@ impl Runtime { return None; }; - let fd = listener.as_raw_fd(); - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})"); + log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd()); self.poller.unregister(id); @@ -725,10 +705,8 @@ impl Runtime { return None; }; - let fd = transport.as_raw_fd(); - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})"); + log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd()); self.poller.unregister(id);