Skip to content

Commit

Permalink
Merge pull request #348 from DUNE-DAQ/asztuc/mlt_v4_to_v5
Browse files Browse the repository at this point in the history
Asztuc/mlt v4 to v5
  • Loading branch information
ArturSztuc authored Oct 16, 2024
2 parents 3bdd5e1 + 6449dbf commit 6d9bb86
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 42 deletions.
7 changes: 7 additions & 0 deletions include/trigger/Issues.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ ERS_DECLARE_ISSUE(trigger,
"TD trigger number " << tn << " time stamp " << ts,
((uint64_t)tn) ((uint64_t)ts))

ERS_DECLARE_ISSUE_BASE(trigger,
MLTConfigurationProblem,
appfwk::GeneralDAQModuleIssue,
"Configuration error: " << item,
((std::string)name),
((std::string)item))

} // namespace dunedaq

#endif // TRIGGER_INCLUDE_TRIGGER_ISSUES_HPP_
84 changes: 81 additions & 3 deletions plugins/MLTModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ MLTModule::MLTModule(const std::string& name)
// clang-format on
}


std::map<std::string, int>
MLTModule::decode_geoid(uint64_t _geoid_int)
{

std::map<std::string, int> geoid;

// Extract stream_id (stored in the top 16 bits)
geoid["stream_id"] = (_geoid_int >> 48) & 0xFFFF;

// Extract slot_id (stored in the next 16 bits)
geoid["slot_id"] = (_geoid_int >> 32) & 0xFFFF;

// Extract crate_id (stored in the next 16 bits)
geoid["crate_id"] = (_geoid_int >> 16) & 0xFFFF;

// Extract det_id (stored in the lowest 16 bits)
geoid["detector_id"] = _geoid_int & 0xFFFF;

return geoid;
}

void
MLTModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
{
Expand All @@ -65,6 +87,47 @@ MLTModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
m_decision_output = get_iom_sender<dfmessages::TriggerDecision>(con->UID());
}

// Get the session to access the detector configuration
auto session = mcfg->configuration_manager()->session();

hdf5libs::HDF5SourceIDHandler::source_id_geo_id_map_t geoidmap = hdf5libs::HDF5SourceIDHandler::make_source_id_geo_id_map(session);

// Fill the SourceID -- Subdetector map
for (auto const& [sourceid, geoids] : geoidmap) {
TLOG() << "SourceID: " << sourceid;
for (auto const& geoid : geoids) {
std::map<std::string, int> gidmap = decode_geoid(geoid);
SubdetectorID detid = static_cast<SubdetectorID>(gidmap["detector_id"]);
if (m_srcid_detid_map.contains(sourceid) &&
!(m_srcid_detid_map[sourceid] == detid)) {
throw MLTConfigurationProblem(ERS_HERE, get_name(),
"Multiple subdetector types for ine SourceID not suported in trigger system!");
}
m_srcid_detid_map[sourceid] = detid;
}
TLOG() << " * Subdetector type: " << m_srcid_detid_map[sourceid];
}

for (auto subdet_readout_window : mtrg->get_configuration()->get_subdetector_readout_map()) {
std::string subdetector_name = subdet_readout_window->get_subdetector();
SubdetectorID detid = dunedaq::detdataformats::DetID::string_to_subdetector(subdetector_name);
if (detid == detdataformats::DetID::Subdetector::kUnknown) {
throw MLTConfigurationProblem(ERS_HERE, get_name(),
"Unknown Subdetector supplied to MLT subdetector-readout window map");
}

if (m_subdetector_readout_window_map.count(detid)) {
throw MLTConfigurationProblem(ERS_HERE, get_name(),
"Supplied more than one of the same Subdetector name to MLT subdetector-readout window map");
}

m_subdetector_readout_window_map[detid] = std::make_pair(subdet_readout_window->get_time_before(),
subdet_readout_window->get_time_after());

TLOG() << "[MLT] Custom readout map for subdetector: " << detid
<< " time_start: " << subdet_readout_window->get_time_before() << " time_after: " << subdet_readout_window->get_time_after();
}

// Latency related
m_latency_monitoring.store( mtrg->get_configuration()->get_latency_monitoring() );

Expand Down Expand Up @@ -154,11 +217,8 @@ MLTModule::do_start(const nlohmann::json& startobj)

m_inhibit_input->add_callback(std::bind(&MLTModule::dfo_busy_callback, this, std::placeholders::_1));
m_decision_input->add_callback(std::bind(&MLTModule::trigger_decisions_callback, this, std::placeholders::_1));
//m_send_trigger_decisions_thread = std::thread(&MLTModule::send_trigger_decisions, this);
//pthread_setname_np(m_send_trigger_decisions_thread.native_handle(), "mlt-dec"); // TODO: originally mlt-trig-dec

ers::info(TriggerStartOfRun(ERS_HERE, m_run_number));

}

void
Expand Down Expand Up @@ -231,6 +291,23 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
decision.run_number = m_run_number;
decision.trigger_number = m_last_trigger_number;

// Overwrite the component's readout window if we have custom
// subdetector--readout window map
for ( const auto& [subdetectorid, window] : m_subdetector_readout_window_map ) {
for (auto& request: decision.components) {
if (request.component.subsystem != daqdataformats::SourceID::Subsystem::kDetectorReadout) {
continue;
}

if (subdetectorid != m_srcid_detid_map[request.component]) {
continue;
}

request.window_begin = decision.trigger_timestamp - window.first;
request.window_end = decision.trigger_timestamp + window.second;
}
}

TLOG() << "Received decision with timestamp "
<< decision.trigger_timestamp ;

Expand All @@ -240,6 +317,7 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
<< " number of links " << decision.components.size();

// readout window latency update
// TODO: The latency will be different for different components, since they might have different readout windows
if (m_latency_monitoring.load()) {
m_latency_requests_instance.update_latency_in( decision.components.front().window_begin );
m_latency_requests_instance.update_latency_out( decision.components.front().window_end );
Expand Down
14 changes: 14 additions & 0 deletions plugins/MLTModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
#include "appmodel/TCReadoutMap.hpp"
#include "appmodel/ROIGroupConf.hpp"
#include "appmodel/SourceIDConf.hpp"
#include "appmodel/SubdetectorReadoutWindowMap.hpp"

#include "confmodel/Connection.hpp"
#include "confmodel/GeoId.hpp"

#include "daqdataformats/SourceID.hpp"
#include "hdf5libs/HDF5RawDataFile.hpp"
#include "dfmessages/TriggerDecision.hpp"
#include "dfmessages/TriggerDecisionToken.hpp"
#include "dfmessages/TriggerInhibit.hpp"
Expand Down Expand Up @@ -59,6 +62,7 @@ namespace trigger {
class MLTModule : public dunedaq::appfwk::DAQModule
{
public:
typedef dunedaq::detdataformats::DetID::Subdetector SubdetectorID;
/**
* @brief MLTModule Constructor
* @param name Instance name for this MLTModule instance
Expand All @@ -83,6 +87,8 @@ class MLTModule : public dunedaq::appfwk::DAQModule
void trigger_decisions_callback(dfmessages::TriggerDecision& decision);
void dfo_busy_callback(dfmessages::TriggerInhibit& inhibit);

std::map<std::string, int> decode_geoid(uint64_t _geoid_int);

// Queue sources and sinks
std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TriggerDecision>> m_decision_input;
std::shared_ptr<iomanager::SenderConcept<dfmessages::TriggerDecision>> m_decision_output;
Expand Down Expand Up @@ -208,6 +214,14 @@ class MLTModule : public dunedaq::appfwk::DAQModule
bool m_ignoring_tc_types;
bool check_trigger_type_ignore(unsigned int tc_type);
*/

/// @brief SourceID -- SubdetectorID map
std::map<dfmessages::SourceID, SubdetectorID> m_srcid_detid_map;

/// @brief Subdetector--readout-window map config
std::map<SubdetectorID, std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>>
m_subdetector_readout_window_map;

// Opmon variables
using metric_counter_type = uint64_t ; //decltype(moduleleveltriggerinfo::Info::tc_received_count);
std::atomic<metric_counter_type> m_td_msg_received_count{ 0 };
Expand Down
96 changes: 57 additions & 39 deletions src/TCProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,17 @@ TCProcessor::conf(const appmodel::DataHandlerModule* cfg)

m_hsi_passthrough = proc_conf->get_hsi_trigger_type_passthrough();
m_tc_merging = proc_conf->get_merge_overlapping_tcs();
m_ignore_tc_pileup = proc_conf->get_ignore_overlapping_tcs();
m_buffer_timeout = proc_conf->get_buffer_timeout();
m_send_timed_out_tds = proc_conf->get_td_out_of_timeout();
m_send_timed_out_tds = (m_ignore_tc_pileup) ? false : proc_conf->get_td_out_of_timeout();
m_td_readout_limit = proc_conf->get_td_readout_limit();
m_ignored_tc_types = proc_conf->get_ignore_tc();
m_ignoring_tc_types = (m_ignored_tc_types.size() > 0) ? true : false;
m_use_readout_map = proc_conf->get_use_readout_map();
m_use_roi_readout = proc_conf->get_use_roi_readout();
m_use_bitwords = proc_conf->get_use_bitwords();
TLOG_DEBUG(3) << "Allow merging: " << m_tc_merging;
TLOG_DEBUG(3) << "Ignore pileup: " << m_ignore_tc_pileup;
TLOG_DEBUG(3) << "Buffer timeout: " << m_buffer_timeout;
TLOG_DEBUG(3) << "Should send timed out TDs: " << m_send_timed_out_tds;
TLOG_DEBUG(3) << "TD readout limit: " << m_td_readout_limit;
Expand Down Expand Up @@ -375,55 +377,71 @@ TCProcessor::call_tc_decision(const TCProcessor::PendingTD& pending_td)
void
TCProcessor::add_tc(const triggeralgs::TriggerCandidate tc)
{
bool added_to_existing = false;
bool tc_dealt = false;
int64_t tc_wallclock_arrived =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();

if (m_tc_merging) {
if (m_tc_merging || m_ignore_tc_pileup) {

for (std::vector<PendingTD>::iterator it = m_pending_tds.begin(); it != m_pending_tds.end();) {
if (check_overlap(tc, *it)) {
it->contributing_tcs.push_back(tc);
if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
TLOG_DEBUG(3) << "TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
<< tc.time_candidate + m_readout_window_map[tc.type].second
<< " overlaps with pending TD with start/end times " << it->readout_start << "/"
<< it->readout_end;
it->readout_start = ((tc.time_candidate - m_readout_window_map[tc.type].first) >= it->readout_start)
? it->readout_start
: (tc.time_candidate - m_readout_window_map[tc.type].first);
it->readout_end = ((tc.time_candidate + m_readout_window_map[tc.type].second) >= it->readout_end)
? (tc.time_candidate + m_readout_window_map[tc.type].second)
: it->readout_end;
} else {
TLOG_DEBUG(3) << "TC with start/end times " << tc.time_start << "/" << tc.time_end
<< " overlaps with pending TD with start/end times " << it->readout_start << "/"
<< it->readout_end;
it->readout_start = (tc.time_start >= it->readout_start) ? it->readout_start : tc.time_start;
it->readout_end = (tc.time_end >= it->readout_end) ? tc.time_end : it->readout_end;
}
it->walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
added_to_existing = true;
// Don't deal with TC here if there's no overlap
if (!check_overlap(tc, *it)) {
it++;
continue;
}

// If overlap and ignoring, we drop the TC and flag it as dealt with.
if (m_ignore_tc_pileup) {
m_tds_dropped_tc_count++;
tc_dealt = true;
TLOG_DEBUG(3) << "TC overlapping with a previous TD, dropping!";
break;
}
++it;

// If we're here, TC merging must be on, in which case we're actually
// going to merge the TC into the TD.
it->contributing_tcs.push_back(tc);
if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
TLOG_DEBUG(3) << "TC with start/end times " << tc.time_candidate - m_readout_window_map[tc.type].first << "/"
<< tc.time_candidate + m_readout_window_map[tc.type].second
<< " overlaps with pending TD with start/end times " << it->readout_start << "/"
<< it->readout_end;
it->readout_start = ((tc.time_candidate - m_readout_window_map[tc.type].first) >= it->readout_start)
? it->readout_start
: (tc.time_candidate - m_readout_window_map[tc.type].first);
it->readout_end = ((tc.time_candidate + m_readout_window_map[tc.type].second) >= it->readout_end)
? (tc.time_candidate + m_readout_window_map[tc.type].second)
: it->readout_end;
} else {
TLOG_DEBUG(3) << "TC with start/end times " << tc.time_start << "/" << tc.time_end
<< " overlaps with pending TD with start/end times " << it->readout_start << "/"
<< it->readout_end;
it->readout_start = (tc.time_start >= it->readout_start) ? it->readout_start : tc.time_start;
it->readout_end = (tc.time_end >= it->readout_end) ? tc.time_end : it->readout_end;
}
it->walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
tc_dealt = true;
break;
}
}

if (!added_to_existing) {
PendingTD td_candidate;
td_candidate.contributing_tcs.push_back(tc);
if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
td_candidate.readout_start = tc.time_candidate - m_readout_window_map[tc.type].first;
td_candidate.readout_end = tc.time_candidate + m_readout_window_map[tc.type].second;
} else {
td_candidate.readout_start = tc.time_start;
td_candidate.readout_end = tc.time_end;
}
td_candidate.walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
m_pending_tds.push_back(td_candidate);
// Don't do anything else if we've already dealt with the TC
if (tc_dealt) {
return;
}
return;

// Create a new TD out of the TC
PendingTD td_candidate;
td_candidate.contributing_tcs.push_back(tc);
if ( (m_use_readout_map) && (m_readout_window_map.count(tc.type)) ){
td_candidate.readout_start = tc.time_candidate - m_readout_window_map[tc.type].first;
td_candidate.readout_end = tc.time_candidate + m_readout_window_map[tc.type].second;
} else {
td_candidate.readout_start = tc.time_start;
td_candidate.readout_end = tc.time_end;
}
td_candidate.walltime_expiration = tc_wallclock_arrived + m_buffer_timeout;
m_pending_tds.push_back(td_candidate);
}

void
Expand Down
3 changes: 3 additions & 0 deletions src/trigger/TCProcessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class TCProcessor : public datahandlinglibs::TaskRawDataProcessorModel<TCWrapper
std::atomic<bool> m_hsi_passthrough;
std::atomic<bool> m_tc_merging;

/// @brief Ignore TCs that overlap with already made TD
bool m_ignore_tc_pileup;

dfmessages::trigger_number_t m_last_trigger_number;

dfmessages::run_number_t m_run_number;
Expand Down

0 comments on commit 6d9bb86

Please sign in to comment.