From dda814f834dc0fb0193ef6182bdffd8944ac9e25 Mon Sep 17 00:00:00 2001 From: Rainer Zaiser Date: Sun, 8 Sep 2024 16:18:50 +0200 Subject: [PATCH] tokio xcp server as async task only, for flexibility --- README.md | 7 +- build.rs | 9 +- examples/tokio_demo/src/main.rs | 57 ++++++--- examples/tokio_demo/src/xcp_server.rs | 175 ++++++++++++++++---------- src/xcp.rs | 27 +++- src/xcplite.rs | 6 + xcp_client/src/xcp_client.rs | 2 +- xcplib/src/xcpEthTl.c | 10 +- xcplib/src/xcpLite.c | 3 +- 9 files changed, 196 insertions(+), 100 deletions(-) diff --git a/README.md b/README.md index fc7ffbd..78b76f7 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,11 @@ Shows how to measure and calibrate in a task instanciated in multiple threads wi Use CANape to observe rayon workers calculating a mandelbrot set by lines ### tokio_demo -Observe tokio tasks - +Demonstrates using the XCP server tokio task (xcp_server::xcp_task) with tokio::net::UdpSocket, no threads running in xcplib anymore +TCP not implemented yet +Demo the usual measurement and calibration operations +Demo which visualizes multiples tokio tasks start and stop executing in the tokio worker thread pool (similar to rayon_demo) + ### point_cloud_demo Measure a lidar point cloud and visualize it in CANapes 3D scene window Use CDR serialization over XCP and the CDR/IDL schema generator proc-macro diff --git a/build.rs b/build.rs index df35877..37baba4 100644 --- a/build.rs +++ b/build.rs @@ -8,22 +8,28 @@ fn main() { .header("xcplib/wrapper.h") .clang_arg("-Ixcplib/src") .clang_arg("-Ixcplib") - // Tell cargo to invalidate the built crate whenever any of the included header files changed. .parse_callbacks(Box::new(bindgen::CargoCallbacks::new())) // .blocklist_type("T_CLOCK_INFO") + // Protocol layer .allowlist_function("XcpInit") .allowlist_function("XcpStart") + .allowlist_function("XcpDisconnect") + // Transport layer .allowlist_function("XcpTlInit") + .allowlist_function("XcpTlShutdown") .allowlist_function("XcpTlCommand") .allowlist_function("XcpTlTransmitQueuePeek") .allowlist_function("XcpTlTransmitQueueNext") + // ETH server .allowlist_function("XcpEthServerInit") .allowlist_function("XcpEthServerShutdown") .allowlist_function("XcpEthServerStatus") .allowlist_function("XcpGetSessionStatus") + // DAQ .allowlist_function("XcpEvent") .allowlist_function("XcpEventExt") + // Misc .allowlist_function("XcpPrint") .allowlist_function("ApplXcpSetLogLevel") .allowlist_function("ApplXcpSetA2lName") @@ -46,6 +52,7 @@ fn main() { .flag("-O2") .compile("xcplib"); + // Tell cargo to invalidate the built crate whenever any of these files changed. println!("cargo:rerun-if-changed=xcplib/wrapper.h"); println!("cargo:rerun-if-changed=xcplib/main_cfg.h"); println!("cargo:rerun-if-changed=xcplib/xcptl_cfg.h"); diff --git a/examples/tokio_demo/src/main.rs b/examples/tokio_demo/src/main.rs index 12b0ab6..834c765 100644 --- a/examples/tokio_demo/src/main.rs +++ b/examples/tokio_demo/src/main.rs @@ -1,5 +1,7 @@ // xcp-lite - tokio_demo -// Visualizes in CANape how tokio starts tasks in its worker threaad pool +// Implement the XCP server as a tokio task (xcp_server::xcp_task), no threads running in xcplib anymore +// Demo the usual measurement and calibration operations +// Demo how to visualize tokio tasks start/stop in tokios worker thread pool mod xcp_server; @@ -15,6 +17,7 @@ use xcp_type_description::prelude::*; //----------------------------------------------------------------------------- // Demo calibration parameters (static) +// Does not create memory segments in A2L, manually added characterics in group "cal" struct CalPage { run: bool, @@ -29,13 +32,14 @@ struct CalPage0 { } static CAL_PAGE0: once_cell::sync::OnceCell = once_cell::sync::OnceCell::with_value(CalPage0 { - task1_cycle_time_us: 100000, // 100ms - task2_cycle_time_us: 1000, // 1ms + task1_cycle_time_us: 10000, // 10ms + task2_cycle_time_us: 1000, // 1ms task_count: 10, }); //----------------------------------------------------------------------------- -// Demo calibration parameters (dynamic) +// Demo calibration parameters (dynamic, with auto A2L generation derive and attribute macros) +// Creates a memory segment "CalPage1" with 2 pages, a default "FLASH" page and a mutable "RAM" page // Define a struct with calibration parameters #[derive(Debug, Clone, Copy, Serialize, Deserialize, XcpTypeDescription)] @@ -59,7 +63,7 @@ struct CalPage1 { } // Default calibration values -// This will be the FLASH page in the calibration memory segment +// This will be the read only default (FLASH) page in the calibration memory segment CalPage1 const CAL_PAGE1: CalPage1 = CalPage1 { ampl: 100.0, period: 5.0, @@ -67,38 +71,50 @@ const CAL_PAGE1: CalPage1 = CalPage1 { }; //----------------------------------------------------------------------------- -// Asynchronous task, measures index, sleeps 100ms, measures -index and ends +// Experimental +// Asynchronous task, trigger measurement of local variable index, sleep 200us, measure -index and stop // Demonstrates multi instance measurement -// There will be an event and an instance of index for each worker thread tokio uses +// There will be an event instance and an instance of variable index for each worker thread tokio uses +// Note: +// Once the A2L registry is created on XCP client connect, the event and variable instances are fixed and addional instances are not visible +// Tokio occasionally creates new worker threads and destroys old ones very late, so the number of instances may change +// Check what happens, when increasing/decreasing calpage0.task_count #[allow(dead_code)] async fn task(task_index: u16) { let mut index: i16 = task_index as i16; trace!("task {} start", index); - //let event = daq_create_event_instance!("task"); - //daq_register!(index, event, "Task index", ""); - // event.trigger(); + let event = daq_create_event_instance!("task"); + daq_register!(index, event, "Task index", ""); + event.trigger(); - tokio::time::sleep(tokio::time::Duration::from_millis(2)).await; + tokio::time::sleep(tokio::time::Duration::from_micros(200)).await; index = -index; - //event.trigger(); + event.trigger(); trace!("task {} end", index); } //----------------------------------------------------------------------------- +// Main + #[tokio::main] async fn main() -> Result<(), Box> { println!("xcp-lite tokio demo"); // Initialize logger - env_logger::Builder::new().filter_level(log::LevelFilter::Debug).init(); + env_logger::Builder::new().filter_level(log::LevelFilter::Info).init(); // Start tokio XCP server - let (rx_task, tx_task) = xcp_server::start_async_xcp_server("127.0.0.1:5555".to_string()).await?; - let xcp = Xcp::get(); + // Initialize the xcplib transport and protocol layer only, not the server + let xcp: &'static Xcp = XcpBuilder::new("tokio_demo").set_log_level(XcpLogLevel::Debug).enable_a2l(true).tl_start().unwrap(); + + let xcp_task = tokio::spawn(xcp_server::xcp_task(xcp, [127, 0, 0, 1], 5555)); + + // let mut xcp_server = xcp_server::XcpServer::new([127, 0, 0, 1], 5555); + // let xcp = xcp_server.start_xcp(xcp).await?; // Create and register a static calibration parameter set let calpage = CAL_PAGE.get().unwrap(); @@ -140,13 +156,14 @@ async fn main() -> Result<(), Box> { loop { // Stop if !calpage.run { + info!("mainloop stopped by calpage.run=false"); break; } // Sleep for a calibratable amount of microseconds tokio::time::sleep(tokio::time::Duration::from_micros(calpage0.task1_cycle_time_us as u64)).await; - // Start a number of asynchronous tasks and wait for them to finish + // Start a number of short running asynchronous tasks and wait for them to finish let mut tasks = Vec::new(); for i in 1..=calpage0.task_count { tasks.push(tokio::spawn(task(i))); @@ -175,11 +192,11 @@ async fn main() -> Result<(), Box> { calseg.sync(); } - info!("Stop"); + info!("mainloop stopped"); - let _ = tokio::join!(rx_task); - let _ = tokio::join!(tx_task); + xcp_task.abort(); + xcp.tl_shutdown(); + //xcp_task.await.unwrap()?; - xcp.stop_server(); Ok(()) } diff --git a/examples/tokio_demo/src/xcp_server.rs b/examples/tokio_demo/src/xcp_server.rs index 503116d..d174958 100644 --- a/examples/tokio_demo/src/xcp_server.rs +++ b/examples/tokio_demo/src/xcp_server.rs @@ -1,80 +1,125 @@ -// #![warn(rust_2018_idioms)] - -use std::error::Error; use std::io; -use std::net::SocketAddr; - -use log::info; +use std::net::Ipv4Addr; +use std::time::Duration; -use once_cell::sync::OnceCell; +use log::{error, info, trace, warn}; -//use tokio::join; use tokio::net::UdpSocket; +use tokio::time::timeout; use xcp::*; -#[derive(Debug)] -struct Server { - socket: UdpSocket, -} - -#[derive(Debug)] -struct Client { - client: SocketAddr, -} +pub async fn xcp_task(xcp: &'static Xcp, addr: A, port: u16) -> Result<(), io::Error> +where + A: Into, +{ + info!("xcp_task: start"); -static ASYNC_XCP_SERVER: OnceCell = OnceCell::new(); -static ASYNC_XCP_CLIENT: OnceCell = OnceCell::new(); + // Bind to address + let addr = addr.into(); + let socket = UdpSocket::bind((addr, port)).await?; + info!("xcp_task: bind to {}:{}", addr, port); -async fn rx_task() -> Result<(), io::Error> { - let server = ASYNC_XCP_SERVER.get().unwrap(); - let xcp = Xcp::get(); + let mut client_addr = None; let mut buf = vec![0u8; 1024]; - loop { - let res: (usize, SocketAddr) = server.socket.recv_from(&mut buf).await?; - info!("rx_task: recv {} bytes from {}, buf_len={}", res.0, res.1, buf.len()); - - if let Some(c) = ASYNC_XCP_CLIENT.get() { - assert_eq!(c.client, res.1); - } else { - ASYNC_XCP_CLIENT.set(Client { client: res.1 }).unwrap(); - } - xcp.tl_command(&buf); - } -} - -async fn tx_task() -> Result<(), io::Error> { - let server = ASYNC_XCP_SERVER.get().unwrap(); - - let xcp = Xcp::get(); loop { - while let Some(buf) = xcp.tl_transmit_queue_peek() { - let client = ASYNC_XCP_CLIENT.get().unwrap(); - server.socket.send_to(buf, &client.client).await?; - xcp.tl_transmit_queue_next(); - info!("Sent {} bytes to {}", buf.len(), client.client); + let rx_future = socket.recv_from(&mut buf); + let res = timeout(Duration::from_millis(10), rx_future).await; + match res { + Err(_) => { + trace!("xcp_task: timeout"); + } + + Ok(rx) => match rx { + Err(e) => { + error!("xcp_task: xcp_task stop, recv error: {}", e); + return Err(e); + } + + Ok((size, addr)) => { + if size == 0 { + warn!("xcp_task: xcp_task stop, recv 0 bytes from {}, socket closed", addr); + return Ok(()); + } else { + info!("xcp_task: recv {} bytes from {}, buf_len={}", size, addr, buf.len()); + + // Set client address, do not accept new clients while being connected + if let Some(c) = client_addr { + if c != addr && xcp.is_connected() { + error!("xcp_task: client addr changed to {} while beeing connected to {}", addr, c); + assert_eq!(c, addr); + } + } else { + client_addr = Some(addr); + info!("xcp_task: set client to {}", addr); + } + + // Execute command + xcp.tl_command(&buf); + } + } + }, // match + } // match res + + // Transmit + // Check if client address is valid + if let Some(addr) = client_addr { + trace!("xcp_task: read transmit queue "); + + // Empty the transmit queue + while let Some(buf) = xcp.tl_transmit_queue_peek() { + socket.send_to(buf, addr).await?; + xcp.tl_transmit_queue_next(); + info!("xcp_task: Sent {} bytes to {}", buf.len(), client_addr.unwrap()); + } } - - tokio::time::sleep(tokio::time::Duration::from_millis(2)).await; - } + } // loop } -pub async fn start_async_xcp_server(addr: String) -> Result<(tokio::task::JoinHandle>, tokio::task::JoinHandle>), Box> { - let socket = UdpSocket::bind(&addr).await?; - println!("Bind to {}", socket.local_addr()?); - - // Initialize the XCP driver transport layer only, not the server - let _xcp = XcpBuilder::new("tokio_demo").set_log_level(XcpLogLevel::Debug).enable_a2l(true).start_protocol_layer().unwrap(); - - // Start the tokio server - let server = Server { socket }; - if ASYNC_XCP_SERVER.get().is_some() { - return Err("Server already started".into()); - } - ASYNC_XCP_SERVER.set(server).unwrap(); - let rx_task = tokio::spawn(rx_task()); - let tx_task: tokio::task::JoinHandle> = tokio::spawn(tx_task()); - - Ok((rx_task, tx_task)) -} +//----------------------------------------------------------------------------- + +// #[derive(Debug)] +// pub struct XcpServer { +// addr: Ipv4Addr, +// port: u16, +// task: Option>>, +// } + +// impl Drop for XcpServer { +// fn drop(&mut self) { +// // Cancel the task +// if let Some(task) = self.task.take() { +// task.abort(); +// } +// } +// } + +// impl XcpServer { +// pub fn new(addr: A, port: u16) -> Self +// where +// A: Into, +// { +// Self { addr: addr.into(), port, task: None } +// } + +// pub async fn start_xcp(&mut self, xcp: &Xcp) -> Result<&Xcp, Box> { +// // Start server +// let task = tokio::spawn(xcp_task(xcp, self.addr, self.port)); +// self.task = Some(task); + +// Ok(xcp) +// } + +// pub fn get_xcp(&self) -> &Xcp { +// Xcp::get() +// } + +// pub async fn stop_xcp(&mut self) -> Result<(), Box> { +// // Cancel the task +// if let Some(task) = self.task.take() { +// task.abort(); +// } +// Ok(()) +// } +// } diff --git a/src/xcp.rs b/src/xcp.rs index 8aceeaa..f36411e 100644 --- a/src/xcp.rs +++ b/src/xcp.rs @@ -384,9 +384,10 @@ impl XcpBuilder { self } - /// Start the XCP protocol layer only - /// If external server is used - pub fn start_protocol_layer(self) -> Result<&'static Xcp, &'static str> { + /// Start the XCP transport and protocol layer in external server mode + /// The server must be started after this call + /// Server example in tokio_demo::xcp_server::xcp_task + pub fn tl_start(self) -> Result<&'static Xcp, &'static str> { let xcp = Xcp::get(); info!("Start XCP protocol layer and transport layer"); @@ -413,7 +414,7 @@ impl XcpBuilder { } /// Start the XCP on Ethernet Server - + /// Use the server rx and tx threads in xcplib pub fn start_server(self, tl: XcpTransportLayer, addr: A, port: u16) -> Result<&'static Xcp, &'static str> where A: Into, @@ -531,6 +532,16 @@ impl Xcp { XcpSessionStatus::from_bits(session_status).unwrap() } + /// Check if a client is connected + pub fn is_connected(&self) -> bool { + self.get_session_status().contains(XcpSessionStatus::SS_CONNECTED) + } + + /// Check if measurement is started + pub fn is_daq_running(&self) -> bool { + self.get_session_status().contains(XcpSessionStatus::SS_DAQ) + } + /// Set the log level for XCP protocol layer pub fn set_log_level(&self, level: XcpLogLevel) { // @@@@ unsafe - C library call @@ -577,7 +588,12 @@ impl Xcp { } } - pub fn tl_shutdown(&self) {} + pub fn tl_shutdown(&self) { + // @@@@ unsafe - C library call + unsafe { + xcplib::XcpTlShutdown(); + } + } //------------------------------------------------------------------------------------------ // Server mode @@ -595,6 +611,7 @@ impl Xcp { pub fn stop_server(&self) { // @@@@ unsafe - C library call unsafe { + xcplib::XcpDisconnect(); xcplib::XcpEthServerShutdown(); } } diff --git a/src/xcplite.rs b/src/xcplite.rs index 8bf5395..11eaeaa 100644 --- a/src/xcplite.rs +++ b/src/xcplite.rs @@ -24,6 +24,9 @@ extern "C" { extern "C" { pub fn XcpTlInit() -> u8; } +extern "C" { + pub fn XcpTlShutdown(); +} extern "C" { pub fn XcpTlCommand(msgLen: u16, msgBuf: *const u8) -> u8; } @@ -39,6 +42,9 @@ extern "C" { extern "C" { pub fn XcpStart(); } +extern "C" { + pub fn XcpDisconnect(); +} extern "C" { pub fn XcpEvent(event: u16); } diff --git a/xcp_client/src/xcp_client.rs b/xcp_client/src/xcp_client.rs index a4f206e..cd59488 100644 --- a/xcp_client/src/xcp_client.rs +++ b/xcp_client/src/xcp_client.rs @@ -26,7 +26,7 @@ use crate::a2l::a2l_reader::{a2l_find_characteristic, a2l_find_measurement, a2l_ //-------------------------------------------------------------------------------------------------------------------------------------------------- // XCP Parameters -const CMD_TIMEOUT: Duration = Duration::from_secs(1); +const CMD_TIMEOUT: Duration = Duration::from_secs(2); //-------------------------------------------------------------------------------------------------------------------------------------------------- // XCP error type diff --git a/xcplib/src/xcpEthTl.c b/xcplib/src/xcpEthTl.c index 7eb13c3..577bd6c 100644 --- a/xcplib/src/xcpEthTl.c +++ b/xcplib/src/xcpEthTl.c @@ -140,14 +140,14 @@ BOOL XcpTlInit() { DBG_PRINT3("\nInit XCP transport layer\n"); DBG_PRINTF3(" SEGMENT_SIZE=%u, MAX_CTO_SIZE=%u, QUEUE_SIZE=%u, ALIGNMENT=%u, %uKiB memory used\n", XCPTL_MAX_SEGMENT_SIZE, XCPTL_MAX_CTO_SIZE, XCPTL_QUEUE_SIZE, XCPTL_PACKET_ALIGNMENT, (unsigned int)sizeof(gXcpTl) / 1024); - DBG_PRINT3(" Options=("); // Print activated XCP transport layer options + DBG_PRINT3(" Note: These parameters in xcptl_cfg.h need to be configured for optimal memory consumption and performance!\n"); #ifdef XCPTL_ENABLE_MULTICAST - DBG_PRINT3("ENABLE_MULTICAST,"); + DBG_PRINT3(" Option ENABLE_MULTICAST is not recommended\n"); #endif -#ifdef XCPTL_QUEUED_CRM - DBG_PRINT3("QUEUED_CRM,"); +#ifndef XCPTL_QUEUED_CRM + DBG_PRINT3(" Option QUEUED_CRM is disabled, enabled is recommended\n"); #endif - DBG_PRINT3(")\n"); + return TRUE; } diff --git a/xcplib/src/xcpLite.c b/xcplib/src/xcpLite.c index f5773b6..536ff9a 100644 --- a/xcplib/src/xcpLite.c +++ b/xcplib/src/xcpLite.c @@ -1917,11 +1917,12 @@ void XcpStart() #endif DBG_PRINTF3(" Version=%u.%u, MAXEV=%u, MAXCTO=%u, MAXDTO=%u, DAQMEM=%u, MAXDAQ=%u, MAXENTRY=%u, MAXENTRYSIZE=%u\n", XCP_PROTOCOL_LAYER_VERSION >> 8, XCP_PROTOCOL_LAYER_VERSION & 0xFF, XCP_MAX_EVENT, XCPTL_MAX_CTO_SIZE, XCPTL_MAX_DTO_SIZE, XCP_DAQ_MEM_SIZE, (1 << sizeof(uint16_t) * 8) - 1, (1 << sizeof(uint16_t) * 8) - 1, (1 << (sizeof(uint8_t) * 8)) - 1); DBG_PRINTF3(" %u KiB memory used\n", (unsigned int)sizeof(gXcp) / 1024); + DBG_PRINT3(" Note: These parameters in xcp_cfg.h need to be configured for optimal memory consumption and performance!\n"); DBG_PRINT3(" Options=("); // Print activated XCP protocol options #ifdef XCP_ENABLE_DAQ_CLOCK_MULTICAST // Enable GET_DAQ_CLOCK_MULTICAST - DBG_PRINT3("DAQ_CLK_MULTICAST,"); + DBG_PRINT3("DAQ_CLK_MULTICAST (not recomended),"); #endif #ifdef XCP_DAQ_CLOCK_64BIT // Use 64 Bit time stamps DBG_PRINT3("DAQ_CLK_64BIT,");