Skip to content

Commit

Permalink
Add support for traces sent using UNIX domain sockets (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgilmour authored Apr 15, 2020
1 parent 0ec6af4 commit 7f98201
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 27 deletions.
5 changes: 5 additions & 0 deletions include/datadog/opentracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ struct TracerOptions {
// The version of the overall application being traced. Can also be set by the environment
// variable DD_VERSION.
std::string version = "";
// The URL to use for submitting traces to the agent. If set, this will be used instead of
// agent_host / agent_port. This URL supports http, https and unix address schemes.
// If no scheme is set in the URL, a path to a UNIX domain socket is assumed.
// Can also be set by the environment variable DD_TRACE_AGENT_URL.
std::string agent_url = "";
};

// TraceEncoder exposes the data required to encode and submit traces to the
Expand Down
72 changes: 59 additions & 13 deletions src/agent_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,84 @@ const std::vector<std::chrono::milliseconds> default_retry_periods{
const long default_timeout_ms = 2000L;
} // namespace

AgentWriter::AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period,
AgentWriter::AgentWriter(std::string host, uint32_t port, std::string url,
std::chrono::milliseconds write_period,
std::shared_ptr<RulesSampler> sampler)
: AgentWriter(std::unique_ptr<Handle>{new CurlHandle{}}, write_period, max_queued_traces,
default_retry_periods, host, port, sampler){};
default_retry_periods, host, port, url, sampler){};

AgentWriter::AgentWriter(std::unique_ptr<Handle> handle, std::chrono::milliseconds write_period,
size_t max_queued_traces,
std::vector<std::chrono::milliseconds> retry_periods, std::string host,
uint32_t port, std::shared_ptr<RulesSampler> sampler)
uint32_t port, std::string url, std::shared_ptr<RulesSampler> sampler)
: Writer(sampler),
write_period_(write_period),
max_queued_traces_(max_queued_traces),
retry_periods_(retry_periods) {
setUpHandle(handle, host, port);
setUpHandle(handle, host, port, url);
startWriting(std::move(handle));
}

void AgentWriter::setUpHandle(std::unique_ptr<Handle> &handle, std::string host, uint32_t port) {
void AgentWriter::setUpHandle(std::unique_ptr<Handle> &handle, std::string host, uint32_t port,
std::string url) {
// Some options are the same for all actions, set them here.
// Set the agent URI.
std::stringstream agent_uri;
agent_uri << agent_protocol << host << ":" << std::to_string(port) << trace_encoder_->path();
auto rcode = handle->setopt(CURLOPT_URL, agent_uri.str().c_str());
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set agent URL: ") + curl_easy_strerror(rcode));
// Set the agent URL.
// The URL can be either
// - http://host
// - http://host:port
// - https://host
// - https://host:port
// - unix:///path/to/trace-agent.socket
// - /path/to/trace-agent.socket
bool urlopt_set = false;
if (!url.empty()) {
const std::string http_scheme = "http://";
const std::string https_scheme = "https://";
const std::string unix_scheme = "unix://";
if (url.substr(0, http_scheme.size()) == http_scheme ||
url.substr(0, https_scheme.size()) == https_scheme) {
std::string agent_uri = url + trace_encoder_->path();
// http:// or https://
auto rcode = handle->setopt(CURLOPT_URL, agent_uri.c_str());
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set agent URL: ") +
curl_easy_strerror(rcode));
}
urlopt_set = true;
} else if (url.substr(0, unix_scheme.size()) == unix_scheme) {
// unix://
url = url.substr(unix_scheme.size());
auto rcode = handle->setopt(CURLOPT_UNIX_SOCKET_PATH, url.c_str());
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set unix socket path: ") +
curl_easy_strerror(rcode));
}
} else if (url.substr(0, 1) == "/") {
// plain file path
auto rcode = handle->setopt(CURLOPT_UNIX_SOCKET_PATH, url.c_str());
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set unix socket path: ") +
curl_easy_strerror(rcode));
}
} else {
throw std::runtime_error(std::string("Unable to set agent URL: unknown url scheme: " + url));
}
}
rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms);
if (!urlopt_set) {
std::string agent_uri =
agent_protocol + host + ":" + std::to_string(port) + trace_encoder_->path();
auto rcode = handle->setopt(CURLOPT_URL, agent_uri.c_str());
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set agent URL: ") +
curl_easy_strerror(rcode));
}
}
auto rcode = handle->setopt(CURLOPT_TIMEOUT_MS, default_timeout_ms);
if (rcode != CURLE_OK) {
throw std::runtime_error(std::string("Unable to set agent timeout: ") +
curl_easy_strerror(rcode));
}
} // namespace opentracing
}

AgentWriter::~AgentWriter() { stop(); }

Expand Down
10 changes: 6 additions & 4 deletions src/agent_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ class AgentWriter : public Writer {
public:
// Creates an AgentWriter that uses curl to send Traces to a Datadog agent. May throw a
// runtime_exception.
AgentWriter(std::string host, uint32_t port, std::chrono::milliseconds write_period,
std::shared_ptr<RulesSampler> sampler);
AgentWriter(std::string host, uint32_t port, std::string unix_socket,
std::chrono::milliseconds write_period, std::shared_ptr<RulesSampler> sampler);

AgentWriter(std::unique_ptr<Handle> handle, std::chrono::milliseconds write_period,
size_t max_queued_traces, std::vector<std::chrono::milliseconds> retry_periods,
std::string host, uint32_t port, std::shared_ptr<RulesSampler> sampler);
std::string host, uint32_t port, std::string unix_socket,
std::shared_ptr<RulesSampler> sampler);

// Does not flush on destruction, buffered traces may be lost. Stops all threads.
~AgentWriter() override;
Expand All @@ -44,7 +45,8 @@ class AgentWriter : public Writer {

private:
// Initialises the curl handle. May throw a runtime_exception.
void setUpHandle(std::unique_ptr<Handle> &handle, std::string host, uint32_t port);
void setUpHandle(std::unique_ptr<Handle> &handle, std::string host, uint32_t port,
std::string unix_socket);

// Starts asynchronously writing traces. They will be written periodically (set by write_period_)
// or when flush() is called manually.
Expand Down
2 changes: 1 addition & 1 deletion src/opentracing_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ std::shared_ptr<ot::Tracer> makeTracer(const TracerOptions &options) {

auto sampler = std::make_shared<RulesSampler>();
auto writer = std::shared_ptr<Writer>{
new AgentWriter(opts.agent_host, opts.agent_port,
new AgentWriter(opts.agent_host, opts.agent_port, opts.agent_url,
std::chrono::milliseconds(llabs(opts.write_period_ms)), sampler)};
return std::shared_ptr<ot::Tracer>{new Tracer{opts, writer, sampler}};
}
Expand Down
5 changes: 4 additions & 1 deletion src/tracer_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ ot::expected<TracerOptions> optionsFromConfig(const char *configuration,
if (config.find("agent_port") != config.end()) {
config.at("agent_port").get_to(options.agent_port);
}
if (config.find("agent_url") != config.end()) {
config.at("agent_url").get_to(options.agent_url);
}
if (config.find("type") != config.end()) {
config.at("type").get_to(options.type);
}
Expand Down Expand Up @@ -133,7 +136,7 @@ ot::expected<std::shared_ptr<ot::Tracer>> TracerFactory<TracerImpl>::MakeTracer(

auto sampler = std::make_shared<RulesSampler>();
auto writer = std::shared_ptr<Writer>{
new AgentWriter(options.agent_host, options.agent_port,
new AgentWriter(options.agent_host, options.agent_port, options.agent_url,
std::chrono::milliseconds(llabs(options.write_period_ms)), sampler)};

return std::shared_ptr<ot::Tracer>{new TracerImpl{options, writer, sampler}};
Expand Down
5 changes: 5 additions & 0 deletions src/tracer_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ ot::expected<TracerOptions, const char *> applyTracerOptionsFromEnvironment(
opts.sampling_rules = sampling_rules;
}

auto trace_agent_url = std::getenv("DD_TRACE_AGENT_URL");
if (trace_agent_url != nullptr && std::strlen(trace_agent_url) > 0) {
opts.agent_url = trace_agent_url;
}

auto extract = std::getenv("DD_PROPAGATION_STYLE_EXTRACT");
if (extract != nullptr && std::strlen(extract) > 0) {
std::stringstream words{extract};
Expand Down
50 changes: 43 additions & 7 deletions test/agent_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,43 @@ Trace make_trace(std::initializer_list<TestSpanData> spans) {
}

TEST_CASE("writer") {
SECTION("initializes handle correctly") {
std::atomic<bool> handle_destructed{false};
std::unique_ptr<MockHandle> handle_ptr{new MockHandle{&handle_destructed}};
MockHandle* handle = handle_ptr.get();
auto sampler = std::make_shared<RulesSampler>();
struct InitializationTestCase {
std::string host;
uint32_t port;
std::string url;
std::unordered_map<CURLoption, std::string, EnumClassHash> expected_opts;
};
auto test_case = GENERATE(values<InitializationTestCase>({
{"hostname", 1234, "", {{CURLOPT_URL, "http://hostname:1234/v0.4/traces"}}},
{"hostname",
1234,
"http://override:5678",
{{CURLOPT_URL, "http://override:5678/v0.4/traces"}}},
{"", 0, "https://localhost:8126", {{CURLOPT_URL, "https://localhost:8126/v0.4/traces"}}},
{"localhost",
8126,
"unix:///path/to/trace-agent.socket",
{{CURLOPT_UNIX_SOCKET_PATH, "/path/to/trace-agent.socket"},
{CURLOPT_URL, "http://localhost:8126/v0.4/traces"}}},
{"localhost",
8126,
"/path/to/trace-agent.socket",
{{CURLOPT_UNIX_SOCKET_PATH, "/path/to/trace-agent.socket"},
{CURLOPT_URL, "http://localhost:8126/v0.4/traces"}}},
}));
test_case.expected_opts[CURLOPT_TIMEOUT_MS] = "2000";

AgentWriter writer{std::move(handle_ptr), std::chrono::seconds(1), 100, {},
test_case.host, test_case.port, test_case.url, sampler};

REQUIRE(handle->options == test_case.expected_opts);
}

std::atomic<bool> handle_destructed{false};
std::unique_ptr<MockHandle> handle_ptr{new MockHandle{&handle_destructed}};
MockHandle* handle = handle_ptr.get();
Expand All @@ -26,20 +63,16 @@ TEST_CASE("writer") {
auto only_send_traces_when_we_flush = std::chrono::seconds(3600);
size_t max_queued_traces = 25;
std::vector<std::chrono::milliseconds> disable_retry;

AgentWriter writer{std::move(handle_ptr),
only_send_traces_when_we_flush,
max_queued_traces,
disable_retry,
"hostname",
6319,
"",
sampler};

SECTION("initilises handle correctly") {
REQUIRE(handle->options ==
std::unordered_map<CURLoption, std::string, EnumClassHash>{
{CURLOPT_URL, "http://hostname:6319/v0.4/traces"}, {CURLOPT_TIMEOUT_MS, "2000"}});
}

SECTION("traces can be sent") {
writer.write(make_trace(
{TestSpanData{"web", "service", "resource", "service.name", 1, 1, 0, 69, 420, 0}}));
Expand Down Expand Up @@ -142,7 +175,7 @@ TEST_CASE("writer") {
std::unique_ptr<MockHandle> handle_ptr{new MockHandle{}};
handle_ptr->rcode = CURLE_OPERATION_TIMEDOUT;
REQUIRE_THROWS(AgentWriter{std::move(handle_ptr), only_send_traces_when_we_flush,
max_queued_traces, disable_retry, "hostname", 6319,
max_queued_traces, disable_retry, "hostname", 6319, "",
std::make_shared<RulesSampler>()});
}

Expand Down Expand Up @@ -253,6 +286,7 @@ TEST_CASE("writer") {
disable_retry,
"hostname",
6319,
"",
std::make_shared<RulesSampler>()};
// Send 7 traces at 1 trace per second. Since the write period is 2s, there should be 4
// different writes. We don't count the number of writes because that could flake, but we do
Expand Down Expand Up @@ -291,6 +325,7 @@ TEST_CASE("writer") {
retry_periods,
"hostname",
6319,
"",
std::make_shared<RulesSampler>()};
// Redirect cerr, so the the terminal output doesn't imply failure.
std::stringstream error_message;
Expand Down Expand Up @@ -346,6 +381,7 @@ TEST_CASE("flush") {
retry_periods,
"hostname",
6319,
"",
std::make_shared<RulesSampler>()};
// Redirect cerr, so the the terminal output doesn't imply failure.
std::stringstream error_message;
Expand Down
41 changes: 40 additions & 1 deletion test/tracer_factory_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ TEST_CASE("tracer factory") {
REQUIRE(tracer->opts.service == "my-service");
REQUIRE(tracer->opts.type == "web");
}

SECTION("DD_TRACE_AGENT_PORT overrides default") {
::setenv("DD_TRACE_AGENT_PORT", "1234", 0);
std::string input{R"(
Expand All @@ -364,7 +365,6 @@ TEST_CASE("tracer factory") {
REQUIRE(tracer->opts.service == "my-service");
REQUIRE(tracer->opts.type == "web");
}

SECTION("DD_TRACE_AGENT_PORT overrides configuration") {
::setenv("DD_TRACE_AGENT_PORT", "1234", 0);
std::string input{R"(
Expand Down Expand Up @@ -419,6 +419,7 @@ TEST_CASE("tracer factory") {
REQUIRE(!result);
REQUIRE(result.error() == std::make_error_code(std::errc::invalid_argument));
}

SECTION("DD_TRACE_SAMPLING_RULES overrides configuration") {
std::string sampling_rules = R"([{"sample_rate":0.99}])";
::setenv("DD_TRACE_SAMPLING_RULES", sampling_rules.c_str(), 0);
Expand All @@ -438,6 +439,43 @@ TEST_CASE("tracer factory") {
auto tracer = dynamic_cast<MockTracer *>(result->get());
REQUIRE(tracer->opts.sampling_rules == sampling_rules);
}

SECTION("DD_TRACE_AGENT_URL overrides default") {
::setenv("DD_TRACE_AGENT_URL", "/path/to/trace-agent.socket", 0);
std::string input{R"(
{
"service": "my-service"
}
)"};
std::string error = "";
auto result = factory.MakeTracer(input.c_str(), error);
::unsetenv("DD_TRACE_AGENT_URL");

REQUIRE(error == "");
REQUIRE(result->get() != nullptr);
auto tracer = dynamic_cast<MockTracer *>(result->get());
REQUIRE(tracer->opts.service == "my-service");
REQUIRE(tracer->opts.agent_url == "/path/to/trace-agent.socket");
}
SECTION("DD_TRACE_AGENT_URL overrides configuration") {
::setenv("DD_TRACE_AGENT_URL", "/path/to/trace-agent.socket", 0);
std::string input{R"(
{
"service": "my-service",
"agent_url": "/configured/trace-agent.socket"
}
)"};
std::string error = "";
auto result = factory.MakeTracer(input.c_str(), error);
::unsetenv("DD_TRACE_AGENT_URL");

REQUIRE(error == "");
REQUIRE(result->get() != nullptr);
auto tracer = dynamic_cast<MockTracer *>(result->get());
REQUIRE(tracer->opts.service == "my-service");
REQUIRE(tracer->opts.agent_url == "/path/to/trace-agent.socket");
}

SECTION("DD_TRACE_REPORT_HOSTNAME overrides default") {
::setenv("DD_TRACE_REPORT_HOSTNAME", "true", 0);
std::string input{R"(
Expand Down Expand Up @@ -471,6 +509,7 @@ TEST_CASE("tracer factory") {
auto tracer = dynamic_cast<MockTracer *>(result->get());
REQUIRE(!tracer->opts.report_hostname);
}

SECTION("DD_TRACE_ANALYTICS_ENABLED overrides default") {
::setenv("DD_TRACE_ANALYTICS_ENABLED", "true", 0);
std::string input{R"(
Expand Down

0 comments on commit 7f98201

Please sign in to comment.