Skip to content

Commit

Permalink
Merge pull request #375 from DUNE-DAQ/mroda/opmon
Browse files Browse the repository at this point in the history
Correct metric
  • Loading branch information
mroda88 authored Aug 23, 2024
2 parents eab8ce9 + b8f3d47 commit f7d882e
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 57 deletions.
31 changes: 16 additions & 15 deletions plugins/DataWriterModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "DataWriterModule.hpp"
#include "dfmodules/CommonIssues.hpp"
#include "dfmodules/opmon/DataWriter.pb.h"

#include "confmodel/Application.hpp"
#include "confmodel/Session.hpp"
Expand Down Expand Up @@ -118,21 +119,21 @@ DataWriterModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
}

// void
// DataWriterModule::get_info(opmonlib::InfoCollector& ci, int /*level*/)
// {
// datawriterinfo::Info dwi;
void
DataWriterModule::generate_opmon_data() {

opmon::DataWriterInfo dwi;

// dwi.records_received = m_records_received_tot.load();
dwi.set_records_received(m_records_received_tot.load());
// dwi.new_records_received = m_records_received.exchange(0);
// dwi.records_written = m_records_written_tot.load();
// dwi.new_records_written = m_records_written.exchange(0);
// dwi.bytes_output = m_bytes_output_tot.load();
// dwi.new_bytes_output = m_bytes_output.exchange(0);
// dwi.writing_time = m_writing_ms.exchange(0);

// ci.add(dwi);
// }
dwi.set_records_written(m_records_written_tot.load());
dwi.set_new_records_written(m_records_written.exchange(0));
// dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
// dwi.new_bytes_output = m_bytes_output.exchange(0);
dwi.set_writing_time_us(m_writing_us.exchange(0));

publish(std::move(dwi));
}

void
DataWriterModule::do_conf(const data_t&)
Expand Down Expand Up @@ -333,8 +334,8 @@ DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::Trigger
} while (should_retry && m_running.load());

std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
std::chrono::milliseconds writing_time = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
m_writing_ms += writing_time.count();
auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
m_writing_us += writing_time.count();
} // if m_data_storage_is_enabled
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/DataWriterModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule
DataWriterModule& operator=(DataWriterModule&&) = delete; ///< DataWriterModule is not move-assignable

void init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg) override;
// void get_info(opmonlib::InfoCollector& ci, int level) override;
void generate_opmon_data() override;

private:
// Commands
Expand Down Expand Up @@ -93,7 +93,7 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule
std::atomic<uint64_t> m_records_written_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_writing_ms = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_writing_us = { 0 }; // NOLINT(build/unsigned)


// Other
Expand Down
24 changes: 14 additions & 10 deletions plugins/TRBModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,6 @@ TRBModule::generate_opmon_data()
i.set_pending_trigger_decisions(m_trigger_decisions_counter.load());
i.set_fragments_in_the_book(m_fragment_counter.load());
i.set_pending_fragments(m_pending_fragment_counter.load());

// error counters
i.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
i.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
i.set_unexpected_fragments(m_unexpected_fragments.load());
i.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
i.set_lost_fragments(m_lost_fragments.load());
i.set_invalid_requests(m_invalid_requests.load());
i.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());

// operation metrics
i.set_received_trigger_decisions(m_received_trigger_decisions.exchange(0));
Expand All @@ -284,14 +275,27 @@ TRBModule::generate_opmon_data()
i.set_sent_trmon(m_trmon_sent_counter.exchange(0));

publish(std::move(i));

opmon::TRBErrors err;
// error counters
err.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
err.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
err.set_unexpected_fragments(m_unexpected_fragments.load());
err.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
err.set_lost_fragments(m_lost_fragments.load());
err.set_invalid_requests(m_invalid_requests.load());
err.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());

publish(std::move(err));

}

void
TRBModule::do_conf(const data_t&)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";

m_trigger_timeout = duration_type(m_trb_conf->get_trigger_record_timeout_ms());
m_trigger_timeout = std::chrono::milliseconds(m_trb_conf->get_trigger_record_timeout_ms());

m_loop_sleep = m_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_queues_timeout());

Expand Down
17 changes: 0 additions & 17 deletions schema/dfmodules/info/fakedataprodinfo.jsonnet

This file was deleted.

13 changes: 13 additions & 0 deletions schema/dfmodules/opmon/DataWriter.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package dunedaq.dfmodules.opmon;

message DataWriterInfo {

uint64 records_received = 1;
uint64 records_written = 2;
uint64 new_records_written = 3;
uint64 writing_time_us = 10; // in us

}

10 changes: 10 additions & 0 deletions schema/dfmodules/opmon/FakeDataProdModule.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package dunedaq.dfmodules.opmon;

message FakeDataProdInfo {

uint64 requests_received = 1;
uint64 fragments_sent = 2;

}
24 changes: 14 additions & 10 deletions schema/dfmodules/opmon/TRBModule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ message TRBInfo {
uint64 fragments_in_the_book = 2; // Present number of fragments in the book
uint64 pending_fragments = 3; // Fragments to be expected based on the TR in the book

// error counters. These quantities are reset at start and not at regualar intervals
// hence they are about a whole run.
uint64 timed_out_trigger_records = 10; // Number of timed out triggers
uint64 unexpected_fragments = 11; // Number of unexpected fragments
uint64 unexpected_trigger_decisions = 12; // Number of unexpected trigger decisions
uint64 abandoned_trigger_records = 13; // Number of trigger records that failed to send to writing
uint64 lost_fragments = 14; // Number of fragments that not stored in a file
uint64 invalid_requests = 15; // Number of requests with unknown SourceID
uint64 duplicated_trigger_ids =16; // Number of TR not created because redundant

// operation metrics
uint64 received_trigger_decisions = 20; // Number of valid trigger decisions received in the run
uint64 generated_trigger_records = 21; // Number of trigger records produced
Expand All @@ -31,4 +21,18 @@ message TRBInfo {
uint64 received_trmon_requests = 28; // Number of requests coming from DQM
uint64 sent_trmon = 29; // Number of TRs sent to DQM

}

message TRBErrors {

// error counters. These quantities are reset at start and not at regualar intervals
// hence they are about a whole run.
uint64 timed_out_trigger_records = 1; // Number of timed out triggers
uint64 unexpected_fragments = 2; // Number of unexpected fragments
uint64 unexpected_trigger_decisions = 3; // Number of unexpected trigger decisions
uint64 abandoned_trigger_records = 4; // Number of trigger records that failed to send to writing
uint64 lost_fragments = 5; // Number of fragments that not stored in a file
uint64 invalid_requests = 6; // Number of requests with unknown SourceID
uint64 duplicated_trigger_ids = 7; // Number of TR not created because redundant

}
8 changes: 5 additions & 3 deletions src/TriggerRecordBuilderData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ TriggerRecordBuilderData::generate_opmon_data()
if ( completed_trigger_records > 0 ) {
m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0)); // in seconds
}

// prediction rate metrics
info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );

if ( m_last_average_time > 0. ) {
// prediction rate metrics
info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );
}

publish(std::move(info));

}
Expand Down

0 comments on commit f7d882e

Please sign in to comment.