From a104cb48b94520607493aaf96646ce41cdc730f2 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Mon, 10 Apr 2023 13:05:05 +0200 Subject: [PATCH 01/10] chore: update dependencies --- Cargo.lock | 52 ++++++++++++++++++++++++++-------------------------- Cargo.toml | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1ca1db..9862730 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,9 +94,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.14" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ "cfg-if", ] @@ -126,9 +126,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.139" +version = "0.2.141" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" [[package]] name = "log" @@ -191,27 +191,27 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.51" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.23" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] [[package]] name = "syn" -version = "1.0.107" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -220,9 +220,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.6" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" [[package]] name = "value-bag" @@ -350,9 +350,9 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" [[package]] name = "windows_aarch64_gnullvm" @@ -362,9 +362,9 @@ checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" [[package]] name = "windows_aarch64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" [[package]] name = "windows_aarch64_msvc" @@ -374,9 +374,9 @@ checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" [[package]] name = "windows_i686_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" [[package]] name = "windows_i686_gnu" @@ -386,9 +386,9 @@ checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" [[package]] name = "windows_i686_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" [[package]] name = "windows_i686_msvc" @@ -398,9 +398,9 @@ checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" [[package]] name = "windows_x86_64_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" [[package]] name = "windows_x86_64_gnu" @@ -410,9 +410,9 @@ checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" [[package]] name = "windows_x86_64_gnullvm" @@ -422,9 +422,9 @@ checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" [[package]] name = "windows_x86_64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" [[package]] name = "windows_x86_64_msvc" diff --git a/Cargo.toml b/Cargo.toml index 4d1796b..416ddaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "io-reactor" -version = "0.1.2" +version = "0.2.0" description = "Concurrent I/O resource management using reactor pattern" authors = [ "Dr. Maxim Orlovsky ", From 5acfeab86061e7a0d2ba447ecae32e840d82bd5e Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Mon, 10 Apr 2023 13:05:59 +0200 Subject: [PATCH 02/10] errors: remove i16 poll implementation details. Closes #14 --- src/reactor.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index 624d373..78dc704 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -57,16 +57,16 @@ pub enum Error { WriteLogicError(T::Id, Vec), /// transport {0} got disconnected during poll operation. - ListenerDisconnect(L::Id, L, i16), + ListenerDisconnect(L::Id, L), /// transport {0} got disconnected during poll operation. - TransportDisconnect(T::Id, T, i16), + TransportDisconnect(T::Id, T), /// poll on listener {0} has returned error. - ListenerPollError(L::Id, i16), + ListenerPollError(L::Id), /// poll on transport {0} has returned error. - TransportPollError(T::Id, i16), + TransportPollError(T::Id), /// polling multiple reactor has failed. Details: {0:?} Poll(io::Error), @@ -604,13 +604,13 @@ impl Runtime { let listener = self.listeners.remove(id).expect("resource disappeared"); unregister_queue.push(listener.as_raw_fd()); - self.service.handle_error(Error::ListenerDisconnect(*id, listener, flags)); + self.service.handle_error(Error::ListenerDisconnect(*id, listener)); } Err(IoFail::Os(flags)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})"); - self.service.handle_error(Error::ListenerPollError(*id, flags)); + self.service.handle_error(Error::ListenerPollError(*id)); } } } else if let Some(id) = self.transport_map.get(&fd) { @@ -632,14 +632,13 @@ impl Runtime { let transport = self.transports.remove(id).expect("resource disappeared"); unregister_queue.push(transport.as_raw_fd()); - self.service - .handle_error(Error::TransportDisconnect(*id, transport, flags)); + self.service.handle_error(Error::TransportDisconnect(*id, transport)); } Err(IoFail::Os(flags)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} errored (OS flags {flags:#b})"); - self.service.handle_error(Error::TransportPollError(*id, flags)); + self.service.handle_error(Error::TransportPollError(*id)); } } } else { From 160cb80713b375bd0b4f17e1ed3650d9865ffa2c Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Mon, 10 Apr 2023 13:31:54 +0200 Subject: [PATCH 03/10] make unregistering unknown transport/listener idempotent --- src/reactor.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index 78dc704..70af72e 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -44,12 +44,6 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); #[derive(Error, Display, From)] #[display(doc_comments)] pub enum Error { - /// unknown listener {0} - ListenerUnknown(L::Id), - - /// unknown transport {0} - TransportUnknown(T::Id), - /// unable to write to transport {0}. Details: {1:?} WriteFailure(T::Id, io::Error), @@ -700,7 +694,12 @@ impl Runtime { self.transport_map.insert(fd, id); } Action::UnregisterListener(id) => { - let listener = self.listeners.remove(&id).ok_or(Error::ListenerUnknown(id))?; + let Some(listener) = self.listeners.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); + + return Ok(()) + }; let fd = listener.as_raw_fd(); #[cfg(feature = "log")] @@ -713,7 +712,12 @@ impl Runtime { self.service.handover_listener(listener); } Action::UnregisterTransport(id) => { - let transport = self.transports.remove(&id).ok_or(Error::TransportUnknown(id))?; + let Some(transport) = self.transports.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); + + return Ok(()) + }; let fd = transport.as_raw_fd(); #[cfg(feature = "log")] @@ -729,12 +733,12 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len()); - let transport = self.transports.get_mut(&id).ok_or_else(|| { + let Some(transport) = self.transports.get_mut(&id) else { #[cfg(feature = "log")] log::error!(target: "reactor", "Transport {id} is not in the reactor"); - Error::TransportUnknown(id) - })?; + return Ok(()) + }; transport.write_atomic(&data).map_err(|err| match err { WriteError::NotReady => { #[cfg(feature = "log")] From 12d0c17eebe4b693dea55519b62ac1e9d4857fa8 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 11 May 2023 20:37:33 +0200 Subject: [PATCH 04/10] improve error messages --- Cargo.lock | 16 ++++++++-------- src/reactor.rs | 13 +++++++------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9862730..a61fc8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,7 +113,7 @@ dependencies = [ [[package]] name = "io-reactor" -version = "0.1.2" +version = "0.2.0" dependencies = [ "amplify", "crossbeam-channel", @@ -324,13 +324,13 @@ version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" dependencies = [ - "windows_aarch64_gnullvm 0.42.1", - "windows_aarch64_msvc 0.42.1", - "windows_i686_gnu 0.42.1", - "windows_i686_msvc 0.42.1", - "windows_x86_64_gnu 0.42.1", - "windows_x86_64_gnullvm 0.42.1", - "windows_x86_64_msvc 0.42.1", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] diff --git a/src/reactor.rs b/src/reactor.rs index 70af72e..0a62de4 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -42,7 +42,6 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); /// Reactor errors #[derive(Error, Display, From)] -#[display(doc_comments)] pub enum Error { /// unable to write to transport {0}. Details: {1:?} WriteFailure(T::Id, io::Error), @@ -59,7 +58,9 @@ pub enum Error { /// poll on listener {0} has returned error. ListenerPollError(L::Id), - /// poll on transport {0} has returned error. + // to transport disconnect + /// Poll request has returned [`IoFail::Os`] event for a specific resource. + #[display("transport {0} failed during I/O poll")] TransportPollError(T::Id), /// polling multiple reactor has failed. Details: {0:?} @@ -620,17 +621,17 @@ impl Runtime { } } } - Err(IoFail::Connectivity(flags)) => { + Err(IoFail::Connectivity(posix_events)) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Transport {id} hanged up (OS flags {flags:#b})"); + log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})"); let transport = self.transports.remove(id).expect("resource disappeared"); unregister_queue.push(transport.as_raw_fd()); self.service.handle_error(Error::TransportDisconnect(*id, transport)); } - Err(IoFail::Os(flags)) => { + Err(IoFail::Os(posix_events)) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Transport {id} errored (OS flags {flags:#b})"); + log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})"); self.service.handle_error(Error::TransportPollError(*id)); } From 8eee2c07aae900d61806381f331e141838229f6b Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 11 May 2023 20:46:19 +0200 Subject: [PATCH 05/10] errors: panic on writing to read-only or unready resources --- src/reactor.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index 0a62de4..e89fab6 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -46,9 +46,6 @@ pub enum Error { /// unable to write to transport {0}. Details: {1:?} WriteFailure(T::Id, io::Error), - /// writing to transport {0} before it is ready (business logic bug) - WriteLogicError(T::Id, Vec), - /// transport {0} got disconnected during poll operation. ListenerDisconnect(L::Id, L), @@ -666,6 +663,11 @@ impl Runtime { } } + /// # Safety + /// + /// Panics on `Action::Send` for read-only resources or resources which are not ready for a + /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`] + /// implementation. fn handle_action( &mut self, action: Action, @@ -745,7 +747,10 @@ impl Runtime { #[cfg(feature = "log")] log::error!(target: "reactor", internal = true; "An attempt to write to transport {id} before it got ready"); - Error::WriteLogicError(id, data) + panic!( + "application business logic error: write to transport {id} which is \ + read-only or not ready for a write operation" + ); } WriteError::Io(e) => { #[cfg(feature = "log")] From c454bf3da50fc7b7b1b31768eca0cd4b6bb403ce Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 11 May 2023 20:51:56 +0200 Subject: [PATCH 06/10] error: disconnect on resource write failure --- src/reactor.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index e89fab6..e54115c 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -43,9 +43,6 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); /// Reactor errors #[derive(Error, Display, From)] pub enum Error { - /// unable to write to transport {0}. Details: {1:?} - WriteFailure(T::Id, io::Error), - /// transport {0} got disconnected during poll operation. ListenerDisconnect(L::Id, L), From b680ac68048efb1241beb6c6101d0e9657783404 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 11 May 2023 21:25:03 +0200 Subject: [PATCH 07/10] error: disconnect on poller errors on a specific resource --- src/reactor.rs | 98 ++++++++++++++++++++++++++++---------------------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index e54115c..970a897 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -42,6 +42,7 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); /// Reactor errors #[derive(Error, Display, From)] +#[display(doc_comments)] pub enum Error { /// transport {0} got disconnected during poll operation. ListenerDisconnect(L::Id, L), @@ -49,14 +50,6 @@ pub enum Error { /// transport {0} got disconnected during poll operation. TransportDisconnect(T::Id, T), - /// poll on listener {0} has returned error. - ListenerPollError(L::Id), - - // to transport disconnect - /// Poll request has returned [`IoFail::Os`] event for a specific resource. - #[display("transport {0} failed during I/O poll")] - TransportPollError(T::Id), - /// polling multiple reactor has failed. Details: {0:?} Poll(io::Error), } @@ -561,7 +554,7 @@ impl Runtime { let mut awoken = false; let mut unregister_queue = vec![]; - for (fd, res) in &mut self.poller { + while let Some((fd, res)) = self.poller.next() { if fd == self.waker.as_raw_fd() { if let Err(err) = res { #[cfg(feature = "log")] @@ -598,8 +591,7 @@ impl Runtime { Err(IoFail::Os(flags)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})"); - - self.service.handle_error(Error::ListenerPollError(*id)); + self.unregister_listener(*id); } } } else if let Some(id) = self.transport_map.get(&fd) { @@ -626,8 +618,7 @@ impl Runtime { Err(IoFail::Os(posix_events)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})"); - - self.service.handle_error(Error::TransportPollError(*id)); + self.unregister_transport(*id); } } } else { @@ -694,39 +685,19 @@ impl Runtime { self.transport_map.insert(fd, id); } Action::UnregisterListener(id) => { - let Some(listener) = self.listeners.remove(&id) else { - #[cfg(feature = "log")] - log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); - + let Some(listener) = self.unregister_listener(id) else { return Ok(()) }; - let fd = listener.as_raw_fd(); - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})"); - - self.listener_map - .remove(&fd) - .expect("listener index content doesn't match registered listeners"); - self.poller.unregister(&listener); + log::debug!(target: "reactor", "Handling over listener {id}"); self.service.handover_listener(listener); } Action::UnregisterTransport(id) => { - let Some(transport) = self.transports.remove(&id) else { - #[cfg(feature = "log")] - log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); - + let Some(transport) = self.unregister_transport(id) else { return Ok(()) }; - let fd = transport.as_raw_fd(); - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Handling over transport {id} (fd={fd})"); - - self.transport_map - .remove(&fd) - .expect("transport index content doesn't match registered transports"); - self.poller.unregister(&transport); + log::debug!(target: "reactor", "Handling over transport {id}"); self.service.handover_transport(transport); } Action::Send(id, data) => { @@ -739,8 +710,8 @@ impl Runtime { return Ok(()) }; - transport.write_atomic(&data).map_err(|err| match err { - WriteError::NotReady => { + match transport.write_atomic(&data) { + Err(WriteError::NotReady) => { #[cfg(feature = "log")] log::error!(target: "reactor", internal = true; "An attempt to write to transport {id} before it got ready"); @@ -749,12 +720,13 @@ impl Runtime { read-only or not ready for a write operation" ); } - WriteError::Io(e) => { + Err(WriteError::Io(e)) => { #[cfg(feature = "log")] - log::error!(target: "reactor", "Error writing to transport {id}: {e:?}"); - Error::WriteFailure(id, e) + log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}"); + self.unregister_transport(id); } - })?; + Ok(_) => {} + } } Action::SetTimer(duration) => { #[cfg(feature = "log")] @@ -772,6 +744,46 @@ impl Runtime { // We just drop here? } + + fn unregister_listener(&mut self, id: ::Id) -> Option { + let Some(listener) = self.listeners.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); + return None + }; + + let fd = listener.as_raw_fd(); + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})"); + + self.listener_map + .remove(&fd) + .expect("listener index content doesn't match registered listeners"); + self.poller.unregister(&listener); + + Some(listener) + } + + fn unregister_transport(&mut self, id: ::Id) -> Option { + let Some(transport) = self.transports.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); + return None + }; + + let fd = transport.as_raw_fd(); + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})"); + + self.transport_map + .remove(&fd) + .expect("transport index content doesn't match registered transports"); + self.poller.unregister(&transport); + + Some(transport) + } } #[cfg(test)] From bc4e2e330a6504203685eb2caeb7e0eedfaa02ff Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Thu, 11 May 2023 21:34:46 +0200 Subject: [PATCH 08/10] errors: require WriteAtomic to handle EAGAIN, EINTER, EWOULDBLOCK --- src/resource.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/resource.rs b/src/resource.rs index 74bdbfa..0eea783 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -23,6 +23,7 @@ use std::fmt::{Debug, Display}; use std::hash::Hash; +use std::io::ErrorKind; use std::os::unix::io::AsRawFd; use std::os::unix::prelude::RawFd; use std::{io, net}; @@ -88,11 +89,27 @@ pub enum WriteError { pub trait WriteAtomic: io::Write { /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a /// resource without blocking - or fail with [`WriteError::NotReady`] error. + /// + /// # Safety + /// + /// Panics on invalid [`WriteAtomic::write_or_buf`] implementation, i.e. if it doesn't handle + /// EGAGAIN, EINTER, EWOULDBLOCK I/O errors by buffering the data and returns them instead. fn write_atomic(&mut self, buf: &[u8]) -> Result<(), WriteError> { if !self.is_ready_to_write() { Err(WriteError::NotReady) } else { - self.write_or_buf(buf).map_err(WriteError::from) + // TODO: on EGAGAIN, EINTER, EWOULDBLOCK just keep the data buffered + self.write_or_buf(buf).map_err(|err| { + debug_assert!( + matches!( + err.kind(), + ErrorKind::WouldBlock | ErrorKind::Interrupted | ErrorKind::WriteZero + ), + "WriteAtomic::write_or_buf must handle EGAGAIN, EINTER, EWOULDBLOCK errors by \ + buffering the data" + ); + WriteError::from(err) + }) } } @@ -113,5 +130,11 @@ pub trait WriteAtomic: io::Write { /// with a system-level error. /// /// This method shouldn't be called directly; [`Self::write_atomic`] must be used instead. + /// + /// # Safety + /// + /// The method must handle EGAGAIN, EINTER, EWOULDBLOCK I/O errors and buffer the data in such + /// cases. Ig these errors are returned from this methods [`WriteAtomic::write_atomic`] will + /// panic. fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>; } From 9243db2190a2612e572bb11bcf75e56aa216764a Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Fri, 12 May 2023 00:12:19 +0200 Subject: [PATCH 09/10] error: signal TransportDisconnect on unregisterin resource on write failure --- src/reactor.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/reactor.rs b/src/reactor.rs index 970a897..e572dd6 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -723,7 +723,9 @@ impl Runtime { Err(WriteError::Io(e)) => { #[cfg(feature = "log")] log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}"); - self.unregister_transport(id); + if let Some(transport) = self.unregister_transport(id) { + return Err(Error::TransportDisconnect(id, transport)); + } } Ok(_) => {} } From bf4c53ccfd895379c99060888ca896fd8d520a09 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Fri, 12 May 2023 00:32:12 +0200 Subject: [PATCH 10/10] ci: bump MSRV b/c of if .. else --- .github/workflows/build.yml | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index de1e89f..d33a0c8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -81,7 +81,7 @@ jobs: strategy: fail-fast: false matrix: - toolchain: [ nightly, beta, stable, 1.60.0 ] + toolchain: [ nightly, beta, stable, 1.65.0 ] steps: - uses: actions/checkout@v2 - name: Install rust ${{ matrix.toolchain }} diff --git a/Cargo.toml b/Cargo.toml index 416ddaf..f0ce96a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["reactor", "networking", "patterns", "concurrency", "poll"] categories = ["concurrency", "asynchronous", "network-programming", "rust-patterns"] homepage = "https://github.com/rust-amplify" repository = "https://github.com/rust-amplify/io-reactor" -rust-version = "1.60" +rust-version = "1.65" # Due to if ... let clause edition = "2021" license = "Apache-2.0" readme = "README.md"