Skip to content

Commit

Permalink
Add better tests, fix them, bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
Xaeroxe committed Jan 1, 2023
1 parent c6f4336 commit f52b92b
Show file tree
Hide file tree
Showing 3 changed files with 456 additions and 23 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "async-io-typed"
version = "1.0.2"
version = "1.0.3"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "Adapts any AsyncRead or AsyncWrite type to send serde compatible types"
Expand All @@ -21,4 +21,6 @@ thiserror = "1.0.37"

[dev-dependencies]
rand = "0.8"
tokio = { version = "1.22.0", features = ["rt-multi-thread"]}
tokio = { version = "1.22.0", features = ["rt-multi-thread", "sync", "macros", "time"]}
tokio-util = "0.7"
futures-util = { version = "0.3", features = ["io"] }
52 changes: 31 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use futures_io::{AsyncRead, AsyncWrite};
use futures_util::{stream::Stream, Sink, SinkExt};
use serde::{de::DeserializeOwned, Serialize};

#[cfg(test)]
mod tests;

const U16_MARKER: u8 = 252;
const U32_MARKER: u8 = 253;
const U64_MARKER: u8 = 254;
Expand Down Expand Up @@ -313,24 +316,22 @@ impl<R: AsyncRead + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncReadTyp
let mut buf = [0; 8];
let accumulated = *len_in_progress_assigned as usize;
let slice = match len_read_mode {
LenReadMode::U16 => &mut buf[0..(2 - accumulated)],
LenReadMode::U32 => &mut buf[0..(4 - accumulated)],
LenReadMode::U64 => &mut buf[0..(8 - accumulated)],
LenReadMode::U16 => &mut buf[accumulated..2],
LenReadMode::U32 => &mut buf[accumulated..4],
LenReadMode::U64 => &mut buf[accumulated..8],
};
let len = futures_core::ready!(Pin::new(&mut raw).poll_read(cx, slice))?;
len_in_progress[accumulated..(accumulated + slice.len())]
len_in_progress[accumulated..(accumulated + len)]
.copy_from_slice(&slice[..len]);
*len_in_progress_assigned += len as u8;
if len == slice.len() {
let new_len = match len_read_mode {
LenReadMode::U16 => u16::from_le_bytes(
(&len_in_progress[0..2]).try_into().expect("infallible"),
)
as u64,
) as u64,
LenReadMode::U32 => u32::from_le_bytes(
(&len_in_progress[0..4]).try_into().expect("infallible"),
)
as u64,
) as u64,
LenReadMode::U64 => u64::from_le_bytes(*len_in_progress),
};
if new_len > size_limit {
Expand All @@ -344,7 +345,9 @@ impl<R: AsyncRead + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncReadTyp
}
AsyncReadState::ReadingItem { ref mut len_read } => {
while *len_read < item_buffer.len() {
let len = futures_core::ready!(Pin::new(&mut raw).poll_read(cx, &mut item_buffer[*len_read..]))?;
let len = futures_core::ready!(
Pin::new(&mut raw).poll_read(cx, &mut item_buffer[*len_read..])
)?;
*len_read += len;
if *len_read == item_buffer.len() {
break;
Expand Down Expand Up @@ -386,7 +389,8 @@ enum AsyncWriteState {
Idle,
WritingLen {
current_len: [u8; 9],
len_to_be_sent: u8,
len_to_be_sent: usize,
len_sent: usize,
},
WritingValue {
bytes_sent: usize,
Expand Down Expand Up @@ -513,15 +517,19 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
};
*state = AsyncWriteState::WritingLen {
current_len: new_current_len,
len_to_be_sent: to_be_sent as u8,
len_to_be_sent: to_be_sent,
len_sent: 0,
};
let len = futures_core::ready!(Pin::new(&mut *raw).poll_write(cx, &new_current_len[0..to_be_sent]))?;
let len = futures_core::ready!(
Pin::new(&mut *raw).poll_write(cx, &new_current_len[0..to_be_sent])
)?;
*state = if len == to_be_sent {
AsyncWriteState::WritingValue { bytes_sent: 0 }
} else {
AsyncWriteState::WritingLen {
current_len: new_current_len,
len_to_be_sent: (to_be_sent - len) as u8,
len_to_be_sent: to_be_sent,
len_sent: len,
}
};
continue;
Expand All @@ -533,20 +541,22 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
}
}
AsyncWriteState::WritingLen {
current_len,
len_to_be_sent,
ref current_len,
ref len_to_be_sent,
ref mut len_sent,
} => {
let len = futures_core::ready!(Pin::new(&mut *raw)
.poll_write(cx, &current_len[0..(*len_to_be_sent as usize)]))?;
if len == *len_to_be_sent as usize {
.poll_write(cx, &current_len[(*len_sent)..(*len_to_be_sent)]))?;
*len_sent += len;
if *len_sent == *len_to_be_sent {
*state = AsyncWriteState::WritingValue { bytes_sent: 0 };
} else {
*len_to_be_sent -= len as u8;
}
continue;
}
AsyncWriteState::WritingValue { bytes_sent } => {
let len = futures_core::ready!(Pin::new(&mut *raw).poll_write(cx, &write_buffer[*bytes_sent..]))?;
let len = futures_core::ready!(
Pin::new(&mut *raw).poll_write(cx, &write_buffer[*bytes_sent..])
)?;
*bytes_sent += len;
if *bytes_sent == write_buffer.len() {
*state = AsyncWriteState::Idle;
Expand All @@ -564,7 +574,7 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
} else {
continue;
}
},
}
AsyncWriteState::Closed => Poll::Ready(Ok(None)),
};
}
Expand Down
Loading

0 comments on commit f52b92b

Please sign in to comment.