diff --git a/Cargo.toml b/Cargo.toml index 6519234..03e89a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,10 @@ lazy_static = "1.4.0" once_cell = "1.19.0" static_cell = "2.1.0" +# More compact and efficient implementations of the standard synchronization primitives +# Used for the mutex in CalSeg::sync() +parking_lot = "0.12.3" + # proc-macro A2L serializer for structs xcp_type_description = { path = "./xcp_type_description/", optional = true} xcp_type_description_derive = { path = "./xcp_type_description/xcp_type_description_derive/", optional = true } diff --git a/README.md b/README.md index e372b28..c0ebf33 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,7 @@ All measurement and calibration code instrumentation is non blocking and the tri There are no heap allocation during runtime, except for the lazy registrations of and for A2L generation. build.rs automatically builds a minimum static C library from individially preconfigured core XCPlite sources. -On C level, there is a synchronisation mutex or spinlock for the mpsc transmit queue. +On C level, there is a synchronisation mutex for the mpsc transmit queue. The C code has the option to start the server with 2 normal threads for rx and tx socket handling. The generated A2L file is finalized on XCP connect and provided for upload via XCP. @@ -288,7 +288,7 @@ These concepts are currently not supported by the A2L update tools, though A2L g The EPK version string in the A2L file can be set by the application. It resides a seperate, hardcoded const memory segment. -## Future improvements +## Possible improvements - Create a minimal lock MPSC event queue, increase queue efficiency (optimize mutex contention) for many daq lists and events - Support more types of calibration parameters, including types for curves and maps with axis @@ -296,7 +296,8 @@ The EPK version string in the A2L file can be set by the application. It resides - Improve the meta data annotations of the A2L serializer - Reduce the number of heap allocations and strings, reduce the overall memory footprint - Add sub groups of measurements for event instances -- Add support to decribe the application clock domain in rust +- Improve the pointer provenance checks in XcpEvent +- Add support to describe the application clock domain in rust - Provide a no-std version and create a embassy example diff --git a/examples/multi_thread_demo/src/main.rs b/examples/multi_thread_demo/src/main.rs index 9950bb6..ffe59b8 100644 --- a/examples/multi_thread_demo/src/main.rs +++ b/examples/multi_thread_demo/src/main.rs @@ -126,7 +126,7 @@ fn main() { // Test thread::sleep(Duration::from_millis(1000)); - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write t.into_iter().for_each(|t| t.join().unwrap()); diff --git a/examples/point_cloud_demo/src/main.rs b/examples/point_cloud_demo/src/main.rs index b69f6a2..242792a 100644 --- a/examples/point_cloud_demo/src/main.rs +++ b/examples/point_cloud_demo/src/main.rs @@ -162,6 +162,6 @@ fn main() { event_point_cloud.trigger(); params.sync(); - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write } } diff --git a/examples/protobuf_demo/src/main.rs b/examples/protobuf_demo/src/main.rs index dc0fb2d..08ac774 100644 --- a/examples/protobuf_demo/src/main.rs +++ b/examples/protobuf_demo/src/main.rs @@ -218,6 +218,6 @@ fn main() { thread::sleep(Duration::from_micros(1000000)); - xcp.write_a2l().unwrap(); // @@@@ test + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write } } diff --git a/examples/single_thread_demo/src/main.rs b/examples/single_thread_demo/src/main.rs index 8292477..80bad9d 100644 --- a/examples/single_thread_demo/src/main.rs +++ b/examples/single_thread_demo/src/main.rs @@ -111,7 +111,7 @@ fn main() { thread::sleep(Duration::from_millis(10)); // 100 Hz - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write } // Stop the XCP server diff --git a/src/cal/cal_seg.rs b/src/cal/cal_seg.rs index e288bc0..95a9ff6 100644 --- a/src/cal/cal_seg.rs +++ b/src/cal/cal_seg.rs @@ -4,11 +4,12 @@ // Module cal_seg // Calibration Segment -use std::{ - marker::PhantomData, - ops::Deref, - sync::{Arc, Mutex}, -}; +use std::{marker::PhantomData, ops::Deref, sync::Arc}; + +// Mutex used by CalSeg +// parking_lot is about 2 times faster in this use case +//use std::sync::Mutex; +use parking_lot::Mutex; #[allow(unused_imports)] use log::{debug, error, info, trace, warn}; @@ -145,14 +146,12 @@ where index, default_page, ecu_page: Box::new(CalPage { - // Heap allocation ctr: 0, init_request: false, freeze_request: false, page: init_page, }), xcp_page: Arc::new(Mutex::new(CalPage { - // Heap allocation ctr: 0, init_request: false, freeze_request: false, @@ -200,64 +199,62 @@ where /// # Returns /// true, if the calibration segment was modified pub fn sync(&self) -> bool { - let _xcp = Xcp::get(); let mut modified = false; // Check for modifications and copy xcp_page to ecu_page, when active page is "RAM" - /*if xcp.get_xcp_cal_page() == XcpCalPage::Ram */ + // let xcp = Xcp::get(); + // if xcp.get_xcp_cal_page() == XcpCalPage::Ram { // @@@@ ToDo: Avoid the lock, when there is no pending modification for the XCP page - { - let mut xcp_page = self.xcp_page.lock().unwrap(); - - // Freeze - save xcp page to json file - #[cfg(feature = "json")] - if xcp_page.freeze_request { - xcp_page.freeze_request = false; - trace!("freeze: {})", self.get_name(),); - // Reinitialize the calibration segment from default page - let path = format!("{}.json", self.get_name()); - self.ecu_page.page.save_to_file(&path); - } + let mut xcp_page = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard + + // Freeze - save xcp page to json file + #[cfg(feature = "json")] + if xcp_page.freeze_request { + xcp_page.freeze_request = false; + info!("freeze: {})", self.get_name(),); + // Reinitialize the calibration segment from default page + let path = format!("{}.json", self.get_name()); + self.ecu_page.page.save_to_file(&path).unwrap(); + } + + // Init - copy the default calibration page back to xcp page to reset it to default values + if xcp_page.init_request { + xcp_page.init_request = false; + // @@@@ unsafe - Implementation of init cal page in sync() with non mut self + unsafe { + info!("init: {}: default_page => xcp_page ({})", self.get_name(), xcp_page.ctr,); - // Init - copy the default calibration page back to xcp page to reset it to default values - if xcp_page.init_request { - xcp_page.init_request = false; - // @@@@ unsafe - Implementation of init cal page in sync() with non mut self - unsafe { - trace!("init: {}: default_page => xcp_page ({})", self.get_name(), xcp_page.ctr,); - - let src_ptr = self.default_page as *const T; - let dst_ptr = &xcp_page.page as *const _ as *mut T; - core::ptr::copy_nonoverlapping(src_ptr, dst_ptr, 1); - - // Increment the modification counter to distribute the new xcp page to all clones - xcp_page.ctr += 1; - modified = true; - } + let src_ptr = self.default_page as *const T; + let dst_ptr = &xcp_page.page as *const _ as *mut T; + core::ptr::copy_nonoverlapping(src_ptr, dst_ptr, 1); } - // Copy shared (ctr,xcp_page) to (ctr,ecu_page) in this clone of the calibration segment - if xcp_page.ctr != self.ecu_page.ctr { - trace!( - "sync: {}-{:04X}: xcp_page ({}) => ecu_page ({})", - self.get_name(), - self.ecu_page.as_ref() as *const _ as u16, - xcp_page.ctr, - self.ecu_page.ctr - ); - // @@@@ unsafe - Copy xcp_page to ecu_page - unsafe { - let dst_ptr: *mut u8 = self.ecu_page.as_ref() as *const _ as *mut u8; - let src_ptr: *const u8 = &*xcp_page as *const _ as *const u8; - let size: usize = std::mem::size_of::<(usize, T)>(); - core::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size); - } - modified = true; + // Increment the modification counter to distribute the new xcp page to all clones + xcp_page.ctr += 1; + } + + // Sync - Copy shared (ctr,xcp_page) to (ctr,ecu_page) in this clone of the calibration segment + if xcp_page.ctr != self.ecu_page.ctr { + trace!( + "sync: {}-{:04X}: xcp_page ({}) => ecu_page ({})", + self.get_name(), + self.ecu_page.as_ref() as *const _ as u16, + xcp_page.ctr, + self.ecu_page.ctr + ); + // @@@@ unsafe - Copy xcp_page to ecu_page + unsafe { + let dst_ptr: *mut u8 = self.ecu_page.as_ref() as *const _ as *mut u8; + let src_ptr: *const u8 = &*xcp_page as *const _ as *const u8; + let size: usize = std::mem::size_of::<(usize, T)>(); + core::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size); } + modified = true; } + + modified } - modified } } @@ -308,19 +305,19 @@ where self.index } fn set_freeze_request(&self) { - let mut m = self.xcp_page.lock().unwrap(); + let mut m = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard m.freeze_request = true; } fn set_init_request(&self) { - let mut m = self.xcp_page.lock().unwrap(); + let mut m = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard m.init_request = true; } unsafe fn read(&self, offset: u16, len: u8, dst: *mut u8) -> bool { assert!(offset as usize + len as usize <= std::mem::size_of::()); if Xcp::get().get_xcp_cal_page() == XcpCalPage::Ram { - let xcp_page = self.xcp_page.lock().unwrap(); + let xcp_page = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard let src: *const u8 = (&xcp_page.page as *const _ as *const u8).add(offset as usize); core::ptr::copy_nonoverlapping(src, dst, len as usize); true @@ -334,7 +331,7 @@ where unsafe fn write(&self, offset: u16, len: u8, src: *const u8, delay: u8) -> bool { assert!(offset as usize + len as usize <= std::mem::size_of::()); if Xcp::get().get_xcp_cal_page() == XcpCalPage::Ram { - let mut xcp_page = self.xcp_page.lock().unwrap(); + let mut xcp_page = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard let dst: *mut u8 = (&xcp_page.page as *const _ as *mut u8).add(offset as usize); core::ptr::copy_nonoverlapping(src, dst, len as usize); if delay == 0 { @@ -348,7 +345,7 @@ where } fn flush(&self) { - let mut xcp_page = self.xcp_page.lock().unwrap(); + let mut xcp_page = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard xcp_page.ctr = xcp_page.ctr.wrapping_add(1); // Increment modification counter } } @@ -377,18 +374,18 @@ where // @@@@ For testing only // Deref to XCP page and increment the modification counter // This is undefined behaviour, because the reference to XCP data page will escape from its mutex -// impl DerefMut for CalSeg -// where -// T: CalPageTrait, -// { -// fn deref_mut(&mut self) -> &mut Self::Target { -// warn!("Unsafe deref mut to XCP page of {}, this is undefined behaviour !!", self.get_name()); -// let mut p = self.xcp_page.lock().unwrap(); -// p.ctr = p.ctr.wrapping_add(1); -// let r: *mut T = &mut p.page; -// unsafe { &mut *r } -// } -// } +impl std::ops::DerefMut for CalSeg +where + T: CalPageTrait, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + warn!("Unsafe deref mut to XCP page of {}, this is undefined behaviour !!", self.get_name()); + let mut p = self.xcp_page.lock(); // .unwrap(); // std::sync::MutexGuard + p.ctr = p.ctr.wrapping_add(1); + let r: *mut T = &mut p.page; + unsafe { &mut *r } + } +} //---------------------------------------------------------------------------------------------- // Implement Clone for CalSegSync @@ -465,9 +462,10 @@ mod cal_tests { use xcp_type_description_derive::XcpTypeDescription; use serde::{Deserialize, Serialize}; - use std::sync::{mpsc, mpsc::Sender, Arc, Mutex, Once, RwLock}; + use std::sync::{mpsc, mpsc::Sender, Arc, Once, RwLock}; use std::thread; use std::thread::sleep; + use std::thread::JoinHandle; use std::time::{Duration, Instant}; //----------------------------------------------------------------------------- @@ -490,25 +488,19 @@ mod cal_tests { test: u8, } - fn task_calseg(cal_seg: CalSeg) { - info!("task_calseg"); - loop { + fn task_calseg(cal_seg: CalSeg) -> u32 { + trace!("task_calseg start"); + let mut i: u32 = 0; + for _ in 0..1000000 { + i += 1; + thread::yield_now(); if cal_seg.stop { break; } cal_seg.sync(); } - info!("task_calseg end"); - } - - fn task_mutex(stop: Arc>) { - info!("task_stop"); - loop { - if **(stop.lock().unwrap()) { - break; - } - } - info!("task_calseg end"); + trace!("task_calseg end, loop count = {}", i); + i } #[test] @@ -558,7 +550,7 @@ mod cal_tests { // The only effect would be, that we hold a reference to the wrong page, as demonstrated here const CAL_PAGE2: CalPage4 = CalPage4 { test: 0x55 }; // FLASH let cal_page2 = CalPage4 { test: 0xAA }; // RAM - cal_page2.save_to_file("calseg2.json"); + cal_page2.save_to_file("calseg2.json").unwrap(); let cal_seg2 = xcp.create_calseg("calseg2", &CAL_PAGE2, true); Xcp::get().set_ecu_cal_page(XcpCalPage::Ram); let r = &cal_seg2.test; @@ -570,6 +562,40 @@ mod cal_tests { std::fs::remove_file("calseg2.json").ok(); } + #[test] + fn test_calibration_segment_performance() { + let xcp = xcp_test::test_setup(log::LevelFilter::Info); + + const CAL_PAGE: CalPage0 = CalPage0 { stop: false }; + + let mut cal_seg1 = xcp.create_calseg("calseg1", &CAL_PAGE, false); + cal_seg1.sync(); + assert!(!cal_seg1.stop); + + // Create 10 tasks with 10 clones of cal_seg1 + let mut t = Vec::new(); + let loop_count = Arc::new(parking_lot::Mutex::new(Vec::with_capacity(10))); + let start = Instant::now(); + for i in 0..10 { + let c = CalSeg::clone(&cal_seg1); + trace!("task {} clone = {}", i, cal_seg1.get_clone_count()); + let l = loop_count.clone(); + t.push(thread::spawn(move || { + let n = task_calseg(c); + l.lock().push(n); + })); + } + thread::sleep(Duration::from_millis(1000)); + cal_seg1.stop = true; + t.into_iter().for_each(|t| t.join().unwrap()); + + let duration = start.elapsed().as_micros(); + info!("Duration: {}us", duration); + let tot_loop_count: u32 = loop_count.lock().iter().sum(); + info!("Loop counts: tot = {}, {:.3}us per loop", tot_loop_count, duration as f64 / tot_loop_count as f64); + info!(" {:?}", loop_count); + } + //----------------------------------------------------------------------------- // Test file read and write of a cal_seg @@ -602,7 +628,7 @@ mod cal_tests { // Create a test_cal_page.json file with values from CAL_PAR_RAM let mut_page: Box = Box::new(CAL_PAR_RAM); - mut_page.save_to_file("test_cal_seg.json"); + mut_page.save_to_file("test_cal_seg.json").unwrap(); // Create a cal_seg with a mut_page from file test_cal_seg.json aka CAL_PAR_RAM, and a default page from CAL_PAR_FLASH let cal_seg = &xcp.create_calseg("test_cal_seg", &CAL_PAR_FLASH, true); @@ -679,8 +705,8 @@ mod cal_tests { xcp_test::test_setup(log::LevelFilter::Info); let xcp = Xcp::get(); let mut_page: CalPage2 = CalPage2 { a: 1, b: 3, c: 5 }; - mut_page.save_to_file("test1.json"); - mut_page.save_to_file("test2.json"); + mut_page.save_to_file("test1.json").unwrap(); + mut_page.save_to_file("test2.json").unwrap(); let cal_seg = xcp.create_calseg("test1", &FLASH_PAGE2, true); // active page is RAM from test1.json assert_eq!(xcp.get_ecu_cal_page(), XcpCalPage::Ram, "XCP should be on RAM page here, there is no independant page switching yet"); test_is_mut!(cal_seg); // Default page must be mut_page @@ -720,7 +746,7 @@ mod cal_tests { assert!(std::mem::size_of::() == 12); let mut_page1: CalPage1 = CalPage1 { a: 1, b: 3, c: 5 }; - mut_page1.save_to_file("test1.json"); + mut_page1.save_to_file("test1.json").unwrap(); // Create calseg1 from def let calseg1 = xcp.create_calseg("test1", &FLASH_PAGE1, true); @@ -789,7 +815,7 @@ mod cal_tests { info!(" {}: {}", i, s.get_name()); } - let a: Arc>>> = Arc::new(Mutex::new(Vec::new())); + let a: Arc>>> = Arc::new(std::sync::Mutex::new(Vec::new())); { let mut v = a.lock().unwrap(); v.push(Box::new(s1.clone())); diff --git a/src/daq/daq_event.rs b/src/daq/daq_event.rs index fec55d1..8b49e98 100644 --- a/src/daq/daq_event.rs +++ b/src/daq/daq_event.rs @@ -637,7 +637,7 @@ mod daq_tests { break; } } - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write } //----------------------------------------------------------------------------- @@ -672,7 +672,7 @@ mod daq_tests { break; } } - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write } //----------------------------------------------------------------------------- @@ -731,6 +731,6 @@ mod daq_tests { // daq_register_instance!(channel6, event5); // panic: duplicate measurement - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write } } diff --git a/src/main.rs b/src/main.rs index 18011af..8838d72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -486,7 +486,7 @@ fn main() { // Without this, the A2L file will be automatically written on XCP connect, to be available for download by CANape if idle_time >= 2.0 { // Test A2L write - xcp.write_a2l().unwrap(); + xcp.write_a2l().unwrap(); // @@@@ Remove: force A2L write // Test init request // xcp.set_init_request(); diff --git a/src/reg/registry/a2l_writer.rs b/src/reg/registry/a2l_writer.rs index fdbaf95..6ea0c7b 100644 --- a/src/reg/registry/a2l_writer.rs +++ b/src/reg/registry/a2l_writer.rs @@ -145,7 +145,7 @@ impl GenerateA2l for RegistryMeasurement { // r#"/begin BLOB {name} "{comment}" 0x{addr:X} {buffer_size} ECU_ADDRESS_EXTENSION {ext} {annotation} /begin IF_DATA XCP /begin DAQ_EVENT FIXED_EVENT_LIST EVENT {event} /end DAQ_EVENT /end IF_DATA /end BLOB"# // )?; - // @@@@: Intermediate solution + // @@@@ ToDo: Intermediate solution // As ASCII string (old representation) write!( writer, diff --git a/src/xcp.rs b/src/xcp.rs index 5d213df..66747bf 100644 --- a/src/xcp.rs +++ b/src/xcp.rs @@ -253,15 +253,6 @@ impl EventList { } fn register(&mut self) { - // Check event list is in untransformed order, may happen during testing - // @@@@ Remove this - for (i, e) in self.0.iter_mut().enumerate() { - if e.event.channel != i as u16 || e.event.get_channel() != i as u16 { - warn!("Event list is not in untransformed order"); - break; - } - } - // Sort the event list by name and then instance index self.sort_by_name_and_index(); @@ -753,13 +744,11 @@ impl Xcp { // Calibration page switching /// Set the active calibration page for the ECU access (used for test only) - // @@@@ ToDo: remove this pub pub fn set_ecu_cal_page(&self, page: XcpCalPage) { self.ecu_cal_page.store(page as u8, Ordering::Relaxed); } /// Set the active calibration page for the XCP access (used for test only) - // @@@@ ToDo: remove this pub pub fn set_xcp_cal_page(&self, page: XcpCalPage) { self.xcp_cal_page.store(page as u8, Ordering::Relaxed); } diff --git a/tests/test_multi_thread.rs b/tests/test_multi_thread.rs index fa70653..f473d63 100644 --- a/tests/test_multi_thread.rs +++ b/tests/test_multi_thread.rs @@ -2,7 +2,7 @@ // Integration test for XCP in a multi threaded application // Uses the test XCP client in xcp_client -// cargo test --features=json --features=auto_reg -- --test-threads=1 --nocapture --test test_multi_thread +// cargo test --features=json --features=auto_reg --features=a2l_reader -- --test-threads=1 --nocapture --test test_multi_thread #![allow(unused_assignments)] diff --git a/tests/test_single_thread.rs b/tests/test_single_thread.rs index 799217b..c1cad09 100644 --- a/tests/test_single_thread.rs +++ b/tests/test_single_thread.rs @@ -2,8 +2,7 @@ // Integration test for XCP in a single thread application // Uses the test XCP client in module xcp_client -// cargo test --features=json --features=auto_reg -- --test-threads=1 --nocapture --test test_single_thread - +// cargo test --features=json --features=auto_reg --features=a2l_reader -- --test-threads=1 --nocapture --test test_single_thread use xcp::*; use xcp_type_description::prelude::*; diff --git a/tests/test_tokio_multi_thread.rs b/tests/test_tokio_multi_thread.rs index 4a94e79..42729ed 100644 --- a/tests/test_tokio_multi_thread.rs +++ b/tests/test_tokio_multi_thread.rs @@ -278,7 +278,7 @@ async fn task(index: usize, cal_seg: CalSeg) { //----------------------------------------------------------------------------- // Integration test multi thread measurememt and calibration -//#[ignore] +#[ignore] #[tokio::test] async fn test_tokio_multi_thread() { env_logger::Builder::new().filter_level(OPTION_LOG_LEVEL.to_log_level_filter()).init(); diff --git a/tests/test_tokio_single_thread.rs b/tests/test_tokio_single_thread.rs index aa37e26..935f253 100644 --- a/tests/test_tokio_single_thread.rs +++ b/tests/test_tokio_single_thread.rs @@ -151,7 +151,7 @@ fn task(cal_seg: CalSeg) { //----------------------------------------------------------------------------- // Integration test single thread measurement and calibration -//#[ignore] +#[ignore] #[tokio::test] async fn test_tokio_single_thread() { env_logger::Builder::new().filter_level(OPTION_LOG_LEVEL.to_log_level_filter()).try_init().ok(); diff --git a/tests/xcp_test_executor.rs b/tests/xcp_test_executor.rs index 501b663..9fdcf3f 100644 --- a/tests/xcp_test_executor.rs +++ b/tests/xcp_test_executor.rs @@ -25,8 +25,8 @@ pub const OPTION_LOG_LEVEL: xcp::XcpLogLevel = xcp::XcpLogLevel::Info; pub const OPTION_XCP_LOG_LEVEL: xcp::XcpLogLevel = xcp::XcpLogLevel::Info; // Test parameters -pub const MULTI_THREAD_TASK_COUNT: usize = 32; // Number of threads -pub const DAQ_TEST_TASK_SLEEP_TIME_US: u64 = 100; // us +pub const MULTI_THREAD_TASK_COUNT: usize = 16; // Number of threads +pub const DAQ_TEST_TASK_SLEEP_TIME_US: u64 = 1000; // us const DAQ_TEST_DURATION_MS: u64 = 4000; // ms const CAL_TEST_MAX_ITER: u32 = 4000; // Number of calibrations const CAL_TEST_TASK_SLEEP_TIME_US: u64 = 50; // Checking task cycle time in us diff --git a/xcp_lite.a2l b/xcp_lite.a2l index ae76cef..f84e5fc 100644 --- a/xcp_lite.a2l +++ b/xcp_lite.a2l @@ -191,10 +191,10 @@ /begin GROUP mainloop "" ROOT /begin REF_MEASUREMENT mainloop_counter1 mainloop_counter2 /end REF_MEASUREMENT /end GROUP /begin GROUP task1 "" ROOT /begin REF_MEASUREMENT array1 counter counter_u16 counter_u32 counter_u64 counter_u8 /end REF_MEASUREMENT /end GROUP /begin GROUP task2_inst "" ROOT /begin REF_MEASUREMENT channel_1 channel_10 channel_2 channel_3 channel_4 channel_5 channel_6 channel_7 channel_8 channel_9 /end REF_MEASUREMENT /end GROUP -/begin CHARACTERISTIC calpage00.task1_cycle_time_us "task1 cycle time" VALUE 0x14800C U32 0 NO_COMPU_METHOD 0 4294967295 PHYS_UNIT "us" ECU_ADDRESS_EXTENSION 1 /end CHARACTERISTIC -/begin CHARACTERISTIC calpage00.task2_cycle_time_us "task2 cycle time" VALUE 0x148010 U32 0 NO_COMPU_METHOD 0 4294967295 PHYS_UNIT "us" ECU_ADDRESS_EXTENSION 1 /end CHARACTERISTIC -/begin CHARACTERISTIC static_vars.test_f64 "Test static f64" VALUE 0x14827C F32 0 NO_COMPU_METHOD -1000000000000 1000000000000 ECU_ADDRESS_EXTENSION 1 /begin IF_DATA XCP /begin DAQ_EVENT FIXED_EVENT_LIST EVENT 2 /end DAQ_EVENT /end IF_DATA /end CHARACTERISTIC -/begin CHARACTERISTIC static_vars.test_u32 "Test static u32" VALUE 0x148278 U32 0 NO_COMPU_METHOD 0 4294967295 ECU_ADDRESS_EXTENSION 1 /begin IF_DATA XCP /begin DAQ_EVENT FIXED_EVENT_LIST EVENT 2 /end DAQ_EVENT /end IF_DATA /end CHARACTERISTIC +/begin CHARACTERISTIC calpage00.task1_cycle_time_us "task1 cycle time" VALUE 0x14C00C U32 0 NO_COMPU_METHOD 0 4294967295 PHYS_UNIT "us" ECU_ADDRESS_EXTENSION 1 /end CHARACTERISTIC +/begin CHARACTERISTIC calpage00.task2_cycle_time_us "task2 cycle time" VALUE 0x14C010 U32 0 NO_COMPU_METHOD 0 4294967295 PHYS_UNIT "us" ECU_ADDRESS_EXTENSION 1 /end CHARACTERISTIC +/begin CHARACTERISTIC static_vars.test_f64 "Test static f64" VALUE 0x14C3EC F32 0 NO_COMPU_METHOD -1000000000000 1000000000000 ECU_ADDRESS_EXTENSION 1 /begin IF_DATA XCP /begin DAQ_EVENT FIXED_EVENT_LIST EVENT 2 /end DAQ_EVENT /end IF_DATA /end CHARACTERISTIC +/begin CHARACTERISTIC static_vars.test_u32 "Test static u32" VALUE 0x14C3E8 U32 0 NO_COMPU_METHOD 0 4294967295 ECU_ADDRESS_EXTENSION 1 /begin IF_DATA XCP /begin DAQ_EVENT FIXED_EVENT_LIST EVENT 2 /end DAQ_EVENT /end IF_DATA /end CHARACTERISTIC /begin CHARACTERISTIC CalPage.cycle_time_ms "main task cycle time" VALUE 0x80010000 U32 0 NO_COMPU_METHOD 0 4294967295 PHYS_UNIT "ms" /end CHARACTERISTIC /begin CHARACTERISTIC CalPage.run "" VALUE 0x80010004 U8 0 NO_COMPU_METHOD 0 1 PHYS_UNIT "bool" /end CHARACTERISTIC /begin CHARACTERISTIC CalPage.run1 "" VALUE 0x80010005 U8 0 NO_COMPU_METHOD 0 1 PHYS_UNIT "bool" /end CHARACTERISTIC diff --git a/xcplib/src/platform.h b/xcplib/src/platform.h index 3b83816..1b00371 100644 --- a/xcplib/src/platform.h +++ b/xcplib/src/platform.h @@ -81,9 +81,10 @@ typedef HANDLE tXcpThread; #elif defined(_LINUX) // Linux typedef pthread_t tXcpThread; -#define create_thread(h,t) pthread_create(h, NULL, t, NULL); -#define join_thread(h) pthread_join(h,NULL); +#define create_thread(h,t) pthread_create(h, NULL, t, NULL) +#define join_thread(h) pthread_join(h,NULL) #define cancel_thread(h) { pthread_detach(h); pthread_cancel(h); } +#define yield_thread() sched_yield() #endif diff --git a/xcplib/src/xcpEthTl.c b/xcplib/src/xcpEthTl.c index e3bb7d4..75f4f3b 100644 --- a/xcplib/src/xcpEthTl.c +++ b/xcplib/src/xcpEthTl.c @@ -43,11 +43,6 @@ static struct { SOCKET MulticastSock; #endif -#if defined(_WIN) // Windows - HANDLE queue_event; - uint64_t queue_event_time; -#endif - } gXcpTl; #endif @@ -154,10 +149,9 @@ void XcpEthTlSendMulticastCrm(const uint8_t* packet, uint16_t packet_size, const //------------------------------------------------------------------------------ -static int handleXcpCommand(int n, tXcpCtoMessage *p, uint8_t *srcAddr, uint16_t srcPort) { +static int handleXcpCommand(tXcpCtoMessage *p, uint8_t *srcAddr, uint16_t srcPort) { int connected; - (void)n; // gXcpTl.LastCrmCtr = p->ctr; connected = XcpIsConnected(); @@ -193,8 +187,8 @@ static int handleXcpCommand(int n, tXcpCtoMessage *p, uint8_t *srcAddr, uint16_t } } #endif // UDP - - XcpCommand((const uint32_t*)&p->packet[0], p->dlc); // Handle command + if (p->dlc>XCPTL_MAX_CTO_SIZE) return 0; + XcpCommand((const uint32_t*)&p->packet[0], (uint8_t)p->dlc); // Handle command } /* Not connected yet */ @@ -209,7 +203,7 @@ static int handleXcpCommand(int n, tXcpCtoMessage *p, uint8_t *srcAddr, uint16_t } #endif // UDP XcpTlResetTransmitQueue(); - XcpCommand((const uint32_t*)&p->packet[0],p->dlc); // Handle CONNECT command + XcpCommand((const uint32_t*)&p->packet[0],(uint8_t)p->dlc); // Handle CONNECT command } else { DBG_PRINT_WARNING("WARNING: handleXcpCommand: no valid CONNECT command\n"); @@ -268,7 +262,7 @@ BOOL XcpEthTlHandleCommands(uint32_t timeout_ms) { n = socketRecv(gXcpTl.Sock, (uint8_t*)&msgBuf.packet, msgBuf.dlc, TRUE); // packet, recv blocking if (n > 0) { if (n == msgBuf.dlc) { - return handleXcpCommand(n, &msgBuf, NULL, 0); + return handleXcpCommand(&msgBuf, NULL, 0); } else { socketShutdown(gXcpTl.Sock); // Let the receive thread terminate without error message @@ -303,7 +297,7 @@ BOOL XcpEthTlHandleCommands(uint32_t timeout_ms) { DBG_PRINT_ERROR("ERROR: corrupt message received!\n"); return FALSE; // Error } - return handleXcpCommand(n, &msgBuf, srcAddr, srcPort); + return handleXcpCommand(&msgBuf, srcAddr, srcPort); } } #endif // UDP @@ -327,7 +321,8 @@ static int handleXcpMulticastCommand(int n, tXcpCtoMessage* p, uint8_t* dstAddr, // Valid socket data received, at least transport layer header and 1 byte if (n >= XCPTL_TRANSPORT_LAYER_HEADER_SIZE + 1 && p->dlc <= n- XCPTL_TRANSPORT_LAYER_HEADER_SIZE) { - XcpCommand((const uint32_t*)&p->packet[0],p->dlc); // Handle command + if (p->dlc >= XCPTL_MAX_CTO_SIZE) return 0; // Error + XcpCommand((const uint32_t*)&p->packet[0],(uint8_t)p->dlc); // Handle command } else { printf("MULTICAST ignored\n"); diff --git a/xcplib/src/xcpLite.c b/xcplib/src/xcpLite.c index fd0b210..747769a 100644 --- a/xcplib/src/xcpLite.c +++ b/xcplib/src/xcpLite.c @@ -289,7 +289,7 @@ static tXcpData gXcp = { 0 }; #define CRM_WORD(x) (gXcp.Crm.w[x]) #define CRM_DWORD(x) (gXcp.Crm.dw[x]) -static uint8_t XcpAsyncCommand( BOOL async, const uint32_t* cmdBuf, uint16_t cmdLen ); +static uint8_t XcpAsyncCommand( BOOL async, const uint32_t* cmdBuf, uint8_t cmdLen ); /****************************************************************************/ @@ -686,8 +686,7 @@ static uint8_t XcpSetDaqListMode(uint16_t daq, uint16_t event, uint8_t mode, uin // Check all DAQ lists with same event have the same address extension uint8_t ext = DaqListAddrExt(daq); for (uint16_t daq0=0;daq0dlc>XCPTL_MAX_CTO_SIZE) return CRC_CMD_SYNTAX; return XcpCommand((const uint32_t*)&p->packet[0], p->dlc); // Handle command } @@ -84,7 +93,7 @@ uint8_t XcpTlCommand( uint16_t msgLen, const uint8_t* msgBuf) { /* Check for CONNECT command ? */ if (p->dlc == 2 && p->packet[0] == CC_CONNECT) { XcpTlResetTransmitQueue(); - return XcpCommand((const uint32_t*)&p->packet[0],p->dlc); // Handle CONNECT command + return XcpCommand((const uint32_t*)&p->packet[0],(uint8_t)p->dlc); // Handle CONNECT command } else { DBG_PRINTF_WARNING("WARNING: XcpTlCommand: no valid CONNECT command, dlc=%u, data=%02X\n", p->dlc, p->packet[0]); diff --git a/xcplib/src/xcpTlQueue.c b/xcplib/src/xcpTlQueue.c index a0c1e14..7eb9b8a 100644 --- a/xcplib/src/xcpTlQueue.c +++ b/xcplib/src/xcpTlQueue.c @@ -16,10 +16,67 @@ #include "dbg_print.h" #include "xcpLite.h" +// Experimental +// Use spinlock/mutex instead of mutex for producer lock +// This naiv approach is usually not faster compared to a mutex and can produce higher latencies and hard to predict impact on other threads +// It might be a better solution for non preemptive tasks +//#define USE_SPINLOCK +//#define USE_YIELD +//#define TEST_LOCK_TIMING + +/* +Test results from test_multi_thread with 32 tasks and 200us sleep time: +maxLock and avgLock time in ns + +SPINLOCK+YIELD + lockCount=501170, maxLock=296000, avgLock=768 + lockCount=501019, maxLock=195000, avgLock=744 + lockCount=500966, maxLock=210000, avgLock=724 + +SPINLOCK without cache friendly lock check + lockCount=492952, maxLock=10115000, avgLock=1541 + +SPINLOCK + lockCount=497254, maxLock=9935000, avgLock=512 + lockCount=494866, maxLock=11935000, avgLock=1322 + lockCount=490923, maxLock=10019000, avgLock=2073 + lockCount=489831, maxLock=10024000, avgLock=1980 + +MUTEX + lockCount=499798, maxLock=114000, avgLock=840 + lockCount=500202, maxLock=135000, avgLock=806 + lockCount=499972, maxLock=130000, avgLock=790 + lockCount=500703, maxLock=124000, avgLock=755 + lockCount=500773, maxLock=126000, avgLock=669 +*/ + +#ifdef TEST_LOCK_TIMING +static uint64_t lockTimeMax = 0; +static uint64_t lockTimeSum = 0; +static uint64_t lockCount = 0; +#endif + +#ifndef _WIN + #include -// Use spinlock instead of mutex for producer lock -//#define USE_SPINLOCK +#else + +#ifdef _WIN32_ +#error "Windows32 not implemented yet" +#else + +#undef USE_SPINLOCK +#define atomic_uint_fast64_t uint64_t +#define atomic_store(a,b) (*a)=(b) +#define atomic_load(a) (*a) +#define atomic_load_explicit(a,b) (*a) +#define atomic_fetch_add(a,b) { mutexLock(&gXcpTlQueue.mutex); (*a)+=(b); mutexUnlock(&gXcpTlQueue.mutex);} + +#endif + +#endif + // Queue entry states #define RESERVED 0 // Reserved by producer @@ -63,8 +120,8 @@ void XcpTlInitTransmitQueue() { gXcpTlQueue.tail_len = 0; #ifdef USE_SPINLOCK assert(atomic_is_lock_free(&lock)!=0); -#endif assert(atomic_is_lock_free(&gXcpTlQueue.head)!=0); +#endif } void XcpTlResetTransmitQueue() { @@ -79,6 +136,10 @@ void XcpTlFreeTransmitQueue() { #ifndef USE_SPINLOCK mutexDestroy(&gXcpTlQueue.mutex); #endif + +#ifdef TEST_LOCK_TIMING + DBG_PRINTF3("XcpTlFreeTransmitQueue: overruns=%u, lockCount=%llu, maxLock=%llu, avgLock=%llu\n", gXcpTlQueue.overruns, lockCount, lockTimeMax, lockTimeSum/lockCount); +#endif } @@ -106,11 +167,27 @@ uint8_t* XcpTlGetTransmitBuffer(void** handle, uint16_t packet_len) { DBG_PRINTF5("XcpTlGetTransmitBuffer: len=%d\n", packet_len); // Producer lock +#ifdef TEST_LOCK_TIMING + uint64_t c = clockGet(); +#endif #ifdef USE_SPINLOCK - while (atomic_flag_test_and_set(&lock)); + for (uint32_t n = 1;1;n++) { + BOOL locked = atomic_load_explicit(&lock._Value, memory_order_relaxed); + if (!locked && !atomic_flag_test_and_set_explicit(&lock, memory_order_acquire)) break; + //if ( !atomic_flag_test_and_set_explicit(&lock, memory_order_acquire)) break; + #ifdef USE_YIELD + if (n%16==0) yield_thread(); + #endif + } #else mutexLock(&gXcpTlQueue.mutex); #endif +#ifdef TEST_LOCK_TIMING + uint64_t d = clockGet() - c; + if (d>lockTimeMax) lockTimeMax = d; + lockTimeSum += d; + lockCount++; +#endif uint64_t head = atomic_load(&gXcpTlQueue.head); uint64_t tail = atomic_load_explicit(&gXcpTlQueue.tail,memory_order_relaxed); @@ -120,13 +197,13 @@ uint8_t* XcpTlGetTransmitBuffer(void** handle, uint16_t packet_len) { // Use the ctr as commmit state uint32_t offset = head % MPSC_QUEUE_SIZE; entry = (tXcpDtoMessage *)(gXcpTlQueue.buffer + offset); - entry->ctr = RESERVED; + entry->ctr = RESERVED; atomic_store(&gXcpTlQueue.head, head+msg_len); } #ifdef USE_SPINLOCK - atomic_flag_clear(&lock); + atomic_flag_clear_explicit(&lock, memory_order_release); #else mutexUnlock(&gXcpTlQueue.mutex); #endif @@ -167,10 +244,10 @@ void XcpTlFlushTransmitBuffer() { // Get transmit queue level in bytes -static int32_t XcpTlGetTransmitQueueLevel() { +static uint32_t XcpTlGetTransmitQueueLevel() { uint64_t head = atomic_load(&gXcpTlQueue.head); uint64_t tail = atomic_load(&gXcpTlQueue.tail); - return head-tail; + return (uint32_t)(head-tail); } // Wait (sleep) until transmit queue is empty @@ -204,22 +281,20 @@ const uint8_t * XcpTlTransmitQueuePeekMsg( uint16_t* msg_len ) { uint64_t head = atomic_load(&gXcpTlQueue.head); uint64_t tail = atomic_load(&gXcpTlQueue.tail); - if (head == tail) return NULL; // Queue is empty - - uint32_t level = head-tail; - assert(level <= MPSC_QUEUE_SIZE); // Overrun not handled - DBG_PRINTF5("XcpTlTransmitQueuePeekMsg: level = %u, ctr=%u\n", level, gXcpTlQueue.ctr ); + assert(head-tail<=MPSC_QUEUE_SIZE); // Overrun not handled + uint32_t level = (uint32_t)(head-tail); + DBG_PRINTF5("XcpTlTransmitQueuePeekMsg: level=%u, ctr=%u\n", level, gXcpTlQueue.ctr ); uint32_t tail_offset = tail % MPSC_QUEUE_SIZE; - tXcpDtoMessage *entry = (tXcpDtoMessage *)(gXcpTlQueue.buffer + tail_offset); + tXcpDtoMessage *entry1 = (tXcpDtoMessage *)(gXcpTlQueue.buffer + tail_offset); if (gXcpTlQueue.tail_len==0) { - uint16_t ctr = entry->ctr; // entry ctr may be concurrently changed by producer, when committed - if (ctr==RESERVED) return NULL; // Not commited yet - assert(ctr==COMMITTED); - assert(entry->dlc<=XCPTL_MAX_DTO_SIZE); // Max DTO size + uint16_t ctr1 = entry1->ctr; // entry ctr may be concurrently changed by producer, when committed + if (ctr1==RESERVED) return NULL; // Not commited yet + assert(ctr1==COMMITTED); + assert(entry1->dlc<=XCPTL_MAX_DTO_SIZE); // Max DTO size if (gXcpTlQueue.overruns) { // Add the number of overruns DBG_PRINTF3("XcpTlTransmitQueuePeekMsg: overruns=%u\n", gXcpTlQueue.overruns); @@ -227,8 +302,8 @@ const uint8_t * XcpTlTransmitQueuePeekMsg( uint16_t* msg_len ) { gXcpTlQueue.overruns = 0; } - entry->ctr = gXcpTlQueue.ctr++; // Set the transport layer packet counter - uint16_t len = entry->dlc + XCPTL_TRANSPORT_LAYER_HEADER_SIZE; + entry1->ctr = gXcpTlQueue.ctr++; // Set the transport layer packet counter + uint16_t len = entry1->dlc + XCPTL_TRANSPORT_LAYER_HEADER_SIZE; // Check for more packets to concatenate in a meassage segment uint16_t len1 = len; @@ -259,7 +334,7 @@ const uint8_t * XcpTlTransmitQueuePeekMsg( uint16_t* msg_len ) { //DBG_PRINTF5("XcpTlTransmitQueuePeekMsg: msg_len = %u\n", gXcpTlQueue.tail_len ); *msg_len = gXcpTlQueue.tail_len; - return (uint8_t*)entry; + return (uint8_t*)entry1; } diff --git a/xcplib/xcpAppl.c b/xcplib/xcpAppl.c index 96aa2f4..3dd9af5 100644 --- a/xcplib/xcpAppl.c +++ b/xcplib/xcpAppl.c @@ -380,7 +380,7 @@ uint8_t ApplXcpFreezeCalPage(uint8_t segment) { } uint8_t ApplXcpGetCalPageMode(uint8_t segment) { if (segment>0) return 0; - return 0x01; // @@@@ Implement: support multiple segments + return 0x01; // @@@@ ToDo: Support multiple segments, CANape does not support switching individual memory segment } #endif