Skip to content

Commit

Permalink
Merge pull request #353 from DUNE-DAQ/patch/fddaq-v4.4.x
Browse files Browse the repository at this point in the history
Patch/fddaq v4.4.x
  • Loading branch information
jcfreeman2 authored Jun 10, 2024
2 parents 284c485 + 1def8db commit 72d5963
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(dfmodules VERSION 2.14.1)
project(dfmodules VERSION 2.14.2)

find_package(daq-cmake REQUIRED)
daq_setup_environment()
Expand Down
26 changes: 24 additions & 2 deletions plugins/DataFlowOrchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ DataFlowOrchestrator::do_stop(const data_t& /*args*/)
ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_run_number));
}

std::lock_guard<std::mutex> guard(m_trigger_mutex);
m_trigger_counters.clear();

TLOG() << get_name() << " successfully stopped";
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
}
Expand Down Expand Up @@ -180,9 +183,13 @@ DataFlowOrchestrator::receive_trigger_decision(const dfmessages::TriggerDecision
return;
}

++m_received_decisions;
auto decision_received = std::chrono::steady_clock::now();

++m_received_decisions;
auto trigger_types = unpack_types(decision.trigger_type);
for ( const auto t : trigger_types ) {
++get_trigger_counter(t).received;
}

std::chrono::steady_clock::time_point decision_assigned;
do {

Expand Down Expand Up @@ -310,6 +317,18 @@ DataFlowOrchestrator::get_info(opmonlib::InfoCollector& ci, int level)
info.waiting_for_token = m_waiting_for_token.exchange(0);
info.processing_token = m_processing_token.exchange(0);
ci.add(info);

std::lock_guard<std::mutex> guard(m_trigger_mutex);
for ( auto & [type, counts] : m_trigger_counters ) {
opmonlib::InfoCollector tmp_ic;
datafloworchestratorinfo::TriggerInfo i;
i.received = counts.received.exchange(0);
i.completed = counts.completed.exchange(0);
tmp_ic.add(i);
auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type];
ci.add(name, tmp_ic);
}

}

void
Expand Down Expand Up @@ -352,6 +371,8 @@ DataFlowOrchestrator::receive_trigger_complete_token(const dfmessages::TriggerDe

try {
auto dec_ptr = app_it->second.complete_assignment(token.trigger_number, m_metadata_function);
auto trigger_types = unpack_types(dec_ptr->decision.trigger_type);
for ( const auto t : trigger_types ) ++ get_trigger_counter(t).completed;
} catch (AssignedTriggerDecisionNotFound const& err) {
ers::error(err);
}
Expand Down Expand Up @@ -472,4 +493,5 @@ DataFlowOrchestrator::assign_trigger_decision(const std::shared_ptr<AssignedTrig

} // namespace dunedaq::dfmodules


DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DataFlowOrchestrator)
29 changes: 29 additions & 0 deletions plugins/DataFlowOrchestrator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "dfmessages/TriggerDecision.hpp"
#include "dfmessages/TriggerDecisionToken.hpp"
#include "dfmessages/TriggerInhibit.hpp"
#include "trgdataformats/TriggerCandidateData.hpp"


#include "iomanager/Sender.hpp"

Expand All @@ -29,6 +31,7 @@
#include <string>
#include <utility>
#include <vector>
#include <mutex>

namespace dunedaq {

Expand Down Expand Up @@ -132,6 +135,23 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule
std::chrono::steady_clock::time_point m_last_token_received;
std::chrono::steady_clock::time_point m_last_td_received;

// Struct for statistic
struct TriggerData {
std::atomic<uint64_t> received{0};
std::atomic<uint64_t> completed{0};
};
static std::set<trgdataformats::TriggerCandidateData::Type>
unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) {
std::set<trgdataformats::TriggerCandidateData::Type> results;
if (t == dfmessages::TypeDefaults::s_invalid_trigger_type)
return results;
const std::bitset<64> bits(t);
for( size_t i = 0; i < bits.size(); ++i ) {
if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i);
}
return results;
}

// Statistics
std::atomic<uint64_t> m_received_tokens{ 0 }; // NOLINT (build/unsigned)
std::atomic<uint64_t> m_sent_decisions{ 0 }; // NOLINT (build/unsigned)
Expand All @@ -141,6 +161,15 @@ class DataFlowOrchestrator : public dunedaq::appfwk::DAQModule
std::atomic<uint64_t> m_forwarding_decision{ 0 }; // NOLINT (build/unsigned)
std::atomic<uint64_t> m_waiting_for_token{ 0 }; // NOLINT (build/unsigned)
std::atomic<uint64_t> m_processing_token{ 0 }; // NOLINT (build/unsigned)
std::map<dunedaq::trgdataformats::TriggerCandidateData::Type, TriggerData> m_trigger_counters;
std::mutex m_trigger_mutex; // used to safely handle the map above
TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) {
auto it = m_trigger_counters.find(type);
if (it != m_trigger_counters.end()) return it->second;

std::lock_guard<std::mutex> guard(m_trigger_mutex);
return m_trigger_counters[type];
}
};
} // namespace dfmodules
} // namespace dunedaq
Expand Down
7 changes: 6 additions & 1 deletion schema/dfmodules/info/datafloworchestratorinfo.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ local info = {
s.field("average_time_since_assignment", self.uint8, 0, doc="average time since assignment for current TDs (ms)"),
s.field("min_time_since_assignment", self.uint8, 0, doc="shortest time since assignment among current TDs (ms)"),
s.field("max_time_since_assignment", self.uint8, 0, doc="longest time since assignment among current TDs (ms)")
], doc="Data Flow Orchestrator information")
], doc="Data Flow Orchestrator information"),

trigger_info : s.record("TriggerInfo", [
s.field("received", self.uint8, 0, doc="Number of received triggers"),
s.field("completed", self.uint8, 0, doc="number of completed triggers")
], doc = "Trigger info generated by the DFO expected to be different for each trigger type")
};

moo.oschema.sort_select(info)

0 comments on commit 72d5963

Please sign in to comment.