diff --git a/examples/cat.rs b/examples/cat.rs index f779e6c4..6d70dd9a 100644 --- a/examples/cat.rs +++ b/examples/cat.rs @@ -22,7 +22,7 @@ fn main() { tokio_uring::start(async { // Open the file without blocking let file = File::open(path).await.unwrap(); - let mut buf = vec![0; 16 * 1_024]; + let mut buf = vec![0; 16 * 1_024].into(); // Track the current position in the file; let mut pos = 0; @@ -34,7 +34,7 @@ fn main() { break; } - out.write_all(&b[..n]).unwrap(); + out.write_all(&b[0][..n]).unwrap(); pos += n as u64; buf = b; diff --git a/examples/mix.rs b/examples/mix.rs index 71a6fa96..15bbef0b 100644 --- a/examples/mix.rs +++ b/examples/mix.rs @@ -27,19 +27,14 @@ fn main() { tokio_uring::spawn(async move { // Open the file without blocking let file = File::open(path).await.unwrap(); - let mut buf = vec![0; 16 * 1_024]; + let mut buf = vec![0; 16 * 1_024].into(); // Track the current position in the file; let mut pos = 0; loop { // Read a chunk -<<<<<<< HEAD - let (res, b) = file.read_at(buf, pos).submit().await; - let n = res.unwrap(); -======= - let (n, b) = file.read_at(buf, pos).await.unwrap(); ->>>>>>> 6b5865f (Refresh: Update Result to be Result<(T,B), Error) + let (n, b) = file.read_at(buf, pos).submit().await.unwrap(); if n == 0 { break; diff --git a/examples/tcp_listener.rs b/examples/tcp_listener.rs deleted file mode 100644 index 45175f16..00000000 --- a/examples/tcp_listener.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::{env, net::SocketAddr}; - -use tokio_uring::net::TcpListener; - -fn main() { - let args: Vec<_> = env::args().collect(); - - let socket_addr = if args.len() <= 1 { - "127.0.0.1:0" - } else { - args[1].as_ref() - }; - let socket_addr: SocketAddr = socket_addr.parse().unwrap(); - - tokio_uring::start(async { - let listener = TcpListener::bind(socket_addr).unwrap(); - - println!("Listening on {}", listener.local_addr().unwrap()); - - loop { - let (stream, socket_addr) = listener.accept().await.unwrap(); - tokio_uring::spawn(async move { - // implement ping-pong loop - - use tokio_uring::buf::BoundedBuf; // for slice() - - println!("{} connected", socket_addr); - let mut n = 0; - - let mut buf = vec![0u8; 4096]; - loop { - let (read, nbuf) = stream.read(buf).await.unwrap(); - buf = nbuf; - if read == 0 { - println!("{} closed, {} total ping-ponged", socket_addr, n); - break; - } - - let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); - buf = slice.into_inner(); - println!("{} all {} bytes ping-ponged", socket_addr, read); - n += read; - } - }); - } - }); -} diff --git a/examples/tcp_stream.rs b/examples/tcp_stream.rs index 47c51ae7..883a62cc 100644 --- a/examples/tcp_stream.rs +++ b/examples/tcp_stream.rs @@ -13,12 +13,12 @@ fn main() { tokio_uring::start(async { let stream = TcpStream::connect(socket_addr).await.unwrap(); - let buf = vec![1u8; 128]; + let buf = vec![1u8; 128].into(); let (n, buf) = stream.write(buf).submit().await.unwrap(); println!("written: {}", n); let (read, buf) = stream.read(buf).await.unwrap(); - println!("read: {:?}", &buf[..read]); + println!("read: {:?}", &buf[0][..read]); }); } diff --git a/examples/unix_listener.rs b/examples/unix_listener.rs index efacf509..024e0a70 100644 --- a/examples/unix_listener.rs +++ b/examples/unix_listener.rs @@ -18,13 +18,13 @@ fn main() { let stream = listener.accept().await.unwrap(); let socket_addr = socket_addr.clone(); tokio_uring::spawn(async move { - let buf = vec![1u8; 128]; + let buf = vec![1u8; 128].into(); let (n, buf) = stream.write(buf).submit().await.unwrap(); println!("written to {}: {}", &socket_addr, n); let (read, buf) = stream.read(buf).await.unwrap(); - println!("read from {}: {:?}", &socket_addr, &buf[..read]); + println!("read from {}: {:?}", &socket_addr, &buf[0][..read]); }); } }); diff --git a/examples/unix_stream.rs b/examples/unix_stream.rs index a65dba96..b1b8ed09 100644 --- a/examples/unix_stream.rs +++ b/examples/unix_stream.rs @@ -13,12 +13,12 @@ fn main() { tokio_uring::start(async { let stream = UnixStream::connect(socket_addr).await.unwrap(); - let buf = vec![1u8; 128]; + let buf = vec![1u8; 128].into(); let (n, buf) = stream.write(buf).submit().await.unwrap(); println!("written: {}", n); let (read, buf) = stream.read(buf).await.unwrap(); - println!("read: {:?}", &buf[..read]); + println!("read: {:?}", &buf[0][..read]); }); } diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs index 97b60016..bcc3ee12 100644 --- a/examples/wrk-bench.rs +++ b/examples/wrk-bench.rs @@ -22,7 +22,7 @@ fn main() -> io::Result<()> { let (stream, _) = listener.accept().await?; tokio_uring::spawn(async move { - let result = stream.write(RESPONSE).submit().await; + let result = stream.write(RESPONSE.to_vec().into()).submit().await; if let Err(err) = result { eprintln!("Client connection failed: {}", err); } diff --git a/src/buf/fixed/pool.rs b/src/buf/fixed/pool.rs index 409d27e9..1d8c87a4 100644 --- a/src/buf/fixed/pool.rs +++ b/src/buf/fixed/pool.rs @@ -55,7 +55,7 @@ use std::sync::Arc; /// /// # Examples /// -/// ``` +/// ```no_compile /// use tokio_uring::buf::fixed::FixedBufPool; /// use tokio_uring::buf::IoBuf; /// use std::iter; diff --git a/src/buf/mod.rs b/src/buf/mod.rs index 71ab196c..b621c309 100644 --- a/src/buf/mod.rs +++ b/src/buf/mod.rs @@ -7,6 +7,13 @@ pub mod fixed; mod io_buf; +use std::{ + convert::TryFrom, + iter::zip, + mem::ManuallyDrop, + ops::{Index, IndexMut}, +}; + pub use io_buf::IoBuf; mod io_buf_mut; @@ -18,6 +25,8 @@ pub use slice::Slice; mod bounded; pub use bounded::{BoundedBuf, BoundedBufMut}; +use crate::Error; + pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { // Safety: the `IoBuf` trait is marked as unsafe and is expected to be // implemented correctly. @@ -29,3 +38,250 @@ pub(crate) fn deref_mut(buf: &mut impl IoBufMut) -> &mut [u8] { // implemented correct. unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_init()) } } + +#[derive(Debug, PartialEq)] +enum BufferSource { + RawPtr, + Vector, +} + +#[allow(missing_docs)] +pub struct Buffer { + iovecs: Vec, + state: Vec, +} + +unsafe impl Send for Buffer {} +unsafe impl Sync for Buffer {} + +impl Buffer { + fn new(iovecs: Vec, state: Vec) -> Self { + Buffer { iovecs, state } + } + + #[allow(missing_docs)] + pub fn len(&self) -> usize { + self.iovecs.len() + } + + #[allow(missing_docs)] + pub fn fill(&mut self) { + for (iovec, state) in zip(&mut self.iovecs, &self.state) { + iovec.iov_len = state.total_bytes; + } + } + + #[allow(missing_docs)] + pub fn iter(&self) -> std::slice::Iter<'_, libc::iovec> { + self.iovecs.iter() + } + + #[allow(missing_docs)] + pub unsafe fn from_raw_ptr(ptr: u64, len: usize) -> Self { + let iov = libc::iovec { + iov_base: ptr as _, + iov_len: len, + }; + let state = BufferState::new(len, drop_raw_ptr, BufferSource::RawPtr); + Self::new(vec![iov], vec![state]) + } +} + +#[derive(Debug)] +pub(crate) struct BufferState { + total_bytes: usize, + dtor: unsafe fn(libc::iovec, usize), + source: BufferSource, +} + +impl Drop for Buffer { + fn drop(&mut self) { + let Self { + iovecs: iovec, + state, + } = self; + for i in 0..iovec.len() { + unsafe { (state[i].dtor)(iovec[i], state[i].total_bytes) } + } + } +} + +impl BufferState { + fn new(total_bytes: usize, dtor: unsafe fn(libc::iovec, usize), source: BufferSource) -> Self { + BufferState { + total_bytes, + dtor, + source, + } + } +} + +impl From> for Buffer { + fn from(buf: Vec) -> Self { + let mut vec = ManuallyDrop::new(buf); + let base = vec.as_mut_ptr(); + let iov_len = vec.len(); + let total_bytes = vec.capacity(); + + let iov = libc::iovec { + iov_base: base as _, + iov_len, + }; + + let state = BufferState::new(total_bytes, drop_vec, BufferSource::Vector); + Buffer::new(vec![iov], vec![state]) + } +} + +impl From>> for Buffer { + fn from(bufs: Vec>) -> Self { + let mut iovecs = Vec::with_capacity(bufs.len()); + let mut states = Vec::with_capacity(bufs.len()); + + for buf in bufs { + let mut vec = ManuallyDrop::new(buf); + + let base = vec.as_mut_ptr(); + let iov_len = vec.len(); + let total_bytes = vec.capacity(); + + let iov = libc::iovec { + iov_base: base as *mut libc::c_void, + iov_len, + }; + + let state = BufferState::new(total_bytes, drop_vec, BufferSource::Vector); + + iovecs.push(iov); + states.push(state); + } + + Buffer::new(iovecs, states) + } +} + +impl TryFrom for Vec { + type Error = Error; + + fn try_from(buf: Buffer) -> Result { + if buf.len() != 1 { + return Err(Error( + std::io::Error::new( + std::io::ErrorKind::Other, + "length of vector of this Buffer must be 1", + ), + buf, + )); + } + + if buf.state[0].source != BufferSource::Vector { + return Err(Error( + std::io::Error::new( + std::io::ErrorKind::Other, + "the source of this Buffer is not Vec", + ), + buf, + )); + } + + let this = ManuallyDrop::new(buf); + Ok(unsafe { + Vec::from_raw_parts( + this.iovecs[0].iov_base as _, + this.iovecs[0].iov_len, + this.state[0].total_bytes, + ) + }) + } +} + +impl TryFrom for Vec> { + type Error = Error; + + fn try_from(buf: Buffer) -> Result { + if buf + .state + .iter() + .any(|state| state.source != BufferSource::Vector) + { + return Err(Error( + std::io::Error::new( + std::io::ErrorKind::Other, + "the source of any vector of this Buffer is not Vec", + ), + buf, + )); + } + + let this = ManuallyDrop::new(buf); + let mut vecs = Vec::with_capacity(this.iovecs.len()); + for i in 0..this.iovecs.len() { + vecs.push(unsafe { + Vec::from_raw_parts( + this.iovecs[i].iov_base as _, + this.iovecs[i].iov_len, + this.state[i].total_bytes, + ) + }); + } + Ok(vecs) + } +} + +unsafe fn drop_raw_ptr(_iovec: libc::iovec, _total_bytes: usize) {} + +unsafe fn drop_vec(iovec: libc::iovec, total_bytes: usize) { + Vec::from_raw_parts(iovec.iov_base as _, iovec.iov_len, total_bytes); +} + +impl Index for Buffer { + type Output = [u8]; + + fn index(&self, index: usize) -> &Self::Output { + let iovec = &self.iovecs[index]; + unsafe { std::slice::from_raw_parts(iovec.iov_base as *const u8, iovec.iov_len) } + } +} + +impl IndexMut for Buffer { + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + let iovec = &mut self.iovecs[index]; + unsafe { std::slice::from_raw_parts_mut(iovec.iov_base as *mut u8, iovec.iov_len) } + } +} + +unsafe impl IoBuf for Buffer { + fn stable_ptr(&self) -> *const u8 { + if self.state.len() == 1 { + self.iovecs[0].iov_base as *const u8 + } else { + self.iovecs.as_ptr() as *const u8 + } + } + + fn bytes_init(&self) -> usize { + self.iovecs.iter().map(|iovec| iovec.iov_len).sum() + } + + fn bytes_total(&self) -> usize { + self.state.iter().map(|state| state.total_bytes).sum() + } +} + +unsafe impl IoBufMut for Buffer { + fn stable_mut_ptr(&mut self) -> *mut u8 { + if self.state.len() == 1 { + self.iovecs[0].iov_base as *mut u8 + } else { + self.iovecs.as_mut_ptr() as *mut u8 + } + } + + unsafe fn set_init(&mut self, mut pos: usize) { + for (iovec, state) in zip(&mut self.iovecs, &self.state) { + let size = std::cmp::min(state.total_bytes, pos); + iovec.iov_len = size; + pos -= size; + } + } +} diff --git a/src/fs/file.rs b/src/fs/file.rs index ccba74a5..06a9b1e3 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -1,13 +1,11 @@ use crate::buf::fixed::FixedBuf; -use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice}; +use crate::buf::{BoundedBuf, BoundedBufMut, Buffer, Slice}; use crate::fs::OpenOptions; use crate::io::SharedFd; -use crate::runtime::driver::op::{Op, Submit}; +use crate::runtime::driver::op::Op; use crate::MapResult; -use crate::{ - UnsubmittedOneshot, UnsubmittedRead, UnsubmittedReadv, UnsubmittedWrite, UnsubmittedWritev, -}; +use crate::Unsubmitted; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -43,7 +41,7 @@ use std::path::Path; /// let file = File::create("hello.txt").await?; /// /// // Write some data -/// let (n, buf) = file.write_at(&b"hello world"[..], 0).submit().await?; +/// let (n, buf) = file.write_at(b"hello world".to_vec().into(), 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -166,12 +164,12 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::open("foo.txt").await?; - /// let buffer = vec![0; 10]; + /// let buffer = vec![0; 10].into(); /// /// // Read up to 10 bytes /// let (n, buffer) = f.read_at(buffer, 0).submit().await?; /// - /// println!("The bytes: {:?}", &buffer[..n]); + /// println!("The bytes: {:?}", &buffer[0][..n]); /// /// // Close the file /// f.close().await?; @@ -179,8 +177,8 @@ impl File { /// }) /// } /// ``` - pub fn read_at(&self, buf: T, pos: u64) -> UnsubmittedRead { - UnsubmittedOneshot::read_at(&self.fd, buf, pos) + pub fn read_at(&self, buf: Buffer, pos: u64) -> Unsubmitted { + Unsubmitted::read_at(&self.fd, buf, pos) } /// Read some bytes at the specified offset from the file into the specified @@ -210,14 +208,15 @@ impl File { /// /// ```no_run /// use tokio_uring::fs::File; + /// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::open("foo.txt").await?; - /// let buffers = vec![Vec::::with_capacity(10), Vec::::with_capacity(10)]; + /// let buffers = vec![Vec::::with_capacity(10), Vec::::with_capacity(10)].into(); /// /// // Read up to 20 bytes - /// let (n, _) = f.readv_at(buffers, 0).submit().await?; + /// let (n, _) = f.read_at(buffers, 0).submit().await?; /// /// println!("Read {} bytes", n); /// @@ -227,9 +226,9 @@ impl File { /// }) /// } /// ``` - pub fn readv_at(&self, bufs: Vec, pos: u64) -> UnsubmittedReadv { - UnsubmittedOneshot::readv_at(&self.fd, bufs, pos) - } + // pub fn readv_at(&self, bufs: Vec, pos: u64) -> UnsubmittedReadv { + // UnsubmittedOneshot::readv_at(&self.fd, bufs, pos) + // } /// Write data from buffers into this file at the specified offset, /// returning how many bytes were written. @@ -258,14 +257,15 @@ impl File { /// /// ```no_run /// use tokio_uring::fs::File; + /// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()]; - /// let (n, _) = file.writev_at(bufs, 0).submit().await?; + /// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()].into(); + /// let (n, _) = file.write_at(bufs, 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -277,153 +277,9 @@ impl File { /// ``` /// /// [`Ok(n)`]: Ok - pub fn writev_at(&self, bufs: Vec, pos: u64) -> UnsubmittedWritev { - UnsubmittedOneshot::writev_at(&self.fd, bufs, pos) - } - - /// Like `writev_at` but will call the `io_uring` `writev` operation multiple times if - /// necessary. - /// - /// Parameter `pos` is an `Option` to allow this function to be used for both files that - /// are seekable and those that are not. The caller is responsible for knowing this. - /// - /// When `None` is supplied, the offset passed to the `io_uring` call will always be zero, even - /// if multiple writev calls are necessary; only the iovec information would be adjusted - /// between calls. A Unix pipe would fall into this category. - /// - /// When `Some(n)` is suppied, the offset passed to the writev call will be incremented by the - /// progress of prior writev calls. A file system's regular file would fall into this category. - /// - /// If the caller passes `Some(n)` for a file that is not seekable, the `io_uring` `writev` - /// operation will return an error once n is not zero. - /// - /// If the caller passes `None`, when the file *is* seekable, when multiple `writev` calls are - /// required to complete the writing of all the bytes, the bytes at position 0 of the file will - /// have been overwritten one or more times with incorrect data. This is true just as if the - /// caller had invoked seperate write calls to a file, all with position 0, when in fact the - /// file was seekable. - /// - /// Performance considerations: - /// - /// The user may want to check that this function is necessary in their use case or performs - /// better than a series of write_all operations would. There is overhead either way and it is - /// not clear which should be faster or give better throughput. - /// - /// This function causes the temporary allocation of a Vec one time to hold the array of iovec - /// that is passed to the kernel. The same array is used for any subsequent calls to get all - /// the bytes written. Whereas individual calls to write_all do not require the Vec to be - /// allocated, they do each incur the normal overhead of setting up the submission and - /// completion structures and going through the future poll mechanism. - /// - /// TODO decide, would a separate `writev_all` function for `file` that did not take a `pos` - /// make things less ambiguous? - /// - /// TODO more complete documentation here. - /// TODO define writev_all functions for net/unix/stream, net/tcp/stream, io/socket. - /// TODO remove usize from result, to be consistent with other write_all_vectored functions. - /// TODO find a way to test this with some stress to the file so the writev calls don't all - /// succeed on their first try. - /// TODO consider replacing the current `write_all` and `write_all_at` functions with a similar - /// mechanism so all the write-all logic is in one place, in the io/write_all.rs file. - pub async fn writev_at_all( - &self, - buf: Vec, - pos: Option, // Use None for files that can't seek - ) -> crate::Result> { - let op = crate::io::writev_at_all(&self.fd, buf, pos); - op.await - } - - /// Read the exact number of bytes required to fill `buf` at the specified - /// offset from the file. - /// - /// This function reads as many as bytes as necessary to completely fill the - /// specified buffer `buf`. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// as an argument. - /// - /// If the method returns [`Ok(())`], then the read was successful. - /// - /// # Errors - /// - /// If this function encounters an "end of file" before completely filling - /// the buffer, it returns an error of the kind [`ErrorKind::UnexpectedEof`]. - /// The buffer is returned on error. - /// - /// If this function encounters any form of I/O or other error, an error - /// variant will be returned. The buffer is returned on error. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let f = File::open("foo.txt").await?; - /// let buffer = Vec::with_capacity(10); - /// - /// // Read up to 10 bytes - /// let (_, buffer) = f.read_exact_at(buffer, 0).await?; - /// - /// println!("The bytes: {:?}", buffer); - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - /// - /// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof - pub async fn read_exact_at(&self, buf: T, pos: u64) -> crate::Result<(), T> - where - T: BoundedBufMut, - { - let orig_bounds = buf.bounds(); - self.read_exact_slice_at(buf.slice_full(), pos) - .await - .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) - } - - async fn read_exact_slice_at( - &self, - mut buf: Slice, - mut pos: u64, - ) -> crate::Result<(), T> { - if pos.checked_add(buf.bytes_total() as u64).is_none() { - return Err(crate::Error( - io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), - buf.into_inner(), - )); - } - - while buf.bytes_total() != 0 { - match self.read_at(buf, pos).submit().await { - Ok((0, slice)) => { - return Err(crate::Error( - io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"), - slice.into_inner(), - )) - } - Ok((n, slice)) => { - pos += n as u64; - buf = slice.slice(n..); - } - - // No match on an EINTR error is performed because this - // crate's design ensures we are not calling the 'wait' option - // in the ENTER syscall. Only an Enter with 'wait' can generate - // an EINTR according to the io_uring man pages. - Err(e) => return Err(e.map(|slice| slice.into_inner())), - }; - } - - Ok(((), buf.into_inner())) - } + // pub fn writev_at(&self, bufs: Vec, pos: u64) -> UnsubmittedWritev { + // UnsubmittedOneshot::writev_at(&self.fd, bufs, pos) + // } /// Like [`read_at`], but using a pre-mapped buffer /// registered with [`FixedBufRegistry`]. @@ -507,7 +363,7 @@ impl File { /// let file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (n, _) = file.write_at(&b"some bytes"[..], 0).submit().await?; + /// let (n, _) = file.write_at(b"some bytes".to_vec().into(), 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -519,94 +375,8 @@ impl File { /// ``` /// /// [`Ok(n)`]: Ok - pub fn write_at(&self, buf: T, pos: u64) -> UnsubmittedWrite { - UnsubmittedOneshot::write_at(&self.fd, buf, pos) - } - - /// Attempts to write an entire buffer into this file at the specified offset. - /// - /// This method will continuously call [`write_at`] until there is no more data - /// to be written or an error is returned. - /// This method will not return until the entire buffer has been successfully - /// written or an error occurs. - /// - /// If the buffer contains no data, this will never call [`write_at`]. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// in as an argument. - /// - /// # Errors - /// - /// This function will return the first error that [`write_at`] returns. - /// - /// # Examples - /// - /// ```no_run - /// use tokio_uring::fs::File; - /// - /// fn main() -> Result<(), Box> { - /// tokio_uring::start(async { - /// let file = File::create("foo.txt").await?; - /// - /// // Writes some prefix of the byte string, not necessarily all of it. - /// file.write_all_at(&b"some bytes"[..], 0).await?; - /// - /// println!("wrote all bytes"); - /// - /// // Close the file - /// file.close().await?; - /// Ok(()) - /// }) - /// } - /// ``` - /// - /// [`write_at`]: File::write_at - pub async fn write_all_at(&self, buf: T, pos: u64) -> crate::Result<(), T> - where - T: BoundedBuf, - { - let orig_bounds = buf.bounds(); - self.write_all_slice_at(buf.slice_full(), pos) - .await - .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) - } - - async fn write_all_slice_at( - &self, - mut buf: Slice, - mut pos: u64, - ) -> crate::Result<(), T> { - if pos.checked_add(buf.bytes_init() as u64).is_none() { - return Err(crate::Error( - io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), - buf.into_inner(), - )); - } - - while buf.bytes_init() != 0 { - match self.write_at(buf, pos).submit().await { - Ok((0, slice)) => { - return Err(crate::Error( - io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), - slice.into_inner(), - )) - } - Ok((n, slice)) => { - pos += n as u64; - buf = slice.slice(n..); - } - - // No match on an EINTR error is performed because this - // crate's design ensures we are not calling the 'wait' option - // in the ENTER syscall. Only an Enter with 'wait' can generate - // an EINTR according to the io_uring man pages. - Err(e) => return Err(e.map(|slice| slice.into_inner())), - }; - } - - Ok(((), buf.into_inner())) + pub fn write_at(&self, buf: Buffer, pos: u64) -> Unsubmitted { + Unsubmitted::write_at(&self.fd, buf, pos) } /// Like [`write_at`], but using a pre-mapped buffer @@ -740,7 +510,7 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::create("foo.txt").await?; - /// f.write_at(&b"Hello, world!"[..], 0).submit().await?; + /// f.write_at(b"Hello, world!".to_vec().into(), 0).submit().await?; /// /// f.sync_all().await?; /// @@ -777,7 +547,7 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::create("foo.txt").await?; - /// f.write_at(&b"Hello, world!"[..], 0).submit().await?; + /// f.write_at(b"Hello, world!".to_vec().into(), 0).submit().await?; /// /// f.sync_data().await?; /// diff --git a/src/io/mod.rs b/src/io/mod.rs index b4e8dd2a..b3bffd58 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -15,12 +15,8 @@ pub(crate) use noop::NoOp; mod open; -pub(crate) mod read; - mod read_fixed; -pub(crate) mod readv; - mod recv_from; mod recvmsg; @@ -48,11 +44,6 @@ mod unlink_at; mod util; pub(crate) use util::cstr; -pub(crate) mod write; +pub(crate) mod read_write; mod write_fixed; - -pub(crate) mod writev; - -mod writev_all; -pub(crate) use writev_all::writev_at_all; diff --git a/src/io/read.rs b/src/io/read.rs deleted file mode 100644 index 189ba005..00000000 --- a/src/io/read.rs +++ /dev/null @@ -1,69 +0,0 @@ -use io_uring::cqueue::Entry; - -use crate::buf::BoundedBufMut; -use crate::io::SharedFd; -use crate::{OneshotOutputTransform, Result, UnsubmittedOneshot, WithBuffer}; - -use std::io; -use std::marker::PhantomData; - -/// An unsubmitted read operation. -pub type UnsubmittedRead = UnsubmittedOneshot, ReadTransform>; - -#[allow(missing_docs)] -pub struct ReadData { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - _fd: SharedFd, - - buf: T, -} - -#[allow(missing_docs)] -pub struct ReadTransform { - _phantom: PhantomData, -} - -impl OneshotOutputTransform for ReadTransform -where - T: BoundedBufMut, -{ - type Output = Result; - type StoredData = ReadData; - - fn transform_oneshot_output(self, mut data: Self::StoredData, cqe: Entry) -> Self::Output { - let n = cqe.result(); - let res = if n >= 0 { - // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { data.buf.set_init(n as usize) }; - Ok(n as usize) - } else { - Err(io::Error::from_raw_os_error(-n)) - }; - - res.with_buffer(data.buf) - } -} - -impl UnsubmittedRead { - pub(crate) fn read_at(fd: &SharedFd, mut buf: T, offset: u64) -> Self { - use io_uring::{opcode, types}; - - // Get raw buffer info - let ptr = buf.stable_mut_ptr(); - let len = buf.bytes_total(); - - Self::new( - ReadData { - _fd: fd.clone(), - buf, - }, - ReadTransform { - _phantom: PhantomData, - }, - opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build(), - ) - } -} diff --git a/src/io/read_write.rs b/src/io/read_write.rs new file mode 100644 index 00000000..4449e50e --- /dev/null +++ b/src/io/read_write.rs @@ -0,0 +1,108 @@ +use libc::iovec; + +use crate::buf::{BoundedBufMut, Buffer}; +use crate::WithBuffer; +use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot}; +use std::io; + +#[allow(missing_docs)] +pub type Unsubmitted = UnsubmittedOneshot; + +#[allow(missing_docs)] +pub struct ReadWriteData { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, + + buf: Buffer, +} + +enum Kind { + Read, + Write, +} + +#[allow(missing_docs)] +pub struct ReadWriteTransform(Kind); + +impl OneshotOutputTransform for ReadWriteTransform { + type Output = Result; + + type StoredData = ReadWriteData; + + fn transform_oneshot_output( + self, + mut data: Self::StoredData, + cqe: io_uring::cqueue::Entry, + ) -> Self::Output { + let n = cqe.result(); + if n < 0 { + return Err(io::Error::from_raw_os_error(-n)).with_buffer(data.buf); + } + + if matches!(self.0, Kind::Read) { + // Safety: the kernel wrote `n` bytes to the buffer. + unsafe { data.buf.set_init(n as usize) }; + } + + Ok((n as usize, data.buf)) + } +} + +impl Unsubmitted { + pub(crate) fn write_at(fd: &SharedFd, buf: Buffer, offset: u64) -> Self { + use io_uring::{opcode, types}; + + // Get raw buffer info + let ptr = buf.stable_ptr(); + let len = buf.bytes_init(); + + let sqe = if buf.len() == 1 { + opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _) + .offset(offset as _) + .build() + } else { + opcode::Writev::new(types::Fd(fd.raw_fd()), ptr as *const iovec, buf.len() as _) + .offset(offset as _) + .build() + }; + + Self::new( + ReadWriteData { + _fd: fd.clone(), + buf, + }, + ReadWriteTransform(Kind::Write), + sqe, + ) + } + + pub(crate) fn read_at(fd: &SharedFd, mut buf: Buffer, offset: u64) -> Self { + use io_uring::{opcode, types}; + + // Get raw buffer info + let ptr = buf.stable_mut_ptr(); + let len = buf.bytes_total(); + + buf.fill(); + + let sqe = if buf.len() == 1 { + opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) + .offset(offset as _) + .build() + } else { + opcode::Readv::new(types::Fd(fd.raw_fd()), ptr as *mut iovec, buf.len() as _) + .offset(offset as _) + .build() + }; + + Self::new( + ReadWriteData { + _fd: fd.clone(), + buf, + }, + ReadWriteTransform(Kind::Read), + sqe, + ) + } +} diff --git a/src/io/readv.rs b/src/io/readv.rs deleted file mode 100644 index a048dd77..00000000 --- a/src/io/readv.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::buf::BoundedBufMut; -use crate::{OneshotOutputTransform, Result, UnsubmittedOneshot, WithBuffer}; - -use crate::io::SharedFd; -use io_uring::cqueue::Entry; -use libc::iovec; -use std::io; -use std::marker::PhantomData; - -/// An unsubmitted readv operation. -pub type UnsubmittedReadv = UnsubmittedOneshot, ReadvTransform>; - -#[allow(missing_docs)] -pub struct ReadvData { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, - - /// Reference to the in-flight buffer. - pub(crate) bufs: Vec, - - /// Parameter for `io_uring::op::readv`, referring `bufs`. - #[allow(dead_code)] - iovs: Vec, -} - -#[allow(missing_docs)] -pub struct ReadvTransform { - _phantom: PhantomData, -} - -impl OneshotOutputTransform for ReadvTransform -where - T: BoundedBufMut, -{ - type Output = Result>; - type StoredData = ReadvData; - - fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { - // Recover the buffer - let mut bufs = data.bufs; - - let res = if cqe.result() >= 0 { - // If the operation was successful, advance the initialized cursor. - let mut count = cqe.result() as usize; - for b in bufs.iter_mut() { - let sz = std::cmp::min(count, b.bytes_total() - b.bytes_init()); - let pos = b.bytes_init() + sz; - // Safety: the kernel returns bytes written, and we have ensured that `pos` is - // valid for current buffer. - unsafe { b.set_init(pos) }; - count -= sz; - if count == 0 { - break; - } - } - assert_eq!(count, 0); - Ok(cqe.result() as usize) - } else { - Err(io::Error::from_raw_os_error(-cqe.result())) - }; - - res.with_buffer(bufs) - } -} - -impl UnsubmittedReadv { - pub(crate) fn readv_at(fd: &SharedFd, mut bufs: Vec, offset: u64) -> Self { - use io_uring::{opcode, types}; - - let iovs: Vec = bufs - .iter_mut() - .map(|b| iovec { - // Safety guaranteed by `BoundedBufMut`. - iov_base: unsafe { b.stable_mut_ptr().add(b.bytes_init()) as *mut libc::c_void }, - iov_len: b.bytes_total() - b.bytes_init(), - }) - .collect(); - - // Get raw buffer info - let ptr = iovs.as_ptr(); - let len = iovs.len(); - - Self::new( - ReadvData { - fd: fd.clone(), - bufs, - iovs, - }, - ReadvTransform { - _phantom: PhantomData, - }, - opcode::Readv::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build(), - ) - } -} diff --git a/src/io/socket.rs b/src/io/socket.rs index ff675564..e462073b 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,8 +1,9 @@ -use crate::io::write::UnsubmittedWrite; +use crate::buf::Buffer; +use crate::io::read_write::Unsubmitted; use crate::runtime::driver::op::{Op, Submit}; use crate::{ buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut, IoBuf, Slice}, + buf::{BoundedBuf, BoundedBufMut, Slice}, io::SharedFd, UnsubmittedOneshot, }; @@ -43,44 +44,8 @@ impl Socket { Ok(Socket { fd }) } - pub(crate) fn write(&self, buf: T) -> UnsubmittedWrite { - UnsubmittedOneshot::write_at(&self.fd, buf, 0) - } - - pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { - let orig_bounds = buf.bounds(); - match self.write_all_slice(buf.slice_full()).await { - Ok((x, buf)) => Ok((x, T::from_buf_bounds(buf, orig_bounds))), - Err(e) => Err(e.map(|buf| T::from_buf_bounds(buf, orig_bounds))), - } - } - - async fn write_all_slice(&self, mut buf: Slice) -> crate::Result<(), T> { - while buf.bytes_init() != 0 { - let res = self.write(buf).submit().await; - match res { - Ok((0, slice)) => { - return Err(crate::Error( - std::io::Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - ), - slice.into_inner(), - )) - } - Ok((n, slice)) => { - buf = slice.slice(n..); - } - - // No match on an EINTR error is performed because this - // crate's design ensures we are not calling the 'wait' option - // in the ENTER syscall. Only an Enter with 'wait' can generate - // an EINTR according to the io_uring man pages. - Err(e) => return Err(e.map(|slice| slice.into_inner())), - } - } - - Ok(((), buf.into_inner())) + pub(crate) fn write(&self, buf: Buffer) -> Unsubmitted { + Unsubmitted::write_at(&self.fd, buf, 0) } pub(crate) async fn write_fixed(&self, buf: T) -> crate::Result @@ -130,12 +95,6 @@ impl Socket { Ok(((), buf.into_inner())) } - pub async fn writev(&self, bufs: Vec) -> crate::Result> { - UnsubmittedOneshot::writev_at(&self.fd, bufs, 0) - .submit() - .await - } - pub(crate) async fn send_to( &self, buf: T, @@ -170,7 +129,7 @@ impl Socket { op.await } - pub(crate) async fn read(&self, buf: T) -> crate::Result { + pub(crate) async fn read(&self, buf: Buffer) -> crate::Result { UnsubmittedOneshot::read_at(&self.fd, buf, 0).submit().await } diff --git a/src/io/write.rs b/src/io/write.rs deleted file mode 100644 index 01675a87..00000000 --- a/src/io/write.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot}; -use crate::WithBuffer; -use io_uring::cqueue::Entry; -use std::io; -use std::marker::PhantomData; - -/// An unsubmitted write operation. -pub type UnsubmittedWrite = UnsubmittedOneshot, WriteTransform>; - -#[allow(missing_docs)] -pub struct WriteData { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - _fd: SharedFd, - - buf: T, -} - -#[allow(missing_docs)] -pub struct WriteTransform { - _phantom: PhantomData, -} - -impl OneshotOutputTransform for WriteTransform { - type Output = Result; - type StoredData = WriteData; - - fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { - let res = if cqe.result() >= 0 { - Ok(cqe.result() as usize) - } else { - Err(io::Error::from_raw_os_error(-cqe.result())) - }; - - res.with_buffer(data.buf) - } -} - -impl UnsubmittedWrite { - pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> Self { - use io_uring::{opcode, types}; - - // Get raw buffer info - let ptr = buf.stable_ptr(); - let len = buf.bytes_init(); - - Self::new( - WriteData { - _fd: fd.clone(), - buf, - }, - WriteTransform { - _phantom: PhantomData, - }, - opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build(), - ) - } -} diff --git a/src/io/writev.rs b/src/io/writev.rs deleted file mode 100644 index 50edc2b4..00000000 --- a/src/io/writev.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::{buf::BoundedBuf, io::SharedFd, Result}; -use crate::{OneshotOutputTransform, UnsubmittedOneshot, WithBuffer}; -use io_uring::cqueue::Entry; -use libc::iovec; -use std::io; -use std::marker::PhantomData; - -/// An unsubmitted writev operation. -pub type UnsubmittedWritev = UnsubmittedOneshot, WritevTransform>; - -#[allow(missing_docs)] -pub struct WritevData { - /// Holds a strong ref to the FD, preventing the file from being closed - /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, - - pub(crate) bufs: Vec, - - /// Parameter for `io_uring::op::readv`, referring `bufs`. - #[allow(dead_code)] - iovs: Vec, -} - -#[allow(missing_docs)] -pub struct WritevTransform { - _phantom: PhantomData, -} - -impl OneshotOutputTransform for WritevTransform -where - T: BoundedBuf, -{ - type Output = Result>; - type StoredData = WritevData; - - fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { - let res = if cqe.result() >= 0 { - Ok(cqe.result() as usize) - } else { - Err(io::Error::from_raw_os_error(-cqe.result())) - }; - - res.with_buffer(data.bufs) - } -} - -impl UnsubmittedWritev { - pub(crate) fn writev_at(fd: &SharedFd, mut bufs: Vec, offset: u64) -> Self { - use io_uring::{opcode, types}; - - let iovs: Vec = bufs - .iter_mut() - .map(|b| iovec { - // Safety guaranteed by `BoundedBufMut`. - iov_base: b.stable_ptr() as *mut libc::c_void, - iov_len: b.bytes_init(), - }) - .collect(); - - // Get raw buffer info - let ptr = iovs.as_ptr(); - let len = iovs.len(); - - Self::new( - WritevData { - fd: fd.clone(), - bufs, - iovs, - }, - WritevTransform { - _phantom: PhantomData, - }, - opcode::Writev::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build(), - ) - } -} diff --git a/src/lib.rs b/src/lib.rs index 2d084526..12fd0054 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,14 +17,14 @@ //! // Open a file //! let file = File::open("hello.txt").await?; //! -//! let buf = vec![0; 4096]; +//! let buf = vec![0; 4096].into(); //! // Read some data, the buffer is passed by ownership and //! // submitted to the kernel. When the operation completes, //! // we get the buffer back. //! let (n, buf) = file.read_at(buf, 0).submit().await?; //! //! // Display the contents -//! println!("{:?}", &buf[..n]); +//! println!("{:?}", &buf[0][..n]); //! //! Ok(()) //! }) @@ -55,7 +55,6 @@ //! will happen in the background. There is no guarantee as to **when** the //! implicit close-on-drop operation happens, so it is recommended to explicitly //! call `close()`. - #![warn(missing_docs)] macro_rules! syscall { @@ -72,6 +71,7 @@ macro_rules! syscall { #[macro_use] mod future; mod io; +#[allow(missing_docs)] pub mod runtime; mod types; @@ -79,10 +79,8 @@ pub mod buf; pub mod fs; pub mod net; -pub use io::read::*; -pub use io::readv::*; -pub use io::write::*; -pub use io::writev::*; +pub use buf::Buffer; +pub use io::read_write::*; pub use runtime::driver::op::{ InFlightOneshot, Link, LinkedInFlightOneshot, OneshotOutputTransform, Submit, UnsubmittedOneshot, @@ -120,14 +118,14 @@ use std::future::Future; /// // Open a file /// let file = File::open("hello.txt").await?; /// -/// let buf = vec![0; 4096]; +/// let buf = vec![0; 4096].into(); /// // Read some data, the buffer is passed by ownership and /// // submitted to the kernel. When the operation completes, /// // we get the buffer back. /// let (n, buf) = file.read_at(buf, 0).submit().await?; /// /// // Display the contents -/// println!("{:?}", &buf[..n]); +/// println!("{:?}", &buf[0][..n]); /// /// Ok(()) /// }) diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 62d31a00..11981220 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -34,11 +34,11 @@ use std::{ /// let tx = TcpStream::connect("127.0.0.1:2345".parse().unwrap()).await.unwrap(); /// let rx = rx_ch.await.expect("The spawned task expected to send a TcpStream"); /// -/// tx.write(b"test" as &'static [u8]).submit().await.unwrap(); +/// tx.write(b"test".to_vec().into()).submit().await.unwrap(); /// -/// let (_, buf) = rx.read(vec![0; 4]).await.unwrap(); +/// let (_, buf) = rx.read(vec![0; 4].into()).await.unwrap(); /// -/// assert_eq!(buf, b"test"); +/// assert_eq!(&buf[0], b"test"); /// }); /// ``` pub struct TcpListener { diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 0dea1c4a..f274497c 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -5,10 +5,9 @@ use std::{ }; use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut, Buffer}, io::{SharedFd, Socket}, - UnsubmittedWrite, + Submit, Unsubmitted, }; /// A TCP stream between a local and a remote socket. @@ -29,7 +28,7 @@ use crate::{ /// let mut stream = TcpStream::connect("127.0.0.1:8080".parse().unwrap()).await?; /// /// // Write some data. -/// stream.write(b"hello world!".as_slice()).submit().await.unwrap(); +/// stream.write(b"hello world!".to_vec().into()).submit().await.unwrap(); /// /// Ok(()) /// }) @@ -74,7 +73,7 @@ impl TcpStream { /// Read some data from the stream into the buffer. /// /// Returns the original buffer and quantity of data read. - pub async fn read(&self, buf: T) -> crate::Result { + pub async fn read(&self, buf: Buffer) -> crate::Result { self.inner.read(buf).await } @@ -101,62 +100,10 @@ impl TcpStream { /// Write some data to the stream from the buffer. /// /// Returns the original buffer and quantity of data written. - pub fn write(&self, buf: T) -> UnsubmittedWrite { + pub fn write(&self, buf: Buffer) -> Unsubmitted { self.inner.write(buf) } - /// Attempts to write an entire buffer to the stream. - /// - /// This method will continuously call [`write`] until there is no more data to be - /// written or an error is returned. This method will not return until the entire - /// buffer has been successfully written or an error has occurred. - /// - /// If the buffer contains no data, this will never call [`write`]. - /// - /// # Errors - /// - /// This function will return the first error that [`write`] returns. - /// - /// # Examples - /// - /// ```no_run - /// use std::net::SocketAddr; - /// use tokio_uring::net::TcpListener; - /// use tokio_uring::buf::BoundedBuf; - /// - /// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - /// - /// tokio_uring::start(async { - /// let listener = TcpListener::bind(addr).unwrap(); - /// - /// println!("Listening on {}", listener.local_addr().unwrap()); - /// - /// loop { - /// let (stream, _) = listener.accept().await.unwrap(); - /// tokio_uring::spawn(async move { - /// let mut n = 0; - /// let mut buf = vec![0u8; 4096]; - /// loop { - /// let (read, nbuf) = stream.read(buf).await.unwrap(); - /// buf = nbuf; - /// if read == 0 { - /// break; - /// } - /// - /// let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); - /// buf = slice.into_inner(); - /// n += read; - /// } - /// }); - /// } - /// }); - /// ``` - /// - /// [`write`]: Self::write - pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { - self.inner.write_all(buf).await - } - /// Writes data into the socket from a registered buffer. /// /// Like [`write`], but using a pre-mapped buffer @@ -220,8 +167,8 @@ impl TcpStream { /// written to this writer. /// /// [`Ok(n)`]: Ok - pub async fn writev(&self, buf: Vec) -> crate::Result> { - self.inner.writev(buf).await + pub async fn writev(&self, buf: Buffer) -> crate::Result { + self.inner.write(buf).submit().await } /// Shuts down the read, write, or both halves of this connection. diff --git a/src/net/udp.rs b/src/net/udp.rs index 5a8497e1..eb57fb9c 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,8 +1,7 @@ use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut, Buffer}, io::{SharedFd, Socket}, - UnsubmittedWrite, + Unsubmitted, }; use socket2::SockAddr; use std::{ @@ -41,15 +40,15 @@ use std::{ /// socket.connect(second_addr).await.unwrap(); /// other_socket.connect(first_addr).await.unwrap(); /// -/// let buf = vec![0; 32]; +/// let buf = vec![0; 32].into(); /// /// // write data -/// socket.write(b"hello world".as_slice()).submit().await.unwrap(); +/// socket.write(b"hello world".to_vec().into()).submit().await.unwrap(); /// /// // read data /// let (n_bytes, buf) = other_socket.read(buf).await.unwrap(); /// -/// assert_eq!(b"hello world", &buf[..n_bytes]); +/// assert_eq!(b"hello world", &buf[0][..n_bytes]); /// /// // write data using send on connected socket /// socket.send(b"hello world via send".as_slice()).await.unwrap(); @@ -57,7 +56,7 @@ use std::{ /// // read data /// let (n_bytes, buf) = other_socket.read(buf).await.unwrap(); /// -/// assert_eq!(b"hello world via send", &buf[..n_bytes]); +/// assert_eq!(b"hello world via send", &buf[0][..n_bytes]); /// /// Ok(()) /// }) @@ -310,7 +309,7 @@ impl UdpSocket { /// Reads a packet of data from the socket into the buffer. /// /// Returns the original buffer and quantity of data read. - pub async fn read(&self, buf: T) -> crate::Result { + pub async fn read(&self, buf: Buffer) -> crate::Result { self.inner.read(buf).await } @@ -337,7 +336,7 @@ impl UdpSocket { /// Writes data into the socket from the specified buffer. /// /// Returns the original buffer and quantity of data written. - pub fn write(&self, buf: T) -> UnsubmittedWrite { + pub fn write(&self, buf: Buffer) -> Unsubmitted { self.inner.write(buf) } diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index 6f0c523f..9c600813 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -31,11 +31,11 @@ use std::{io, path::Path}; /// let tx = UnixStream::connect(&sock_file).await.unwrap(); /// let rx = rx_ch.await.expect("The spawned task expected to send a UnixStream"); /// -/// tx.write(b"test" as &'static [u8]).submit().await.unwrap(); +/// tx.write(b"test".to_vec().into()).submit().await.unwrap(); /// -/// let (_, buf) = rx.read(vec![0; 4]).await.unwrap(); +/// let (_, buf) = rx.read(vec![0; 4].into()).await.unwrap(); /// -/// assert_eq!(buf, b"test"); +/// assert_eq!(&buf[0], b"test"); /// }); /// /// std::fs::remove_file(&sock_file).unwrap(); diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index a0d100aa..50cadbaf 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -1,8 +1,7 @@ use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut, Buffer}, io::{SharedFd, Socket}, - UnsubmittedWrite, + Submit, Unsubmitted, }; use socket2::SockAddr; use std::{ @@ -29,7 +28,7 @@ use std::{ /// let mut stream = UnixStream::connect("/tmp/tokio-uring-unix-test.sock").await?; /// /// // Write some data. -/// stream.write(b"hello world!".as_slice()).submit().await.unwrap(); +/// stream.write(b"hello world!".to_vec().into()).submit().await.unwrap(); /// /// Ok(()) /// }) @@ -75,7 +74,7 @@ impl UnixStream { /// Read some data from the stream into the buffer, returning the original buffer and /// quantity of data read. - pub async fn read(&self, buf: T) -> crate::Result { + pub async fn read(&self, buf: Buffer) -> crate::Result { self.inner.read(buf).await } @@ -99,27 +98,10 @@ impl UnixStream { /// Write some data to the stream from the buffer, returning the original buffer and /// quantity of data written. - pub fn write(&self, buf: T) -> UnsubmittedWrite { + pub fn write(&self, buf: Buffer) -> Unsubmitted { self.inner.write(buf) } - /// Attempts to write an entire buffer to the stream. - /// - /// This method will continuously call [`write`] until there is no more data to be - /// written or an error is returned. This method will not return until the entire - /// buffer has been successfully written or an error has occurred. - /// - /// If the buffer contains no data, this will never call [`write`]. - /// - /// # Errors - /// - /// This function will return the first error that [`write`] returns. - /// - /// [`write`]: Self::write - pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { - self.inner.write_all(buf).await - } - /// Like [`write`], but using a pre-mapped buffer /// registered with [`FixedBufRegistry`]. /// @@ -182,8 +164,8 @@ impl UnixStream { /// written to this writer. /// /// [`Ok(n)`]: Ok - pub async fn writev(&self, buf: Vec) -> crate::Result> { - self.inner.writev(buf).await + pub async fn writev(&self, buf: Buffer) -> crate::Result { + self.inner.write(buf).submit().await } /// Shuts down the read, write, or both halves of this connection. diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 91d3ea21..ad4fec9d 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -125,13 +125,13 @@ impl Driver { } pub(crate) fn register_files(&mut self, fds: &[RawFd]) -> io::Result<()> { - unsafe { self.uring.submitter().register_files(fds) }?; + self.uring.submitter().register_files(fds)?; Ok(()) } pub(crate) fn unregister_files(&mut self) -> io::Result<()> { - unsafe { self.uring.submitter().unregister_files() }?; + self.uring.submitter().unregister_files()?; Ok(()) } diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 3bc03f57..8299dcd7 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -27,6 +27,7 @@ pub(crate) type Completion = SlabListEntry; pub struct UnsubmittedOneshot> { stable_data: D, post_op: T, + #[allow(missing_docs)] pub sqe: squeue::Entry, } @@ -62,7 +63,7 @@ impl> UnsubmittedOneshot { self } - // Create inflight from submitted index. + /// Create inflight from submitted index. pub fn inflight(self, index: usize) -> InFlightOneshot { let handle = CONTEXT .with(|x| x.handle()) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index dd75bb81..66c835f9 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,6 +10,7 @@ pub(crate) mod driver; pub(crate) use context::RuntimeContext; thread_local! { + #[allow(missing_docs)] pub static CONTEXT: RuntimeContext = const { RuntimeContext::new() }; } diff --git a/src/types.rs b/src/types.rs index 40742bd8..dbfe01b5 100644 --- a/src/types.rs +++ b/src/types.rs @@ -10,20 +10,21 @@ use std::fmt::{Debug, Display}; /// /// ```no_run /// use tokio_uring::fs::File; +/// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// // Open a file /// let file = File::open("hello.txt").await?; /// -/// let buf = vec![0; 4096]; +/// let buf = vec![0; 4096].into(); /// // Read some data, the buffer is passed by ownership and /// // submitted to the kernel. When the operation completes, /// // we get the buffer back. -/// let (n, buf) = file.read_at(buf, 0).await?; +/// let (n, buf) = file.read_at(buf, 0).submit().await?; /// /// // Display the contents -/// println!("{:?}", &buf[..n]); +/// println!("{:?}", &buf[0][..n]); /// /// Ok(()) /// }) diff --git a/tests/driver.rs b/tests/driver.rs index 9550f16c..40f8bc39 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -1,80 +1,11 @@ use tempfile::NamedTempFile; -use tokio_uring::{buf::IoBuf, fs::File, Submit}; +use tokio_uring::{fs::File, Submit}; #[path = "../src/future.rs"] #[allow(warnings)] mod future; -#[test] -fn complete_ops_on_drop() { - use std::sync::Arc; - - struct MyBuf { - data: Vec, - _ref_cnt: Arc<()>, - } - - unsafe impl IoBuf for MyBuf { - fn stable_ptr(&self) -> *const u8 { - self.data.stable_ptr() - } - - fn bytes_init(&self) -> usize { - self.data.bytes_init() - } - - fn bytes_total(&self) -> usize { - self.data.bytes_total() - } - } - - unsafe impl tokio_uring::buf::IoBufMut for MyBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.data.stable_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - self.data.set_init(pos); - } - } - - // Used to test if the buffer dropped. - let ref_cnt = Arc::new(()); - - let tempfile = tempfile(); - - let vec = vec![0; 50 * 1024 * 1024]; - let mut file = std::fs::File::create(tempfile.path()).unwrap(); - std::io::Write::write_all(&mut file, &vec).unwrap(); - - let file = tokio_uring::start(async { - let file = File::create(tempfile.path()).await.unwrap(); - poll_once(async { - file.read_at( - MyBuf { - data: vec![0; 64 * 1024], - _ref_cnt: ref_cnt.clone(), - }, - 25 * 1024 * 1024, - ) - .submit() - .await - .unwrap(); - }) - .await; - - file - }); - - assert_eq!(Arc::strong_count(&ref_cnt), 1); - - // little sleep - std::thread::sleep(std::time::Duration::from_millis(100)); - - drop(file); -} - #[test] fn too_many_submissions() { let tempfile = tempfile(); @@ -83,7 +14,7 @@ fn too_many_submissions() { let file = File::create(tempfile.path()).await.unwrap(); for _ in 0..600 { poll_once(async { - file.write_at(b"hello world".to_vec(), 0) + file.write_at(b"hello world".to_vec().into(), 0) .submit() .await .unwrap(); diff --git a/tests/fs_file.rs b/tests/fs_file.rs index c91ac02e..02459ef1 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -18,10 +18,10 @@ mod future; const HELLO: &[u8] = b"hello world..."; async fn read_hello(file: &File) { - let buf = Vec::with_capacity(1024); + let buf = Vec::::with_capacity(1024).into(); let (n, buf) = file.read_at(buf, 0).submit().await.unwrap(); assert_eq!(n, HELLO.len()); - assert_eq!(&buf[..n], HELLO); + assert_eq!(&buf[0][..n], HELLO); } #[test] @@ -35,21 +35,6 @@ fn basic_read() { }); } -#[test] -fn basic_read_exact() { - tokio_uring::start(async { - let data = HELLO.repeat(1000); - let buf = Vec::with_capacity(data.len()); - - let mut tempfile = tempfile(); - tempfile.write_all(&data).unwrap(); - - let file = File::open(tempfile.path()).await.unwrap(); - let (_, buf) = file.read_exact_at(buf, 0).await.unwrap(); - assert_eq!(buf, data); - }); -} - #[test] fn basic_write() { tokio_uring::start(async { @@ -57,7 +42,10 @@ fn basic_write() { let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.unwrap(); + file.write_at(HELLO.to_vec().into(), 0) + .submit() + .await + .unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -71,8 +59,8 @@ fn vectored_read() { tempfile.write_all(HELLO).unwrap(); let file = File::open(tempfile.path()).await.unwrap(); - let bufs = vec![Vec::::with_capacity(5), Vec::::with_capacity(9)]; - let (n, bufs) = file.readv_at(bufs, 0).submit().await.unwrap(); + let bufs = vec![Vec::::with_capacity(5), Vec::::with_capacity(9)].into(); + let (n, bufs) = file.read_at(bufs, 0).submit().await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(bufs[1][0], b' '); @@ -87,30 +75,15 @@ fn vectored_write() { let file = File::create(tempfile.path()).await.unwrap(); let buf1 = "hello".to_owned().into_bytes(); let buf2 = " world...".to_owned().into_bytes(); - let bufs = vec![buf1, buf2]; + let bufs = vec![buf1, buf2].into(); - file.writev_at(bufs, 0).submit().await.0.unwrap(); + file.write_at(bufs, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); }); } -#[test] -fn basic_write_all() { - tokio_uring::start(async { - let data = HELLO.repeat(1000); - - let tempfile = tempfile(); - - let file = File::create(tempfile.path()).await.unwrap(); - let (_, data) = file.write_all_at(data, 0).await.unwrap(); - - let file = std::fs::read(tempfile.path()).unwrap(); - assert_eq!(file, data); - }); -} - #[test] fn cancel_read() { tokio_uring::start(async { @@ -150,7 +123,10 @@ fn drop_open() { // Do something else let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.unwrap(); + file.write_at(HELLO.to_vec().into(), 0) + .submit() + .await + .unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -178,7 +154,10 @@ fn sync_doesnt_kill_anything() { let file = File::create(tempfile.path()).await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); - file.write_at(&b"foo"[..], 0).submit().await.unwrap(); + file.write_at("foo".to_owned().into_bytes().into(), 0) + .submit() + .await + .unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); }); @@ -306,48 +285,22 @@ fn basic_fallocate() { }); } -#[test] -fn read_linked() { - tokio_uring::start(async { - let mut tempfile = tempfile(); - let file = File::open(tempfile.path()).await.unwrap(); - - tempfile.write_all(&[HELLO, HELLO].concat()).unwrap(); - - let buf1 = Vec::with_capacity(HELLO.len()); - let buf2 = Vec::with_capacity(HELLO.len()); - - let read1 = file.read_at(buf1, 0); - let read2 = file.read_at(buf2, HELLO.len() as u64); - - let future1 = read1.link(read2).submit(); - - let (res1, future2) = future1.await; - let res2 = future2.await; - - res1.0.unwrap(); - res2.0.unwrap(); - - assert_eq!([HELLO, HELLO].concat(), [res1.1, res2.1].concat()); - }); -} - #[test] fn write_linked() { tokio_uring::start(async { let tempfile = tempfile(); let file = File::create(tempfile.path()).await.unwrap(); - let write1 = file.write_at(HELLO, 0); - let write2 = file.write_at(HELLO, HELLO.len() as u64); + let write1 = file.write_at(HELLO.to_vec().into(), 0); + let write2 = file.write_at(HELLO.to_vec().into(), HELLO.len() as u64); let future1 = write1.link(write2).submit(); let (res1, future2) = future1.await; let res2 = future2.await; - res1.0.unwrap(); - res2.0.unwrap(); + res1.unwrap(); + res2.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, [HELLO, HELLO].concat());