Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct metric #375

Merged
merged 7 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions plugins/DataWriterModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "DataWriterModule.hpp"
#include "dfmodules/CommonIssues.hpp"
#include "dfmodules/opmon/DataWriter.pb.h"

#include "confmodel/Application.hpp"
#include "confmodel/Session.hpp"
Expand Down Expand Up @@ -118,21 +119,21 @@ DataWriterModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
}

// void
// DataWriterModule::get_info(opmonlib::InfoCollector& ci, int /*level*/)
// {
// datawriterinfo::Info dwi;
void
DataWriterModule::generate_opmon_data() {

opmon::DataWriterInfo dwi;

// dwi.records_received = m_records_received_tot.load();
dwi.set_records_received(m_records_received_tot.load());
// dwi.new_records_received = m_records_received.exchange(0);
// dwi.records_written = m_records_written_tot.load();
// dwi.new_records_written = m_records_written.exchange(0);
// dwi.bytes_output = m_bytes_output_tot.load();
// dwi.new_bytes_output = m_bytes_output.exchange(0);
// dwi.writing_time = m_writing_ms.exchange(0);

// ci.add(dwi);
// }
dwi.set_records_written(m_records_written_tot.load());
dwi.set_new_records_written(m_records_written.exchange(0));
// dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
// dwi.new_bytes_output = m_bytes_output.exchange(0);
dwi.set_writing_time_us(m_writing_us.exchange(0));

publish(std::move(dwi));
}

void
DataWriterModule::do_conf(const data_t&)
Expand Down Expand Up @@ -333,8 +334,8 @@ DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::Trigger
} while (should_retry && m_running.load());

std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
std::chrono::milliseconds writing_time = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
m_writing_ms += writing_time.count();
auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
m_writing_us += writing_time.count();
} // if m_data_storage_is_enabled
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/DataWriterModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule
DataWriterModule& operator=(DataWriterModule&&) = delete; ///< DataWriterModule is not move-assignable

void init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg) override;
// void get_info(opmonlib::InfoCollector& ci, int level) override;
void generate_opmon_data() override;

private:
// Commands
Expand Down Expand Up @@ -93,7 +93,7 @@ class DataWriterModule : public dunedaq::appfwk::DAQModule
std::atomic<uint64_t> m_records_written_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_writing_ms = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_writing_us = { 0 }; // NOLINT(build/unsigned)


// Other
Expand Down
24 changes: 14 additions & 10 deletions plugins/TRBModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,6 @@ TRBModule::generate_opmon_data()
i.set_pending_trigger_decisions(m_trigger_decisions_counter.load());
i.set_fragments_in_the_book(m_fragment_counter.load());
i.set_pending_fragments(m_pending_fragment_counter.load());

// error counters
i.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
i.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
i.set_unexpected_fragments(m_unexpected_fragments.load());
i.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
i.set_lost_fragments(m_lost_fragments.load());
i.set_invalid_requests(m_invalid_requests.load());
i.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());

// operation metrics
i.set_received_trigger_decisions(m_received_trigger_decisions.exchange(0));
Expand All @@ -284,14 +275,27 @@ TRBModule::generate_opmon_data()
i.set_sent_trmon(m_trmon_sent_counter.exchange(0));

publish(std::move(i));

opmon::TRBErrors err;
// error counters
err.set_timed_out_trigger_records(m_timed_out_trigger_records.load());
err.set_abandoned_trigger_records(m_abandoned_trigger_records.load());
err.set_unexpected_fragments(m_unexpected_fragments.load());
err.set_unexpected_trigger_decisions(m_unexpected_trigger_decisions.load());
err.set_lost_fragments(m_lost_fragments.load());
err.set_invalid_requests(m_invalid_requests.load());
err.set_duplicated_trigger_ids(m_duplicated_trigger_ids.load());

publish(std::move(err));

}

void
TRBModule::do_conf(const data_t&)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";

m_trigger_timeout = duration_type(m_trb_conf->get_trigger_record_timeout_ms());
m_trigger_timeout = std::chrono::milliseconds(m_trb_conf->get_trigger_record_timeout_ms());

m_loop_sleep = m_queue_timeout = std::chrono::milliseconds(m_trb_conf->get_queues_timeout());

Expand Down
17 changes: 0 additions & 17 deletions schema/dfmodules/info/fakedataprodinfo.jsonnet

This file was deleted.

13 changes: 13 additions & 0 deletions schema/dfmodules/opmon/DataWriter.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package dunedaq.dfmodules.opmon;

message DataWriterInfo {

uint64 records_received = 1;
uint64 records_written = 2;
uint64 new_records_written = 3;
uint64 writing_time_us = 10; // in us

}

10 changes: 10 additions & 0 deletions schema/dfmodules/opmon/FakeDataProdModule.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package dunedaq.dfmodules.opmon;

message FakeDataProdInfo {

uint64 requests_received = 1;
uint64 fragments_sent = 2;

}
24 changes: 14 additions & 10 deletions schema/dfmodules/opmon/TRBModule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ message TRBInfo {
uint64 fragments_in_the_book = 2; // Present number of fragments in the book
uint64 pending_fragments = 3; // Fragments to be expected based on the TR in the book

// error counters. These quantities are reset at start and not at regualar intervals
// hence they are about a whole run.
uint64 timed_out_trigger_records = 10; // Number of timed out triggers
uint64 unexpected_fragments = 11; // Number of unexpected fragments
uint64 unexpected_trigger_decisions = 12; // Number of unexpected trigger decisions
uint64 abandoned_trigger_records = 13; // Number of trigger records that failed to send to writing
uint64 lost_fragments = 14; // Number of fragments that not stored in a file
uint64 invalid_requests = 15; // Number of requests with unknown SourceID
uint64 duplicated_trigger_ids =16; // Number of TR not created because redundant

// operation metrics
uint64 received_trigger_decisions = 20; // Number of valid trigger decisions received in the run
uint64 generated_trigger_records = 21; // Number of trigger records produced
Expand All @@ -31,4 +21,18 @@ message TRBInfo {
uint64 received_trmon_requests = 28; // Number of requests coming from DQM
uint64 sent_trmon = 29; // Number of TRs sent to DQM

}

message TRBErrors {

// error counters. These quantities are reset at start and not at regualar intervals
// hence they are about a whole run.
uint64 timed_out_trigger_records = 1; // Number of timed out triggers
uint64 unexpected_fragments = 2; // Number of unexpected fragments
uint64 unexpected_trigger_decisions = 3; // Number of unexpected trigger decisions
uint64 abandoned_trigger_records = 4; // Number of trigger records that failed to send to writing
uint64 lost_fragments = 5; // Number of fragments that not stored in a file
uint64 invalid_requests = 6; // Number of requests with unknown SourceID
uint64 duplicated_trigger_ids = 7; // Number of TR not created because redundant

}
8 changes: 5 additions & 3 deletions src/TriggerRecordBuilderData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ TriggerRecordBuilderData::generate_opmon_data()
if ( completed_trigger_records > 0 ) {
m_last_average_time = 1e-6*0.5*(m_min_complete_time.exchange(0) + m_max_complete_time.exchange(0)); // in seconds
}

// prediction rate metrics
info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );

if ( m_last_average_time > 0. ) {
// prediction rate metrics
info.set_capacity_rate( 0.5*(m_busy_threshold.load()+m_free_threshold.load())/m_last_average_time );
}

publish(std::move(info));

}
Expand Down
Loading