Skip to content

Commit

Permalink
tokio xcp server as async task only, for flexibility
Browse files Browse the repository at this point in the history
  • Loading branch information
RainerZ committed Sep 8, 2024
1 parent e4dfd3d commit dda814f
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 100 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ Shows how to measure and calibrate in a task instanciated in multiple threads wi
Use CANape to observe rayon workers calculating a mandelbrot set by lines

### tokio_demo
Observe tokio tasks

Demonstrates using the XCP server tokio task (xcp_server::xcp_task) with tokio::net::UdpSocket, no threads running in xcplib anymore
TCP not implemented yet
Demo the usual measurement and calibration operations
Demo which visualizes multiples tokio tasks start and stop executing in the tokio worker thread pool (similar to rayon_demo)

### point_cloud_demo
Measure a lidar point cloud and visualize it in CANapes 3D scene window
Use CDR serialization over XCP and the CDR/IDL schema generator proc-macro
Expand Down
9 changes: 8 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,28 @@ fn main() {
.header("xcplib/wrapper.h")
.clang_arg("-Ixcplib/src")
.clang_arg("-Ixcplib")
// Tell cargo to invalidate the built crate whenever any of the included header files changed.
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
//
.blocklist_type("T_CLOCK_INFO")
// Protocol layer
.allowlist_function("XcpInit")
.allowlist_function("XcpStart")
.allowlist_function("XcpDisconnect")
// Transport layer
.allowlist_function("XcpTlInit")
.allowlist_function("XcpTlShutdown")
.allowlist_function("XcpTlCommand")
.allowlist_function("XcpTlTransmitQueuePeek")
.allowlist_function("XcpTlTransmitQueueNext")
// ETH server
.allowlist_function("XcpEthServerInit")
.allowlist_function("XcpEthServerShutdown")
.allowlist_function("XcpEthServerStatus")
.allowlist_function("XcpGetSessionStatus")
// DAQ
.allowlist_function("XcpEvent")
.allowlist_function("XcpEventExt")
// Misc
.allowlist_function("XcpPrint")
.allowlist_function("ApplXcpSetLogLevel")
.allowlist_function("ApplXcpSetA2lName")
Expand All @@ -46,6 +52,7 @@ fn main() {
.flag("-O2")
.compile("xcplib");

// Tell cargo to invalidate the built crate whenever any of these files changed.
println!("cargo:rerun-if-changed=xcplib/wrapper.h");
println!("cargo:rerun-if-changed=xcplib/main_cfg.h");
println!("cargo:rerun-if-changed=xcplib/xcptl_cfg.h");
Expand Down
57 changes: 37 additions & 20 deletions examples/tokio_demo/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// xcp-lite - tokio_demo
// Visualizes in CANape how tokio starts tasks in its worker threaad pool
// Implement the XCP server as a tokio task (xcp_server::xcp_task), no threads running in xcplib anymore
// Demo the usual measurement and calibration operations
// Demo how to visualize tokio tasks start/stop in tokios worker thread pool

mod xcp_server;

Expand All @@ -15,6 +17,7 @@ use xcp_type_description::prelude::*;

//-----------------------------------------------------------------------------
// Demo calibration parameters (static)
// Does not create memory segments in A2L, manually added characterics in group "cal"

struct CalPage {
run: bool,
Expand All @@ -29,13 +32,14 @@ struct CalPage0 {
}

static CAL_PAGE0: once_cell::sync::OnceCell<CalPage0> = once_cell::sync::OnceCell::with_value(CalPage0 {
task1_cycle_time_us: 100000, // 100ms
task2_cycle_time_us: 1000, // 1ms
task1_cycle_time_us: 10000, // 10ms
task2_cycle_time_us: 1000, // 1ms
task_count: 10,
});

//-----------------------------------------------------------------------------
// Demo calibration parameters (dynamic)
// Demo calibration parameters (dynamic, with auto A2L generation derive and attribute macros)
// Creates a memory segment "CalPage1" with 2 pages, a default "FLASH" page and a mutable "RAM" page

// Define a struct with calibration parameters
#[derive(Debug, Clone, Copy, Serialize, Deserialize, XcpTypeDescription)]
Expand All @@ -59,46 +63,58 @@ struct CalPage1 {
}

// Default calibration values
// This will be the FLASH page in the calibration memory segment
// This will be the read only default (FLASH) page in the calibration memory segment CalPage1
const CAL_PAGE1: CalPage1 = CalPage1 {
ampl: 100.0,
period: 5.0,
counter_max: 100,
};

//-----------------------------------------------------------------------------
// Asynchronous task, measures index, sleeps 100ms, measures -index and ends
// Experimental
// Asynchronous task, trigger measurement of local variable index, sleep 200us, measure -index and stop
// Demonstrates multi instance measurement
// There will be an event and an instance of index for each worker thread tokio uses
// There will be an event instance and an instance of variable index for each worker thread tokio uses
// Note:
// Once the A2L registry is created on XCP client connect, the event and variable instances are fixed and addional instances are not visible
// Tokio occasionally creates new worker threads and destroys old ones very late, so the number of instances may change
// Check what happens, when increasing/decreasing calpage0.task_count
#[allow(dead_code)]
async fn task(task_index: u16) {
let mut index: i16 = task_index as i16;

trace!("task {} start", index);

//let event = daq_create_event_instance!("task");
//daq_register!(index, event, "Task index", "");
// event.trigger();
let event = daq_create_event_instance!("task");
daq_register!(index, event, "Task index", "");
event.trigger();

tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
tokio::time::sleep(tokio::time::Duration::from_micros(200)).await;
index = -index;

//event.trigger();
event.trigger();

trace!("task {} end", index);
}

//-----------------------------------------------------------------------------
// Main

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
println!("xcp-lite tokio demo");

// Initialize logger
env_logger::Builder::new().filter_level(log::LevelFilter::Debug).init();
env_logger::Builder::new().filter_level(log::LevelFilter::Info).init();

// Start tokio XCP server
let (rx_task, tx_task) = xcp_server::start_async_xcp_server("127.0.0.1:5555".to_string()).await?;
let xcp = Xcp::get();
// Initialize the xcplib transport and protocol layer only, not the server
let xcp: &'static Xcp = XcpBuilder::new("tokio_demo").set_log_level(XcpLogLevel::Debug).enable_a2l(true).tl_start().unwrap();

let xcp_task = tokio::spawn(xcp_server::xcp_task(xcp, [127, 0, 0, 1], 5555));

// let mut xcp_server = xcp_server::XcpServer::new([127, 0, 0, 1], 5555);
// let xcp = xcp_server.start_xcp(xcp).await?;

// Create and register a static calibration parameter set
let calpage = CAL_PAGE.get().unwrap();
Expand Down Expand Up @@ -140,13 +156,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
// Stop
if !calpage.run {
info!("mainloop stopped by calpage.run=false");
break;
}

// Sleep for a calibratable amount of microseconds
tokio::time::sleep(tokio::time::Duration::from_micros(calpage0.task1_cycle_time_us as u64)).await;

// Start a number of asynchronous tasks and wait for them to finish
// Start a number of short running asynchronous tasks and wait for them to finish
let mut tasks = Vec::new();
for i in 1..=calpage0.task_count {
tasks.push(tokio::spawn(task(i)));
Expand Down Expand Up @@ -175,11 +192,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
calseg.sync();
}

info!("Stop");
info!("mainloop stopped");

let _ = tokio::join!(rx_task);
let _ = tokio::join!(tx_task);
xcp_task.abort();
xcp.tl_shutdown();
//xcp_task.await.unwrap()?;

xcp.stop_server();
Ok(())
}
175 changes: 110 additions & 65 deletions examples/tokio_demo/src/xcp_server.rs
Original file line number Diff line number Diff line change
@@ -1,80 +1,125 @@
// #![warn(rust_2018_idioms)]

use std::error::Error;
use std::io;
use std::net::SocketAddr;

use log::info;
use std::net::Ipv4Addr;
use std::time::Duration;

use once_cell::sync::OnceCell;
use log::{error, info, trace, warn};

//use tokio::join;
use tokio::net::UdpSocket;
use tokio::time::timeout;

use xcp::*;

#[derive(Debug)]
struct Server {
socket: UdpSocket,
}

#[derive(Debug)]
struct Client {
client: SocketAddr,
}
pub async fn xcp_task<A>(xcp: &'static Xcp, addr: A, port: u16) -> Result<(), io::Error>
where
A: Into<Ipv4Addr>,
{
info!("xcp_task: start");

static ASYNC_XCP_SERVER: OnceCell<Server> = OnceCell::new();
static ASYNC_XCP_CLIENT: OnceCell<Client> = OnceCell::new();
// Bind to address
let addr = addr.into();
let socket = UdpSocket::bind((addr, port)).await?;
info!("xcp_task: bind to {}:{}", addr, port);

async fn rx_task() -> Result<(), io::Error> {
let server = ASYNC_XCP_SERVER.get().unwrap();
let xcp = Xcp::get();
let mut client_addr = None;
let mut buf = vec![0u8; 1024];
loop {
let res: (usize, SocketAddr) = server.socket.recv_from(&mut buf).await?;
info!("rx_task: recv {} bytes from {}, buf_len={}", res.0, res.1, buf.len());

if let Some(c) = ASYNC_XCP_CLIENT.get() {
assert_eq!(c.client, res.1);
} else {
ASYNC_XCP_CLIENT.set(Client { client: res.1 }).unwrap();
}

xcp.tl_command(&buf);
}
}

async fn tx_task() -> Result<(), io::Error> {
let server = ASYNC_XCP_SERVER.get().unwrap();

let xcp = Xcp::get();
loop {
while let Some(buf) = xcp.tl_transmit_queue_peek() {
let client = ASYNC_XCP_CLIENT.get().unwrap();
server.socket.send_to(buf, &client.client).await?;
xcp.tl_transmit_queue_next();
info!("Sent {} bytes to {}", buf.len(), client.client);
let rx_future = socket.recv_from(&mut buf);
let res = timeout(Duration::from_millis(10), rx_future).await;
match res {
Err(_) => {
trace!("xcp_task: timeout");
}

Ok(rx) => match rx {
Err(e) => {
error!("xcp_task: xcp_task stop, recv error: {}", e);
return Err(e);
}

Ok((size, addr)) => {
if size == 0 {
warn!("xcp_task: xcp_task stop, recv 0 bytes from {}, socket closed", addr);
return Ok(());
} else {
info!("xcp_task: recv {} bytes from {}, buf_len={}", size, addr, buf.len());

// Set client address, do not accept new clients while being connected
if let Some(c) = client_addr {
if c != addr && xcp.is_connected() {
error!("xcp_task: client addr changed to {} while beeing connected to {}", addr, c);
assert_eq!(c, addr);
}
} else {
client_addr = Some(addr);
info!("xcp_task: set client to {}", addr);
}

// Execute command
xcp.tl_command(&buf);
}
}
}, // match
} // match res

// Transmit
// Check if client address is valid
if let Some(addr) = client_addr {
trace!("xcp_task: read transmit queue ");

// Empty the transmit queue
while let Some(buf) = xcp.tl_transmit_queue_peek() {
socket.send_to(buf, addr).await?;
xcp.tl_transmit_queue_next();
info!("xcp_task: Sent {} bytes to {}", buf.len(), client_addr.unwrap());
}
}

tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
}
} // loop
}

pub async fn start_async_xcp_server(addr: String) -> Result<(tokio::task::JoinHandle<Result<(), io::Error>>, tokio::task::JoinHandle<Result<(), io::Error>>), Box<dyn Error>> {
let socket = UdpSocket::bind(&addr).await?;
println!("Bind to {}", socket.local_addr()?);

// Initialize the XCP driver transport layer only, not the server
let _xcp = XcpBuilder::new("tokio_demo").set_log_level(XcpLogLevel::Debug).enable_a2l(true).start_protocol_layer().unwrap();

// Start the tokio server
let server = Server { socket };
if ASYNC_XCP_SERVER.get().is_some() {
return Err("Server already started".into());
}
ASYNC_XCP_SERVER.set(server).unwrap();
let rx_task = tokio::spawn(rx_task());
let tx_task: tokio::task::JoinHandle<Result<(), io::Error>> = tokio::spawn(tx_task());

Ok((rx_task, tx_task))
}
//-----------------------------------------------------------------------------

// #[derive(Debug)]
// pub struct XcpServer {
// addr: Ipv4Addr,
// port: u16,
// task: Option<tokio::task::JoinHandle<Result<(), io::Error>>>,
// }

// impl Drop for XcpServer {
// fn drop(&mut self) {
// // Cancel the task
// if let Some(task) = self.task.take() {
// task.abort();
// }
// }
// }

// impl XcpServer {
// pub fn new<A>(addr: A, port: u16) -> Self
// where
// A: Into<Ipv4Addr>,
// {
// Self { addr: addr.into(), port, task: None }
// }

// pub async fn start_xcp(&mut self, xcp: &Xcp) -> Result<&Xcp, Box<dyn Error>> {
// // Start server
// let task = tokio::spawn(xcp_task(xcp, self.addr, self.port));
// self.task = Some(task);

// Ok(xcp)
// }

// pub fn get_xcp(&self) -> &Xcp {
// Xcp::get()
// }

// pub async fn stop_xcp(&mut self) -> Result<(), Box<dyn Error>> {
// // Cancel the task
// if let Some(task) = self.task.take() {
// task.abort();
// }
// Ok(())
// }
// }
Loading

0 comments on commit dda814f

Please sign in to comment.