Skip to content

Commit

Permalink
Simplify code. Add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Sep 10, 2024
1 parent 0ace6ec commit eee87d3
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 58 deletions.
118 changes: 63 additions & 55 deletions tracing/deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,32 @@ type DeferredTracer struct {
}

type DeferredSpan struct {
name string
err error
start time.Time
end time.Time
opts []trace.SpanStartOption
children []*DeferredSpan
flushed bool
name string
err error
end time.Time
opts []trace.SpanStartOption
children []*DeferredSpan
spanContext trace.SpanContext
flushed bool
}

// NewTracer creates a DeferredTracer instance.
func NewTracer() *DeferredTracer {
// NewDeferredTracer creates a DeferredTracer instance.
func NewDeferredTracer() *DeferredTracer {
return new(DeferredTracer)
}

func (t *DeferredTracer) StartSpan(opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
name, fileTag := getCallerSpanName(1)
opts = append(opts, trace.WithAttributes(
attribute.String("file", fileTag),
))
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
span := &DeferredSpan{
name: name,
start: start,
opts: opts,
name: name,
opts: opts,
}
t.spans = append(t.spans, span)
return span
Expand All @@ -54,46 +56,51 @@ func (t *DeferredTracer) StartSpan(opts ...trace.SpanStartOption) *DeferredSpan
func (t *DeferredTracer) StartNamedSpan(name string, opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
fileTag := getFileTag(1)
opts = append(opts, trace.WithAttributes(
attribute.String("file", fileTag),
))
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
span := &DeferredSpan{
name: name,
start: start,
opts: opts,
name: name,
opts: opts,
}
t.spans = append(t.spans, span)
return span
}

// Flush sends all deferred events to OpenTelemetry.
// Deferred spans will be created as children to span assigned to ctx.
// Returns true if any spans are still open.
func (t *DeferredTracer) Flush(ctx context.Context) bool {
// Returns true if any spans still remain open.
func (t *DeferredTracer) Flush(ctx context.Context) (remaining bool) {
var keepSpans []*DeferredSpan

for _, span := range t.spans {
remaining := t.flushSpan(ctx, span)
if len(remaining) > 0 {
keepSpans = append(keepSpans, remaining...)
if t.flushSpan(ctx, span) {
keepSpans = append(keepSpans, span)
}
}

t.spans = keepSpans
return len(t.spans) > 0
}

// flushSpan sends a span and its children to OpenTelemetry. Any unclosed
// spans are returned in remaining return value.
func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (remaining []*DeferredSpan) {
// flushSpan sends a span and its children to OpenTelemetry.
// If parent span is closed, traverse child spans.
// Idempotent: flushes spans only once. May call repeatedly until all spans
// are flushed.
// Returns true if any spans still remain open.
func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (remaining bool) {
if span.end.IsZero() {
// Preserve unclosed spans.
remaining = append(remaining, span)
} else if !span.flushed {
// Return if not closed. Do not traverse children.
return true
}
if !span.flushed {
// Create real OpenTelemetry span.
opts := append(span.opts, trace.WithTimestamp(span.start))
ctx2 := StartNamedScope(ctx, span.name, opts...)
ctx2 := StartNamedScope(ctx, span.name, span.opts...)
realspan := trace.SpanFromContext(ctx2)
span.spanContext = realspan.SpanContext()
if span.err != nil {
realspan.RecordError(span.err)
realspan.SetStatus(codes.Error, span.err.Error())
Expand All @@ -103,19 +110,16 @@ func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (rem
}

// Traverse children.
childrenRemaining := false
ctx2 := trace.ContextWithSpanContext(ctx, span.spanContext)
var keepChildren []*DeferredSpan
for _, childSpan := range span.children {
childRemaining := t.flushSpan(ctx, childSpan)
if len(childRemaining) > 0 {
remaining = append(remaining, childRemaining...)
childrenRemaining = true
if t.flushSpan(ctx2, childSpan) {
keepChildren = append(keepChildren, childSpan)
}
}
if !childrenRemaining {
span.children = span.children[:0]
}
span.children = keepChildren

return
return len(keepChildren) > 0
}

func (span *DeferredSpan) EndSpan(err error, opts ...trace.SpanStartOption) {
Expand All @@ -129,13 +133,15 @@ func (span *DeferredSpan) EndSpan(err error, opts ...trace.SpanStartOption) {
func (span *DeferredSpan) StartChildSpan(opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
name, fileTag := getCallerSpanName(1)
opts = append(opts, trace.WithAttributes(
attribute.String("file", fileTag),
))
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
childSpan := &DeferredSpan{
name: name,
start: start,
opts: opts,
name: name,
opts: opts,
}
span.children = append(span.children, childSpan)
return childSpan
Expand All @@ -144,13 +150,15 @@ func (span *DeferredSpan) StartChildSpan(opts ...trace.SpanStartOption) *Deferre
func (span *DeferredSpan) StartNamedChildSpan(name string, opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
fileTag := getFileTag(1)
opts = append(opts, trace.WithAttributes(
attribute.String("file", fileTag),
))
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
childSpan := &DeferredSpan{
name: name,
start: start,
opts: opts,
name: name,
opts: opts,
}
span.children = append(span.children, childSpan)
return childSpan
Expand Down
117 changes: 117 additions & 0 deletions tracing/deferred_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package tracing_test

import (
"context"
"os"
"testing"

"github.com/mailgun/holster/v4/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/trace"
)

func TestDeferredTracer(t *testing.T) {
ctx := context.Background()
initTracing := func(t *testing.T) {
os.Setenv("OTEL_TRACES_EXPORTER", "test")
defer os.Unsetenv("OTEL_TRACES_EXPORTER")
err := tracing.InitTracing(ctx, t.Name())
require.NoError(t, err)
}

t.Run("Single span", func(t *testing.T) {
// Given
initTracing(t)
prefix := t.Name() + "_"

// When
dtracer := tracing.NewDeferredTracer()
span1 := dtracer.StartNamedSpan(prefix + "Span 1")
span1.EndSpan(nil)
remaining := dtracer.Flush(ctx)
err := tracing.CloseTracing(ctx)
require.NoError(t, err)

// Then
assert.Zero(t, remaining)
count := tracing.GlobalTestExporter.Count()
assert.Equal(t, 1, count)
reader := tracing.GlobalTestExporter.NewSpanReader()
spans := make([]trace.ReadOnlySpan, count)
n, err := reader.Read(spans)
require.NoError(t, err)
assert.Equal(t, count, n)
assert.Equal(t, prefix+"Span 1", spans[0].Name())
})

t.Run("Multiple spans", func(t *testing.T) {
// Given
initTracing(t)
prefix := t.Name() + "_"

// When
dtracer := tracing.NewDeferredTracer()
span1 := dtracer.StartNamedSpan(prefix + "Span 1")
span1.EndSpan(nil)
span2 := dtracer.StartNamedSpan(prefix + "Span 2")
span2.EndSpan(nil)
span3 := dtracer.StartNamedSpan(prefix + "Span 3")
span3.EndSpan(nil)
remaining := dtracer.Flush(ctx)
err := tracing.CloseTracing(ctx)
require.NoError(t, err)

// Then
assert.Zero(t, remaining)
count := tracing.GlobalTestExporter.Count()
assert.Equal(t, 3, count)
reader := tracing.GlobalTestExporter.NewSpanReader()
spans := make([]trace.ReadOnlySpan, count)
n, err := reader.Read(spans)
require.NoError(t, err)
assert.Equal(t, count, n)
assert.Equal(t, prefix+"Span 1", spans[0].Name())
assert.Equal(t, prefix+"Span 2", spans[1].Name())
assert.Equal(t, prefix+"Span 3", spans[2].Name())

// Check same parent span IDs.
assert.Equal(t, spans[0].Parent().SpanID(), spans[1].Parent().SpanID())
assert.Equal(t, spans[0].Parent().SpanID(), spans[2].Parent().SpanID())
})

t.Run("Nested spans", func(t *testing.T) {
// Given
initTracing(t)
prefix := t.Name() + "_"

// When
dtracer := tracing.NewDeferredTracer()
span1 := dtracer.StartNamedSpan(prefix + "Span 1")
span2 := span1.StartNamedChildSpan(prefix + "Span 2")
span3 := span2.StartNamedChildSpan(prefix + "Span 3")
span3.EndSpan(nil)
span2.EndSpan(nil)
span1.EndSpan(nil)
remaining := dtracer.Flush(ctx)
err := tracing.CloseTracing(ctx)
require.NoError(t, err)

// Then
assert.Zero(t, remaining)
count := tracing.GlobalTestExporter.Count()
assert.Equal(t, 3, count)
reader := tracing.GlobalTestExporter.NewSpanReader()
spans := make([]trace.ReadOnlySpan, count)
n, err := reader.Read(spans)
require.NoError(t, err)
assert.Equal(t, count, n)
assert.Equal(t, prefix+"Span 1", spans[0].Name())
assert.Equal(t, prefix+"Span 2", spans[1].Name())
assert.Equal(t, prefix+"Span 3", spans[2].Name())

// Check parent/child span IDs.
assert.Equal(t, spans[1].SpanContext().SpanID(), spans[2].Parent().SpanID())
assert.Equal(t, spans[0].SpanContext().SpanID(), spans[1].Parent().SpanID())
})
}
Loading

0 comments on commit eee87d3

Please sign in to comment.