Skip to content

Commit

Permalink
QC-271: Implement SOR/EOR triggers in post-processing (#2356)
Browse files Browse the repository at this point in the history
* init

* midwork

* linking moderncppkafka

* consuming kafka with simple running test

* draft for SOR trigger

* logic fully implemented

* correct usage of timestamp; activites being filled from kafka

* implementation of manual tests

* build with new alibuild rdkafka recipe

* inverted action filtering; we are returning SOR and EOR triggers if they do not match previous action

* SOSOR and SOEOR messages trigger SOR and EOR triggger object

---------

Co-authored-by: Michal Tichák <michal.tichak@cern.ch>
  • Loading branch information
justonedev1 and Michal Tichák authored Aug 2, 2024
1 parent ef2601a commit a57944a
Show file tree
Hide file tree
Showing 12 changed files with 899 additions and 37 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ find_package(FairLogger REQUIRED)
find_package(Occ REQUIRED)
find_package(ROOT 6.06.02 COMPONENTS RHTTP Gui REQUIRED)

# rdkafka is built by configure and does not provide FindPackage.cmake file
find_library(RDKAFKA_LIB rdkafka REQUIRED PATHS ${RDKAFKA_ROOT}/lib)
set(RDFKAFKA_INCLUDE "${RDKAFKA_ROOT}/include")

configure_file(getTestDataDirectory.cxx.in getTestDataDirectory.cxx)

# ---- Subdirectories ----
Expand Down
34 changes: 30 additions & 4 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ add_root_dictionary(O2QualityControlTypes
include/QualityControl/Activity.h
LINKDEF include/QualityControl/TypesLinkDef.h)

# ---- Kafka ----

add_library(O2QualityControlKafkaProtos OBJECT
proto/events.proto)

target_link_libraries(O2QualityControlKafkaProtos PUBLIC
protobuf::libprotobuf
)

target_include_directories(O2QualityControlKafkaProtos PUBLIC
"$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>"
)

protobuf_generate(
TARGET O2QualityControlKafkaProtos
IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}/proto"
PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}/proto")

# ---- Library ----

add_library(O2QualityControl
Expand Down Expand Up @@ -100,14 +118,18 @@ add_library(O2QualityControl
src/WorkflowType.cxx
src/TimekeeperFactory.cxx
src/RootFileStorage.cxx
src/ReductorHelpers.cxx)
src/ReductorHelpers.cxx
src/KafkaPoller.cxx)

target_include_directories(
O2QualityControl
PUBLIC $<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
${MODERNCPPKAFKA_ROOT}
$<BUILD_INTERFACE:${RDFKAFKA_INCLUDE}>
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/include>)
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/include>
)

target_link_libraries(O2QualityControl
PUBLIC Boost::boost
Expand All @@ -130,8 +152,11 @@ target_link_libraries(O2QualityControl
O2::DataFormatsQualityControl
O2::DetectorsBase
O2::GlobalTracking
O2QualityControlKafkaProtos
${RDKAFKA_LIB}
PRIVATE Boost::system
CURL::libcurl)
CURL::libcurl
)

add_root_dictionary(O2QualityControl
HEADERS
Expand Down Expand Up @@ -250,6 +275,7 @@ add_executable(o2-qc-test-core
test/testVersion.cxx
test/testMonitorObjectCollection.cxx
test/testTrendingTask.cxx
test/testKafkaTests.cxx
)
set_property(TARGET o2-qc-test-core
PROPERTY RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/tests)
Expand Down Expand Up @@ -392,7 +418,7 @@ endif()
unset(isSystemDir)

# Install library and binaries
install(TARGETS O2QualityControl O2QualityControlTypes ${EXE_NAMES}
install(TARGETS O2QualityControl O2QualityControlTypes O2QualityControlKafkaProtos ${EXE_NAMES}
EXPORT QualityControlTargets
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
Expand Down
62 changes: 62 additions & 0 deletions Framework/include/QualityControl/KafkaPoller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019-2024 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
/// @file KafkaPoller.h
/// @author Michal Tichak

#ifndef QC_CORE_KAFKA_CONSUMER_H
#define QC_CORE_KAFKA_CONSUMER_H

#include <kafka/KafkaConsumer.h>
#include <proto/events.pb.h>
#include "Activity.h"

namespace o2::quality_control::core
{

namespace proto
{

auto recordToEvent(const kafka::Value&) -> std::optional<events::Event>;

namespace start_of_run
{
void fillActivity(const events::Event& event, Activity& activity);
bool isValid(const events::Event& event, const std::string& environmentID = "", int runNumber = 0);
} // namespace start_of_run

namespace end_of_run
{
void fillActivity(const events::Event& event, Activity& activity);
bool isValid(const events::Event& event, const std::string& environmentID = "", int runNumber = 0);
} // namespace end_of_run

} // namespace proto

class KafkaPoller
{
public:
using KafkaRecords = std::vector<kafka::clients::consumer::ConsumerRecord>;

explicit KafkaPoller(const std::string& brokers, const std::string& groupId);

void subscribe(const std::string& topic, size_t numberOfRetries = 5);
// timeout is used to wait if there are not messages.
auto poll(std::chrono::milliseconds timeout = std::chrono::milliseconds{ 10 }) -> KafkaRecords;

private:
kafka::clients::consumer::KafkaConsumer mConsumer;
};

} // namespace o2::quality_control::core

#endif
6 changes: 4 additions & 2 deletions Framework/include/QualityControl/PostProcessingConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace o2::quality_control::postprocessing
{

//todo pretty print
// todo pretty print

/// \brief Post-processing configuration structure
struct PostProcessingConfig {
Expand All @@ -45,6 +45,8 @@ struct PostProcessingConfig {
std::string qcdbUrl;
std::string ccdbUrl;
std::string consulUrl;
std::string kafkaBrokersUrl;
std::string kafkaTopic;
core::Activity activity;
bool matchAnyRunNumber = false;
bool critical;
Expand All @@ -53,4 +55,4 @@ struct PostProcessingConfig {

} // namespace o2::quality_control::postprocessing

#endif //QUALITYCONTROL_POSTPROCESSINGCONFIG_H
#endif // QUALITYCONTROL_POSTPROCESSINGCONFIG_H
6 changes: 3 additions & 3 deletions Framework/include/QualityControl/Triggers.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ namespace triggers
{

/// \brief Triggers when it detects a Start Of Run during its uptime (once per each)
TriggerFcn StartOfRun(const core::Activity& = {});
TriggerFcn StartOfRun(const std::string& kafkaBrokers, const std::string& topic, const std::string& detector, const std::string& taskName, const core::Activity& = {});
/// \brief Triggers when it detects an End Of Run during its uptime (once per each)
TriggerFcn EndOfRun(const core::Activity& = {});
TriggerFcn EndOfRun(const std::string& kafkaBrokers, const std::string& topic, const std::string& detector, const std::string& taskName, const core::Activity& = {});
/// \brief Triggers when it detects Stable Beams during its uptime (once per each)
TriggerFcn StartOfFill(const core::Activity& = {});
/// \brief Triggers when it detects an event dump during its uptime (once per each)
Expand All @@ -103,4 +103,4 @@ TriggerFcn Never(const core::Activity& = {});

} // namespace o2::quality_control::postprocessing

#endif //QUALITYCONTROL_TRIGGERS_H
#endif // QUALITYCONTROL_TRIGGERS_H
142 changes: 142 additions & 0 deletions Framework/proto/events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2024 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

syntax = "proto3";

package events;
option java_package = "ch.cern.alice.o2.control.events";
option go_package = "github.com/AliceO2Group/Control/common/protos;pb";

//////////////// Common event messages ///////////////

enum OpStatus {
NULL = 0;
STARTED = 1;
ONGOING = 2;
DONE_OK = 3;
DONE_ERROR = 4;
DONE_TIMEOUT = 5;
}

message Ev_MetaEvent_MesosHeartbeat {
}

message Ev_MetaEvent_CoreStart {
string frameworkId = 1;
}

message Ev_MetaEvent_FrameworkEvent {
string frameworkId = 1;
string message = 2;
}

message Ev_EnvironmentEvent {
string environmentId = 1;
string state = 2;
uint32 runNumber = 3; // only when the environment is in the running state
string error = 4;
string message = 5; // any additional message concerning the current state or transition
string transition = 6;
string transitionStep = 7;
OpStatus transitionStatus = 8;
map<string, string> vars = 9; // consolidated environment variables at the root role of the environment
}

message Traits {
string trigger = 1;
string await = 2;
string timeout = 3;
bool critical = 4;
}

message Ev_TaskEvent {
string name = 1; // task name, based on the name of the task class
string taskid = 2; // task id, unique
string state = 3; // state machine state for this task
string status = 4; // active/inactive etc.
string hostname = 5;
string className = 6; // name of the task class from which this task was spawned
Traits traits = 7;
string environmentId = 8;
string path = 9; // path to the parent taskRole of this task within the environment
}

message Ev_CallEvent {
string func = 1; // name of the function being called, within the workflow template context
OpStatus callStatus = 2; // progress or success/failure state of the call
string return = 3; // return value of the function
Traits traits = 4;
string output = 5; // any additional output of the function
string error = 6; // error value, if returned
string environmentId = 7;
string path = 8; // path to the parent callRole of this call within the environment
}

message Ev_RoleEvent {
string name = 1; // role name
string status = 2; // active/inactive etc., derived from the state of child tasks, calls or other roles
string state = 3; // state machine state for this role
string rolePath = 4; // path to this role within the environment
string environmentId = 5;
}

message Ev_IntegratedServiceEvent {
string name = 1; // name of the context, usually the path of the callRole that calls a given integrated service function e.g. readout-dataflow.dd-scheduler.terminate
string error = 2; // error message, if any
string operationName = 3; // name of the operation, usually the name of the integrated service function being called e.g. ddsched.PartitionTerminate()"
OpStatus operationStatus = 4; // progress or success/failure state of the operation
string operationStep = 5; // if the operation has substeps, this is the name of the current substep, like an API call or polling phase
OpStatus operationStepStatus = 6; // progress or success/failure state of the current substep
string environmentId = 7;
string payload = 8; // any additional payload, depending on the integrated service; there is no schema, it can even be the raw return structure of a remote API call
}

message Ev_RunEvent {
string environmentId = 1;
uint32 runNumber = 2;
string state = 3;
string error = 4;
string transition = 5;
OpStatus transitionStatus = 6;
map<string, string> vars = 7;
}

message Event {
int64 timestamp = 1;
reserved 2 to 10;
reserved 17 to 100;

oneof Payload {
Ev_EnvironmentEvent environmentEvent = 11;
Ev_TaskEvent taskEvent = 12;
Ev_RoleEvent roleEvent = 13;
Ev_CallEvent callEvent = 14;
Ev_IntegratedServiceEvent integratedServiceEvent = 15;
Ev_RunEvent runEvent = 16;

Ev_MetaEvent_FrameworkEvent frameworkEvent = 101;
Ev_MetaEvent_MesosHeartbeat mesosHeartbeatEvent = 102;
Ev_MetaEvent_CoreStart coreStartEvent = 103;
}
}
Loading

0 comments on commit a57944a

Please sign in to comment.