From 9b707c15dc21814d35bc544dc374aec0ff922334 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 29 Jan 2024 20:35:58 +0000 Subject: [PATCH] Unify Server and ServerBuffers; TaskHandler; remove unnecessary lifetimes --- edge-http/CHANGELOG.md | 11 ++ edge-http/README.md | 14 +- edge-http/src/io/client.rs | 10 +- edge-http/src/io/server.rs | 269 +++++++++++++++++++++++++++++-------- edge-ws/README.md | 14 +- examples/http_server.rs | 14 +- examples/ws_server.rs | 14 +- 7 files changed, 247 insertions(+), 99 deletions(-) create mode 100644 edge-http/CHANGELOG.md diff --git a/edge-http/CHANGELOG.md b/edge-http/CHANGELOG.md new file mode 100644 index 0000000..c8bbbb0 --- /dev/null +++ b/edge-http/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.2.0] - ????-??-?? +* Remove unnecessary lifetimes when implementing the `embedded-svc` traits +* Server: new trait, `TaskHandler` which has an extra `task_id` parameter of type `usize`. This allows the request handling code to take advantage of the fact that - since the number of handlers when running a `Server` instance is fixed - it can store data related to handlers in a simple static array of the same size as the number of handlers that the server is running +* Breaking change: structures `Server` and `ServerBuffers` united, because `Server` was actually stateless. Turbofish syntax for specifying max number of HTTP headers and queue size is no longer necessary diff --git a/edge-http/README.md b/edge-http/README.md index e481c39..c302f5e 100644 --- a/edge-http/README.md +++ b/edge-http/README.md @@ -93,7 +93,7 @@ async fn request<'b, const N: usize, T: TcpConnect>( ### HTTP server ```rust -use edge_http::io::server::{Connection, Handler, Server, ServerBuffers}; +use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::Method; use embedded_nal_async_xtra::TcpListen; @@ -107,14 +107,12 @@ fn main() { env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), ); - let mut buffers: ServerBuffers = ServerBuffers::new(); + let mut server = DefaultServer::new(); - futures_lite::future::block_on(run(&mut buffers)).unwrap(); + futures_lite::future::block_on(run(&mut server)).unwrap(); } -pub async fn run( - buffers: &mut ServerBuffers, -) -> Result<(), anyhow::Error> { +pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { let addr = "0.0.0.0:8881"; info!("Running HTTP server on {addr}"); @@ -123,9 +121,7 @@ pub async fn run( .listen(addr.parse().unwrap()) .await?; - let mut server: Server<_, _> = Server::new(acceptor, HttpHandler); - - server.process::<2, P, B>(buffers).await?; + server.run(acceptor, HttpHandler).await?; Ok(()) } diff --git a/edge-http/src/io/client.rs b/edge-http/src/io/client.rs index 60d3429..d5f24a2 100644 --- a/edge-http/src/io/client.rs +++ b/edge-http/src/io/client.rs @@ -347,7 +347,7 @@ where impl<'b, T, const N: usize> Read for Connection<'b, T, N> where - T: TcpConnect + 'b, + T: TcpConnect, { async fn read(&mut self, buf: &mut [u8]) -> Result { self.response_mut()?.io.read(buf).await @@ -356,7 +356,7 @@ where impl<'b, T, const N: usize> Write for Connection<'b, T, N> where - T: TcpConnect + 'b, + T: TcpConnect, { async fn write(&mut self, buf: &[u8]) -> Result { self.request_mut()?.io.write(buf).await @@ -408,7 +408,7 @@ mod embedded_svc_compat { impl<'b, T, const N: usize> Headers for super::Connection<'b, T, N> where - T: TcpConnect + 'b, + T: TcpConnect, { fn header(&self, name: &str) -> Option<&'_ str> { let response = self.response_ref().expect("Not in response state"); @@ -419,7 +419,7 @@ mod embedded_svc_compat { impl<'b, T, const N: usize> Status for super::Connection<'b, T, N> where - T: TcpConnect + 'b, + T: TcpConnect, { fn status(&self) -> u16 { let response = self.response_ref().expect("Not in response state"); @@ -436,7 +436,7 @@ mod embedded_svc_compat { impl<'b, T, const N: usize> Connection for super::Connection<'b, T, N> where - T: TcpConnect + 'b, + T: TcpConnect, { type Read = Body<'b, T::Connection<'b>>; diff --git a/edge-http/src/io/server.rs b/edge-http/src/io/server.rs index c3c0fd6..7cb7eda 100644 --- a/edge-http/src/io/server.rs +++ b/edge-http/src/io/server.rs @@ -2,6 +2,7 @@ use core::fmt::{self, Debug}; use core::mem::{self, MaybeUninit}; use core::pin::pin; +use embassy_futures::select::Either; use embassy_sync::blocking_mutex::raw::NoopRawMutex; use embedded_io_async::{ErrorType, Read, Write}; @@ -10,7 +11,7 @@ use log::{debug, info, warn}; use crate::ws::{upgrade_response_headers, MAX_BASE64_KEY_RESPONSE_LEN}; use crate::DEFAULT_MAX_HEADERS_COUNT; -const DEFAULT_HANDLERS_COUNT: usize = 4; +const DEFAULT_HANDLER_TASKS_COUNT: usize = 4; const DEFAULT_BUF_SIZE: usize = 2048; use super::{ @@ -294,7 +295,7 @@ type ResponseState = SendBody; pub trait Handler<'b, T, const N: usize> where - T: ErrorType, + T: Read + Write, { type Error: Debug; @@ -303,8 +304,8 @@ where impl<'b, const N: usize, T, H> Handler<'b, T, N> for &H where - H: Handler<'b, T, N>, T: Read + Write, + H: Handler<'b, T, N>, { type Error = H::Error; @@ -313,25 +314,92 @@ where } } -pub async fn handle_connection( +pub trait TaskHandler<'b, T, const N: usize> +where + T: Read + Write, +{ + type Error: Debug; + + async fn handle( + &self, + task_id: usize, + connection: &mut Connection<'b, T, N>, + ) -> Result<(), Self::Error>; +} + +impl<'b, const N: usize, T, H> TaskHandler<'b, T, N> for &H +where + T: Read + Write, + H: TaskHandler<'b, T, N>, +{ + type Error = H::Error; + + async fn handle( + &self, + task_id: usize, + connection: &mut Connection<'b, T, N>, + ) -> Result<(), Self::Error> { + (**self).handle(task_id, connection).await + } +} + +pub struct TaskHandlerAdaptor(H); + +impl TaskHandlerAdaptor { + pub const fn new(handler: H) -> Self { + Self(handler) + } +} + +impl From for TaskHandlerAdaptor { + fn from(value: H) -> Self { + TaskHandlerAdaptor(value) + } +} + +impl<'b, const N: usize, T, H> TaskHandler<'b, T, N> for TaskHandlerAdaptor +where + T: Read + Write, + H: Handler<'b, T, N>, +{ + type Error = H::Error; + + async fn handle( + &self, + _task_id: usize, + connection: &mut Connection<'b, T, N>, + ) -> Result<(), Self::Error> { + self.0.handle(connection).await + } +} + +pub async fn handle_connection(io: T, buf: &mut [u8], handler: H) +where + H: for<'b> Handler<'b, &'b mut T, N>, + T: Read + Write, +{ + handle_task_connection(io, buf, 0, TaskHandlerAdaptor::new(handler)).await +} + +pub async fn handle_task_connection( mut io: T, buf: &mut [u8], - handler_id: usize, - handler: &H, + task_id: usize, + handler: H, ) where - H: for<'b> Handler<'b, &'b mut T, N>, + H: for<'b> TaskHandler<'b, &'b mut T, N>, T: Read + Write, { loop { - debug!("Handler {}: Waiting for new request", handler_id); + debug!("Handler task {}: Waiting for new request", task_id); - let result = handle_request::(buf, &mut io, handler).await; + let result = handle_task_request::(buf, &mut io, task_id, &handler).await; match result { Err(e) => { warn!( - "Handler {}: Error when handling request: {:?}", - handler_id, e + "Handler task {}: Error when handling request: {:?}", + task_id, e ); break; @@ -339,12 +407,12 @@ pub async fn handle_connection( Ok(needs_close) => { if needs_close { debug!( - "Handler {}: Request complete; closing connection", - handler_id + "Handler task {}: Request complete; closing connection", + task_id ); break; } else { - debug!("Handler {}: Request complete", handler_id); + debug!("Handler task {}: Request complete", task_id); } } } @@ -387,15 +455,28 @@ where pub async fn handle_request<'b, const N: usize, H, T>( buf: &'b mut [u8], io: T, - handler: &H, + handler: H, ) -> Result> where H: Handler<'b, T, N>, T: Read + Write, +{ + handle_task_request(buf, io, 0, TaskHandlerAdaptor::new(handler)).await +} + +pub async fn handle_task_request<'b, const N: usize, H, T>( + buf: &'b mut [u8], + io: T, + task_id: usize, + handler: H, +) -> Result> +where + H: TaskHandler<'b, T, N>, + T: Read + Write, { let mut connection = Connection::<_, N>::new(buf, io).await?; - let result = handler.handle(&mut connection).await; + let result = handler.handle(task_id, &mut connection).await; match result { Result::Ok(_) => connection.complete().await?, @@ -408,65 +489,131 @@ where Ok(connection.needs_close()) } -pub type DefaultServerBuffers = ServerBuffers; - -pub struct ServerBuffers( - [MaybeUninit<[u8; B]>; P], -); +pub type DefaultServer = + Server<{ DEFAULT_HANDLER_TASKS_COUNT }, { DEFAULT_BUF_SIZE }, { DEFAULT_MAX_HEADERS_COUNT }, 2>; -impl ServerBuffers { - pub const HANDLERS_COUNT: usize = P; - pub const BUF_SIZE: usize = B; +pub struct Server< + const P: usize = DEFAULT_HANDLER_TASKS_COUNT, + const B: usize = DEFAULT_BUF_SIZE, + const N: usize = DEFAULT_MAX_HEADERS_COUNT, + const W: usize = 2, +>([MaybeUninit<[u8; B]>; P]); +impl Server { #[inline(always)] pub const fn new() -> Self { Self([MaybeUninit::uninit(); P]) } -} -pub struct Server { - acceptor: A, - handler: H, -} + pub async fn run(&mut self, acceptor: A, handler: H) -> Result<(), Error> + where + A: embedded_nal_async_xtra::TcpAccept, + H: for<'b, 't> Handler<'b, &'b mut A::Connection<'t>, N>, + { + let handler = TaskHandlerAdaptor::new(handler); -impl Server -where - A: embedded_nal_async_xtra::TcpAccept, - H: for<'b, 't> Handler<'b, &'b mut A::Connection<'t>, N>, -{ - #[inline(always)] - pub const fn new(acceptor: A, handler: H) -> Self { - Self { acceptor, handler } + // TODO: Figure out what is going on with the lifetimes so as to avoid this horrible code duplication + + info!("Creating queue for {W} requests"); + let channel = embassy_sync::channel::Channel::::new(); + + debug!("Creating {P} handler tasks"); + let mut tasks = heapless::Vec::<_, P>::new(); + + for index in 0..P { + let channel = &channel; + let task_id = index; + let handler = &handler; + let buf = self.0[index].as_mut_ptr(); + + tasks + .push(async move { + loop { + debug!("Handler task {}: Waiting for connection", task_id); + + let io = channel.receive().await; + debug!("Handler task {}: Got connection request", task_id); + + handle_task_connection::( + io, + unsafe { buf.as_mut() }.unwrap(), + task_id, + handler, + ) + .await; + } + }) + .map_err(|_| ()) + .unwrap(); + } + + let accept = pin!(async { + loop { + debug!("Acceptor: waiting for new connection"); + + match acceptor.accept().await.map_err(Error::Io) { + Ok(io) => { + debug!("Acceptor: got new connection"); + channel.send(io).await; + debug!("Acceptor: connection sent"); + } + Err(e) => { + debug!("Got error when accepting a new connection: {:?}", e); + break Err(e); + } + } + } + }); + + let result = embassy_futures::select::select( + accept, + embassy_futures::select::select_slice(&mut tasks), + ) + .await; + + let result = match result { + Either::First(result) => result, + Either::Second((result, _)) => result, + }; + + info!("Server processing loop quit"); + + result } - pub async fn process( + pub async fn run_with_task_id( &mut self, - bufs: &mut ServerBuffers, - ) -> Result<(), Error> { + acceptor: A, + handler: H, + ) -> Result<(), Error> + where + A: embedded_nal_async_xtra::TcpAccept, + H: for<'b, 't> TaskHandler<'b, &'b mut A::Connection<'t>, N>, + { info!("Creating queue for {W} requests"); let channel = embassy_sync::channel::Channel::::new(); - debug!("Creating {P} handlers"); - let mut handlers = heapless::Vec::<_, P>::new(); + debug!("Creating {P} handler tasks"); + let mut tasks = heapless::Vec::<_, P>::new(); for index in 0..P { let channel = &channel; - let handler_id = index; - let handler = &self.handler; - let buf = bufs.0[index].as_mut_ptr(); + let task_id = index; + let handler = &handler; + let buf = self.0[index].as_mut_ptr(); - handlers + tasks .push(async move { loop { - debug!("Handler {}: Waiting for connection", handler_id); + debug!("Handler task {}: Waiting for connection", task_id); let io = channel.receive().await; - debug!("Handler {}: Got connection request", handler_id); + debug!("Handler task {}: Got connection request", task_id); - handle_connection::( + handle_task_connection::( io, unsafe { buf.as_mut() }.unwrap(), - handler_id, + task_id, handler, ) .await; @@ -476,11 +623,11 @@ where .unwrap(); } - let mut accept = pin!(async { + let accept = pin!(async { loop { debug!("Acceptor: waiting for new connection"); - match self.acceptor.accept().await.map_err(Error::Io) { + match acceptor.accept().await.map_err(Error::Io) { Ok(io) => { debug!("Acceptor: got new connection"); channel.send(io).await; @@ -488,20 +635,26 @@ where } Err(e) => { debug!("Got error when accepting a new connection: {:?}", e); + break Err(e); } } } }); - embassy_futures::select::select( - &mut accept, - embassy_futures::select::select_slice(&mut handlers), + let result = embassy_futures::select::select( + accept, + embassy_futures::select::select_slice(&mut tasks), ) .await; + let result = match result { + Either::First(result) => result, + Either::Second((result, _)) => result, + }; + info!("Server processing loop quit"); - Ok(()) + result } } @@ -550,7 +703,7 @@ mod embedded_svc_compat { impl<'b, T, const N: usize> Connection for super::Connection<'b, T, N> where - T: Read + Write + 'b, + T: Read + Write, { type Headers = RequestHeaders<'b, N>; @@ -604,7 +757,7 @@ mod embedded_svc_compat { H: embedded_svc::http::server::asynch::Handler>, Q: Handler<'b, T, N>, Q::Error: Into, - T: Read + Write + 'b, + T: Read + Write, { type Error = H::Error; diff --git a/edge-ws/README.md b/edge-ws/README.md index e64f380..5f877aa 100644 --- a/edge-ws/README.md +++ b/edge-ws/README.md @@ -144,7 +144,7 @@ where ### Websocket echo server ```rust -use edge_http::io::server::{Connection, Handler, Server, ServerBuffers}; +use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::Method; use edge_ws::{FrameHeader, FrameType}; @@ -159,14 +159,12 @@ fn main() { env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), ); - let mut buffers: ServerBuffers = ServerBuffers::new(); + let mut server = DefaultServer::new(); - futures_lite::future::block_on(run(&mut buffers)).unwrap(); + futures_lite::future::block_on(run(&mut server)).unwrap(); } -pub async fn run( - buffers: &mut ServerBuffers, -) -> Result<(), anyhow::Error> { +pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { let addr = "0.0.0.0:8881"; info!("Running HTTP server on {addr}"); @@ -175,9 +173,7 @@ pub async fn run( .listen(addr.parse().unwrap()) .await?; - let mut server: Server<_, _> = Server::new(acceptor, WsHandler); - - server.process::<2, P, B>(buffers).await?; + server.run(acceptor, WsHandler).await?; Ok(()) } diff --git a/examples/http_server.rs b/examples/http_server.rs index 69db7ff..440d3bc 100644 --- a/examples/http_server.rs +++ b/examples/http_server.rs @@ -1,4 +1,4 @@ -use edge_http::io::server::{Connection, Handler, Server, ServerBuffers}; +use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::Method; use embedded_nal_async_xtra::TcpListen; @@ -12,14 +12,12 @@ fn main() { env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), ); - let mut buffers: ServerBuffers = ServerBuffers::new(); + let mut server = DefaultServer::new(); - futures_lite::future::block_on(run(&mut buffers)).unwrap(); + futures_lite::future::block_on(run(&mut server)).unwrap(); } -pub async fn run( - buffers: &mut ServerBuffers, -) -> Result<(), anyhow::Error> { +pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { let addr = "0.0.0.0:8881"; info!("Running HTTP server on {addr}"); @@ -28,9 +26,7 @@ pub async fn run( .listen(addr.parse().unwrap()) .await?; - let mut server: Server<_, _> = Server::new(acceptor, HttpHandler); - - server.process::<2, P, B>(buffers).await?; + server.run(acceptor, HttpHandler).await?; Ok(()) } diff --git a/examples/ws_server.rs b/examples/ws_server.rs index 947857c..22af535 100644 --- a/examples/ws_server.rs +++ b/examples/ws_server.rs @@ -1,4 +1,4 @@ -use edge_http::io::server::{Connection, Handler, Server, ServerBuffers}; +use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::Method; use edge_ws::{FrameHeader, FrameType}; @@ -13,14 +13,12 @@ fn main() { env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), ); - let mut buffers: ServerBuffers = ServerBuffers::new(); + let mut server = DefaultServer::new(); - futures_lite::future::block_on(run(&mut buffers)).unwrap(); + futures_lite::future::block_on(run(&mut server)).unwrap(); } -pub async fn run( - buffers: &mut ServerBuffers, -) -> Result<(), anyhow::Error> { +pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { let addr = "0.0.0.0:8881"; info!("Running HTTP server on {addr}"); @@ -29,9 +27,7 @@ pub async fn run( .listen(addr.parse().unwrap()) .await?; - let mut server: Server<_, _> = Server::new(acceptor, WsHandler); - - server.process::<2, P, B>(buffers).await?; + server.run(acceptor, WsHandler).await?; Ok(()) }