diff --git a/CMakeLists.txt b/CMakeLists.txt index c7a5c03..fd5f463 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(dfmodules VERSION 2.14.2) +project(dfmodules VERSION 2.14.3) find_package(daq-cmake REQUIRED) daq_setup_environment() diff --git a/include/dfmodules/DataStore.hpp b/include/dfmodules/DataStore.hpp index c75f197..b78e4f8 100644 --- a/include/dfmodules/DataStore.hpp +++ b/include/dfmodules/DataStore.hpp @@ -75,6 +75,17 @@ ERS_DECLARE_ISSUE(dfmodules, ((std::string)mod_name)((std::string)description)) /// @endcond LCOV_EXCL_STOP +/** + * @brief An ERS Issue for DataStore problems in which it is + * reasonable to skip any warning or error message. + * @cond Doxygen doesn't like ERS macros LCOV_EXCL_START + */ +ERS_DECLARE_ISSUE(dfmodules, + IgnorableDataStoreProblem, + "Module " << mod_name << ": A problem was encountered when " << description, + ((std::string)mod_name)((std::string)description)) +/// @endcond LCOV_EXCL_STOP + /** * @brief An ERS Issue for DataStore problems in which it is * not clear whether retrying the operation might succeed or not. diff --git a/integtest/disabled_output_test.py b/integtest/disabled_output_test.py index 6c50e91..8b17f16 100644 --- a/integtest/disabled_output_test.py +++ b/integtest/disabled_output_test.py @@ -61,12 +61,12 @@ triggeractivity_frag_params={"fragment_type_description": "Trigger Activity", "fragment_type": "Trigger_Activity", "hdf5_source_subsystem": "Trigger", - "expected_fragment_count": 1, + "expected_fragment_count": 3, "min_size_bytes": 72, "max_size_bytes": 216} triggertp_frag_params={"fragment_type_description": "Trigger with TPs", "fragment_type": "Trigger_Primitive", "hdf5_source_subsystem": "Trigger", - "expected_fragment_count": 2, # number of readout apps (1) times 2 + "expected_fragment_count": 3, # number of readout apps (1) times 3, one per plane "min_size_bytes": 72, "max_size_bytes": 16000} hsi_frag_params ={"fragment_type_description": "HSI", "fragment_type": "Hardware_Signal", @@ -116,10 +116,10 @@ # The commands to run in nanorc, as a list nanorc_command_list="integtest-partition boot conf".split() -nanorc_command_list+="start_run --disable-data-storage 101 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split() -nanorc_command_list+="start_run 102 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split() -nanorc_command_list+="start_run --disable-data-storage 103 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split() -nanorc_command_list+="start_run 104 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split() +nanorc_command_list+="start_run --disable-data-storage --wait 2 101 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split() +nanorc_command_list+="start_run --wait 2 102 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split() +nanorc_command_list+="start_run --disable-data-storage --wait 2 103 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split() +nanorc_command_list+="start_run --wait 2 104 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split() nanorc_command_list+="scrap terminate".split() # The tests themselves diff --git a/integtest/multi_output_file_test.py b/integtest/multi_output_file_test.py index 2eb5fc2..cb141cd 100644 --- a/integtest/multi_output_file_test.py +++ b/integtest/multi_output_file_test.py @@ -55,12 +55,12 @@ triggeractivity_frag_params={"fragment_type_description": "Trigger Activity", "fragment_type": "Trigger_Activity", "hdf5_source_subsystem": "Trigger", - "expected_fragment_count": number_of_readout_apps, + "expected_fragment_count": (3*number_of_readout_apps), "min_size_bytes": 72, "max_size_bytes": 520} triggertp_frag_params={"fragment_type_description": "Trigger with TPs", "fragment_type": "Trigger_Primitive", "hdf5_source_subsystem": "Trigger", - "expected_fragment_count": (2*number_of_readout_apps), + "expected_fragment_count": (3*number_of_readout_apps), "min_size_bytes": 72, "max_size_bytes": 16000} hsi_frag_params ={"fragment_type_description": "HSI", "fragment_type": "Hardware_Signal", @@ -132,7 +132,7 @@ } # The commands to run in nanorc, as a list -nanorc_command_list="integtest-partition boot conf start_run 101 wait 180 disable_triggers wait 2 stop_run wait 21 start_run 102 wait 120 disable_triggers wait 2 stop_run wait 21 scrap terminate".split() +nanorc_command_list="integtest-partition boot conf start_run --wait 3 101 wait 180 disable_triggers wait 2 stop_run wait 21 start_run --wait 3 102 wait 120 disable_triggers wait 2 stop_run wait 21 scrap terminate".split() # The tests themselves diff --git a/plugins/HDF5DataStore.hpp b/plugins/HDF5DataStore.hpp index 25d6a8b..53ceb54 100644 --- a/plugins/HDF5DataStore.hpp +++ b/plugins/HDF5DataStore.hpp @@ -135,6 +135,7 @@ class HDF5DataStore : public DataStore m_file_index = 0; m_recorded_size = 0; + m_current_record_number = std::numeric_limits::max(); if (m_operation_mode != "one-event-per-file" //&& m_operation_mode != "one-fragment-per-file" @@ -162,7 +163,7 @@ class HDF5DataStore : public DataStore virtual void write(const daqdataformats::TriggerRecord& tr) { - // check if there is sufficient space for this data block + // check if there is sufficient space for this record size_t current_free_space = get_free_space(m_path); size_t tr_size = tr.get_total_size_bytes(); if (current_free_space < (m_free_space_safety_factor_for_write * tr_size)) { @@ -178,12 +179,19 @@ class HDF5DataStore : public DataStore throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue); } - // check if a new file should be opened for this data block - increment_file_index_if_needed(tr_size); + // check if a new file should be opened for this record + if (! increment_file_index_if_needed(tr_size)) { + if (m_config_params.mode == "one-event-per-file") { + if (m_current_record_number != std::numeric_limits::max() && + tr.get_header_ref().get_trigger_number() != m_current_record_number) { + ++m_file_index; + } + } + } + m_current_record_number = tr.get_header_ref().get_trigger_number(); // determine the filename from Storage Key + configuration parameters - std::string full_filename = - get_file_name(tr.get_header_ref().get_trigger_number(), tr.get_header_ref().get_run_number()); + std::string full_filename = get_file_name(tr.get_header_ref().get_run_number()); try { open_file_if_needed(full_filename, HighFive::File::OpenOrCreate); @@ -194,7 +202,7 @@ class HDF5DataStore : public DataStore throw FileOperationProblem(ERS_HERE, get_name(), full_filename); } - // write the data block + // write the record m_file_handle->write(tr); m_recorded_size = m_file_handle->get_recorded_size(); } @@ -209,7 +217,7 @@ class HDF5DataStore : public DataStore virtual void write(const daqdataformats::TimeSlice& ts) { - // check if there is sufficient space for this data block + // check if there is sufficient space for this record size_t current_free_space = get_free_space(m_path); size_t ts_size = ts.get_total_size_bytes(); if (current_free_space < (m_free_space_safety_factor_for_write * ts_size)) { @@ -225,11 +233,19 @@ class HDF5DataStore : public DataStore throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue); } - // check if a new file should be opened for this data block - increment_file_index_if_needed(ts_size); + // check if a new file should be opened for this record + if (! increment_file_index_if_needed(ts_size)) { + if (m_config_params.mode == "one-event-per-file") { + if (m_current_record_number != std::numeric_limits::max() && + ts.get_header().timeslice_number != m_current_record_number) { + ++m_file_index; + } + } + } + m_current_record_number = ts.get_header().timeslice_number; // determine the filename from Storage Key + configuration parameters - std::string full_filename = get_file_name(ts.get_header().timeslice_number, ts.get_header().run_number); + std::string full_filename = get_file_name(ts.get_header().run_number); try { open_file_if_needed(full_filename, HighFive::File::OpenOrCreate); @@ -240,13 +256,18 @@ class HDF5DataStore : public DataStore throw FileOperationProblem(ERS_HERE, get_name(), full_filename); } - // write the data block - m_file_handle->write(ts); - m_recorded_size = m_file_handle->get_recorded_size(); + // write the record + try { + m_file_handle->write(ts); + m_recorded_size = m_file_handle->get_recorded_size(); + } catch (hdf5libs::TimeSliceAlreadyExists const& excpt) { + std::string msg = "writing a time slice to file " + m_file_handle->get_file_name(); + throw IgnorableDataStoreProblem(ERS_HERE, get_name(), msg, excpt); + } } /** - * @brief Informs the HDF5DataStore that writes or reads of data blocks + * @brief Informs the HDF5DataStore that writes or reads of records * associated with the specified run number will soon be requested. * This allows the DataStore to test that the output file path is valid * and any other checks that are useful in advance of the first data @@ -280,10 +301,11 @@ class HDF5DataStore : public DataStore m_file_index = 0; m_recorded_size = 0; + m_current_record_number = std::numeric_limits::max(); } /** - * @brief Informs the HD5DataStore that writes or reads of data blocks + * @brief Informs the HD5DataStore that writes or reads of records * associated with the specified run number have finished, for now. * This allows the DataStore to close open files and do any other * cleanup or shutdown operations that are useful once the writes or @@ -324,8 +346,13 @@ class HDF5DataStore : public DataStore // Total number of generated files size_t m_file_index; - // Total size of data being written - size_t m_recorded_size; + // Size of data being written + size_t m_recorded_size; // per file + + // Record number for the record that is currently being written out + // This is only useful for long-readout windows, in which there may + // be multiple calls to write() + size_t m_current_record_number; // Configuration hdf5datastore::ConfParams m_config_params; @@ -340,8 +367,7 @@ class HDF5DataStore : public DataStore /** * @brief Translates the specified input parameters into the appropriate filename. */ - std::string get_file_name(uint64_t record_number, // NOLINT(build/unsigned) - daqdataformats::run_number_t run_number) + std::string get_file_name(daqdataformats::run_number_t run_number) { std::ostringstream work_oss; work_oss << m_config_params.directory_path; @@ -356,28 +382,24 @@ class HDF5DataStore : public DataStore work_oss << m_config_params.filename_parameters.run_number_prefix; work_oss << std::setw(m_config_params.filename_parameters.digits_for_run_number) << std::setfill('0') << run_number; work_oss << "_"; - if (m_config_params.mode == "one-event-per-file") { - work_oss << m_config_params.filename_parameters.trigger_number_prefix; - work_oss << std::setw(m_config_params.filename_parameters.digits_for_trigger_number) << std::setfill('0') - << record_number; - } else if (m_config_params.mode == "all-per-file") { + work_oss << m_config_params.filename_parameters.file_index_prefix; + work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0') + << m_file_index; - work_oss << m_config_params.filename_parameters.file_index_prefix; - work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0') - << m_file_index; - } work_oss << "_" << m_config_params.filename_parameters.writer_identifier; work_oss << ".hdf5"; return work_oss.str(); } - void increment_file_index_if_needed(size_t size_of_next_write) + bool increment_file_index_if_needed(size_t size_of_next_write) { if ((m_recorded_size + size_of_next_write) > m_max_file_size && m_recorded_size > 0) { ++m_file_index; m_recorded_size = 0; + return true; } + return false; } void open_file_if_needed(const std::string& file_name, unsigned open_flags = HighFive::File::ReadOnly) diff --git a/plugins/TPStreamWriter.cpp b/plugins/TPStreamWriter.cpp index 9b81c99..0432569 100644 --- a/plugins/TPStreamWriter.cpp +++ b/plugins/TPStreamWriter.cpp @@ -60,9 +60,15 @@ TPStreamWriter::get_info(opmonlib::InfoCollector& ci, int /*level*/) { tpstreamwriterinfo::Info info; - info.tpset_received = m_tpset_received.exchange(0); - info.tpset_written = m_tpset_written.exchange(0); + info.heartbeat_tpsets_received = m_heartbeat_tpsets.exchange(0); + info.tpsets_with_tps_received = m_tpsets_with_tps.exchange(0); + info.tps_received = m_tps_received.exchange(0); + info.tps_written = m_tps_written.exchange(0); + info.timeslices_written = m_timeslices_written.exchange(0); info.bytes_output = m_bytes_output.exchange(0); + info.tardy_timeslice_max_seconds = m_tardy_timeslice_max_seconds.exchange(0.0); + info.total_tps_received = m_total_tps_received.load(); + info.total_tps_written = m_total_tps_written.load(); ci.add(info); } @@ -76,6 +82,8 @@ TPStreamWriter::do_conf(const data_t& payload) m_accumulation_inactivity_time_before_write = std::chrono::milliseconds(static_cast(1000*conf_params.tp_accumulation_inactivity_time_before_write_sec)); m_source_id = conf_params.source_id; + warn_user_when_tardy_tps_are_discarded = conf_params.warn_user_when_tardy_tps_are_discarded; + m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0; // create the DataStore instance here try { @@ -98,6 +106,8 @@ TPStreamWriter::do_start(const nlohmann::json& payload) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; rcif::cmd::StartParams start_params = payload.get(); m_run_number = start_params.run; + m_total_tps_received.store(0); + m_total_tps_written.store(0); // 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run. // I've put this call fairly early in this method because it could throw an @@ -159,15 +169,17 @@ TPStreamWriter::do_work(std::atomic& running_flag) TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write); bool possible_pending_data = true; + size_t largest_timeslice_number = 0; while (running_flag.load() || possible_pending_data) { trigger::TPSet tpset; try { tpset = m_tpset_source->receive(m_queue_timeout); ++n_tpset_received; - ++m_tpset_received; - if (tpset.type == trigger::TPSet::Type::kHeartbeat) + if (tpset.type == trigger::TPSet::Type::kHeartbeat) { + ++m_heartbeat_tpsets; continue; + } TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin << ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is " @@ -181,8 +193,12 @@ TPStreamWriter::do_work(std::atomic& running_flag) << m_run_number << "), Source ID is " << tpset.origin << ", seqno is " << tpset.seqno; continue; } + ++m_tpsets_with_tps; + size_t num_tps_in_tpset = tpset.objects.size(); tp_bundle_handler.add_tpset(std::move(tpset)); + m_tps_received += num_tps_in_tpset; + m_total_tps_received += num_tps_in_tpset; } catch (iomanager::TimeoutExpired&) { if (running_flag.load()) {continue;} } @@ -194,6 +210,13 @@ TPStreamWriter::do_work(std::atomic& running_flag) list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices(); possible_pending_data = false; } + + // keep track of the largest timeslice number (for reporting on tardy ones) + for (auto& timeslice_ptr : list_of_timeslices) { + largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number); + } + + // attempt to write out each TimeSlice for (auto& timeslice_ptr : list_of_timeslices) { daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id); timeslice_ptr->set_element_id(sid); @@ -205,8 +228,11 @@ TPStreamWriter::do_work(std::atomic& running_flag) should_retry = false; try { m_data_writer->write(*timeslice_ptr); - ++m_tpset_written; + ++m_timeslices_written; m_bytes_output += timeslice_ptr->get_total_size_bytes(); + size_t number_of_tps_written = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive)); + m_tps_written += number_of_tps_written; + m_total_tps_written += number_of_tps_written; } catch (const RetryableDataStoreProblem& excpt) { should_retry = true; ers::error(DataWritingProblem(ERS_HERE, @@ -219,6 +245,24 @@ TPStreamWriter::do_work(std::atomic& running_flag) } usleep(retry_wait_usec); retry_wait_usec *= 2; + } catch (const IgnorableDataStoreProblem& excpt) { + int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number; + double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff; + m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late); + if (warn_user_when_tardy_tps_are_discarded) { + std::ostringstream sid_list; + bool first_frag = true; + for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) { + if (first_frag) {first_frag = false;} + else {sid_list << ",";} + sid_list << frag_ptr->get_element_id().to_string(); + } + ers::warning(TardyTPsDiscarded(ERS_HERE, + get_name(), + sid_list.str(), + timeslice_ptr->get_header().timeslice_number, + seconds_too_late)); + } } catch (const std::exception& excpt) { ers::warning(DataWritingProblem(ERS_HERE, get_name(), diff --git a/plugins/TPStreamWriter.hpp b/plugins/TPStreamWriter.hpp index 49935f1..4d44f67 100644 --- a/plugins/TPStreamWriter.hpp +++ b/plugins/TPStreamWriter.hpp @@ -14,8 +14,8 @@ #include "dfmodules/DataStore.hpp" #include "appfwk/DAQModule.hpp" -#include "iomanager/Receiver.hpp" #include "daqdataformats/TimeSlice.hpp" +#include "iomanager/Receiver.hpp" #include "trigger/TPSet.hpp" #include "utilities/WorkerThread.hpp" @@ -63,6 +63,8 @@ class TPStreamWriter : public dunedaq::appfwk::DAQModule std::chrono::steady_clock::duration m_accumulation_inactivity_time_before_write; daqdataformats::run_number_t m_run_number; uint32_t m_source_id; // NOLINT(build/unsigned) + bool warn_user_when_tardy_tps_are_discarded; + double m_accumulation_interval_seconds; // Queue sources and sinks using incoming_t = trigger::TPSet; @@ -73,10 +75,15 @@ class TPStreamWriter : public dunedaq::appfwk::DAQModule std::unique_ptr m_data_writer; // Metrics - std::atomic m_tpset_received = { 0 }; // NOLINT(build/unsigned) - std::atomic m_tpset_written = { 0 }; // NOLINT(build/unsigned) - std::atomic m_bytes_output = { 0 }; // NOLINT(build/unsigned) - + std::atomic m_heartbeat_tpsets = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tpsets_with_tps = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tps_received = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tps_written = { 0 }; // NOLINT(build/unsigned) + std::atomic m_timeslices_written = { 0 }; // NOLINT(build/unsigned) + std::atomic m_bytes_output = { 0 }; // NOLINT(build/unsigned) + std::atomic m_tardy_timeslice_max_seconds = { 0.0 }; // NOLINT(build/unsigned) + std::atomic m_total_tps_received = { 0 }; // NOLINT(build/unsigned) + std::atomic m_total_tps_written = { 0 }; // NOLINT(build/unsigned) }; } // namespace dfmodules @@ -95,6 +102,14 @@ ERS_DECLARE_ISSUE_BASE(dfmodules, ((std::string)name), ((size_t)trnum)((size_t)runnum)) +ERS_DECLARE_ISSUE_BASE(dfmodules, + TardyTPsDiscarded, + appfwk::GeneralDAQModuleIssue, + "Tardy TPs from SourceIDs [" << sid_list << "] were discarded from TimeSlice number " + << trnum << " (~" << sec_too_late << " sec too late)", + ((std::string)name), + ((std::string)sid_list)((size_t)trnum)((float)sec_too_late)) + } // namespace dunedaq #endif // DFMODULES_PLUGINS_TPSTREAMWRITER_HPP_ diff --git a/schema/dfmodules/info/tpstreamwriterinfo.jsonnet b/schema/dfmodules/info/tpstreamwriterinfo.jsonnet index ca976e4..51526ca 100644 --- a/schema/dfmodules/info/tpstreamwriterinfo.jsonnet +++ b/schema/dfmodules/info/tpstreamwriterinfo.jsonnet @@ -8,10 +8,18 @@ local s = moo.oschema.schema("dunedaq.dfmodules.tpstreamwriterinfo"); local info = { uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), + float4 : s.number("float4", "f4", doc="A float of 4 bytes"), + info: s.record("Info", [ - s.field("tpset_received", self.uint8, 0, doc="incremental received tpset counter"), - s.field("tpset_written", self.uint8, 0, doc="incremental written tpset counter"), - s.field("bytes_output", self.uint8, 0, doc="incremental number of bytes that have been written out"), + s.field("heartbeat_tpsets_received", self.uint8, 0, doc="incremental count of heartbeat TPSets received"), + s.field("tpsets_with_tps_received", self.uint8, 0, doc="incremental count of TPSets received that contain TPs"), + s.field("tps_received", self.uint8, 0, doc="incremental count of TPs that have been received"), + s.field("tps_written", self.uint8, 0, doc="incremental count of TPs that have been written out"), + s.field("timeslices_written", self.uint8, 0, doc="incremental count of TimeSlices that have been written out"), + s.field("bytes_output", self.uint8, 0, doc="incremental number of bytes that have been written out"), + s.field("tardy_timeslice_max_seconds", self.float4, 0, doc="incremental max amount of time that a TimeSlice was tardy"), + s.field("total_tps_received", self.uint8, 0, doc="count of TPs that have been received in the current run"), + s.field("total_tps_written", self.uint8, 0, doc="count of TPs that have been written out in the current run"), ], doc="TPSet writer information") }; diff --git a/schema/dfmodules/tpstreamwriter.jsonnet b/schema/dfmodules/tpstreamwriter.jsonnet index 8130c40..75c111c 100644 --- a/schema/dfmodules/tpstreamwriter.jsonnet +++ b/schema/dfmodules/tpstreamwriter.jsonnet @@ -11,6 +11,8 @@ local types = { float : s.number("Float", "f4", doc="A floating point number of 4 bytes"), + flag: s.boolean("Flag", doc="Parameter that can be used to enable or disable functionality"), + conf: s.record("ConfParams", [ s.field("tp_accumulation_interval_ticks", self.size, 62500000, doc="Size of the TP accumulation window, measured in clock ticks"), @@ -19,6 +21,8 @@ local types = { s.field("data_store_parameters", self.dsparams, doc="Parameters that configure the DataStore associated with this TPStreamWriter"), s.field("source_id", self.sourceid_number, 999, doc="Source ID of TPSW instance, added to time slice header"), + s.field("warn_user_when_tardy_tps_are_discarded", self.flag, true, + doc="Whether to warn users when TimeSlices that contain tardy TPs are discarded"), ], doc="TPStreamWriter configuration parameters"), };