diff --git a/plugins/DataWriterModule.cpp b/plugins/DataWriterModule.cpp index 91b735fc..9b6f57c0 100644 --- a/plugins/DataWriterModule.cpp +++ b/plugins/DataWriterModule.cpp @@ -8,6 +8,7 @@ #include "DataWriterModule.hpp" #include "dfmodules/CommonIssues.hpp" +#include "dfmodules/opmon/DataWriter.pb.h" #include "confmodel/Application.hpp" #include "confmodel/Session.hpp" @@ -118,21 +119,21 @@ DataWriterModule::init(std::shared_ptr mcfg) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } -// void -// DataWriterModule::get_info(opmonlib::InfoCollector& ci, int /*level*/) -// { -// datawriterinfo::Info dwi; +void +DataWriterModule::generate_opmon_data() { + + opmon::DataWriterInfo dwi; -// dwi.records_received = m_records_received_tot.load(); + dwi.set_records_received(m_records_received_tot.load()); // dwi.new_records_received = m_records_received.exchange(0); -// dwi.records_written = m_records_written_tot.load(); -// dwi.new_records_written = m_records_written.exchange(0); -// dwi.bytes_output = m_bytes_output_tot.load(); -// dwi.new_bytes_output = m_bytes_output.exchange(0); -// dwi.writing_time = m_writing_ms.exchange(0); - -// ci.add(dwi); -// } + dwi.set_records_written(m_records_written_tot.load()); + dwi.set_new_records_written(m_records_written.exchange(0)); +// dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage +// dwi.new_bytes_output = m_bytes_output.exchange(0); + dwi.set_writing_time_us(m_writing_us.exchange(0)); + + publish(std::move(dwi)); +} void DataWriterModule::do_conf(const data_t&) @@ -333,8 +334,8 @@ DataWriterModule::receive_trigger_record(std::unique_ptr(end_time - start_time); - m_writing_ms += writing_time.count(); + auto writing_time = std::chrono::duration_cast(end_time - start_time); + m_writing_us += writing_time.count(); } // if m_data_storage_is_enabled } diff --git a/plugins/DataWriterModule.hpp b/plugins/DataWriterModule.hpp index 4cd24461..f85e6c8c 100644 --- a/plugins/DataWriterModule.hpp +++ b/plugins/DataWriterModule.hpp @@ -46,7 +46,7 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule DataWriterModule& operator=(DataWriterModule&&) = delete; ///< DataWriterModule is not move-assignable void init(std::shared_ptr mcfg) override; - // void get_info(opmonlib::InfoCollector& ci, int level) override; + void generate_opmon_data() override; private: // Commands @@ -93,7 +93,7 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule std::atomic m_records_written_tot = { 0 }; // NOLINT(build/unsigned) std::atomic m_bytes_output = { 0 }; // NOLINT(build/unsigned) std::atomic m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned) - std::atomic m_writing_ms = { 0 }; // NOLINT(build/unsigned) + std::atomic m_writing_us = { 0 }; // NOLINT(build/unsigned) // Other diff --git a/plugins/TRBModule.cpp b/plugins/TRBModule.cpp index 22c54875..8c2d4bea 100644 --- a/plugins/TRBModule.cpp +++ b/plugins/TRBModule.cpp @@ -261,15 +261,6 @@ TRBModule::generate_opmon_data() i.set_pending_trigger_decisions(m_trigger_decisions_counter.load()); i.set_fragments_in_the_book(m_fragment_counter.load()); i.set_pending_fragments(m_pending_fragment_counter.load()); - - // error counters - i.set_timed_out_trigger_records(m_timed_out_trigger_records.load()); - i.set_abandoned_trigger_records(m_abandoned_trigger_records.load()); - i.set_unexpected_fragments(m_unexpected_fragments.load()); - i.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load()); - i.set_lost_fragments(m_lost_fragments.load()); - i.set_invalid_requests(m_invalid_requests.load()); - i.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load()); // operation metrics i.set_received_trigger_decisions(m_received_trigger_decisions.exchange(0)); @@ -284,6 +275,19 @@ TRBModule::generate_opmon_data() i.set_sent_trmon(m_trmon_sent_counter.exchange(0)); publish(std::move(i)); + + opmon::TRBErrors err; + // error counters + err.set_timed_out_trigger_records(m_timed_out_trigger_records.load()); + err.set_abandoned_trigger_records(m_abandoned_trigger_records.load()); + err.set_unexpected_fragments(m_unexpected_fragments.load()); + err.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load()); + err.set_lost_fragments(m_lost_fragments.load()); + err.set_invalid_requests(m_invalid_requests.load()); + err.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load()); + + publish(std::move(err)); + } void @@ -291,7 +295,7 @@ TRBModule::do_conf(const data_t&) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method"; - m_trigger_timeout = duration_type(m_trb_conf->get_trigger_record_timeout_ms()); + m_trigger_timeout = std::chrono::milliseconds(m_trb_conf->get_trigger_record_timeout_ms()); m_loop_sleep = m_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_queues_timeout()); diff --git a/schema/dfmodules/info/fakedataprodinfo.jsonnet b/schema/dfmodules/info/fakedataprodinfo.jsonnet deleted file mode 100644 index b395572b..00000000 --- a/schema/dfmodules/info/fakedataprodinfo.jsonnet +++ /dev/null @@ -1,17 +0,0 @@ -// This is the application info schema used by the fake data producer. -// It describes the information object structure passed by the application -// for operational monitoring - -local moo = import "moo.jsonnet"; -local s = moo.oschema.schema("dunedaq.dfmodules.fakedataprodinfo"); - -local info = { - uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), - - info: s.record("Info", [ - s.field("requests_received", self.uint8, 0, doc="Number of received requests"), - s.field("fragments_sent", self.uint8, 0, doc="Number of sent fragments"), - ], doc="Data writer information") -}; - -moo.oschema.sort_select(info) \ No newline at end of file diff --git a/schema/dfmodules/opmon/DataWriter.proto b/schema/dfmodules/opmon/DataWriter.proto new file mode 100644 index 00000000..68da8b26 --- /dev/null +++ b/schema/dfmodules/opmon/DataWriter.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package dunedaq.dfmodules.opmon; + +message DataWriterInfo { + + uint64 records_received = 1; + uint64 records_written = 2; + uint64 new_records_written = 3; + uint64 writing_time_us = 10; // in us + +} + diff --git a/schema/dfmodules/opmon/FakeDataProdModule.proto b/schema/dfmodules/opmon/FakeDataProdModule.proto new file mode 100644 index 00000000..400f148e --- /dev/null +++ b/schema/dfmodules/opmon/FakeDataProdModule.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package dunedaq.dfmodules.opmon; + +message FakeDataProdInfo { + + uint64 requests_received = 1; + uint64 fragments_sent = 2; + +} \ No newline at end of file diff --git a/schema/dfmodules/opmon/TRBModule.proto b/schema/dfmodules/opmon/TRBModule.proto index 8bed24bb..ce2025eb 100644 --- a/schema/dfmodules/opmon/TRBModule.proto +++ b/schema/dfmodules/opmon/TRBModule.proto @@ -9,16 +9,6 @@ message TRBInfo { uint64 fragments_in_the_book = 2; // Present number of fragments in the book uint64 pending_fragments = 3; // Fragments to be expected based on the TR in the book - // error counters. These quantities are reset at start and not at regualar intervals - // hence they are about a whole run. - uint64 timed_out_trigger_records = 10; // Number of timed out triggers - uint64 unexpected_fragments = 11; // Number of unexpected fragments - uint64 unexpected_trigger_decisions = 12; // Number of unexpected trigger decisions - uint64 abandoned_trigger_records = 13; // Number of trigger records that failed to send to writing - uint64 lost_fragments = 14; // Number of fragments that not stored in a file - uint64 invalid_requests = 15; // Number of requests with unknown SourceID - uint64 duplicated_trigger_ids =16; // Number of TR not created because redundant - // operation metrics uint64 received_trigger_decisions = 20; // Number of valid trigger decisions received in the run uint64 generated_trigger_records = 21; // Number of trigger records produced @@ -31,4 +21,18 @@ message TRBInfo { uint64 received_trmon_requests = 28; // Number of requests coming from DQM uint64 sent_trmon = 29; // Number of TRs sent to DQM +} + +message TRBErrors { + + // error counters. These quantities are reset at start and not at regualar intervals + // hence they are about a whole run. + uint64 timed_out_trigger_records = 1; // Number of timed out triggers + uint64 unexpected_fragments = 2; // Number of unexpected fragments + uint64 unexpected_trigger_decisions = 3; // Number of unexpected trigger decisions + uint64 abandoned_trigger_records = 4; // Number of trigger records that failed to send to writing + uint64 lost_fragments = 5; // Number of fragments that not stored in a file + uint64 invalid_requests = 6; // Number of requests with unknown SourceID + uint64 duplicated_trigger_ids = 7; // Number of TR not created because redundant + } \ No newline at end of file diff --git a/src/TriggerRecordBuilderData.cpp b/src/TriggerRecordBuilderData.cpp index 8ccb410b..d159eab9 100644 --- a/src/TriggerRecordBuilderData.cpp +++ b/src/TriggerRecordBuilderData.cpp @@ -195,10 +195,12 @@ TriggerRecordBuilderData::generate_opmon_data() if ( completed_trigger_records > 0 ) { m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0)); // in seconds } - - // prediction rate metrics - info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time ); + if ( m_last_average_time > 0. ) { + // prediction rate metrics + info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time ); + } + publish(std::move(info)); }