Skip to content

Commit

Permalink
Consume trace iterator trusting the v2 storage api contracts
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Emonueje Ebenezer <eebenezer949@gmail.com>
  • Loading branch information
ekefan committed Dec 24, 2024
1 parent e954c17 commit 4727e37
Showing 1 changed file with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
package v1adapter

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
)

// PTracesSeq2ToModel consumes an otel trace iterator and returns a jaeger model trace.
// PTracesSeq2ToModel consumes an otel trace iterator and returns a model trace.
//
// When necessary, it groups chunks of traces into one single trace
// When necessary, it groups chunks of a trace into a single model trace
func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
var (
jaegerTraces []*model.Trace
err error
tracesByID map[pcommon.TraceID]*model.Trace
err error
lastTraceID *model.TraceID
lastTrace *model.Trace
)
jaegerTraces := []*model.Trace{}

seqTrace(func(otelTraces []ptrace.Traces, e error) bool {
if e != nil {
Expand All @@ -30,14 +31,17 @@ func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Tr

for _, otelTrace := range otelTraces {
spans := modelSpansFromOtelTrace(otelTrace)
for _, span := range spans {
traceId := span.TraceID.ToOTELTraceID()
if _, exists := tracesByID[traceId]; !exists {
tracesByID[traceId] = &model.Trace{}
}
trace := tracesByID[traceId]
trace.Spans = append(trace.Spans, span)
tracesByID[traceId] = trace
if len(spans) == 0 {
continue
}
currentTraceID := spans[0].TraceID
if lastTraceID != nil && *lastTraceID == currentTraceID {
lastTrace.Spans = append(lastTrace.Spans, spans...)
} else {
newTrace := &model.Trace{Spans: spans}
lastTraceID = &currentTraceID
lastTrace = newTrace
jaegerTraces = append(jaegerTraces, lastTrace)
}
}
return true
Expand All @@ -47,8 +51,8 @@ func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Tr
return nil, err
}

for _, trace := range tracesByID {
jaegerTraces = append(jaegerTraces, trace)
if len(jaegerTraces) == 0 {
return nil, spanstore.ErrTraceNotFound
}
return jaegerTraces, nil
}
Expand All @@ -58,7 +62,10 @@ func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span {
spans := []*model.Span{}
batches := otel2model.ProtoFromTraces(otelTrace)
for _, batch := range batches {
spans = append(spans, batch.Spans...)
for _, span := range batch.Spans {
span.Process = batch.Process
spans = append(spans, span)
}
}
return spans
}

0 comments on commit 4727e37

Please sign in to comment.