Skip to content

Commit

Permalink
[OMON-647] Scrape AliECS endpoint for inactive tasks (#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
awegrzyn authored Sep 6, 2023
1 parent 9f7aa55 commit 2daddd2
Show file tree
Hide file tree
Showing 3 changed files with 651 additions and 36 deletions.
76 changes: 40 additions & 36 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,44 +303,48 @@ endif()
# gRPC
####################################
if(Protobuf_FOUND AND gRPC_FOUND)
set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/proto/odc.proto)
get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE)
get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH)
set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc)
set(GRPC_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.grpc.pb.cc)

add_custom_command(
OUTPUT "${PROTO_CPP_OUTPUT}"
COMMAND protobuf::protoc
ARGS --proto_path ${PROTO_FILE_PREFIX}
--cpp_out ${CMAKE_CURRENT_BINARY_DIR}
${PROTO_OUTPUT_NAME}.proto
DEPENDS ${PROTO_FILE}
COMMENT "Running protoc on ${PROTO_FILE}"
VERBATIM)

add_custom_command(
OUTPUT "${GRPC_CPP_OUTPUT}"
COMMAND protobuf::protoc
ARGS --proto_path ${PROTO_FILE_PREFIX}
--grpc_out=${CMAKE_CURRENT_BINARY_DIR}
--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>
${PROTO_OUTPUT_NAME}.proto
DEPENDS ${PROTO_FILE}
COMMENT "Running protoc/gRPC on ${PROTO_FILE}"
VERBATIM)
set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/proto/odc.proto ${CMAKE_CURRENT_SOURCE_DIR}/proto/o2control.proto)
set(EXAMPLES examples/15-ODC.cxx examples/16-AliECS.cxx)
foreach(PROTO_FILE example IN ZIP_LISTS PROTO_FILES EXAMPLES)
get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE)
get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH)
set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc)
set(GRPC_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.grpc.pb.cc)

add_custom_command(
OUTPUT "${PROTO_CPP_OUTPUT}"
COMMAND protobuf::protoc
ARGS --proto_path ${PROTO_FILE_PREFIX}
--cpp_out ${CMAKE_CURRENT_BINARY_DIR}
${PROTO_OUTPUT_NAME}.proto
DEPENDS ${PROTO_FILE}
COMMENT "Running protoc on ${PROTO_FILE}"
VERBATIM)

add_custom_command(
OUTPUT "${GRPC_CPP_OUTPUT}"
COMMAND protobuf::protoc
ARGS --proto_path ${PROTO_FILE_PREFIX}
--grpc_out=${CMAKE_CURRENT_BINARY_DIR}
--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>
${PROTO_OUTPUT_NAME}.proto
DEPENDS ${PROTO_FILE}
COMMENT "Running protoc/gRPC on ${PROTO_FILE}"
VERBATIM)

set(example examples/15-ODC.cxx)
get_filename_component(example_name ${example} NAME)
string(REGEX REPLACE ".cxx" "" example_name ${example_name})
add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT} ${GRPC_CPP_OUTPUT})
target_link_libraries(${example_name} PRIVATE
gRPC::grpc++
protobuf::libprotobuf
Boost::program_options
)
target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
get_filename_component(example_name ${example} NAME)
string(REGEX REPLACE ".cxx" "" example_name ${example_name})
add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT} ${GRPC_CPP_OUTPUT})
target_link_libraries(${example_name} PRIVATE
Monitoring
gRPC::grpc++
protobuf::libprotobuf
Boost::program_options
)
target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
endforeach()
set_target_properties(15-ODC PROPERTIES OUTPUT_NAME "o2-monitoring-odc")
set_target_properties(16-AliECS PROPERTIES OUTPUT_NAME "o2-monitoring-aliecs-tasks")
endif()

####################################
Expand Down
86 changes: 86 additions & 0 deletions examples/16-AliECS.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
///
/// \file 16-AliECS.cxx
/// \author Adam Wegrzynek <adam.wegrzynek@cern.ch>
///

#include <iostream>
#include <grpc++/grpc++.h>
#include "o2control.grpc.pb.h"
#include "helpers/HttpConnection.h"
#include <boost/program_options.hpp>
#include <thread>
#include <regex>
#include "../src/Backends/InfluxDB.h"
#include "../src/Transports/HTTP.h"
#include "../src/MonLogger.h"

using o2::monitoring::MonLogger;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;

using o2control::EnvironmentInfo;
using namespace o2::monitoring;

class AliEcsClient {
public:
AliEcsClient(std::shared_ptr<Channel> channel) : mStub(o2control::Control::NewStub(channel)) {}
void sendRunDetails(const auto& influxBackend) {
o2control::GetEnvironmentsRequest request;
request.set_showall(false);
request.set_showtaskinfos(false);
o2control::GetEnvironmentsReply reply;
ClientContext context;
Status status = mStub->GetEnvironments(&context, request, &reply);
if (status.ok()) {
MonLogger::Get() << "Status call OK" << MonLogger::End();
for (int i = 0; i < reply.environments_size(); i++) {
if (reply.environments(i).currentrunnumber() > 1) {
MonLogger::Get() << "Env ID" << reply.environments(i).id() << MonLogger::End();
auto metric = Metric{"tasks"};
metric.addValue(reply.environments(i).numberofactivetasks(), "active").addValue(reply.environments(i).numberofinactivetasks(), "inactive");
influxBackend->sendWithRun(metric, reply.environments(i).id(), std::to_string(reply.environments(i).currentrunnumber()));
}
}
} else {
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<o2control::Control::Stub> mStub;
};


int main(int argc, char* argv[]) {
boost::program_options::options_description desc("Program options");
desc.add_options()
("aliecs-host", boost::program_options::value<std::string>()->required(), "AliECS hostname")
("aliecs-port", boost::program_options::value<unsigned short>()->required(), "AliECS port")
("influxdb-url", boost::program_options::value<std::string>()->required(), "InfluxDB hostname")
("influxdb-token", boost::program_options::value<std::string>()->required(), "InfluxDB token")
("influxdb-org", boost::program_options::value<std::string>()->default_value("cern"), "InfluxDB organisation")
("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);
MonLogger::mLoggerSeverity = o2::monitoring::Severity::Debug;
MonLogger::Get() << "Connected to AliECS server: " << vm["aliecs-host"].as<std::string>() << ":" << vm["aliecs-port"].as<unsigned short>() << MonLogger::End();
grpc::ChannelArguments args;
args.SetMaxReceiveMessageSize(20*1024*1024);
AliEcsClient client(grpc::CreateCustomChannel(
vm["aliecs-host"].as<std::string>() + ":" + std::to_string(vm["aliecs-port"].as<unsigned short>()),
grpc::InsecureChannelCredentials(),
args
));
auto httpTransport = std::make_unique<transports::HTTP>(
vm["influxdb-url"].as<std::string>() + "/api/v2/write?" +
"org=" + vm["influxdb-org"].as<std::string>() + "&" +
"bucket=" + vm["influxdb-bucket"].as<std::string>()
);
httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>());
auto influxdbBackend = std::make_unique<backends::InfluxDB>(std::move(httpTransport));
for (;;) {
client.sendRunDetails(influxdbBackend);
std::this_thread::sleep_for(std::chrono::seconds(15));
}
}
Loading

0 comments on commit 2daddd2

Please sign in to comment.