Skip to content

Commit

Permalink
Merge pull request #21 from rust-amplify/waker
Browse files Browse the repository at this point in the history
Refactor poller to provide custom wakers
  • Loading branch information
dr-orlovsky authored May 19, 2023
2 parents fbf573b + a7f48fd commit 3b2f10a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 81 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "reactor"
[dependencies]
amplify = { version = "4.0.0", features = ["hex"] }
crossbeam-channel = "0.5.8"
popol = { version = "2.2.0", optional = true }
popol = { version = "3.0.0", optional = true }
polling = { version = "2.7.0", optional = true }
# epoll = { version = "4.3.1", optional = true } - NB: epoll not supported on MacOS
mio = { version = "0.8.6", optional = true }
Expand Down
26 changes: 26 additions & 0 deletions src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ where
Self: Send + Iterator<Item = (RawFd, Result<IoType, IoFail>)>,
for<'a> &'a mut Self: Iterator<Item = (RawFd, Result<IoType, IoFail>)>,
{
/// Waker type used by the poll provider.
type Waker: Waker;

/// Registers a file-descriptor based resource for a poll.
fn register(&mut self, fd: &impl AsRawFd, interest: IoType);
/// Unregisters a file-descriptor based resource from a poll.
Expand All @@ -165,3 +168,26 @@ where
/// Number of generated events.
fn poll(&mut self, timeout: Option<Duration>) -> io::Result<usize>;
}

/// Waker object provided by the poller.
pub trait Waker {
/// Data type for sending wake signals to the poller.
type Send: WakerSend;
/// Data type for receiving wake signals inside the poller.
type Recv: WakerRecv;

/// Constructs pair of waker receiver and sender objects.
fn pair() -> Result<(Self::Send, Self::Recv), io::Error>;
}

/// Sending part of the waker.
pub trait WakerSend: Send + Sync + Clone {
/// Awakes the poller to read events.
fn wake(&self) -> io::Result<()>;
}

/// Receiver part of the waker.
pub trait WakerRecv: AsRawFd + Send + io::Read {
/// Resets the waker reader.
fn reset(&self);
}
48 changes: 46 additions & 2 deletions src/poller/popol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
//! Poll engine provided by the [`popol`] crate.
use std::collections::VecDeque;
use std::io;
use std::io::{self, Error};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::time::Duration;

use crate::poller::{IoFail, IoType, Poll};
use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};

/// Manager for a set of reactor which are polled for an event loop by the
/// re-actor by using [`popol`] library.
Expand Down Expand Up @@ -61,6 +62,8 @@ impl Poller {
}

impl Poll for Poller {
type Waker = PopolWaker;

fn register(&mut self, fd: &impl AsRawFd, interest: IoType) {
#[cfg(feature = "log")]
log::trace!(target: "popol", "Registering {}", fd.as_raw_fd());
Expand Down Expand Up @@ -156,3 +159,44 @@ impl From<IoType> for popol::Interest {
e
}
}

/// Wrapper type around the waker provided by `popol` crate.
#[derive(Clone)]
pub struct PopolWaker(Arc<popol::Waker>);

impl Waker for PopolWaker {
type Send = Self;
type Recv = Self;

fn pair() -> Result<(Self::Send, Self::Recv), Error> {
let waker = Arc::new(popol::Waker::new()?);
Ok((PopolWaker(waker.clone()), PopolWaker(waker)))
}
}

impl io::Read for PopolWaker {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
self.reset();
// Waker reads only when there is something which was sent.
// That's why we just return here.
Ok(0)
}
}

impl AsRawFd for PopolWaker {
fn as_raw_fd(&self) -> RawFd { self.0.as_ref().as_raw_fd() }
}

impl WakerRecv for PopolWaker {
fn reset(&self) {
if let Err(e) = popol::Waker::reset(self.0.as_ref()) {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");
panic!("unable to reset waker queue. Details: {e}");
}
}
}

impl WakerSend for PopolWaker {
fn wake(&self) -> io::Result<()> { self.0.wake() }
}
100 changes: 24 additions & 76 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@

use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::io::Write;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use std::{io, thread};

use crossbeam_channel as chan;

use crate::poller::{IoFail, IoType, Poll};
use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::resource::WriteError;
use crate::{Resource, Timer, Timestamp, WriteAtomic};

Expand Down Expand Up @@ -191,12 +188,12 @@ pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport
///
/// Apps running the [`Reactor`] can interface it and a [`Handler`] via use of the [`Controller`]
/// API.
pub struct Reactor<C> {
pub struct Reactor<C, P: Poll> {
thread: JoinHandle<()>,
controller: Controller<C>,
controller: Controller<C, <P::Waker as Waker>::Send>,
}

impl<C> Reactor<C> {
impl<C, P: Poll> Reactor<C, P> {
/// Creates new reactor using provided [`Poll`] engine and a service exposing [`Handler`] API to
/// the reactor.
///
Expand All @@ -206,7 +203,7 @@ impl<C> Reactor<C> {
/// # Error
///
/// Errors with a system/OS error if it was impossible to spawn a thread.
pub fn new<P: Poll, H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
pub fn new<H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
where
H: 'static,
P: 'static,
Expand All @@ -225,7 +222,7 @@ impl<C> Reactor<C> {
/// # Error
///
/// Errors with a system/OS error if it was impossible to spawn a thread.
pub fn named<P: Poll, H: Handler<Command = C>>(
pub fn named<H: Handler<Command = C>>(
service: H,
poller: P,
thread_name: String,
Expand All @@ -248,7 +245,7 @@ impl<C> Reactor<C> {
/// # Error
///
/// Errors with a system/OS error if it was impossible to spawn a thread.
pub fn with<P: Poll, H: Handler<Command = C>>(
pub fn with<H: Handler<Command = C>>(
service: H,
mut poller: P,
builder: thread::Builder,
Expand All @@ -260,13 +257,11 @@ impl<C> Reactor<C> {
{
let (ctl_send, ctl_recv) = chan::unbounded();

let (waker_writer, waker_reader) = UnixStream::pair()?;
waker_reader.set_nonblocking(true)?;
waker_writer.set_nonblocking(true)?;
let (waker_writer, waker_reader) = P::Waker::pair()?;

let controller = Controller {
ctl_send,
waker: Arc::new(waker_writer),
waker: waker_writer,
};

#[cfg(feature = "log")]
Expand Down Expand Up @@ -306,7 +301,7 @@ impl<C> Reactor<C> {
/// running inside of its thread.
///
/// See [`Handler::Command`] for the details.
pub fn controller(&self) -> Controller<C> { self.controller.clone() }
pub fn controller(&self) -> Controller<C, <P::Waker as Waker>::Send> { self.controller.clone() }

/// Joins the reactor thread.
pub fn join(self) -> thread::Result<()> { self.thread.join() }
Expand All @@ -323,12 +318,12 @@ enum Ctl<C> {
/// API to the reactor itself for receiving reactor-generated events. This API is used by the
/// reactor to inform the service about incoming commands, sent via this [`Controller`] API (see
/// [`Handler::Command`] for the details).
pub struct Controller<C> {
pub struct Controller<C, W: WakerSend> {
ctl_send: chan::Sender<Ctl<C>>,
waker: Arc<UnixStream>,
waker: W,
}

impl<C> Clone for Controller<C> {
impl<C, W: WakerSend> Clone for Controller<C, W> {
fn clone(&self) -> Self {
Controller {
ctl_send: self.ctl_send.clone(),
Expand All @@ -337,7 +332,7 @@ impl<C> Clone for Controller<C> {
}
}

impl<C> Controller<C> {
impl<C, W: WakerSend> Controller<C, W> {
/// Send a command to the service inside a [`Reactor`] or a reactor [`Runtime`].
#[allow(unused_mut)] // because of the `log` feature gate
pub fn cmd(&self, mut command: C) -> Result<(), io::Error>
Expand Down Expand Up @@ -377,56 +372,9 @@ impl<C> Controller<C> {
}

fn wake(&self) -> io::Result<()> {
use io::ErrorKind::*;

#[cfg(feature = "log")]
log::trace!(target: "reactor-controller", "Wakening the reactor");

loop {
let mut waker = self.waker.as_ref();
match (&mut waker).write_all(&[0x1]) {
Ok(_) => return Ok(()),
Err(e) if e.kind() == WouldBlock => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker write queue got overfilled, resetting and repeating...");
reset_fd(&self.waker.as_raw_fd())?;
}
Err(e) if e.kind() == Interrupted => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker failure, repeating...");
}
Err(e) => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Waker error: {e}");

return Err(e);
}
}
}
}
}

fn reset_fd(fd: &impl AsRawFd) -> io::Result<()> {
let mut buf = [0u8; 4096];

loop {
// We use a low-level "read" here because the alternative is to create a `UnixStream`
// from the `RawFd`, which has "drop" semantics which we want to avoid.
match unsafe {
libc::read(fd.as_raw_fd(), buf.as_mut_ptr() as *mut libc::c_void, buf.len())
} {
-1 => match io::Error::last_os_error() {
e if e.kind() == io::ErrorKind::WouldBlock => return Ok(()),
e => {
#[cfg(feature = "log")]
log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");

return Err(e);
}
},
0 => return Ok(()),
_ => continue,
}
self.waker.wake()
}
}

Expand All @@ -440,13 +388,13 @@ fn reset_fd(fd: &impl AsRawFd) -> io::Result<()> {
pub struct Runtime<H: Handler, P: Poll> {
service: H,
poller: P,
controller: Controller<H::Command>,
controller: Controller<H::Command, <P::Waker as Waker>::Send>,
ctl_recv: chan::Receiver<Ctl<H::Command>>,
listener_map: HashMap<RawFd, <H::Listener as Resource>::Id>,
transport_map: HashMap<RawFd, <H::Transport as Resource>::Id>,
listeners: HashMap<<H::Listener as Resource>::Id, H::Listener>,
transports: HashMap<<H::Transport as Resource>::Id, H::Transport>,
waker: UnixStream,
waker: <P::Waker as Waker>::Recv,
timeouts: Timer,
}

Expand All @@ -456,13 +404,11 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
pub fn with(service: H, poller: P) -> io::Result<Self> {
let (ctl_send, ctl_recv) = chan::unbounded();

let (waker_writer, waker_reader) = UnixStream::pair()?;
waker_reader.set_nonblocking(true)?;
waker_writer.set_nonblocking(true)?;
let (waker_writer, waker_reader) = P::Waker::pair()?;

let controller = Controller {
ctl_send,
waker: Arc::new(waker_writer),
waker: waker_writer,
};

Ok(Runtime {
Expand All @@ -483,7 +429,9 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
/// running inside of its thread.
///
/// See [`Handler::Command`] for the details.
pub fn controller(&self) -> Controller<H::Command> { self.controller.clone() }
pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
self.controller.clone()
}

fn run(mut self) {
loop {
Expand Down Expand Up @@ -565,7 +513,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Awoken by the controller");

reset_fd(&self.waker).expect("waker failure");
self.waker.reset();
awoken = true;
} else if let Some(id) = self.listener_map.get(&fd) {
match res {
Expand Down Expand Up @@ -803,7 +751,7 @@ mod test {
impl AsRawFd for DumbRes {
fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() }
}
impl Write for DumbRes {
impl io::Write for DumbRes {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { Ok(buf.len()) }
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
Expand Down

0 comments on commit 3b2f10a

Please sign in to comment.