Skip to content

Commit

Permalink
Add support for propagation via C++ streams (#54)
Browse files Browse the repository at this point in the history
* Add support for propagation via C++ streams

* Hardening and additional tests.
  • Loading branch information
cgilmour authored Sep 24, 2018
1 parent c458452 commit 94110cb
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 14 deletions.
88 changes: 88 additions & 0 deletions src/propagation.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "propagation.h"
#include <algorithm>
#include <iostream>
#include <nlohmann/json.hpp>

namespace ot = opentracing;
using json = nlohmann::json;

namespace datadog {
namespace opentracing {
Expand All @@ -17,6 +19,12 @@ const std::string sampling_priority_header = "x-datadog-sampling-priority";
// interop.
const ot::string_view baggage_prefix = "ot-baggage-";

// Key names for binary serialization in JSON
const std::string json_trace_id_key = "trace_id";
const std::string json_parent_id_key = "parent_id";
const std::string json_sampling_priority_key = "sampling_priority";
const std::string json_baggage_key = "baggage";

// Does what it says on the tin. Just looks at each char, so don't try and use this on
// unicode strings, only used for comparing HTTP header names.
// Rolled my own because I don't want to import all of libboost for a couple of functions!
Expand Down Expand Up @@ -139,6 +147,30 @@ SpanContext SpanContext::withId(uint64_t id) const {
return SpanContext{id, trace_id_, std::move(p), std::move(baggage)};
}

ot::expected<void> SpanContext::serialize(std::ostream &writer) const {
// check ostream state
if (!writer.good()) {
return ot::make_unexpected(std::make_error_code(std::errc::io_error));
}

json j;
// JSON numbers only support 64bit IEEE 754, so we encode these as strings.
j[json_trace_id_key] = std::to_string(trace_id_);
j[json_parent_id_key] = std::to_string(id_);
if (sampling_priority_ != nullptr) {
j[json_sampling_priority_key] = static_cast<int>(*sampling_priority_);
}
j[json_baggage_key] = baggage_;

writer << j.dump();
// check ostream state
if (!writer.good()) {
return ot::make_unexpected(std::make_error_code(std::errc::io_error));
}

return {};
}

ot::expected<void> SpanContext::serialize(const ot::TextMapWriter &writer) const {
std::lock_guard<std::mutex> lock{mutex_};
auto result = writer.Set(trace_id_header, std::to_string(trace_id_));
Expand Down Expand Up @@ -176,6 +208,62 @@ ot::expected<void> SpanContext::serialize(const ot::TextMapWriter &writer) const
return result;
}

ot::expected<std::unique_ptr<ot::SpanContext>> SpanContext::deserialize(std::istream &reader) try {
// check istream state
if (!reader.good()) {
return ot::make_unexpected(std::make_error_code(std::errc::io_error));
}

// Check for the case when no span is encoded.
if (reader.eof()) {
return {};
}

uint64_t trace_id, parent_id;
OptionalSamplingPriority sampling_priority = nullptr;
std::unordered_map<std::string, std::string> baggage;
json j;

reader >> j;
bool trace_id_set = j.find(json_trace_id_key) != j.end();
bool parent_id_set = j.find(json_parent_id_key) != j.end();

if (!trace_id_set && !parent_id_set) {
// both ids empty, return empty context
return {};
}
if (!trace_id_set || !parent_id_set) {
// missing one id, return unexpected error
return ot::make_unexpected(ot::span_context_corrupted_error);
}

std::string trace_id_str = j[json_trace_id_key];
std::string parent_id_str = j[json_parent_id_key];
trace_id = std::stoull(trace_id_str);
parent_id = std::stoull(parent_id_str);

if (j.find(json_sampling_priority_key) != j.end()) {
sampling_priority = asSamplingPriority(j[json_sampling_priority_key]);
if (sampling_priority == nullptr) {
// sampling priority value not valid, return unexpected error
return ot::make_unexpected(ot::span_context_corrupted_error);
}
}
if (j.find(json_baggage_key) != j.end()) {
baggage = j[json_baggage_key].get<std::unordered_map<std::string, std::string>>();
}

return std::unique_ptr<ot::SpanContext>(std::make_unique<SpanContext>(
parent_id, trace_id, std::move(sampling_priority), std::move(baggage)));

} catch (const json::parse_error &) {
return ot::make_unexpected(std::make_error_code(std::errc::invalid_argument));
} catch (const std::invalid_argument &ia) {
return ot::make_unexpected(ot::span_context_corrupted_error);
} catch (const std::bad_alloc &) {
return ot::make_unexpected(std::make_error_code(std::errc::not_enough_memory));
}

ot::expected<std::unique_ptr<ot::SpanContext>> SpanContext::deserialize(
const ot::TextMapReader &reader) try {
uint64_t trace_id, parent_id;
Expand Down
2 changes: 2 additions & 0 deletions src/propagation.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ class SpanContext : public ot::SpanContext {
std::string baggageItem(ot::string_view key) const;

// Serializes the context into the given writer.
ot::expected<void> serialize(std::ostream &writer) const;
ot::expected<void> serialize(const ot::TextMapWriter &writer) const;

SpanContext withId(uint64_t id) const;

// Returns a new context from the given reader.
static ot::expected<std::unique_ptr<ot::SpanContext>> deserialize(std::istream &reader);
static ot::expected<std::unique_ptr<ot::SpanContext>> deserialize(
const ot::TextMapReader &reader);

Expand Down
15 changes: 2 additions & 13 deletions src/tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ std::unique_ptr<ot::Span> Tracer::StartSpanWithOptions(ot::string_view operation
}

ot::expected<void> Tracer::Inject(const ot::SpanContext &sc, std::ostream &writer) const {
return ot::make_unexpected(ot::invalid_carrier_error);
return inject(sc, writer);
}

ot::expected<void> Tracer::Inject(const ot::SpanContext &sc,
Expand All @@ -111,19 +111,8 @@ ot::expected<void> Tracer::Inject(const ot::SpanContext &sc,
return inject(sc, writer);
}

ot::expected<void> Tracer::inject(const ot::SpanContext &sc, const ot::TextMapWriter &writer) const
try {
auto span_context = dynamic_cast<const SpanContext *>(&sc);
if (span_context == nullptr) {
return ot::make_unexpected(ot::invalid_span_context_error);
}
return span_context->serialize(writer);
} catch (const std::bad_alloc &) {
return ot::make_unexpected(std::make_error_code(std::errc::not_enough_memory));
}

ot::expected<std::unique_ptr<ot::SpanContext>> Tracer::Extract(std::istream &reader) const {
return ot::make_unexpected(ot::invalid_carrier_error);
return SpanContext::deserialize(reader);
}

ot::expected<std::unique_ptr<ot::SpanContext>> Tracer::Extract(
Expand Down
11 changes: 10 additions & 1 deletion src/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ class Tracer : public ot::Tracer, public std::enable_shared_from_this<Tracer> {
void Close() noexcept override;

private:
ot::expected<void> inject(const ot::SpanContext &sc, const ot::TextMapWriter &writer) const;
template <class Writer>
ot::expected<void> inject(const ot::SpanContext &sc, Writer &writer) const try {
auto span_context = dynamic_cast<const SpanContext *>(&sc);
if (span_context == nullptr) {
return ot::make_unexpected(ot::invalid_span_context_error);
}
return span_context->serialize(writer);
} catch (const std::bad_alloc &) {
return ot::make_unexpected(std::make_error_code(std::errc::not_enough_memory));
}

const TracerOptions opts_;
// Keeps finished spans until their entire trace is finished.
Expand Down
63 changes: 63 additions & 0 deletions test/propagation_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,66 @@ TEST_CASE("SpanContext") {
}
}
}

TEST_CASE("Binary Span Context") {
std::stringstream carrier{};
SpanContext context{420,
123,
std::make_unique<SamplingPriority>(SamplingPriority::SamplerKeep),
{{"ayy", "lmao"}, {"hi", "haha"}}};

SECTION("can be serialized") {
REQUIRE(context.serialize(carrier));

SECTION("can be deserialized") {
auto sc = SpanContext::deserialize(carrier);
auto received_context = dynamic_cast<SpanContext*>(sc->get());
REQUIRE(received_context);
REQUIRE(received_context->id() == 420);
REQUIRE(received_context->trace_id() == 123);
REQUIRE(received_context->getSamplingPriority() != nullptr);
REQUIRE(*received_context->getSamplingPriority() == SamplingPriority::SamplerKeep);
REQUIRE(getBaggage(received_context) == dict{{"ayy", "lmao"}, {"hi", "haha"}});
}
}

SECTION("serialise fails") {
SECTION("when the writer is not 'good'") {
carrier.clear(carrier.badbit);
auto err = context.serialize(carrier);
REQUIRE(!err);
REQUIRE(err.error() == std::make_error_code(std::errc::io_error));
carrier.clear(carrier.goodbit);
}
}

SECTION("deserialize fails") {
SECTION("when trace_id is missing") {
carrier << "{ \"parent_id\": \"420\" }";
auto err = SpanContext::deserialize(carrier);
REQUIRE(!err);
REQUIRE(err.error() == ot::span_context_corrupted_error);
}

SECTION("when parent_id is missing") {
carrier << "{ \"trace_id\": \"123\" }";
auto err = SpanContext::deserialize(carrier);
REQUIRE(!err);
REQUIRE(err.error() == ot::span_context_corrupted_error);
}

SECTION("when the sampling priority is whack") {
carrier << "{ \"trace_id\": \"123\", \"parent_id\": \"420\", \"sampling_priority\": 42 }";
auto err = SpanContext::deserialize(carrier);
REQUIRE(!err);
REQUIRE(err.error() == ot::span_context_corrupted_error);
}

SECTION("when given invalid json data") {
carrier << "something that isn't JSON";
auto err = SpanContext::deserialize(carrier);
REQUIRE(!err);
REQUIRE(err.error() == std::make_error_code(std::errc::invalid_argument));
}
}
}

0 comments on commit 94110cb

Please sign in to comment.