Skip to content

Commit

Permalink
Merge pull request #364 from DUNE-DAQ/patch/fddaq-v4.4.x
Browse files Browse the repository at this point in the history
Patch/fddaq v4.4.x
  • Loading branch information
jcfreeman2 authored Jul 10, 2024
2 parents 72d5963 + cf2d114 commit aba48e0
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 52 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(dfmodules VERSION 2.14.2)
project(dfmodules VERSION 2.14.3)

find_package(daq-cmake REQUIRED)
daq_setup_environment()
Expand Down
11 changes: 11 additions & 0 deletions include/dfmodules/DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ ERS_DECLARE_ISSUE(dfmodules,
((std::string)mod_name)((std::string)description))
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Issue for DataStore problems in which it is
* reasonable to skip any warning or error message.
* @cond Doxygen doesn't like ERS macros LCOV_EXCL_START
*/
ERS_DECLARE_ISSUE(dfmodules,
IgnorableDataStoreProblem,
"Module " << mod_name << ": A problem was encountered when " << description,
((std::string)mod_name)((std::string)description))
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Issue for DataStore problems in which it is
* not clear whether retrying the operation might succeed or not.
Expand Down
12 changes: 6 additions & 6 deletions integtest/disabled_output_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
triggeractivity_frag_params={"fragment_type_description": "Trigger Activity",
"fragment_type": "Trigger_Activity",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": 1,
"expected_fragment_count": 3,
"min_size_bytes": 72, "max_size_bytes": 216}
triggertp_frag_params={"fragment_type_description": "Trigger with TPs",
"fragment_type": "Trigger_Primitive",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": 2, # number of readout apps (1) times 2
"expected_fragment_count": 3, # number of readout apps (1) times 3, one per plane
"min_size_bytes": 72, "max_size_bytes": 16000}
hsi_frag_params ={"fragment_type_description": "HSI",
"fragment_type": "Hardware_Signal",
Expand Down Expand Up @@ -116,10 +116,10 @@

# The commands to run in nanorc, as a list
nanorc_command_list="integtest-partition boot conf".split()
nanorc_command_list+="start_run --disable-data-storage 101 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run 102 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run --disable-data-storage 103 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="start_run 104 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="start_run --disable-data-storage --wait 2 101 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run --wait 2 102 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run --disable-data-storage --wait 2 103 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="start_run --wait 2 104 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="scrap terminate".split()

# The tests themselves
Expand Down
6 changes: 3 additions & 3 deletions integtest/multi_output_file_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
triggeractivity_frag_params={"fragment_type_description": "Trigger Activity",
"fragment_type": "Trigger_Activity",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": number_of_readout_apps,
"expected_fragment_count": (3*number_of_readout_apps),
"min_size_bytes": 72, "max_size_bytes": 520}
triggertp_frag_params={"fragment_type_description": "Trigger with TPs",
"fragment_type": "Trigger_Primitive",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": (2*number_of_readout_apps),
"expected_fragment_count": (3*number_of_readout_apps),
"min_size_bytes": 72, "max_size_bytes": 16000}
hsi_frag_params ={"fragment_type_description": "HSI",
"fragment_type": "Hardware_Signal",
Expand Down Expand Up @@ -132,7 +132,7 @@
}

# The commands to run in nanorc, as a list
nanorc_command_list="integtest-partition boot conf start_run 101 wait 180 disable_triggers wait 2 stop_run wait 21 start_run 102 wait 120 disable_triggers wait 2 stop_run wait 21 scrap terminate".split()
nanorc_command_list="integtest-partition boot conf start_run --wait 3 101 wait 180 disable_triggers wait 2 stop_run wait 21 start_run --wait 3 102 wait 120 disable_triggers wait 2 stop_run wait 21 scrap terminate".split()

# The tests themselves

Expand Down
80 changes: 51 additions & 29 deletions plugins/HDF5DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class HDF5DataStore : public DataStore

m_file_index = 0;
m_recorded_size = 0;
m_current_record_number = std::numeric_limits<size_t>::max();

if (m_operation_mode != "one-event-per-file"
//&& m_operation_mode != "one-fragment-per-file"
Expand Down Expand Up @@ -162,7 +163,7 @@ class HDF5DataStore : public DataStore
virtual void write(const daqdataformats::TriggerRecord& tr)
{

// check if there is sufficient space for this data block
// check if there is sufficient space for this record
size_t current_free_space = get_free_space(m_path);
size_t tr_size = tr.get_total_size_bytes();
if (current_free_space < (m_free_space_safety_factor_for_write * tr_size)) {
Expand All @@ -178,12 +179,19 @@ class HDF5DataStore : public DataStore
throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue);
}

// check if a new file should be opened for this data block
increment_file_index_if_needed(tr_size);
// check if a new file should be opened for this record
if (! increment_file_index_if_needed(tr_size)) {
if (m_config_params.mode == "one-event-per-file") {
if (m_current_record_number != std::numeric_limits<size_t>::max() &&
tr.get_header_ref().get_trigger_number() != m_current_record_number) {
++m_file_index;
}
}
}
m_current_record_number = tr.get_header_ref().get_trigger_number();

// determine the filename from Storage Key + configuration parameters
std::string full_filename =
get_file_name(tr.get_header_ref().get_trigger_number(), tr.get_header_ref().get_run_number());
std::string full_filename = get_file_name(tr.get_header_ref().get_run_number());

try {
open_file_if_needed(full_filename, HighFive::File::OpenOrCreate);
Expand All @@ -194,7 +202,7 @@ class HDF5DataStore : public DataStore
throw FileOperationProblem(ERS_HERE, get_name(), full_filename);
}

// write the data block
// write the record
m_file_handle->write(tr);
m_recorded_size = m_file_handle->get_recorded_size();
}
Expand All @@ -209,7 +217,7 @@ class HDF5DataStore : public DataStore
virtual void write(const daqdataformats::TimeSlice& ts)
{

// check if there is sufficient space for this data block
// check if there is sufficient space for this record
size_t current_free_space = get_free_space(m_path);
size_t ts_size = ts.get_total_size_bytes();
if (current_free_space < (m_free_space_safety_factor_for_write * ts_size)) {
Expand All @@ -225,11 +233,19 @@ class HDF5DataStore : public DataStore
throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue);
}

// check if a new file should be opened for this data block
increment_file_index_if_needed(ts_size);
// check if a new file should be opened for this record
if (! increment_file_index_if_needed(ts_size)) {
if (m_config_params.mode == "one-event-per-file") {
if (m_current_record_number != std::numeric_limits<size_t>::max() &&
ts.get_header().timeslice_number != m_current_record_number) {
++m_file_index;
}
}
}
m_current_record_number = ts.get_header().timeslice_number;

// determine the filename from Storage Key + configuration parameters
std::string full_filename = get_file_name(ts.get_header().timeslice_number, ts.get_header().run_number);
std::string full_filename = get_file_name(ts.get_header().run_number);

try {
open_file_if_needed(full_filename, HighFive::File::OpenOrCreate);
Expand All @@ -240,13 +256,18 @@ class HDF5DataStore : public DataStore
throw FileOperationProblem(ERS_HERE, get_name(), full_filename);
}

// write the data block
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
// write the record
try {
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
} catch (hdf5libs::TimeSliceAlreadyExists const& excpt) {
std::string msg = "writing a time slice to file " + m_file_handle->get_file_name();
throw IgnorableDataStoreProblem(ERS_HERE, get_name(), msg, excpt);
}
}

/**
* @brief Informs the HDF5DataStore that writes or reads of data blocks
* @brief Informs the HDF5DataStore that writes or reads of records
* associated with the specified run number will soon be requested.
* This allows the DataStore to test that the output file path is valid
* and any other checks that are useful in advance of the first data
Expand Down Expand Up @@ -280,10 +301,11 @@ class HDF5DataStore : public DataStore

m_file_index = 0;
m_recorded_size = 0;
m_current_record_number = std::numeric_limits<size_t>::max();
}

/**
* @brief Informs the HD5DataStore that writes or reads of data blocks
* @brief Informs the HD5DataStore that writes or reads of records
* associated with the specified run number have finished, for now.
* This allows the DataStore to close open files and do any other
* cleanup or shutdown operations that are useful once the writes or
Expand Down Expand Up @@ -324,8 +346,13 @@ class HDF5DataStore : public DataStore
// Total number of generated files
size_t m_file_index;

// Total size of data being written
size_t m_recorded_size;
// Size of data being written
size_t m_recorded_size; // per file

// Record number for the record that is currently being written out
// This is only useful for long-readout windows, in which there may
// be multiple calls to write()
size_t m_current_record_number;

// Configuration
hdf5datastore::ConfParams m_config_params;
Expand All @@ -340,8 +367,7 @@ class HDF5DataStore : public DataStore
/**
* @brief Translates the specified input parameters into the appropriate filename.
*/
std::string get_file_name(uint64_t record_number, // NOLINT(build/unsigned)
daqdataformats::run_number_t run_number)
std::string get_file_name(daqdataformats::run_number_t run_number)
{
std::ostringstream work_oss;
work_oss << m_config_params.directory_path;
Expand All @@ -356,28 +382,24 @@ class HDF5DataStore : public DataStore
work_oss << m_config_params.filename_parameters.run_number_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_run_number) << std::setfill('0') << run_number;
work_oss << "_";
if (m_config_params.mode == "one-event-per-file") {

work_oss << m_config_params.filename_parameters.trigger_number_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_trigger_number) << std::setfill('0')
<< record_number;
} else if (m_config_params.mode == "all-per-file") {
work_oss << m_config_params.filename_parameters.file_index_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0')
<< m_file_index;

work_oss << m_config_params.filename_parameters.file_index_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0')
<< m_file_index;
}
work_oss << "_" << m_config_params.filename_parameters.writer_identifier;
work_oss << ".hdf5";
return work_oss.str();
}

void increment_file_index_if_needed(size_t size_of_next_write)
bool increment_file_index_if_needed(size_t size_of_next_write)
{
if ((m_recorded_size + size_of_next_write) > m_max_file_size && m_recorded_size > 0) {
++m_file_index;
m_recorded_size = 0;
return true;
}
return false;
}

void open_file_if_needed(const std::string& file_name, unsigned open_flags = HighFive::File::ReadOnly)
Expand Down
54 changes: 49 additions & 5 deletions plugins/TPStreamWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ TPStreamWriter::get_info(opmonlib::InfoCollector& ci, int /*level*/)
{
tpstreamwriterinfo::Info info;

info.tpset_received = m_tpset_received.exchange(0);
info.tpset_written = m_tpset_written.exchange(0);
info.heartbeat_tpsets_received = m_heartbeat_tpsets.exchange(0);
info.tpsets_with_tps_received = m_tpsets_with_tps.exchange(0);
info.tps_received = m_tps_received.exchange(0);
info.tps_written = m_tps_written.exchange(0);
info.timeslices_written = m_timeslices_written.exchange(0);
info.bytes_output = m_bytes_output.exchange(0);
info.tardy_timeslice_max_seconds = m_tardy_timeslice_max_seconds.exchange(0.0);
info.total_tps_received = m_total_tps_received.load();
info.total_tps_written = m_total_tps_written.load();

ci.add(info);
}
Expand All @@ -76,6 +82,8 @@ TPStreamWriter::do_conf(const data_t& payload)
m_accumulation_inactivity_time_before_write =
std::chrono::milliseconds(static_cast<int>(1000*conf_params.tp_accumulation_inactivity_time_before_write_sec));
m_source_id = conf_params.source_id;
warn_user_when_tardy_tps_are_discarded = conf_params.warn_user_when_tardy_tps_are_discarded;
m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0;

// create the DataStore instance here
try {
Expand All @@ -98,6 +106,8 @@ TPStreamWriter::do_start(const nlohmann::json& payload)
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
m_run_number = start_params.run;
m_total_tps_received.store(0);
m_total_tps_written.store(0);

// 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run.
// I've put this call fairly early in this method because it could throw an
Expand Down Expand Up @@ -159,15 +169,17 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write);

bool possible_pending_data = true;
size_t largest_timeslice_number = 0;
while (running_flag.load() || possible_pending_data) {
trigger::TPSet tpset;
try {
tpset = m_tpset_source->receive(m_queue_timeout);
++n_tpset_received;
++m_tpset_received;

if (tpset.type == trigger::TPSet::Type::kHeartbeat)
if (tpset.type == trigger::TPSet::Type::kHeartbeat) {
++m_heartbeat_tpsets;
continue;
}

TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin
<< ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is "
Expand All @@ -181,8 +193,12 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
<< m_run_number << "), Source ID is " << tpset.origin << ", seqno is " << tpset.seqno;
continue;
}
++m_tpsets_with_tps;

size_t num_tps_in_tpset = tpset.objects.size();
tp_bundle_handler.add_tpset(std::move(tpset));
m_tps_received += num_tps_in_tpset;
m_total_tps_received += num_tps_in_tpset;
} catch (iomanager::TimeoutExpired&) {
if (running_flag.load()) {continue;}
}
Expand All @@ -194,6 +210,13 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices();
possible_pending_data = false;
}

// keep track of the largest timeslice number (for reporting on tardy ones)
for (auto& timeslice_ptr : list_of_timeslices) {
largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number);
}

// attempt to write out each TimeSlice
for (auto& timeslice_ptr : list_of_timeslices) {
daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id);
timeslice_ptr->set_element_id(sid);
Expand All @@ -205,8 +228,11 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
should_retry = false;
try {
m_data_writer->write(*timeslice_ptr);
++m_tpset_written;
++m_timeslices_written;
m_bytes_output += timeslice_ptr->get_total_size_bytes();
size_t number_of_tps_written = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive));
m_tps_written += number_of_tps_written;
m_total_tps_written += number_of_tps_written;
} catch (const RetryableDataStoreProblem& excpt) {
should_retry = true;
ers::error(DataWritingProblem(ERS_HERE,
Expand All @@ -219,6 +245,24 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
}
usleep(retry_wait_usec);
retry_wait_usec *= 2;
} catch (const IgnorableDataStoreProblem& excpt) {
int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number;
double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff;
m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late);
if (warn_user_when_tardy_tps_are_discarded) {
std::ostringstream sid_list;
bool first_frag = true;
for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) {
if (first_frag) {first_frag = false;}
else {sid_list << ",";}
sid_list << frag_ptr->get_element_id().to_string();
}
ers::warning(TardyTPsDiscarded(ERS_HERE,
get_name(),
sid_list.str(),
timeslice_ptr->get_header().timeslice_number,
seconds_too_late));
}
} catch (const std::exception& excpt) {
ers::warning(DataWritingProblem(ERS_HERE,
get_name(),
Expand Down
Loading

0 comments on commit aba48e0

Please sign in to comment.