Skip to content

Commit

Permalink
writable source
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Shih authored and Brian Shih committed Oct 9, 2023
1 parent 67c278b commit 651df88
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
8 changes: 5 additions & 3 deletions mdbook/src/motivation.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ I've always wondered how asynchronous runtimes like [Node.js](https://nodejs.org

In this blog series, I will explore building a toy version of [Glommio](https://docs.rs/glommio/latest/glommio/), an `asynchronous` framework for building `thread-per-core` applications.

What is an Asynchronous Runtime? TODO

### What is Thread-Per-Core?

A complex application may have many tasks that it needs to execute. Some of these tasks can be performed in parallel to speed up the application. The ability of a system to execute multiple tasks concurrently is known as **multitasking**.
Expand All @@ -20,11 +22,11 @@ Thread-per-core is an architecture that eliminates threads from the picture. In

[Seastar](https://seastar.io/) (C++) and [Glommio](https://docs.rs/glommio/latest/glommio/) (Rust) are two frameworks that allow developers to write thread-per-core applications. Seastar is used in ScyllaDB and Redpanda, while Glommio is used by Datadog.

In this blog series, I will reimplement a light-weight version of Glommio by extracting bits and pieces from it. Throughout the blog, I will explain the different core abstractions that make up an asynchronous runtime.
In this blog series, I will reimplement a lightweight version of Glommio by extracting bits and pieces from it. Throughout the blog, I will explain the different core abstractions that make up an asynchronous runtime.

I’ve split up the blog series into four phases:

- **Phase 1**: In phase 1, we will cover Rust’s asynchronous primitives like `Future`, `Async/Await`, and `Waker` which will serve as building blocks for the asynchronous runtime. We will then build a simple, single-threaded, executor that can run and spawn tasks.
- **Phase 2**: In phase 2, we talk about `io_uring` and use it to add `asynchronous I/O` to our executor
- **Phase 3**: In phase 3, we will implement more advanced features such as thread parking, task yielding, and scheduling tasks based on priority.
- **Phase 4**: In phase 4, we will build abstractions that allow developers to create a pool of `LocalExecutor`s.
- **Phase 3** **[WIP]**: In phase 3, we will implement more advanced features such as thread parking, task yielding, and scheduling tasks based on priority.
- **Phase 4 [WIP]**: In phase 4, we will build abstractions that allow developers to create a pool of `LocalExecutor`s.
4 changes: 2 additions & 2 deletions src/pollable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct Async<T> {
impl<T: AsRawFd> Async<T> {
pub fn new(io: T) -> io::Result<Async<T>> {
Ok(Async {
source: get_reactor().insert_pollable_io(io.as_raw_fd()),
source: get_reactor().create_source(io.as_raw_fd()),
io: Some(Box::new(io)),
})
}
Expand All @@ -38,7 +38,7 @@ impl<T> Async<T> {
}
res => return res,
}
self.readable().await?;
self.source.readable().await?;
}
}
}
2 changes: 1 addition & 1 deletion src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Reactor {
Source::new(raw, stype, None)
}

pub fn insert_pollable_io(&self, raw: RawFd) -> Source {
pub fn create_source(&self, raw: RawFd) -> Source {
fcntl(raw, FcntlArg::F_SETFL(OFlag::O_NONBLOCK)).unwrap();
self.new_source(raw, SourceType::PollableFd)
}
Expand Down
14 changes: 14 additions & 0 deletions src/sys/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ impl Source {
.await
}

/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
future::poll_fn(|cx| {
if self.take_result().is_some() {
return Poll::Ready(Ok(()));
}

self.add_waiter(cx.waker().clone());
get_reactor().sys.interest(self, false, true);
Poll::Pending
})
.await
}

pub(crate) fn take_result(&self) -> Option<io::Result<usize>> {
self.inner.borrow_mut().wakers.result.take()
}
Expand Down

0 comments on commit 651df88

Please sign in to comment.