From f6ad5add506c72e4b6f04b47dec8992a103777ab Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 24 May 2024 19:01:12 +0200 Subject: [PATCH 1/5] Compiling version for trigger counts, but we need to sort out the threading of the system --- plugins/DataFlowOrchestrator.cpp | 17 +++++++++++++++-- plugins/DataFlowOrchestrator.hpp | 11 +++++++++++ .../info/datafloworchestratorinfo.jsonnet | 7 ++++++- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/plugins/DataFlowOrchestrator.cpp b/plugins/DataFlowOrchestrator.cpp index b45a7911..408eea84 100644 --- a/plugins/DataFlowOrchestrator.cpp +++ b/plugins/DataFlowOrchestrator.cpp @@ -180,9 +180,10 @@ DataFlowOrchestrator::receive_trigger_decision(const dfmessages::TriggerDecision return; } - ++m_received_decisions; auto decision_received = std::chrono::steady_clock::now(); - + ++m_received_decisions; + ++m_trigger_counters[(trgdataformats::TriggerCandidateData::Type)decision.trigger_type].received; + std::chrono::steady_clock::time_point decision_assigned; do { @@ -300,6 +301,16 @@ DataFlowOrchestrator::get_info(opmonlib::InfoCollector& ci, int level) ci.add(name, tmp_ic); } + for (auto& [type, data] : m_trigger_counters) { + opmonlib::InfoCollector tmp_ic; + datafloworchestratorinfo::TriggerInfo i; + i.received = data.received.exchange(0); + i.completed = data.completed.exchange(0); + tmp_ic.add(i); + auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; + ci.add(name, tmp_ic); + } + datafloworchestratorinfo::Info info; info.tokens_received = m_received_tokens.exchange(0); info.decisions_sent = m_sent_decisions.exchange(0); @@ -309,6 +320,7 @@ DataFlowOrchestrator::get_info(opmonlib::InfoCollector& ci, int level) info.forwarding_decision = m_forwarding_decision.exchange(0); info.waiting_for_token = m_waiting_for_token.exchange(0); info.processing_token = m_processing_token.exchange(0); + ci.add(info); } @@ -352,6 +364,7 @@ DataFlowOrchestrator::receive_trigger_complete_token(const dfmessages::TriggerDe try { auto dec_ptr = app_it->second.complete_assignment(token.trigger_number, m_metadata_function); + ++m_trigger_counters[(trgdataformats::TriggerCandidateData::Type)dec_ptr->decision.trigger_type].completed; } catch (AssignedTriggerDecisionNotFound const& err) { ers::error(err); } diff --git a/plugins/DataFlowOrchestrator.hpp b/plugins/DataFlowOrchestrator.hpp index 501e44ab..8753ef65 100644 --- a/plugins/DataFlowOrchestrator.hpp +++ b/plugins/DataFlowOrchestrator.hpp @@ -18,6 +18,8 @@ #include "dfmessages/TriggerDecision.hpp" #include "dfmessages/TriggerDecisionToken.hpp" #include "dfmessages/TriggerInhibit.hpp" +#include "trgdataformats/TriggerCandidateData.hpp" + #include "iomanager/Sender.hpp" @@ -29,6 +31,7 @@ #include #include #include +#include namespace dunedaq { @@ -132,6 +135,12 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule std::chrono::steady_clock::time_point m_last_token_received; std::chrono::steady_clock::time_point m_last_td_received; + // Struct for statistic + struct TriggerData { + std::atomic received{0}; + std::atomic completed{0}; + }; + // Statistics std::atomic m_received_tokens{ 0 }; // NOLINT (build/unsigned) std::atomic m_sent_decisions{ 0 }; // NOLINT (build/unsigned) @@ -141,6 +150,8 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule std::atomic m_forwarding_decision{ 0 }; // NOLINT (build/unsigned) std::atomic m_waiting_for_token{ 0 }; // NOLINT (build/unsigned) std::atomic m_processing_token{ 0 }; // NOLINT (build/unsigned) + std::map m_trigger_counters; + std::mutex m_trigger_mutex; // used to safely handle the map above }; } // namespace dfmodules } // namespace dunedaq diff --git a/schema/dfmodules/info/datafloworchestratorinfo.jsonnet b/schema/dfmodules/info/datafloworchestratorinfo.jsonnet index 92856079..45eab867 100755 --- a/schema/dfmodules/info/datafloworchestratorinfo.jsonnet +++ b/schema/dfmodules/info/datafloworchestratorinfo.jsonnet @@ -20,7 +20,12 @@ local info = { s.field("average_time_since_assignment", self.uint8, 0, doc="average time since assignment for current TDs (ms)"), s.field("min_time_since_assignment", self.uint8, 0, doc="shortest time since assignment among current TDs (ms)"), s.field("max_time_since_assignment", self.uint8, 0, doc="longest time since assignment among current TDs (ms)") - ], doc="Data Flow Orchestrator information") + ], doc="Data Flow Orchestrator information"), + + trigger_info : s.record("TriggerInfo", [ + s.field("received", self.uint8, 0, doc="Number of received triggers"), + s.field("completed", self.uint8, 0, doc="number of completed triggers") + ], doc = "Trigger info generated by the DFO expected to be different for each trigger type") }; moo.oschema.sort_select(info) \ No newline at end of file From a128709afbfb28237a0ad5f833e0be54c287d454 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 24 May 2024 19:18:41 +0200 Subject: [PATCH 2/5] Compiling version with lock guards --- plugins/DataFlowOrchestrator.cpp | 30 +++++++++++++++++------------- plugins/DataFlowOrchestrator.hpp | 8 ++++++++ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/plugins/DataFlowOrchestrator.cpp b/plugins/DataFlowOrchestrator.cpp index 408eea84..82fe4ec1 100644 --- a/plugins/DataFlowOrchestrator.cpp +++ b/plugins/DataFlowOrchestrator.cpp @@ -153,6 +153,9 @@ DataFlowOrchestrator::do_stop(const data_t& /*args*/) ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_run_number)); } + std::lock_guard guard(m_trigger_mutex); + m_trigger_counters.clear(); + TLOG() << get_name() << " successfully stopped"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; } @@ -182,7 +185,7 @@ DataFlowOrchestrator::receive_trigger_decision(const dfmessages::TriggerDecision auto decision_received = std::chrono::steady_clock::now(); ++m_received_decisions; - ++m_trigger_counters[(trgdataformats::TriggerCandidateData::Type)decision.trigger_type].received; + ++get_trigger_counter(decision.trigger_type).received; std::chrono::steady_clock::time_point decision_assigned; do { @@ -301,16 +304,6 @@ DataFlowOrchestrator::get_info(opmonlib::InfoCollector& ci, int level) ci.add(name, tmp_ic); } - for (auto& [type, data] : m_trigger_counters) { - opmonlib::InfoCollector tmp_ic; - datafloworchestratorinfo::TriggerInfo i; - i.received = data.received.exchange(0); - i.completed = data.completed.exchange(0); - tmp_ic.add(i); - auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; - ci.add(name, tmp_ic); - } - datafloworchestratorinfo::Info info; info.tokens_received = m_received_tokens.exchange(0); info.decisions_sent = m_sent_decisions.exchange(0); @@ -320,8 +313,19 @@ DataFlowOrchestrator::get_info(opmonlib::InfoCollector& ci, int level) info.forwarding_decision = m_forwarding_decision.exchange(0); info.waiting_for_token = m_waiting_for_token.exchange(0); info.processing_token = m_processing_token.exchange(0); - ci.add(info); + + std::lock_guard guard(m_trigger_mutex); + for (auto& [type, data] : m_trigger_counters) { + opmonlib::InfoCollector tmp_ic; + datafloworchestratorinfo::TriggerInfo i; + i.received = data.received.exchange(0); + i.completed = data.completed.exchange(0); + tmp_ic.add(i); + auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; + ci.add(name, tmp_ic); + } + } void @@ -364,7 +368,7 @@ DataFlowOrchestrator::receive_trigger_complete_token(const dfmessages::TriggerDe try { auto dec_ptr = app_it->second.complete_assignment(token.trigger_number, m_metadata_function); - ++m_trigger_counters[(trgdataformats::TriggerCandidateData::Type)dec_ptr->decision.trigger_type].completed; + ++ get_trigger_counter(dec_ptr->decision.trigger_type).completed; } catch (AssignedTriggerDecisionNotFound const& err) { ers::error(err); } diff --git a/plugins/DataFlowOrchestrator.hpp b/plugins/DataFlowOrchestrator.hpp index 8753ef65..e9f48a97 100644 --- a/plugins/DataFlowOrchestrator.hpp +++ b/plugins/DataFlowOrchestrator.hpp @@ -152,6 +152,14 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule std::atomic m_processing_token{ 0 }; // NOLINT (build/unsigned) std::map m_trigger_counters; std::mutex m_trigger_mutex; // used to safely handle the map above + TriggerData & get_trigger_counter(decltype(dfmessages::TriggerDecision::trigger_type) t ) { + auto type = (trgdataformats::TriggerCandidateData::Type) t; + auto it = m_trigger_counters.find(type); + if (it != m_trigger_counters.end()) return it->second; + + std::lock_guard guard(m_trigger_mutex); + return m_trigger_counters[type]; + } }; } // namespace dfmodules } // namespace dunedaq From dddb1c834eeb56e42b55426ffbbc0b815a607cbf Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 27 May 2024 13:40:12 +0200 Subject: [PATCH 3/5] Compiling version with correct extraction logic --- plugins/DataFlowOrchestrator.cpp | 15 ++++++++++----- plugins/DataFlowOrchestrator.hpp | 18 +++++++++++++++--- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/plugins/DataFlowOrchestrator.cpp b/plugins/DataFlowOrchestrator.cpp index 82fe4ec1..d1ea94bb 100644 --- a/plugins/DataFlowOrchestrator.cpp +++ b/plugins/DataFlowOrchestrator.cpp @@ -185,7 +185,10 @@ DataFlowOrchestrator::receive_trigger_decision(const dfmessages::TriggerDecision auto decision_received = std::chrono::steady_clock::now(); ++m_received_decisions; - ++get_trigger_counter(decision.trigger_type).received; + auto trigger_types = unpack_types(decision.trigger_type); + for ( const auto t : trigger_types ) { + ++get_trigger_counter(t).received; + } std::chrono::steady_clock::time_point decision_assigned; do { @@ -316,11 +319,11 @@ DataFlowOrchestrator::get_info(opmonlib::InfoCollector& ci, int level) ci.add(info); std::lock_guard guard(m_trigger_mutex); - for (auto& [type, data] : m_trigger_counters) { + for ( auto & [type, counts] : m_trigger_counters ) { opmonlib::InfoCollector tmp_ic; datafloworchestratorinfo::TriggerInfo i; - i.received = data.received.exchange(0); - i.completed = data.completed.exchange(0); + i.received = counts.received.exchange(0); + i.completed = counts.completed.exchange(0); tmp_ic.add(i); auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; ci.add(name, tmp_ic); @@ -368,7 +371,8 @@ DataFlowOrchestrator::receive_trigger_complete_token(const dfmessages::TriggerDe try { auto dec_ptr = app_it->second.complete_assignment(token.trigger_number, m_metadata_function); - ++ get_trigger_counter(dec_ptr->decision.trigger_type).completed; + auto trigger_types = unpack_types(dec_ptr->decision.trigger_type); + for ( const auto t : trigger_types ) ++ get_trigger_counter(t).completed; } catch (AssignedTriggerDecisionNotFound const& err) { ers::error(err); } @@ -489,4 +493,5 @@ DataFlowOrchestrator::assign_trigger_decision(const std::shared_ptr received{0}; std::atomic completed{0}; + std::string type; + TriggerData(trgdataformats::TriggerCandidateData::Type t) + : type(dunedaq::trgdataformats::get_trigger_candidate_type_names()[t]) {;} }; + static std::set + unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) { + std::set results; + const std::bitset<64> bits(t); + for( size_t i = 0; i < bits.size(); ++i ) { + if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i); + } + return results; + } // Statistics std::atomic m_received_tokens{ 0 }; // NOLINT (build/unsigned) @@ -152,13 +164,13 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule std::atomic m_processing_token{ 0 }; // NOLINT (build/unsigned) std::map m_trigger_counters; std::mutex m_trigger_mutex; // used to safely handle the map above - TriggerData & get_trigger_counter(decltype(dfmessages::TriggerDecision::trigger_type) t ) { - auto type = (trgdataformats::TriggerCandidateData::Type) t; + TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) { auto it = m_trigger_counters.find(type); if (it != m_trigger_counters.end()) return it->second; std::lock_guard guard(m_trigger_mutex); - return m_trigger_counters[type]; + const TriggerData e (type); + return m_trigger_counters[type] = e; } }; } // namespace dfmodules From 97193b23cb1b200e6dd38b18da6cba192da6d5b3 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 27 May 2024 16:18:22 +0200 Subject: [PATCH 4/5] Check on valid trigger type --- plugins/DataFlowOrchestrator.hpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/plugins/DataFlowOrchestrator.hpp b/plugins/DataFlowOrchestrator.hpp index 70ad1c53..f37c7301 100644 --- a/plugins/DataFlowOrchestrator.hpp +++ b/plugins/DataFlowOrchestrator.hpp @@ -139,13 +139,12 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule struct TriggerData { std::atomic received{0}; std::atomic completed{0}; - std::string type; - TriggerData(trgdataformats::TriggerCandidateData::Type t) - : type(dunedaq::trgdataformats::get_trigger_candidate_type_names()[t]) {;} }; static std::set unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) { std::set results; + if (t == dfmessages::TypeDefaults::s_invalid_trigger_type) + return results; const std::bitset<64> bits(t); for( size_t i = 0; i < bits.size(); ++i ) { if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i); @@ -167,10 +166,9 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) { auto it = m_trigger_counters.find(type); if (it != m_trigger_counters.end()) return it->second; - + std::lock_guard guard(m_trigger_mutex); - const TriggerData e (type); - return m_trigger_counters[type] = e; + return m_trigger_counters[type]; } }; } // namespace dfmodules From f4702985356a718f0632bc3c963d9fbace65cc59 Mon Sep 17 00:00:00 2001 From: Kurt Biery Date: Thu, 30 May 2024 16:48:21 -0500 Subject: [PATCH 5/5] Updated version to v2.14.2 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 17c0ed39..c7a5c030 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(dfmodules VERSION 2.14.1) +project(dfmodules VERSION 2.14.2) find_package(daq-cmake REQUIRED) daq_setup_environment()