Skip to content

Commit

Permalink
Fix the server to work with externally-allocated buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Jan 20, 2024
1 parent 7a0a925 commit 8a801e9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
7 changes: 5 additions & 2 deletions edge-http/src/io/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use embedded_io_async::{ErrorType, Read, Write};

use embedded_nal_async::{SocketAddr, TcpConnect};

use crate::ws::{upgrade_request_headers, MAX_BASE64_KEY_LEN, NONCE_LEN};
use crate::{
ws::{upgrade_request_headers, MAX_BASE64_KEY_LEN, NONCE_LEN},
DEFAULT_MAX_HEADERS_COUNT,
};

use super::{
send_headers, send_headers_end, send_request, Body, BodyType, Error, ResponseHeaders, SendBody,
Expand All @@ -18,7 +21,7 @@ use super::Method;

const COMPLETION_BUF_SIZE: usize = 64;

pub enum Connection<'b, T, const N: usize = 128>
pub enum Connection<'b, T, const N: usize = DEFAULT_MAX_HEADERS_COUNT>
where
T: TcpConnect,
{
Expand Down
45 changes: 35 additions & 10 deletions edge-http/src/io/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use core::fmt::{self, Debug};
use core::mem;
use core::mem::{self, MaybeUninit};
use core::pin::pin;

use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embedded_io_async::{ErrorType, Read, Write};

use log::{debug, info, warn};

use crate::DEFAULT_MAX_HEADERS_COUNT;

const DEFAULT_HANDLERS_COUNT: usize = 4;
const DEFAULT_BUF_SIZE: usize = 2048;

use super::{
send_headers, send_headers_end, send_status, Body, BodyType, Error, RequestHeaders, SendBody,
};
Expand All @@ -17,7 +22,7 @@ pub use embedded_svc_compat::*;

const COMPLETION_BUF_SIZE: usize = 64;

pub enum Connection<'b, T, const N: usize = 128> {
pub enum Connection<'b, T, const N: usize = DEFAULT_MAX_HEADERS_COUNT> {
Transition(TransitionState),
Unbound(T),
Request(RequestState<'b, T, N>),
Expand Down Expand Up @@ -286,20 +291,19 @@ where
}
}

pub async fn handle_connection<const N: usize, const B: usize, T, H>(
pub async fn handle_connection<const N: usize, T, H>(
mut io: T,
buf: &mut [u8],
handler_id: usize,
handler: &H,
) where
H: for<'b> Handler<'b, &'b mut T, N>,
T: Read + Write,
{
let mut buf = [0_u8; B];

loop {
debug!("Handler {}: Waiting for new request", handler_id);

let result = handle_request::<N, _, _>(&mut buf, &mut io, handler).await;
let result = handle_request::<N, _, _>(buf, &mut io, handler).await;

match result {
Err(e) => {
Expand Down Expand Up @@ -385,12 +389,23 @@ where
Ok(connection.needs_close())
}

pub struct Server<A, H, const N: usize = 128, const B: usize = 2048> {
pub struct ServerBuffers<const P: usize = DEFAULT_HANDLERS_COUNT, const B: usize = DEFAULT_BUF_SIZE>(
[MaybeUninit<[u8; B]>; P],
);

impl<const P: usize, const B: usize> ServerBuffers<P, B> {
#[inline(always)]
pub const fn new() -> Self {
Self([MaybeUninit::uninit(); P])
}
}

pub struct Server<A, H, const N: usize = DEFAULT_MAX_HEADERS_COUNT> {
acceptor: A,
handler: H,
}

impl<A, H, const N: usize, const B: usize> Server<A, H, N, B>
impl<A, H, const N: usize> Server<A, H, N>
where
A: embedded_nal_async_xtra::TcpAccept,
H: for<'b, 't> Handler<'b, &'b mut A::Connection<'t>, N>,
Expand All @@ -400,7 +415,10 @@ where
Self { acceptor, handler }
}

pub async fn process<const P: usize, const W: usize>(&mut self) -> Result<(), Error<A::Error>> {
pub async fn process<const W: usize, const P: usize, const B: usize>(
&mut self,
bufs: &mut ServerBuffers<P, B>,
) -> Result<(), Error<A::Error>> {
info!("Creating queue for {W} requests");
let channel = embassy_sync::channel::Channel::<NoopRawMutex, _, W>::new();

Expand All @@ -411,6 +429,7 @@ where
let channel = &channel;
let handler_id = index;
let handler = &self.handler;
let buf = bufs.0[index].as_mut_ptr();

handlers
.push(async move {
Expand All @@ -420,7 +439,13 @@ where
let io = channel.receive().await;
debug!("Handler {}: Got connection request", handler_id);

handle_connection::<N, B, _, _>(io, handler_id, handler).await;
handle_connection::<N, _, _>(
io,
unsafe { buf.as_mut() }.unwrap(),
handler_id,
handler,
)
.await;
}
})
.map_err(|_| ())
Expand Down
2 changes: 2 additions & 0 deletions edge-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use core::str;

use httparse::{Header, EMPTY_HEADER};

pub(crate) const DEFAULT_MAX_HEADERS_COUNT: usize = 64;

#[cfg(feature = "io")]
pub mod io;

Expand Down
12 changes: 8 additions & 4 deletions examples/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use edge_http::io::server::{Connection, Handler, Server};
use edge_http::io::server::{Connection, Handler, Server, ServerBuffers};
use edge_http::Method;

use edge_std_nal_async::StdTcpListen;
Expand All @@ -13,10 +13,14 @@ fn main() {
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

futures_lite::future::block_on(run()).unwrap();
let mut buffers: ServerBuffers = ServerBuffers::new();

futures_lite::future::block_on(run(&mut buffers)).unwrap();
}

pub async fn run() -> Result<(), anyhow::Error> {
pub async fn run<const P: usize, const B: usize>(
buffers: &mut ServerBuffers<P, B>,
) -> Result<(), anyhow::Error> {
let addr = "0.0.0.0:8881";

info!("Running HTTP server on {addr}");
Expand All @@ -25,7 +29,7 @@ pub async fn run() -> Result<(), anyhow::Error> {

let mut server: Server<_, _> = Server::new(acceptor, HttpHandler);

server.process::<4, 4>().await?;
server.process::<2, P, B>(buffers).await?;

Ok(())
}
Expand Down

0 comments on commit 8a801e9

Please sign in to comment.