From 2daddd29a69cd517cff2d186d1631f4d867ce82c Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Wed, 6 Sep 2023 16:10:57 +0200 Subject: [PATCH] [OMON-647] Scrape AliECS endpoint for inactive tasks (#330) --- CMakeLists.txt | 76 +++--- examples/16-AliECS.cxx | 86 +++++++ proto/o2control.proto | 525 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 651 insertions(+), 36 deletions(-) create mode 100644 examples/16-AliECS.cxx create mode 100644 proto/o2control.proto diff --git a/CMakeLists.txt b/CMakeLists.txt index 471e08b1..3a583a92 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -303,44 +303,48 @@ endif() # gRPC #################################### if(Protobuf_FOUND AND gRPC_FOUND) - set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/proto/odc.proto) - get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE) - get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH) - set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc) - set(GRPC_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.grpc.pb.cc) - - add_custom_command( - OUTPUT "${PROTO_CPP_OUTPUT}" - COMMAND protobuf::protoc - ARGS --proto_path ${PROTO_FILE_PREFIX} - --cpp_out ${CMAKE_CURRENT_BINARY_DIR} - ${PROTO_OUTPUT_NAME}.proto - DEPENDS ${PROTO_FILE} - COMMENT "Running protoc on ${PROTO_FILE}" - VERBATIM) - - add_custom_command( - OUTPUT "${GRPC_CPP_OUTPUT}" - COMMAND protobuf::protoc - ARGS --proto_path ${PROTO_FILE_PREFIX} - --grpc_out=${CMAKE_CURRENT_BINARY_DIR} - --plugin=protoc-gen-grpc=$ - ${PROTO_OUTPUT_NAME}.proto - DEPENDS ${PROTO_FILE} - COMMENT "Running protoc/gRPC on ${PROTO_FILE}" - VERBATIM) + set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/proto/odc.proto ${CMAKE_CURRENT_SOURCE_DIR}/proto/o2control.proto) + set(EXAMPLES examples/15-ODC.cxx examples/16-AliECS.cxx) + foreach(PROTO_FILE example IN ZIP_LISTS PROTO_FILES EXAMPLES) + get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE) + get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH) + set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc) + set(GRPC_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.grpc.pb.cc) + + add_custom_command( + OUTPUT "${PROTO_CPP_OUTPUT}" + COMMAND protobuf::protoc + ARGS --proto_path ${PROTO_FILE_PREFIX} + --cpp_out ${CMAKE_CURRENT_BINARY_DIR} + ${PROTO_OUTPUT_NAME}.proto + DEPENDS ${PROTO_FILE} + COMMENT "Running protoc on ${PROTO_FILE}" + VERBATIM) + + add_custom_command( + OUTPUT "${GRPC_CPP_OUTPUT}" + COMMAND protobuf::protoc + ARGS --proto_path ${PROTO_FILE_PREFIX} + --grpc_out=${CMAKE_CURRENT_BINARY_DIR} + --plugin=protoc-gen-grpc=$ + ${PROTO_OUTPUT_NAME}.proto + DEPENDS ${PROTO_FILE} + COMMENT "Running protoc/gRPC on ${PROTO_FILE}" + VERBATIM) - set(example examples/15-ODC.cxx) - get_filename_component(example_name ${example} NAME) - string(REGEX REPLACE ".cxx" "" example_name ${example_name}) - add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT} ${GRPC_CPP_OUTPUT}) - target_link_libraries(${example_name} PRIVATE - gRPC::grpc++ - protobuf::libprotobuf - Boost::program_options - ) - target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) + get_filename_component(example_name ${example} NAME) + string(REGEX REPLACE ".cxx" "" example_name ${example_name}) + add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT} ${GRPC_CPP_OUTPUT}) + target_link_libraries(${example_name} PRIVATE + Monitoring + gRPC::grpc++ + protobuf::libprotobuf + Boost::program_options + ) + target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) + endforeach() set_target_properties(15-ODC PROPERTIES OUTPUT_NAME "o2-monitoring-odc") + set_target_properties(16-AliECS PROPERTIES OUTPUT_NAME "o2-monitoring-aliecs-tasks") endif() #################################### diff --git a/examples/16-AliECS.cxx b/examples/16-AliECS.cxx new file mode 100644 index 00000000..b0de85de --- /dev/null +++ b/examples/16-AliECS.cxx @@ -0,0 +1,86 @@ +/// +/// \file 16-AliECS.cxx +/// \author Adam Wegrzynek +/// + +#include +#include +#include "o2control.grpc.pb.h" +#include "helpers/HttpConnection.h" +#include +#include +#include +#include "../src/Backends/InfluxDB.h" +#include "../src/Transports/HTTP.h" +#include "../src/MonLogger.h" + +using o2::monitoring::MonLogger; +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +using o2control::EnvironmentInfo; +using namespace o2::monitoring; + +class AliEcsClient { + public: + AliEcsClient(std::shared_ptr channel) : mStub(o2control::Control::NewStub(channel)) {} + void sendRunDetails(const auto& influxBackend) { + o2control::GetEnvironmentsRequest request; + request.set_showall(false); + request.set_showtaskinfos(false); + o2control::GetEnvironmentsReply reply; + ClientContext context; + Status status = mStub->GetEnvironments(&context, request, &reply); + if (status.ok()) { + MonLogger::Get() << "Status call OK" << MonLogger::End(); + for (int i = 0; i < reply.environments_size(); i++) { + if (reply.environments(i).currentrunnumber() > 1) { + MonLogger::Get() << "Env ID" << reply.environments(i).id() << MonLogger::End(); + auto metric = Metric{"tasks"}; + metric.addValue(reply.environments(i).numberofactivetasks(), "active").addValue(reply.environments(i).numberofinactivetasks(), "inactive"); + influxBackend->sendWithRun(metric, reply.environments(i).id(), std::to_string(reply.environments(i).currentrunnumber())); + } + } + } else { + std::cout << status.error_code() << ": " << status.error_message() << std::endl; + } + } + private: + std::unique_ptr mStub; +}; + + +int main(int argc, char* argv[]) { + boost::program_options::options_description desc("Program options"); + desc.add_options() + ("aliecs-host", boost::program_options::value()->required(), "AliECS hostname") + ("aliecs-port", boost::program_options::value()->required(), "AliECS port") + ("influxdb-url", boost::program_options::value()->required(), "InfluxDB hostname") + ("influxdb-token", boost::program_options::value()->required(), "InfluxDB token") + ("influxdb-org", boost::program_options::value()->default_value("cern"), "InfluxDB organisation") + ("influxdb-bucket", boost::program_options::value()->default_value("aliecs"), "InfluxDB bucket"); + boost::program_options::variables_map vm; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); + boost::program_options::notify(vm); + MonLogger::mLoggerSeverity = o2::monitoring::Severity::Debug; + MonLogger::Get() << "Connected to AliECS server: " << vm["aliecs-host"].as() << ":" << vm["aliecs-port"].as() << MonLogger::End(); + grpc::ChannelArguments args; + args.SetMaxReceiveMessageSize(20*1024*1024); + AliEcsClient client(grpc::CreateCustomChannel( + vm["aliecs-host"].as() + ":" + std::to_string(vm["aliecs-port"].as()), + grpc::InsecureChannelCredentials(), + args + )); + auto httpTransport = std::make_unique( + vm["influxdb-url"].as() + "/api/v2/write?" + + "org=" + vm["influxdb-org"].as() + "&" + + "bucket=" + vm["influxdb-bucket"].as() + ); + httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as()); + auto influxdbBackend = std::make_unique(std::move(httpTransport)); + for (;;) { + client.sendRunDetails(influxdbBackend); + std::this_thread::sleep_for(std::chrono::seconds(15)); + } +} diff --git a/proto/o2control.proto b/proto/o2control.proto new file mode 100644 index 00000000..b575da22 --- /dev/null +++ b/proto/o2control.proto @@ -0,0 +1,525 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2018 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +syntax = "proto3"; + +package o2control; +option java_package = "ch.cern.alice.o2.control.rpcserver"; +option go_package = "github.com/AliceO2Group/Control/core/protos;pb"; + +//////////////// Common event messages /////////////// + +message Event_MesosHeartbeat { +} + +message Ev_EnvironmentEvent { + string environmentId = 1; + string state = 2; + uint32 currentRunNumber = 3; + string error = 4; + string message = 5; +} + +message Ev_TaskEvent { + string name = 1; + string taskid = 2; + string state = 3; + string status = 4; + string hostname = 5; + string className = 6; +} + +message Ev_RoleEvent { + string name = 1; + string status = 2; + string state = 3; + string rolePath = 4; +} + +////////////////////////////////////////////////////// + +service Control { + rpc TrackStatus (StatusRequest) returns (stream StatusReply) {} + + rpc GetFrameworkInfo (GetFrameworkInfoRequest) returns (GetFrameworkInfoReply) {} + rpc Teardown (TeardownRequest) returns (TeardownReply) {} + + rpc GetEnvironments (GetEnvironmentsRequest) returns (GetEnvironmentsReply) {} + rpc NewAutoEnvironment (NewAutoEnvironmentRequest) returns (NewAutoEnvironmentReply) {} + rpc NewEnvironment (NewEnvironmentRequest) returns (NewEnvironmentReply) {} + rpc GetEnvironment (GetEnvironmentRequest) returns (GetEnvironmentReply) {} + rpc ControlEnvironment (ControlEnvironmentRequest) returns (ControlEnvironmentReply) {} + rpc ModifyEnvironment (ModifyEnvironmentRequest) returns (ModifyEnvironmentReply) {} + rpc DestroyEnvironment (DestroyEnvironmentRequest) returns (DestroyEnvironmentReply) {} + rpc GetActiveDetectors (Empty) returns (GetActiveDetectorsReply) {} + +// rpc SetEnvironmentProperties (SetEnvironmentPropertiesRequest) returns (SetEnvironmentPropertiesReply) {} +// rpc GetEnvironmentProperties (GetEnvironmentPropertiesRequest) returns (GetEnvironmentPropertiesReply) {} + + rpc GetTasks (GetTasksRequest) returns (GetTasksReply) {} + rpc GetTask(GetTaskRequest) returns (GetTaskReply) {} + rpc CleanupTasks(CleanupTasksRequest) returns (CleanupTasksReply) {} + + rpc GetRoles (GetRolesRequest) returns (GetRolesReply) {} + + rpc GetWorkflowTemplates (GetWorkflowTemplatesRequest) returns (GetWorkflowTemplatesReply) {} + + rpc ListRepos(ListReposRequest) returns (ListReposReply) {} + rpc AddRepo(AddRepoRequest) returns (AddRepoReply) {} + rpc RemoveRepo(RemoveRepoRequest) returns (RemoveRepoReply) {} + rpc RefreshRepos(RefreshReposRequest) returns (Empty) {} + rpc SetDefaultRepo(SetDefaultRepoRequest) returns (Empty) {} + rpc SetGlobalDefaultRevision(SetGlobalDefaultRevisionRequest) returns (Empty) {} + rpc SetRepoDefaultRevision(SetRepoDefaultRevisionRequest) returns (SetRepoDefaultRevisionReply) {} + rpc Subscribe(SubscribeRequest) returns (stream Event); + + rpc GetIntegratedServices(Empty) returns (ListIntegratedServicesReply) {} +} + +//////////////////////////////////////// +// Global status +//////////////////////////////////////// +message StatusRequest {} +message StatusReply { + string state = 1; + repeated StatusUpdate statusUpdates = 2; +} +message StatusUpdate { + enum Level { + DEBUG = 0; + INFO = 1; + WARNING = 2; + ERROR = 3; + } + Level level = 1; + oneof Event { + Event_MesosHeartbeat mesosHeartbeat = 2; + //TODO add other events here and in events.proto + } +} + +message Event { + string timestamp = 1; + oneof Payload { + Ev_EnvironmentEvent environmentEvent = 2; + Ev_TaskEvent taskEvent = 3; + Ev_RoleEvent roleEvent = 4; + } +} + +message SubscribeRequest{ + string id = 1; +} +//////////////////////////////////////// +// Framework +//////////////////////////////////////// +message GetFrameworkInfoRequest {} +message Version { + int32 major = 1; + int32 minor = 2; + int32 patch = 3; + string build = 4; + string productName = 5; + string versionStr = 6; +} +message GetFrameworkInfoReply { + string frameworkId = 1; + int32 environmentsCount = 2; + int32 tasksCount = 3; + string state = 4; + int32 hostsCount = 5; + string instanceName = 6; + Version version = 7; + string configurationEndpoint = 8; + repeated string detectorsInInstance = 9; + repeated string activeDetectors = 10; + repeated string availableDetectors = 11; +} + +// Not implemented yet +message TeardownRequest { + string reason = 1; +} +message TeardownReply {} + +//////////////////////////////////////// +// Environment +//////////////////////////////////////// +message GetEnvironmentsRequest { + bool showAll = 1; + bool showTaskInfos = 2; +} +message GetEnvironmentsReply { + string frameworkId = 1; + repeated EnvironmentInfo environments = 2; +} +message EnvironmentInfo { + string id = 1; + int64 createdWhen = 2; // msec + string state = 3; + repeated ShortTaskInfo tasks = 4; + string rootRole = 5; + uint32 currentRunNumber = 6; + map defaults = 7; + map vars = 8; + map userVars = 9; + int32 numberOfFlps = 10; + repeated string includedDetectors = 11; + string description = 12; + int32 numberOfHosts = 13; + map integratedServicesData = 14; + int32 numberOfTasks = 15; + string currentTransition = 16; + int32 numberOfActiveTasks = 17; + int32 numberOfInactiveTasks = 18; +} + +message NewEnvironmentRequest { + string workflowTemplate = 1; + map vars = 2; + bool public = 3; +} +message NewEnvironmentReply { + EnvironmentInfo environment = 1; + bool public = 2; +} +message NewAutoEnvironmentRequest { + string workflowTemplate = 1; + map vars = 2; + string id = 3; +} +message NewAutoEnvironmentReply { + +} + +message GetEnvironmentRequest { + string id = 1; + bool showWorkflowTree = 2; +} +message GetEnvironmentReply { + EnvironmentInfo environment = 1; + RoleInfo workflow = 2; + bool public = 3; +} + +message ControlEnvironmentRequest { + string id = 1; + enum Optype { + NOOP = 0; + START_ACTIVITY = 1; + STOP_ACTIVITY = 2; + CONFIGURE = 3; + RESET = 4; + GO_ERROR = 5; + DEPLOY = 6; + } + Optype type = 2; +} +message ControlEnvironmentReply { + string id = 1; + string state = 2; + uint32 currentRunNumber = 3; + // All times are in milliseconds + int64 startOfTransition = 4; + int64 endOfTransition = 5; + int64 transitionDuration = 6; +} + +message ModifyEnvironmentRequest { + string id = 1; + repeated EnvironmentOperation operations = 2; + bool reconfigureAll = 3; +} +message EnvironmentOperation { + enum Optype { + NOOP = 0; + REMOVE_ROLE = 3; + ADD_ROLE = 4; + } + Optype type = 1; + string roleName = 2; +} +message ModifyEnvironmentReply { + repeated EnvironmentOperation failedOperations = 1; + string id = 2; + string state = 3; +} + +message DestroyEnvironmentRequest { + string id = 1; + bool keepTasks = 2; + bool allowInRunningState = 3; + bool force = 4; +} +message DestroyEnvironmentReply { + CleanupTasksReply cleanupTasksReply = 1; +} + +message GetActiveDetectorsReply { + repeated string detectors = 1; +} + +//////////////////////////////////////// +// Environment, GET/SET properties +//////////////////////////////////////// +message SetEnvironmentPropertiesRequest { + string id = 1; + // If properties == nil, the core sets nothing + // and reply ok + map properties = 2; +} +message SetEnvironmentPropertiesReply {} + +message GetEnvironmentPropertiesRequest { + string id = 1; + // If len(queries) == 0, we return an + // empty map. + // To retrieve all KVs, use query '*' + repeated string queries = 2; + bool excludeGlobals = 3; +} +message GetEnvironmentPropertiesReply { + map properties = 1; +} + + +//////////////////////////////////////// +// Tasks +//////////////////////////////////////// +message ShortTaskInfo { + string name = 1; + bool locked = 2; + string taskId = 3; + string status = 4; + string state = 5; + string className = 6; + TaskDeploymentInfo deploymentInfo = 7; + string pid = 8; + string sandboxStdout = 9; + bool claimable = 10; +} +message TaskDeploymentInfo { + string hostname = 1; + string agentId = 2; + string offerId = 3; + string executorId = 4; +} + +message GetTasksRequest {} +message GetTasksReply { + repeated ShortTaskInfo tasks = 1; +} +message GetTaskRequest { + string taskId = 1; +} +message GetTaskReply { + TaskInfo task = 1; +} + +message TaskClassInfo { + string name = 1; + string controlMode = 2; +} +message CommandInfo { + repeated string env = 1; + bool shell = 2; + string value = 3; + repeated string arguments = 4; + string user = 5; +} +message ChannelInfo { + string name = 1; + string type = 2; + string target = 3; +} +message TaskInfo { + ShortTaskInfo shortInfo = 1; + TaskClassInfo classInfo = 2; + repeated ChannelInfo inboundChannels = 3; + repeated ChannelInfo outboundChannels = 4; + CommandInfo commandInfo = 5; + string taskPath = 6; + string envId = 7; + map properties = 9; +} + +message CleanupTasksRequest { + repeated string taskIds = 1; +} +message CleanupTasksReply { + repeated ShortTaskInfo killedTasks = 1; + repeated ShortTaskInfo runningTasks = 2; +} + +//////////////////////////////////////// +// Roles +//////////////////////////////////////// +message GetRolesRequest { + string envId = 1; + string pathSpec = 2; +} + +message RoleInfo { + string name = 1; + string status = 2; + string state = 3; + string fullPath = 4; + repeated string taskIds = 5; + repeated RoleInfo roles = 6; + map defaults = 7; + map vars = 8; + map userVars = 9; + map consolidatedStack = 10; + string description = 11; +} + +message GetRolesReply { + repeated RoleInfo roles = 1; +} + +message GetWorkflowTemplatesRequest{ + string repoPattern = 1; + string revisionPattern = 2; + bool allBranches = 3; + bool allTags = 4; + bool allWorkflows = 5; +} + +message VarSpecMessage { + enum UiWidget { + editBox = 0; // plain string input line, can accept types number (like a spinBox) and string + slider = 1; // input widget exclusively for numbers, range allowedValues[0]-[1] + listBox = 2; // displays a list of items, can accept types number, string or list; if number/string ==> single selection, otherwise multiple selection allowed + dropDownBox = 3; + comboBox = 4; + radioButtonBox = 5; + checkBox = 6; + } + + enum Type { + string = 0; + number = 1; + bool = 2; + list = 3; + map = 4; + } + + string defaultValue = 1; + Type type = 2; + string label = 3; + string description = 4; + UiWidget widget = 5; + string panel = 6; // hint for the UI on where to put or group the given variable input + repeated string allowedValues = 7; // list of offered values from which to choose (only for some UiWidgets) + int32 index = 8; + string visibleIf = 9; // JS expression that evaluates to bool + string enabledIf = 10; // JS expression that evaluates to bool +} + +message WorkflowTemplateInfo { + string repo = 1; + string template = 2; + string revision = 3; + map varSpecMap = 4; + string description = 5; +} + +message GetWorkflowTemplatesReply{ + repeated WorkflowTemplateInfo workflowTemplates = 1; +} + +//////////////////////////////////////// +// Repos +//////////////////////////////////////// + +message ListReposRequest { + bool getRevisions = 1; +} + +message RepoInfo { + string name = 1; + bool default = 2; + string defaultRevision = 3; + repeated string revisions = 4; +} + +message ListReposReply { + repeated RepoInfo repos = 1; + string globalDefaultRevision = 2; +} + +message AddRepoRequest { + string name = 1; + string defaultRevision = 2; +} + +message AddRepoReply { + string newDefaultRevision = 1; + string info = 2; +} + +message RemoveRepoRequest { + int32 index = 1; +} + +message RemoveRepoReply { + string newDefaultRepo = 1; +} + +message RefreshReposRequest { + int32 index = 1; +} + +message SetDefaultRepoRequest { + int32 index = 1; +} + +message SetGlobalDefaultRevisionRequest { + string revision = 1; +} + +message SetRepoDefaultRevisionRequest { + int32 index = 1; + string revision = 2; +} + +message SetRepoDefaultRevisionReply { + string info = 1; +} + +message Empty { + +} + +message ListIntegratedServicesReply { + map services = 1; // keys are IDs (e.g. "ddsched"), the service name should be displayed to users instead +} + +message IntegratedServiceInfo { + string name = 1; // user-visible service name, e.g. "DD scheduler" + bool enabled = 2; + string endpoint = 3; + string connectionState = 4; // allowed values: READY, CONNECTING, TRANSIENT_FAILURE, IDLE, SHUTDOWN + string data = 5; // always a JSON payload with a map inside. +} \ No newline at end of file