-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
blog - asynchronous I/O implementation
- Loading branch information
Brian Shih
authored and
Brian Shih
committed
Oct 14, 2023
1 parent
651df88
commit 84b7932
Showing
16 changed files
with
576 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,40 @@ | ||
# API | ||
|
||
Our goal here is to implement a set of internal APIs to make it easy to convert synchronous operations into asynchronous ones. | ||
|
||
Whether we’re dealing with sockets or files, converting a synchronous operation to an asynchronous one roughly follows these steps: | ||
|
||
- we need to set the file descriptor to non-blocking | ||
- we need to perform the non-blocking operation | ||
- we need to tell `io_uring` to monitor the file descriptor by submitting an `SQE` | ||
- since the operation is asynchronous, we need to store the poller’s `waker` and invoke `wake()` when the I/O operation is complete. We detect when an I/O operation is complete when the corresponding `CQE` is posted to the `io_uring`'s completion queue. | ||
|
||
To make it easier to implement new asynchronous operations, we introduce `Async`, an adapter for I/O types inspired by the [async_io crate](https://docs.rs/async-io/latest/async_io/). `Async` abstracts away the steps listed above so that developers who build on top of `Async` don’t have to worry about things like `io_uring`, `Waker`, `O_NONBLOCK`, etc. | ||
|
||
Here is how you use the `Async` adapter to implement an asynchronous `TcpListener` with an asynchronous `accept` method: | ||
|
||
```rust | ||
impl Async<TcpListener> { | ||
pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> { | ||
let addr = addr.into(); | ||
let listener = TcpListener::bind(addr)?; | ||
Ok(Async::new(listener)?) | ||
} | ||
|
||
pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> { | ||
let (stream, addr) = self.read_with(|io| io.accept()).await?; | ||
Ok((Async::new(stream)?, addr)) | ||
} | ||
} | ||
``` | ||
|
||
Here is how you can use the `Async<TcpListener>` inside an executor to perform asynchronous I/O: | ||
|
||
```rust | ||
let local_ex = LocalExecutor::default(); | ||
let res = local_ex.run(async { | ||
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8080)).unwrap(); | ||
let (stream, _) = listener.accept().await.unwrap(); | ||
handle_connection(stream); | ||
}); | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# Core abstractions | ||
|
||
In general, we can break down how the executor performs asynchronous I/O into 3 steps: | ||
|
||
- setting the I/O handle to be non-blocking by setting the `O_NONBLOCK` flag on the file descriptor | ||
- performing the non-blocking operation and registering interest in `io_uring` by submitting a `SQE` to the `io_uring` instance's `submission_queue` | ||
- polling the `io_uring`'s completion queue to check if there is a corresponding `CQE`, which indicates that the I/O operation has been completed. If it's completed, process it by resuming the blocked task. | ||
|
||
To accomplish these, we will introduce a few new abstractions: `Async`, `Source`, and the `Reactor`. | ||
|
||
### Async | ||
|
||
Async is a wrapper around the I/O handle (e.g. TcpListener). It contains helper methods to make converting blocking operations into asynchronous operations easier. | ||
|
||
Here is the `Async` struct: | ||
|
||
```rust | ||
pub struct Async<T> { | ||
/// A source registered in the reactor. | ||
source: Source, | ||
|
||
/// The inner I/O handle. | ||
io: Option<Box<T>>, | ||
} | ||
``` | ||
|
||
### Source | ||
|
||
The `Source` is a bridge between the executor and the I/O handle. It contains properties pertaining to the I/O handle that are relevant to the executor. For example, it contains tasks that are blocked by operations on the I/O handle. | ||
|
||
```rust | ||
pub struct Source { | ||
pub(crate) inner: Pin<Rc<RefCell<InnerSource>>>, | ||
} | ||
|
||
/// A registered source of I/O events. | ||
pub(crate) struct InnerSource { | ||
/// Raw file descriptor on Unix platforms. | ||
pub(crate) raw: RawFd, | ||
|
||
/// Tasks interested in events on this source. | ||
pub(crate) wakers: Wakers, | ||
|
||
pub(crate) source_type: SourceType, | ||
|
||
... | ||
} | ||
``` | ||
|
||
### Reactor | ||
|
||
Each executor has a `Reactor`. The `Reactor` is an abstraction around the `io_uring` instance. It provides simple APIs to interact with the `io_uring` instance. | ||
|
||
```rust | ||
pub(crate) struct Reactor { | ||
// the main_ring contains an io_uring instance | ||
main_ring: RefCell<SleepableRing>, | ||
source_map: Rc<RefCell<SourceMap>>, | ||
} | ||
|
||
struct SleepableRing { | ||
ring: iou::IoUring, | ||
in_kernel: usize, | ||
submission_queue: ReactorQueue, | ||
name: &'static str, | ||
source_map: Rc<RefCell<SourceMap>>, | ||
} | ||
|
||
struct SourceMap { | ||
id: u64, | ||
map: HashMap<u64, Pin<Rc<RefCell<InnerSource>>>>, | ||
} | ||
``` | ||
|
||
As we can see, the `Reactor` holds a `SleepableRing`, which is just a wrapper around an `iou::IoUring` instance. Glommio uses the `[iou` crate](https://docs.rs/iou/latest/iou/) to interact with Linux kernel’s `io_uring` interface. | ||
|
||
The `Reactor` also contains a `SourceMap`, which contains a `HashMap` that maps a unique ID to a `Source`. The unique ID is the same ID used as the `SQE`'s user_data. This way, when a CQE is posted to the `io_uring`'s completion queue, we can tie it back to the corresponding `Source`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Life of an Asynchronous I/O Operation |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Step 1 - Setting the O_NONBLOCK Flag | ||
|
||
The first step to asynchronous I/O is to change the I/O handle to be nonblocking by setting the `O_NONBLOCK` flag. | ||
|
||
### API | ||
|
||
`Async::new(handle)` is responsible for setting the I/O handle to be nonblocking. | ||
|
||
### Implementation | ||
|
||
`Async::new(handle)` is the constructor of the `Async` struct. For example, here is how you create an instance of `Async<TcpListener>`: | ||
|
||
```rust | ||
let listener = TcpListener::bind(addr)?; | ||
Async::new(listener); | ||
``` | ||
|
||
Here is the implementation of `Async::new`: | ||
|
||
```rust | ||
impl<T: AsRawFd> Async<T> { | ||
pub fn new(io: T) -> io::Result<Async<T>> { | ||
Ok(Async { | ||
source: get_reactor().create_source(io.as_raw_fd()), | ||
io: Some(Box::new(io)), | ||
}) | ||
} | ||
} | ||
``` | ||
|
||
The `get_reactor()` method retrieves the `Reactor` for the executor running on the current thread. The `create_source` method, as shown below, sets the `O_NONBLOCK` flag for the handle with [fcntl](https://man7.org/linux/man-pages/man2/fcntl.2.html). | ||
|
||
```rust | ||
impl Reactor { | ||
... | ||
pub fn create_source(&self, raw: RawFd) -> Source { | ||
fcntl(raw, FcntlArg::F_SETFL(OFlag::O_NONBLOCK)).unwrap(); | ||
self.new_source(raw, SourceType::PollableFd) | ||
} | ||
|
||
fn new_source(&self, raw: RawFd, stype: SourceType) -> Source { | ||
Source::new(raw, stype, None) | ||
} | ||
|
||
} | ||
``` |
Oops, something went wrong.