Skip to content

Commit

Permalink
Simple async IO server for edge-captive
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Jan 5, 2024
1 parent af81c7a commit ff38c62
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 149 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ embedded-svc = { git = "https://github.com/esp-rs/embedded-svc", branch = "rust-

[features]
default = ["io"]
std = ["edge-http/std", "edge-captive/std", "edge-mqtt", "edge-std-nal-async"]
std = ["edge-http/std", "edge-captive/std", "edge-mdns/std", "edge-mqtt", "edge-std-nal-async"]
io = ["edge-captive/io", "edge-dhcp/io", "edge-http/io", "edge-mdns/io", "edge-raw/io", "edge-ws/io", "embedded-nal-async-xtra"]
nightly = []
embedded-svc = ["edge-http/embedded-svc", "edge-mqtt/embedded-svc", "edge-ws/embedded-svc"]
Expand Down
5 changes: 3 additions & 2 deletions edge-captive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ edition = "2021"
rust-version = "1.75"

[features]
std = [] # TODO
default = ["io"]
io = [] # TODO
std = ["domain/std"]
io = ["embedded-nal-async"]

[dependencies]
log = { workspace = true }
domain = { workspace = true }
octseq = { workspace = true }
embedded-nal-async = { workspace = true, optional = true }
205 changes: 60 additions & 145 deletions edge-captive/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,155 +1,70 @@
#[cfg(feature = "std")]
pub mod server {
use std::{
io, mem,
net::{Ipv4Addr, SocketAddrV4, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle},
time::Duration,
};
use core::time::Duration;

use log::*;
use embedded_nal_async::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpStack, UnconnectedUdp};

#[derive(Clone, Debug)]
pub struct DnsConf {
pub bind_ip: Ipv4Addr,
pub bind_port: u16,
pub ip: Ipv4Addr,
pub ttl: Duration,
}

impl DnsConf {
pub fn new(ip: Ipv4Addr) -> Self {
Self {
bind_ip: Ipv4Addr::new(0, 0, 0, 0),
bind_port: 53,
ip,
ttl: Duration::from_secs(60),
}
}
}

#[derive(Debug)]
pub enum Status {
Stopped,
Started,
Error(io::Error),
}

pub struct DnsServer {
conf: DnsConf,
status: Status,
running: Arc<AtomicBool>,
handle: Option<JoinHandle<Result<(), io::Error>>>,
}

impl DnsServer {
pub fn new(conf: DnsConf) -> Self {
Self {
conf,
status: Status::Stopped,
running: Arc::new(AtomicBool::new(false)),
handle: None,
}
}

pub fn get_status(&mut self) -> &Status {
self.cleanup();
&self.status
}

pub fn start(&mut self) -> Result<(), io::Error> {
if matches!(self.get_status(), Status::Started) {
return Ok(());
}
let socket_address = SocketAddrV4::new(self.conf.bind_ip, self.conf.bind_port);
let running = self.running.clone();
let ip = self.conf.ip;
let ttl = self.conf.ttl;

self.running.store(true, Ordering::Relaxed);
self.handle = Some(
thread::Builder::new()
// default stack size is not enough
// 9000 was found via trial and error
.stack_size(9000)
.spawn(move || {
// Socket is not movable across thread bounds
// Otherwise we run into an assertion error here: https://github.com/espressif/esp-idf/blob/master/components/lwip/port/esp32/freertos/sys_arch.c#L103
let socket = UdpSocket::bind(socket_address)?;
socket.set_read_timeout(Some(Duration::from_secs(1)))?;
let result = Self::run(&running, ip, ttl, socket);

running.store(false, Ordering::Relaxed);
use log::*;

result
})
.unwrap(),
);
use super::*;

Ok(())
}
pub const DEFAULT_SOCKET: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), PORT);

pub fn stop(&mut self) -> Result<(), io::Error> {
if matches!(self.get_status(), Status::Stopped) {
return Ok(());
}
const PORT: u16 = 5353;

self.running.store(false, Ordering::Relaxed);
self.cleanup();

let mut status = Status::Stopped;
mem::swap(&mut self.status, &mut status);

match status {
Status::Error(e) => Err(e),
_ => Ok(()),
}
}

fn cleanup(&mut self) {
if !self.running.load(Ordering::Relaxed) && self.handle.is_some() {
self.status = match mem::take(&mut self.handle).unwrap().join().unwrap() {
Ok(_) => Status::Stopped,
Err(e) => Status::Error(e),
};
}
}

fn run(
running: &AtomicBool,
ip: Ipv4Addr,
ttl: Duration,
socket: UdpSocket,
) -> Result<(), io::Error> {
while running.load(Ordering::Relaxed) {
let mut request_arr = [0_u8; 512];
debug!("Waiting for data");
let (request_len, source_addr) = match socket.recv_from(&mut request_arr) {
Ok(value) => value,
Err(err) => match err.kind() {
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => continue,
_ => return Err(err),
},
};

let request = &request_arr[..request_len];

debug!("Received {} bytes from {source_addr}", request.len());

let mut reply_arr = [0_u8; 1280];
let len = crate::reply(request, &ip.octets(), ttl, &mut reply_arr)
.map_err(|_| io::ErrorKind::Other)?;

socket.send_to(&reply_arr[..len], source_addr)?;
#[derive(Debug)]
pub enum DnsIoError<E> {
DnsError(DnsError),
IoError(E),
}

debug!("Sent {len} bytes to {source_addr}");
}
impl<E> From<DnsError> for DnsIoError<E> {
fn from(err: DnsError) -> Self {
Self::DnsError(err)
}
}

Ok(())
}
pub async fn run<S>(
stack: &S,
socket: SocketAddr,
tx_buf: &mut [u8],
rx_buf: &mut [u8],
ip: Ipv4Addr,
ttl: Duration,
) -> Result<(), DnsIoError<S::Error>>
where
S: UdpStack,
{
let (_, mut udp) = stack
.bind_single(socket)
.await
.map_err(DnsIoError::IoError)?;

loop {
debug!("Waiting for data");

let (len, local, remote) = udp
.receive_into(rx_buf)
.await
.map_err(DnsIoError::IoError)?;

let request = &rx_buf[..len];

debug!("Received {} bytes from {remote}", request.len());

let len = match crate::reply(request, &ip.octets(), ttl, tx_buf) {
Ok(len) => len,
Err(err) => match err {
DnsError::InvalidMessage => {
warn!("Got invalid message from {remote}, skipping");
continue;
}
other => Err(other)?,
},
};

udp.send(local, remote, &tx_buf[..len])
.await
.map_err(DnsIoError::IoError)?;

debug!("Sent {len} bytes to {remote}");
}
}
1 change: 1 addition & 0 deletions edge-mdns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"

[features]
default = ["io"]
std = []
io = ["embassy-futures", "embassy-sync", "embassy-time", "embedded-nal-async", "embedded-nal-async-xtra"]

[dependencies]
Expand Down
5 changes: 4 additions & 1 deletion edge-mdns/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use super::*;

mod split;

pub const DEFAULT_SOCKET: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), PORT);

const IP_BROADCAST_ADDR: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
const IPV6_BROADCAST_ADDR: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x00fb);

Expand Down Expand Up @@ -58,6 +60,7 @@ pub async fn run<T, S>(
interface: Option<u32>,
services: T,
stack: &S,
socket: SocketAddr,
udp_buffer: &mut UdpSplitBuffer,
buffers: &mut MdnsRunBuffers,
) -> Result<(), MdnsIoError<S::Error>>
Expand All @@ -67,7 +70,7 @@ where
S::UniquelyBound: Multicast<Error = S::Error>,
{
let (local_addr, mut udp) = stack
.bind_single(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), PORT))
.bind_single(socket)
.await
.map_err(MdnsIoError::IoError)?;

Expand Down

0 comments on commit ff38c62

Please sign in to comment.