Skip to content

Commit

Permalink
Add InfluxDB 2.x optional backend (#193)
Browse files Browse the repository at this point in the history
* Add CURL to deps list

* Add URL parsing

* First working version

* Clean up and docs
  • Loading branch information
awegrzyn authored Apr 23, 2020
1 parent c169cc5 commit a4e4c9e
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 7 deletions.
14 changes: 11 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system filesystem)
find_package(Git QUIET)
find_package(ApMon MODULE)
find_package(CURL MODULE)
find_package(RdKafka CONFIG)

####################################
Expand Down Expand Up @@ -105,11 +106,12 @@ add_library(Monitoring SHARED
src/Exceptions/MonitoringException.cxx
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/Kafka.cxx>
$<$<BOOL:${CURL_FOUND}>:src/Transports/HTTP.cxx>
)

target_include_directories(Monitoring
PUBLIC
$<INSTALL_INTERFACE:include>
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src
Expand All @@ -127,6 +129,7 @@ target_link_libraries(Monitoring
pthread
$<$<BOOL:${ApMon_FOUND}>:ApMon::ApMon>
$<$<BOOL:${RdKafka_FOUND}>:RdKafka::rdkafka++>
$<$<BOOL:${CURL_FOUND}>:CURL::libcurl>
)

# Handle ApMon optional dependency
Expand All @@ -138,6 +141,10 @@ if(RdKafka_FOUND)
message(STATUS " Compiling Kafka transport")
endif()

if(CURL_FOUND)
message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend")
endif()

# Detect operating system
if (UNIX AND NOT APPLE)
message(STATUS "Detected Linux: Process monitor enabled")
Expand All @@ -155,6 +162,7 @@ target_compile_definitions(Monitoring
$<$<BOOL:${LINUX}>:O2_MONITORING_OS_LINUX>
$<$<BOOL:${ApMon_FOUND}>:O2_MONITORING_WITH_APPMON>
$<$<BOOL:${RdKafka_FOUND}>:O2_MONITORING_WITH_KAFKA>
$<$<BOOL:${CURL_FOUND}>:O2_MONITORING_WITH_CURL>
)

# Use C++17
Expand Down Expand Up @@ -217,7 +225,7 @@ foreach (test ${TEST_SRCS})

add_executable(${test_name} ${test})
target_link_libraries(${test_name}
PRIVATE
PRIVATE
Monitoring Boost::unit_test_framework Boost::filesystem
)
add_test(NAME ${test_name} COMMAND ${test_name})
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ See the table below to find `URI`s for supported backends:
| InfluxDB | Unix socket | `influxdb-unix` | - | `info` |
| InfluxDB | StdOut | `influxdb-stdout` | - | `info` |
| InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` |
| InfluxDB 2.x | HTTP | `influxdbv2` | `org=ORG&bucket=BUCKET&token=TOKEN` | `info` |
| ApMon | UDP | `apmon` | - | `info` |
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |
Expand All @@ -62,7 +63,7 @@ A metric consist of 5 parameters:
| Parameter name | Type | Required | Default |
| -------------- |:--------------------------------:|:--------:| -----------------------:|
| name | string | yes | - |
| values | map&lt;string, int/double/string/uint64_t&gt; | no/1 | - |
| values | map&lt;string, int/double/string/uint64_t&gt; | no/1 | - |
| timestamp | time_point&lt;system_clock&gt; | no | current time |
| verbosity | Enum (Debug/Info/Prod) | no | Verbosity::Info |
| tags | map | no | host and process names |
Expand Down
36 changes: 36 additions & 0 deletions src/MonitoringFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "Transports/Kafka.h"
#endif

#ifdef O2_MONITORING_WITH_CURL
#include "Transports/HTTP.h"
#endif

namespace o2
{
/// ALICE O2 Monitoring system
Expand All @@ -56,6 +60,37 @@ std::unique_ptr<Backend> getStdOut(http::url uri)
}
}

/// Extracts token from header add sets it as addition HTTP header
/// http://localhost:9999/?org=YOUR_ORG&bucket=YOUR_BUCKET&token=AUTH_TOKEN
/// ->
/// http://localhost:9999/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET
/// --header "Authorization: Token YOURAUTHTOKEN"
std::unique_ptr<Backend> getInfluxDbv2(http::url uri)
{
#ifdef O2_MONITORING_WITH_CURL
std::string tokenLabel = "token=";
std::string path = "/api/v2/write";
std::string query = uri.search;

auto tokenStart = query.find(tokenLabel);
auto tokenEnd = query.find('&', tokenStart);
if (tokenEnd == std::string::npos) {
tokenEnd = query.length();
}
std::string token = query.substr(tokenStart + tokenLabel.length(), tokenEnd-(tokenStart + tokenLabel.length()));
// make sure ampersand is removed
if (tokenEnd < query.length() && query.at(tokenEnd) == '&') tokenEnd++;
if (tokenStart > 0 && query.at(tokenStart-1) == '&') tokenStart--;
query.erase(tokenStart, tokenEnd - tokenStart);

auto transport = std::make_unique<transports::HTTP>("http://" + uri.host + ':' + std::to_string(uri.port) + path + '?' + query);
transport->addHeader("Authorization: Token " + token);
return std::make_unique<backends::InfluxDB>(std::move(transport));
#else
throw std::runtime_error("HTTP transport is not enabled");
#endif
}

std::unique_ptr<Backend> getInfluxDb(http::url uri)
{
auto const position = uri.protocol.find_last_of('-');
Expand Down Expand Up @@ -129,6 +164,7 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url)
{"influxdb-unix", getInfluxDb},
{"influxdb-stdout", getInfluxDb},
{"influxdb-kafka", getInfluxDb},
{"influxdbv2", getInfluxDbv2},
{"apmon", getApMon},
{"no-op", getNoop}
};
Expand Down
68 changes: 68 additions & 0 deletions src/Transports/HTTP.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
///
/// \file HTTP.cxx
/// \author Adam Wegrzynek <adam.wegrzynek@cern.ch>
///

#include "HTTP.h"
#include "../MonLogger.h"
#include "../Exceptions/MonitoringException.h"
#include <boost/algorithm/string.hpp>

namespace o2
{
/// ALICE O2 Monitoring system
namespace monitoring
{
/// Monitoring transports
namespace transports
{

HTTP::HTTP(const std::string& url)
{
mHeaders = NULL;
mCurl = curl_easy_init();
curl_easy_setopt(mCurl, CURLOPT_URL, url.c_str());
curl_easy_setopt(mCurl, CURLOPT_SSL_VERIFYPEER, 0);
curl_easy_setopt(mCurl, CURLOPT_CONNECTTIMEOUT, 10);
curl_easy_setopt(mCurl, CURLOPT_TIMEOUT, 10);
curl_easy_setopt(mCurl, CURLOPT_POST, 1);
curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPIDLE, 120L);
curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPINTVL, 60L);
FILE *devnull = fopen("/dev/null", "w+");
curl_easy_setopt(mCurl, CURLOPT_WRITEDATA, devnull);

MonLogger::Get() << "HTTP transport initialized (" << url << ")" << MonLogger::End();
}

HTTP::~HTTP()
{
curl_slist_free_all(mHeaders);
curl_easy_cleanup(mCurl);
curl_global_cleanup();
}

void HTTP::addHeader(const std::string& header)
{
mHeaders = curl_slist_append(mHeaders, header.c_str());
curl_easy_setopt(mCurl, CURLOPT_HTTPHEADER, mHeaders);
}

void HTTP::send(std::string&& post)
{
CURLcode response;
long responseCode;
curl_easy_setopt(mCurl, CURLOPT_POSTFIELDS, post.c_str());
curl_easy_setopt(mCurl, CURLOPT_POSTFIELDSIZE, (long) post.length());
response = curl_easy_perform(mCurl);
curl_easy_getinfo(mCurl, CURLINFO_RESPONSE_CODE, &responseCode);
if (response != CURLE_OK) {
MonLogger::Get() << "HTTP Tranport " << curl_easy_strerror(response) << MonLogger::End();
}
if (responseCode < 200 || responseCode > 206) {
MonLogger::Get() << "HTTP Transport: Response code : " << std::to_string(responseCode) << MonLogger::End();
}
}

} // namespace transports
} // namespace monitoring
} // namespace o2
54 changes: 54 additions & 0 deletions src/Transports/HTTP.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
///
/// \file HTTP.h
/// \author Adam Wegrzynek <adam.wegrzynek@cern.ch>
///

#ifndef ALICEO2_MONITORING_TRANSPORTS_HTTP_H
#define ALICEO2_MONITORING_TRANSPORTS_HTTP_H

#include "TransportInterface.h"

#include <curl/curl.h>
#include <string>

namespace o2
{
/// ALICE O2 Monitoring system
namespace monitoring
{
/// Monitoring transports
namespace transports
{

/// \brief HTTP POST transport
///
/// Allows to push string formatted metrics as HTTP POST requests via cURL
class HTTP : public TransportInterface
{
public:
/// Constructor
/// \param url URL of HTTP server endpoint
HTTP(const std::string& url);

/// Destructor
~HTTP();

/// Sends metric via HTTP POST
/// \param post r-value reference string formatted metric
void send(std::string&& post);

/// Adds custom HTTP header
void addHeader(const std::string& header);
private:
/// CURL pointers
CURL *mCurl;

/// HTTP headers struct
struct curl_slist *mHeaders;
};

} // namespace transports
} // namespace monitoring
} // namespace o2

#endif // ALICEO2_MONITORING_TRANSPORTS_HTTP_H
4 changes: 2 additions & 2 deletions src/UriParser/UriParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
namespace http
{
struct url {
std::string protocol, user, password, host, path, search;
std::string protocol, user, password, host, path, search, url;
int port;
};

Expand Down Expand Up @@ -89,7 +89,7 @@ static inline url ParseHttpUrl(std::string& in)
{
url ret;
ret.port = -1;

ret.url = in;
ret.protocol = ExtractProtocol(in);
ret.search = ExtractSearch(in);
ret.path = ExtractPath(in);
Expand Down
6 changes: 5 additions & 1 deletion test/testInfluxDb.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace monitoring
{
namespace Test
{

BOOST_AUTO_TEST_CASE(simplySendMetric)
{
auto monitoring = MonitoringFactory::Get("influxdb-udp://localhost:1000");
Expand All @@ -32,6 +31,11 @@ BOOST_AUTO_TEST_CASE(simplySendMetric2)
monitoring->send(Metric{10, "myCrazyMetric"});
}

BOOST_AUTO_TEST_CASE(InfluxDbv2)
{
auto monitoring = MonitoringFactory::Get("influxdbv2://localhost:9999?org=cern&bucket=test&token=TOKEN");
}

} // namespace Test
} // namespace monitoring
} // namespace o2

0 comments on commit a4e4c9e

Please sign in to comment.