From 7f9820168c305ae2824d2c0ee3919469366d5c42 Mon Sep 17 00:00:00 2001 From: Caleb Gilmour Date: Wed, 15 Apr 2020 12:17:13 +1200 Subject: [PATCH] Add support for traces sent using UNIX domain sockets (#133) --- include/datadog/opentracing.h | 5 +++ src/agent_writer.cpp | 72 ++++++++++++++++++++++++++++------- src/agent_writer.h | 10 +++-- src/opentracing_agent.cpp | 2 +- src/tracer_factory.cpp | 5 ++- src/tracer_options.cpp | 5 +++ test/agent_writer_test.cpp | 50 ++++++++++++++++++++---- test/tracer_factory_test.cpp | 41 +++++++++++++++++++- 8 files changed, 163 insertions(+), 27 deletions(-) diff --git a/include/datadog/opentracing.h b/include/datadog/opentracing.h index 63f234d7..8d5021ef 100644 --- a/include/datadog/opentracing.h +++ b/include/datadog/opentracing.h @@ -82,6 +82,11 @@ struct TracerOptions { // The version of the overall application being traced. Can also be set by the environment // variable DD_VERSION. std::string version = ""; + // The URL to use for submitting traces to the agent. If set, this will be used instead of + // agent_host / agent_port. This URL supports http, https and unix address schemes. + // If no scheme is set in the URL, a path to a UNIX domain socket is assumed. + // Can also be set by the environment variable DD_TRACE_AGENT_URL. + std::string agent_url = ""; }; // TraceEncoder exposes the data required to encode and submit traces to the diff --git a/src/agent_writer.cpp b/src/agent_writer.cpp index ef5d7ed7..b1466c89 100644 --- a/src/agent_writer.cpp +++ b/src/agent_writer.cpp @@ -20,38 +20,84 @@ const std::vector default_retry_periods{ const long default_timeout_ms = 2000L; } // namespace -AgentWriter::AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period, +AgentWriter::AgentWriter(std::string host, uint32_t port, std::string url, + std::chrono::milliseconds write_period, std::shared_ptr sampler) : AgentWriter(std::unique_ptr{new CurlHandle{}}, write_period, max_queued_traces, - default_retry_periods, host, port, sampler){}; + default_retry_periods, host, port, url, sampler){}; AgentWriter::AgentWriter(std::unique_ptr handle, std::chrono::milliseconds write_period, size_t max_queued_traces, std::vector retry_periods, std::string host, - uint32_t port, std::shared_ptr sampler) + uint32_t port, std::string url, std::shared_ptr sampler) : Writer(sampler), write_period_(write_period), max_queued_traces_(max_queued_traces), retry_periods_(retry_periods) { - setUpHandle(handle, host, port); + setUpHandle(handle, host, port, url); startWriting(std::move(handle)); } -void AgentWriter::setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port) { +void AgentWriter::setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port, + std::string url) { // Some options are the same for all actions, set them here. - // Set the agent URI. - std::stringstream agent_uri; - agent_uri << agent_protocol << host << ":" << std::to_string(port) << trace_encoder_->path(); - auto rcode = handle->setopt(CURLOPT_URL, agent_uri.str().c_str()); - if (rcode != CURLE_OK) { - throw std::runtime_error(std::string("Unable to set agent URL: ") + curl_easy_strerror(rcode)); + // Set the agent URL. + // The URL can be either + // - http://host + // - http://host:port + // - https://host + // - https://host:port + // - unix:///path/to/trace-agent.socket + // - /path/to/trace-agent.socket + bool urlopt_set = false; + if (!url.empty()) { + const std::string http_scheme = "http://"; + const std::string https_scheme = "https://"; + const std::string unix_scheme = "unix://"; + if (url.substr(0, http_scheme.size()) == http_scheme || + url.substr(0, https_scheme.size()) == https_scheme) { + std::string agent_uri = url + trace_encoder_->path(); + // http:// or https:// + auto rcode = handle->setopt(CURLOPT_URL, agent_uri.c_str()); + if (rcode != CURLE_OK) { + throw std::runtime_error(std::string("Unable to set agent URL: ") + + curl_easy_strerror(rcode)); + } + urlopt_set = true; + } else if (url.substr(0, unix_scheme.size()) == unix_scheme) { + // unix:// + url = url.substr(unix_scheme.size()); + auto rcode = handle->setopt(CURLOPT_UNIX_SOCKET_PATH, url.c_str()); + if (rcode != CURLE_OK) { + throw std::runtime_error(std::string("Unable to set unix socket path: ") + + curl_easy_strerror(rcode)); + } + } else if (url.substr(0, 1) == "/") { + // plain file path + auto rcode = handle->setopt(CURLOPT_UNIX_SOCKET_PATH, url.c_str()); + if (rcode != CURLE_OK) { + throw std::runtime_error(std::string("Unable to set unix socket path: ") + + curl_easy_strerror(rcode)); + } + } else { + throw std::runtime_error(std::string("Unable to set agent URL: unknown url scheme: " + url)); + } } - rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms); + if (!urlopt_set) { + std::string agent_uri = + agent_protocol + host + ":" + std::to_string(port) + trace_encoder_->path(); + auto rcode = handle->setopt(CURLOPT_URL, agent_uri.c_str()); + if (rcode != CURLE_OK) { + throw std::runtime_error(std::string("Unable to set agent URL: ") + + curl_easy_strerror(rcode)); + } + } + auto rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms); if (rcode != CURLE_OK) { throw std::runtime_error(std::string("Unable to set agent timeout: ") + curl_easy_strerror(rcode)); } -} // namespace opentracing +} AgentWriter::~AgentWriter() { stop(); } diff --git a/src/agent_writer.h b/src/agent_writer.h index 1b559b90..e370a7d0 100644 --- a/src/agent_writer.h +++ b/src/agent_writer.h @@ -23,12 +23,13 @@ class AgentWriter : public Writer { public: // Creates an AgentWriter that uses curl to send Traces to a Datadog agent. May throw a // runtime_exception. - AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period, - std::shared_ptr sampler); + AgentWriter(std::string host, uint32_t port, std::string unix_socket, + std::chrono::milliseconds write_period, std::shared_ptr sampler); AgentWriter(std::unique_ptr handle, std::chrono::milliseconds write_period, size_t max_queued_traces, std::vector retry_periods, - std::string host, uint32_t port, std::shared_ptr sampler); + std::string host, uint32_t port, std::string unix_socket, + std::shared_ptr sampler); // Does not flush on destruction, buffered traces may be lost. Stops all threads. ~AgentWriter() override; @@ -44,7 +45,8 @@ class AgentWriter : public Writer { private: // Initialises the curl handle. May throw a runtime_exception. - void setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port); + void setUpHandle(std::unique_ptr &handle, std::string host, uint32_t port, + std::string unix_socket); // Starts asynchronously writing traces. They will be written periodically (set by write_period_) // or when flush() is called manually. diff --git a/src/opentracing_agent.cpp b/src/opentracing_agent.cpp index 3299160b..ddd844ab 100644 --- a/src/opentracing_agent.cpp +++ b/src/opentracing_agent.cpp @@ -26,7 +26,7 @@ std::shared_ptr makeTracer(const TracerOptions &options) { auto sampler = std::make_shared(); auto writer = std::shared_ptr{ - new AgentWriter(opts.agent_host, opts.agent_port, + new AgentWriter(opts.agent_host, opts.agent_port, opts.agent_url, std::chrono::milliseconds(llabs(opts.write_period_ms)), sampler)}; return std::shared_ptr{new Tracer{opts, writer, sampler}}; } diff --git a/src/tracer_factory.cpp b/src/tracer_factory.cpp index 8294341d..79e85d93 100644 --- a/src/tracer_factory.cpp +++ b/src/tracer_factory.cpp @@ -33,6 +33,9 @@ ot::expected optionsFromConfig(const char *configuration, if (config.find("agent_port") != config.end()) { config.at("agent_port").get_to(options.agent_port); } + if (config.find("agent_url") != config.end()) { + config.at("agent_url").get_to(options.agent_url); + } if (config.find("type") != config.end()) { config.at("type").get_to(options.type); } @@ -133,7 +136,7 @@ ot::expected> TracerFactory::MakeTracer( auto sampler = std::make_shared(); auto writer = std::shared_ptr{ - new AgentWriter(options.agent_host, options.agent_port, + new AgentWriter(options.agent_host, options.agent_port, options.agent_url, std::chrono::milliseconds(llabs(options.write_period_ms)), sampler)}; return std::shared_ptr{new TracerImpl{options, writer, sampler}}; diff --git a/src/tracer_options.cpp b/src/tracer_options.cpp index 9691763a..e6452f73 100644 --- a/src/tracer_options.cpp +++ b/src/tracer_options.cpp @@ -129,6 +129,11 @@ ot::expected applyTracerOptionsFromEnvironment( opts.sampling_rules = sampling_rules; } + auto trace_agent_url = std::getenv("DD_TRACE_AGENT_URL"); + if (trace_agent_url != nullptr && std::strlen(trace_agent_url) > 0) { + opts.agent_url = trace_agent_url; + } + auto extract = std::getenv("DD_PROPAGATION_STYLE_EXTRACT"); if (extract != nullptr && std::strlen(extract) > 0) { std::stringstream words{extract}; diff --git a/test/agent_writer_test.cpp b/test/agent_writer_test.cpp index 907c9b0e..2676c9d5 100644 --- a/test/agent_writer_test.cpp +++ b/test/agent_writer_test.cpp @@ -17,6 +17,43 @@ Trace make_trace(std::initializer_list spans) { } TEST_CASE("writer") { + SECTION("initializes handle correctly") { + std::atomic handle_destructed{false}; + std::unique_ptr handle_ptr{new MockHandle{&handle_destructed}}; + MockHandle* handle = handle_ptr.get(); + auto sampler = std::make_shared(); + struct InitializationTestCase { + std::string host; + uint32_t port; + std::string url; + std::unordered_map expected_opts; + }; + auto test_case = GENERATE(values({ + {"hostname", 1234, "", {{CURLOPT_URL, "http://hostname:1234/v0.4/traces"}}}, + {"hostname", + 1234, + "http://override:5678", + {{CURLOPT_URL, "http://override:5678/v0.4/traces"}}}, + {"", 0, "https://localhost:8126", {{CURLOPT_URL, "https://localhost:8126/v0.4/traces"}}}, + {"localhost", + 8126, + "unix:///path/to/trace-agent.socket", + {{CURLOPT_UNIX_SOCKET_PATH, "/path/to/trace-agent.socket"}, + {CURLOPT_URL, "http://localhost:8126/v0.4/traces"}}}, + {"localhost", + 8126, + "/path/to/trace-agent.socket", + {{CURLOPT_UNIX_SOCKET_PATH, "/path/to/trace-agent.socket"}, + {CURLOPT_URL, "http://localhost:8126/v0.4/traces"}}}, + })); + test_case.expected_opts[CURLOPT_TIMEOUT_MS] = "2000"; + + AgentWriter writer{std::move(handle_ptr), std::chrono::seconds(1), 100, {}, + test_case.host, test_case.port, test_case.url, sampler}; + + REQUIRE(handle->options == test_case.expected_opts); + } + std::atomic handle_destructed{false}; std::unique_ptr handle_ptr{new MockHandle{&handle_destructed}}; MockHandle* handle = handle_ptr.get(); @@ -26,20 +63,16 @@ TEST_CASE("writer") { auto only_send_traces_when_we_flush = std::chrono::seconds(3600); size_t max_queued_traces = 25; std::vector disable_retry; + AgentWriter writer{std::move(handle_ptr), only_send_traces_when_we_flush, max_queued_traces, disable_retry, "hostname", 6319, + "", sampler}; - SECTION("initilises handle correctly") { - REQUIRE(handle->options == - std::unordered_map{ - {CURLOPT_URL, "http://hostname:6319/v0.4/traces"}, {CURLOPT_TIMEOUT_MS, "2000"}}); - } - SECTION("traces can be sent") { writer.write(make_trace( {TestSpanData{"web", "service", "resource", "service.name", 1, 1, 0, 69, 420, 0}})); @@ -142,7 +175,7 @@ TEST_CASE("writer") { std::unique_ptr handle_ptr{new MockHandle{}}; handle_ptr->rcode = CURLE_OPERATION_TIMEDOUT; REQUIRE_THROWS(AgentWriter{std::move(handle_ptr), only_send_traces_when_we_flush, - max_queued_traces, disable_retry, "hostname", 6319, + max_queued_traces, disable_retry, "hostname", 6319, "", std::make_shared()}); } @@ -253,6 +286,7 @@ TEST_CASE("writer") { disable_retry, "hostname", 6319, + "", std::make_shared()}; // Send 7 traces at 1 trace per second. Since the write period is 2s, there should be 4 // different writes. We don't count the number of writes because that could flake, but we do @@ -291,6 +325,7 @@ TEST_CASE("writer") { retry_periods, "hostname", 6319, + "", std::make_shared()}; // Redirect cerr, so the the terminal output doesn't imply failure. std::stringstream error_message; @@ -346,6 +381,7 @@ TEST_CASE("flush") { retry_periods, "hostname", 6319, + "", std::make_shared()}; // Redirect cerr, so the the terminal output doesn't imply failure. std::stringstream error_message; diff --git a/test/tracer_factory_test.cpp b/test/tracer_factory_test.cpp index c794f9c8..fcc95ba5 100644 --- a/test/tracer_factory_test.cpp +++ b/test/tracer_factory_test.cpp @@ -345,6 +345,7 @@ TEST_CASE("tracer factory") { REQUIRE(tracer->opts.service == "my-service"); REQUIRE(tracer->opts.type == "web"); } + SECTION("DD_TRACE_AGENT_PORT overrides default") { ::setenv("DD_TRACE_AGENT_PORT", "1234", 0); std::string input{R"( @@ -364,7 +365,6 @@ TEST_CASE("tracer factory") { REQUIRE(tracer->opts.service == "my-service"); REQUIRE(tracer->opts.type == "web"); } - SECTION("DD_TRACE_AGENT_PORT overrides configuration") { ::setenv("DD_TRACE_AGENT_PORT", "1234", 0); std::string input{R"( @@ -419,6 +419,7 @@ TEST_CASE("tracer factory") { REQUIRE(!result); REQUIRE(result.error() == std::make_error_code(std::errc::invalid_argument)); } + SECTION("DD_TRACE_SAMPLING_RULES overrides configuration") { std::string sampling_rules = R"([{"sample_rate":0.99}])"; ::setenv("DD_TRACE_SAMPLING_RULES", sampling_rules.c_str(), 0); @@ -438,6 +439,43 @@ TEST_CASE("tracer factory") { auto tracer = dynamic_cast(result->get()); REQUIRE(tracer->opts.sampling_rules == sampling_rules); } + + SECTION("DD_TRACE_AGENT_URL overrides default") { + ::setenv("DD_TRACE_AGENT_URL", "/path/to/trace-agent.socket", 0); + std::string input{R"( + { + "service": "my-service" + } + )"}; + std::string error = ""; + auto result = factory.MakeTracer(input.c_str(), error); + ::unsetenv("DD_TRACE_AGENT_URL"); + + REQUIRE(error == ""); + REQUIRE(result->get() != nullptr); + auto tracer = dynamic_cast(result->get()); + REQUIRE(tracer->opts.service == "my-service"); + REQUIRE(tracer->opts.agent_url == "/path/to/trace-agent.socket"); + } + SECTION("DD_TRACE_AGENT_URL overrides configuration") { + ::setenv("DD_TRACE_AGENT_URL", "/path/to/trace-agent.socket", 0); + std::string input{R"( + { + "service": "my-service", + "agent_url": "/configured/trace-agent.socket" + } + )"}; + std::string error = ""; + auto result = factory.MakeTracer(input.c_str(), error); + ::unsetenv("DD_TRACE_AGENT_URL"); + + REQUIRE(error == ""); + REQUIRE(result->get() != nullptr); + auto tracer = dynamic_cast(result->get()); + REQUIRE(tracer->opts.service == "my-service"); + REQUIRE(tracer->opts.agent_url == "/path/to/trace-agent.socket"); + } + SECTION("DD_TRACE_REPORT_HOSTNAME overrides default") { ::setenv("DD_TRACE_REPORT_HOSTNAME", "true", 0); std::string input{R"( @@ -471,6 +509,7 @@ TEST_CASE("tracer factory") { auto tracer = dynamic_cast(result->get()); REQUIRE(!tracer->opts.report_hostname); } + SECTION("DD_TRACE_ANALYTICS_ENABLED overrides default") { ::setenv("DD_TRACE_ANALYTICS_ENABLED", "true", 0); std::string input{R"(