diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d33a0c8..71a7f5e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -58,10 +58,10 @@ jobs: fail-fast: false matrix: os: - - ubuntu-20.04 - ubuntu-22.04 - - macos-11 + - ubuntu-latest - macos-12 + - macos-latest # - windows-2019 # - windows-2022 steps: @@ -81,7 +81,7 @@ jobs: strategy: fail-fast: false matrix: - toolchain: [ nightly, beta, stable, 1.65.0 ] + toolchain: [ nightly, beta, stable, 1.66.0 ] steps: - uses: actions/checkout@v2 - name: Install rust ${{ matrix.toolchain }} diff --git a/Cargo.lock b/Cargo.lock index 7d79985..bb64f4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "amplify" -version = "4.0.0" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26966af46e0d200e8bf2b7f16230997c1c3f2d141bc27ccc091c012ed527b58" +checksum = "8629db306c0bbeb0a402e2918bdcf0026b5ddb24c46460f3bf5410b350d98710" dependencies = [ "amplify_derive", "amplify_num", @@ -16,31 +16,34 @@ dependencies = [ [[package]] name = "amplify_derive" -version = "3.0.0" +version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580f12b79a9e10cfa8d2515128d83a53f387e290096a75904c92b8a2a4d542a6" +checksum = "759dcbfaf94d838367a86d493ec34ccc8aa6fe365cb7880d6bf89006de24d9c1" dependencies = [ "amplify_syn", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] name = "amplify_num" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddce3bc63e807ea02065e8d8b702695f3d302ae4158baddff8b0ce5c73947251" +checksum = "9681187211554ab98f138ba159e90861b136c20afc680dcff2ba82d020721e27" +dependencies = [ + "wasm-bindgen", +] [[package]] name = "amplify_syn" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29b08d74fda406d5a94abfdcdb91ba13bb06562ccf0a4581867fa924ca242b01" +checksum = "7736fb8d473c0d83098b5bac44df6a561e20470375cd8bcae30516dc889fd62a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -63,9 +66,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bumpalo" -version = "3.12.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "cfg-if" @@ -75,18 +78,18 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "concurrent-queue" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" dependencies = [ "crossbeam-utils", ] [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" dependencies = [ "cfg-if", "crossbeam-utils", @@ -94,23 +97,13 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.15" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ "cfg-if", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "io-reactor" version = "0.2.1" @@ -126,49 +119,48 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.141" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "log" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ - "cfg-if", "value-bag", ] [[package]] name = "mio" -version = "0.8.6" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "log", "wasi", - "windows-sys 0.45.0", + "windows-sys", ] [[package]] name = "once_cell" -version = "1.17.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "polling" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be1c66a6add46bff50935c313dae30a5030cf8385c5206e8a95e9e9def974aa" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" dependencies = [ "autocfg", "bitflags", @@ -177,7 +169,7 @@ dependencies = [ "libc", "log", "pin-project-lite", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -191,18 +183,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.56" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.26" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -219,26 +211,27 @@ dependencies = [ ] [[package]] -name = "unicode-ident" -version = "1.0.8" +name = "syn" +version = "2.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +checksum = "ee659fb5f3d355364e1f3e5bc10fb82068efbf824a1e9d1c9504244a6469ad53" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] [[package]] -name = "value-bag" -version = "1.0.0-alpha.9" +name = "unicode-ident" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" -dependencies = [ - "ctor", - "version_check", -] +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] -name = "version_check" -version = "0.9.4" +name = "value-bag" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "4a72e1902dde2bd6441347de2b70b7f5d59bf157c6c62f0c44572607a1d55bbe" [[package]] name = "wasi" @@ -248,9 +241,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -258,24 +251,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.43", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -283,31 +276,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.84" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.43", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" - -[[package]] -name = "windows-sys" -version = "0.45.0" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.1", -] +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "windows-sys" @@ -315,119 +299,62 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets 0.48.0", + "windows-targets", ] [[package]] name = "windows-targets" -version = "0.42.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows-targets" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" -dependencies = [ - "windows_aarch64_gnullvm 0.48.0", - "windows_aarch64_msvc 0.48.0", - "windows_i686_gnu 0.48.0", - "windows_i686_msvc 0.48.0", - "windows_x86_64_gnu 0.48.0", - "windows_x86_64_gnullvm 0.48.0", - "windows_x86_64_msvc 0.48.0", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - -[[package]] -name = "windows_i686_gnu" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" - -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnullvm" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/Cargo.toml b/Cargo.toml index 4d3ecf3..00d21b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["reactor", "networking", "patterns", "concurrency", "poll"] categories = ["concurrency", "asynchronous", "network-programming", "rust-patterns"] homepage = "https://github.com/rust-amplify" repository = "https://github.com/rust-amplify/io-reactor" -rust-version = "1.65" # Due to if ... let clause +rust-version = "1.66" # Due to amplify dependency edition = "2021" license = "Apache-2.0" readme = "README.md" diff --git a/MANIFEST.yml b/MANIFEST.yml index a6c4d4d..52261e7 100644 --- a/MANIFEST.yml +++ b/MANIFEST.yml @@ -3,7 +3,7 @@ Type: Library Kind: Free software License: Apache-2.0 Language: Rust -Compiler: 1.60 +Compiler: 1.66 Author: Maxim Orlovsky Maintained: UBIDECO Institute, Switzerland Maintainers: diff --git a/src/lib.rs b/src/lib.rs index 420ae01..cecd07f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,7 +76,7 @@ mod reactor; mod resource; mod timeouts; -pub use resource::{Io, Resource, ResourceId, WriteAtomic, WriteError}; +pub use resource::{Io, Resource, ResourceId, ResourceIdGenerator, WriteAtomic, WriteError}; pub use timeouts::{Timer, Timestamp}; pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime}; diff --git a/src/poller/mod.rs b/src/poller/mod.rs index 7d51d08..4dd3cbd 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -27,11 +27,12 @@ pub mod popol; use std::fmt::{self, Display, Formatter}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::time::Duration; use std::{io, ops}; use crate::resource::Io; +use crate::ResourceId; /// Information about I/O events which has happened for a resource. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] @@ -147,19 +148,21 @@ pub enum IoFail { /// To read I/O events from the engine please use its Iterator interface. pub trait Poll where - Self: Send + Iterator)>, - for<'a> &'a mut Self: Iterator)>, + Self: Send + Iterator)>, + for<'a> &'a mut Self: Iterator)>, { /// Waker type used by the poll provider. type Waker: Waker; + /// Registers a waker object. + fn register_waker(&mut self, fd: &impl AsRawFd); /// Registers a file-descriptor based resource for a poll. - fn register(&mut self, fd: &impl AsRawFd, interest: IoType); + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId; /// Unregisters a file-descriptor based resource from a poll. - fn unregister(&mut self, fd: &impl AsRawFd); + fn unregister(&mut self, id: ResourceId); /// Subscribes for a specific set of events for a given file descriptor-backed resource (see /// [`IoType`] for the details on event subscription). - fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool; + fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool; /// Runs single poll syscall over all registered resources with an optional timeout. /// diff --git a/src/poller/popol.rs b/src/poller/popol.rs index c699fbc..d791a06 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -30,12 +30,14 @@ use std::sync::Arc; use std::time::Duration; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; +use crate::{ResourceId, ResourceIdGenerator}; /// Manager for a set of reactor which are polled for an event loop by the /// re-actor by using [`popol`] library. pub struct Poller { - poll: popol::Sources, - events: VecDeque>, + poll: popol::Sources, + events: VecDeque>, + id_gen: ResourceIdGenerator, } impl Default for Poller { @@ -48,6 +50,7 @@ impl Poller { Self { poll: popol::Sources::new(), events: empty!(), + id_gen: ResourceIdGenerator::default(), } } @@ -57,6 +60,7 @@ impl Poller { Self { poll: popol::Sources::with_capacity(capacity), events: VecDeque::with_capacity(capacity), + id_gen: ResourceIdGenerator::default(), } } } @@ -64,26 +68,39 @@ impl Poller { impl Poll for Poller { type Waker = PopolWaker; - fn register(&mut self, fd: &impl AsRawFd, interest: IoType) { - #[cfg(feature = "log")] - log::trace!(target: "popol", "Registering {}", fd.as_raw_fd()); - self.poll.register(fd.as_raw_fd(), fd, interest.into()); + fn register_waker(&mut self, fd: &impl AsRawFd) { + let id = ResourceId::WAKER; + if self.poll.get(&id).is_some() { + #[cfg(feature = "log")] + log::error!(target: "popol", "Reactor waker is already registered, terminating"); + panic!("Reactor waker is already registered"); + } + + self.poll.register(id, fd, popol::interest::READ); } - fn unregister(&mut self, fd: &impl AsRawFd) { + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId { + let id = self.id_gen.next(); + #[cfg(feature = "log")] - log::trace!(target: "popol", "Unregistering {}", fd.as_raw_fd()); - self.poll.unregister(&fd.as_raw_fd()); + log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id); + + self.poll.register(id, fd, interest.into()); + id } - fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool { - let fd = fd.as_raw_fd(); + fn unregister(&mut self, id: ResourceId) { + #[cfg(feature = "log")] + log::trace!(target: "popol", "Unregistering {}", id); + self.poll.unregister(&id); + } + fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool { #[cfg(feature = "log")] - log::trace!(target: "popol", "Setting interest `{interest}` on {}", fd); + log::trace!(target: "popol", "Setting interest `{interest}` on {}", id); - self.poll.unset(&fd, (!interest).into()); - self.poll.set(&fd, interest.into()) + self.poll.unset(&id, (!interest).into()); + self.poll.set(&id, interest.into()) } fn poll(&mut self, timeout: Option) -> io::Result { @@ -115,21 +132,21 @@ impl Poll for Poller { } impl Iterator for Poller { - type Item = (RawFd, Result); + type Item = (ResourceId, Result); fn next(&mut self) -> Option { let event = self.events.pop_front()?; - let fd = event.key; + let id = event.key; let fired = event.raw_events(); let res = if event.is_hangup() { #[cfg(feature = "log")] - log::trace!(target: "popol", "Hangup on {fd}"); + log::trace!(target: "popol", "Hangup on {id}"); Err(IoFail::Connectivity(fired)) } else if event.is_error() || event.is_invalid() { #[cfg(feature = "log")] - log::trace!(target: "popol", "OS error on {fd} (fired events {fired:#b})"); + log::trace!(target: "popol", "OS error on {id} (fired events {fired:#b})"); Err(IoFail::Os(fired)) } else { @@ -139,11 +156,11 @@ impl Iterator for Poller { }; #[cfg(feature = "log")] - log::trace!(target: "popol", "I/O event on {fd}: {io}"); + log::trace!(target: "popol", "I/O event on {id}: {io}"); Ok(io) }; - Some((fd, res)) + Some((id, res)) } } diff --git a/src/reactor.rs b/src/reactor.rs index 7b8939d..48c1490 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -21,6 +21,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(unused_variables)] // because we need them for feature-gated logger + use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; use std::os::unix::io::{AsRawFd, RawFd}; @@ -32,7 +34,7 @@ use crossbeam_channel as chan; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; use crate::resource::WriteError; -use crate::{Resource, Timer, Timestamp, WriteAtomic}; +use crate::{Resource, ResourceId, Timer, Timestamp, WriteAtomic}; /// Maximum amount of time to wait for I/O. const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); @@ -42,10 +44,10 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); #[display(doc_comments)] pub enum Error { /// transport {0} got disconnected during poll operation. - ListenerDisconnect(L::Id, L), + ListenerDisconnect(ResourceId, L), /// transport {0} got disconnected during poll operation. - TransportDisconnect(T::Id, T), + TransportDisconnect(ResourceId, T), /// polling multiple reactor has failed. Details: {0:?} Poll(io::Error), @@ -81,7 +83,7 @@ pub enum Action { /// closed, listener is not unbound, connections are not closed etc. All these actions must be /// handled by the handler upon the handover event. #[display("unregister_listener")] - UnregisterListener(L::Id), + UnregisterListener(ResourceId), /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via /// [`Handler::handover_transport`]. @@ -90,11 +92,11 @@ pub enum Action { /// closed, listener is not unbound, connections are not closed etc. All these actions must be /// handled by the handler upon the handover event. #[display("unregister_transport")] - UnregisterTransport(T::Id), + UnregisterTransport(ResourceId), /// Write the data to one of the transport resources using [`io::Write`]. #[display("send_to({0})")] - Send(T::Id, Vec), + Send(ResourceId, Vec), /// Set a new timer for a given duration from this moment. /// @@ -143,7 +145,7 @@ pub trait Handler: Send + Iterator::Id, + id: ResourceId, event: ::Event, time: Timestamp, ); @@ -151,11 +153,19 @@ pub trait Handler: Send + Iterator::Id, + id: ResourceId, event: ::Event, time: Timestamp, ); + /// Method called by the reactor when a given resource was successfully registered and provided + /// with a resource id. + /// + /// The resource id will be used later in [`Self::handle_listener_event`], + /// [`Self::handle_transport_event`], [`Self::handover_listener`] and [`handover_transport`] + /// calls to the handler. + fn handle_registered(&mut self, fd: RawFd, id: ResourceId); + /// Method called by the reactor when a [`Self::Command`] is received for the [`Handler`]. /// /// The commands are sent via [`Controller`] from outside of the reactor, including other @@ -173,14 +183,14 @@ pub trait Handler: Send + Iterator Reactor { let thread = builder.spawn(move || { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - poller.register(&waker_reader, IoType::read_only()); + poller.register_waker(&waker_reader); let runtime = Runtime { service, @@ -280,8 +290,6 @@ impl Reactor { ctl_recv, listeners: empty!(), transports: empty!(), - listener_map: empty!(), - transport_map: empty!(), waker: waker_reader, timeouts: Timer::new(), }; @@ -390,10 +398,8 @@ pub struct Runtime { poller: P, controller: Controller::Send>, ctl_recv: chan::Receiver>, - listener_map: HashMap::Id>, - transport_map: HashMap::Id>, - listeners: HashMap<::Id, H::Listener>, - transports: HashMap<::Id, H::Transport>, + listeners: HashMap, + transports: HashMap, waker: ::Recv, timeouts: Timer, } @@ -401,11 +407,15 @@ pub struct Runtime { impl Runtime { /// Creates new reactor runtime using provided [`Poll`] engine and a service exposing /// [`Handler`] API to the reactor. - pub fn with(service: H, poller: P) -> io::Result { + pub fn with(service: H, mut poller: P) -> io::Result { let (ctl_send, ctl_recv) = chan::unbounded(); let (waker_writer, waker_reader) = P::Waker::pair()?; + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); + poller.register_waker(&waker_reader); + let controller = Controller { ctl_send, waker: waker_writer, @@ -418,8 +428,6 @@ impl Runtime { ctl_recv, listeners: empty!(), transports: empty!(), - listener_map: empty!(), - transport_map: empty!(), waker: waker_reader, timeouts: Timer::new(), }) @@ -438,11 +446,11 @@ impl Runtime { let before_poll = Timestamp::now(); let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT); - for res in self.listeners.values() { - self.poller.set_interest(res, res.interests()); + for (id, res) in &self.listeners { + self.poller.set_interest(*id, res.interests()); } - for res in self.transports.values() { - self.poller.set_interest(res, res.interests()); + for (id, res) in &self.transports { + self.poller.set_interest(*id, res.interests()); } // Blocking @@ -502,8 +510,8 @@ impl Runtime { let mut awoken = false; let mut unregister_queue = vec![]; - while let Some((fd, res)) = self.poller.next() { - if fd == self.waker.as_raw_fd() { + while let Some((id, res)) = self.poller.next() { + if id == ResourceId::WAKER { if let Err(err) = res { #[cfg(feature = "log")] log::error!(target: "reactor", "Polling waker has failed: {err}"); @@ -515,16 +523,16 @@ impl Runtime { self.waker.reset(); awoken = true; - } else if let Some(id) = self.listener_map.get(&fd) { + } else if self.listeners.contains_key(&id) { match res { Ok(io) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from listener {id} (fd={fd})"); + log::trace!(target: "reactor", "Got `{io}` event from listener {id}"); - let listener = self.listeners.get_mut(id).expect("resource disappeared"); + let listener = self.listeners.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = listener.handle_io(io) { - self.service.handle_listener_event(*id, event, time); + self.service.handle_listener_event(id, event, time); } } } @@ -532,26 +540,26 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} hung up (OS flags {flags:#b})"); - let listener = self.listeners.remove(id).expect("resource disappeared"); - unregister_queue.push(listener.as_raw_fd()); - self.service.handle_error(Error::ListenerDisconnect(*id, listener)); + let listener = self.listeners.remove(&id).expect("resource disappeared"); + unregister_queue.push(id); + self.service.handle_error(Error::ListenerDisconnect(id, listener)); } Err(IoFail::Os(flags)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})"); - self.unregister_listener(*id); + self.unregister_listener(id); } } - } else if let Some(id) = self.transport_map.get(&fd) { + } else if self.transports.contains_key(&id) { match res { Ok(io) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from transport {id} (fd={fd})"); + log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); - let transport = self.transports.get_mut(id).expect("resource disappeared"); + let transport = self.transports.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = transport.handle_io(io) { - self.service.handle_transport_event(*id, event, time); + self.service.handle_transport_event(id, event, time); } } } @@ -559,14 +567,14 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})"); - let transport = self.transports.remove(id).expect("resource disappeared"); - unregister_queue.push(transport.as_raw_fd()); - self.service.handle_error(Error::TransportDisconnect(*id, transport)); + let transport = self.transports.remove(&id).expect("resource disappeared"); + unregister_queue.push(id); + self.service.handle_error(Error::TransportDisconnect(id, transport)); } Err(IoFail::Os(posix_events)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})"); - self.unregister_transport(*id); + self.unregister_transport(id); } } } else { @@ -577,8 +585,8 @@ impl Runtime { } // We need this b/c of borrow checker - for fd in unregister_queue { - self.poller.unregister(&fd); + for id in unregister_queue { + self.poller.unregister(id); } awoken @@ -611,42 +619,40 @@ impl Runtime { ) -> Result<(), Error> { match action { Action::RegisterListener(listener) => { - let id = listener.id(); let fd = listener.as_raw_fd(); #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering listener on {id} (fd={fd})"); + log::debug!(target: "reactor", "Registering listener with fd={fd}"); - self.poller.register(&listener, IoType::read_only()); + let id = self.poller.register(&listener, IoType::read_only()); self.listeners.insert(id, listener); - self.listener_map.insert(fd, id); + self.service.handle_registered(fd, id); } Action::RegisterTransport(transport) => { - let id = transport.id(); let fd = transport.as_raw_fd(); #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering transport on {id} (fd={fd})"); + log::debug!(target: "reactor", "Registering transport with fd={fd}"); - self.poller.register(&transport, IoType::read_only()); + let id = self.poller.register(&transport, IoType::read_only()); self.transports.insert(id, transport); - self.transport_map.insert(fd, id); + self.service.handle_registered(fd, id); } Action::UnregisterListener(id) => { let Some(listener) = self.unregister_listener(id) else { - return Ok(()) + return Ok(()); }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id}"); - self.service.handover_listener(listener); + self.service.handover_listener(id, listener); } Action::UnregisterTransport(id) => { let Some(transport) = self.unregister_transport(id) else { - return Ok(()) + return Ok(()); }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over transport {id}"); - self.service.handover_transport(transport); + self.service.handover_transport(id, transport); } Action::Send(id, data) => { #[cfg(feature = "log")] @@ -656,7 +662,7 @@ impl Runtime { #[cfg(feature = "log")] log::error!(target: "reactor", "Transport {id} is not in the reactor"); - return Ok(()) + return Ok(()); }; match transport.write_atomic(&data) { Err(WriteError::NotReady) => { @@ -695,11 +701,11 @@ impl Runtime { // We just drop here? } - fn unregister_listener(&mut self, id: ::Id) -> Option { + fn unregister_listener(&mut self, id: ResourceId) -> Option { let Some(listener) = self.listeners.remove(&id) else { #[cfg(feature = "log")] log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); - return None + return None; }; let fd = listener.as_raw_fd(); @@ -707,19 +713,16 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})"); - self.listener_map - .remove(&fd) - .expect("listener index content doesn't match registered listeners"); - self.poller.unregister(&listener); + self.poller.unregister(id); Some(listener) } - fn unregister_transport(&mut self, id: ::Id) -> Option { + fn unregister_transport(&mut self, id: ResourceId) -> Option { let Some(transport) = self.transports.remove(&id) else { #[cfg(feature = "log")] log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); - return None + return None; }; let fd = transport.as_raw_fd(); @@ -727,10 +730,7 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})"); - self.transport_map - .remove(&fd) - .expect("transport index content doesn't match registered transports"); - self.poller.unregister(&transport); + self.poller.unregister(id); Some(transport) } @@ -761,9 +761,7 @@ mod test { fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) } } impl Resource for DumbRes { - type Id = RawFd; type Event = (); - fn id(&self) -> Self::Id { self.0.as_raw_fd() } fn interests(&self) -> IoType { IoType::read_write() } fn handle_io(&mut self, _io: Io) -> Option { None } } @@ -811,7 +809,7 @@ mod test { } fn handle_listener_event( &mut self, - _d: ::Id, + _d: ResourceId, _event: ::Event, _time: Timestamp, ) { @@ -819,12 +817,13 @@ mod test { } fn handle_transport_event( &mut self, - _id: ::Id, + _id: ResourceId, _event: ::Event, _time: Timestamp, ) { unreachable!() } + fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId) {} fn handle_command(&mut self, cmd: Self::Command) { match cmd { Cmd::Init => { @@ -839,8 +838,12 @@ mod test { fn handle_error(&mut self, err: Error) { panic!("{err}") } - fn handover_listener(&mut self, _listener: Self::Listener) { unreachable!() } - fn handover_transport(&mut self, _transport: Self::Transport) { unreachable!() } + fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) { + unreachable!() + } + fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) { + unreachable!() + } } let reactor = Reactor::new(DumbService::default(), poller::popol::Poller::new()).unwrap(); diff --git a/src/resource.rs b/src/resource.rs index 8d56c7f..e250577 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -21,12 +21,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::hash::Hash; -use std::io::ErrorKind; +use std::io::{self, ErrorKind}; use std::os::unix::io::AsRawFd; -use std::os::unix::prelude::RawFd; -use std::{io, net}; use crate::poller::IoType; @@ -40,19 +38,43 @@ pub enum Io { Write, } -/// Marker traits for types which can be used as a reactor-managed [`Resource`] identifiers. -pub trait ResourceId: Copy + Eq + Ord + Hash + Send + Debug + Display {} +/// Generator for the new [`ResourceId`]s which should be used by pollers implementing [`Poll`] +/// trait. +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] +#[display(inner)] +pub struct ResourceIdGenerator(u64); + +impl Default for ResourceIdGenerator { + fn default() -> Self { ResourceIdGenerator(1) } +} + +#[allow(dead_code)] // We need this before we've got non-popol implementations +impl ResourceIdGenerator { + /// Returns the next id for the resource. + pub fn next(&mut self) -> ResourceId { + let id = self.0; + self.0 += 1; + ResourceId(id) + } +} + +/// The resource identifier must be globally unique and non-reusable object. Because of this, +/// things like [`RawFd`] and socket addresses can't operate like resource identifiers. +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] +#[display(inner)] +pub struct ResourceId(u64); + +impl ResourceId { + /// Resource id for the waker (always zero). + pub const WAKER: ResourceId = ResourceId(0); +} /// A resource which can be managed by the reactor. pub trait Resource: AsRawFd + WriteAtomic + Send { - /// Resource identifier type. - type Id: ResourceId; /// Events which resource may generate upon receiving I/O from the reactor via /// [`Self::handle_io`]. These events are passed to the reactor [`crate::Handler`]. type Event; - /// Method returning the [`ResourceId`]. - fn id(&self) -> Self::Id; /// Method informing the reactor which types of events this resource is subscribed for. fn interests(&self) -> IoType; @@ -61,9 +83,6 @@ pub trait Resource: AsRawFd + WriteAtomic + Send { fn handle_io(&mut self, io: Io) -> Option; } -impl ResourceId for net::SocketAddr {} -impl ResourceId for RawFd {} - /// Error during write operation for a reactor-managed [`Resource`]. #[derive(Debug, Display, Error, From)] pub enum WriteError {