Skip to content

Commit

Permalink
[concurrentbatch] skip size==0 requests (#208)
Browse files Browse the repository at this point in the history
Size=0 requests could get stuck in the concurrent batch processor if
there are not other requests traveling through. This bug was able to
cause test failures when race detection is enabled.

Fixes #206

---------

Co-authored-by: moh-osman3 <moh.osman@lightstep.com>
  • Loading branch information
jmacd and moh-osman3 authored Jun 4, 2024
1 parent 367e229 commit 0ed688e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Jitter is applied to once per process, not once per stream. [#199](https://github.com/open-telemetry/otel-arrow/pull/199)
- Network statistics tracing instrumentation simplified. [#201](https://github.com/open-telemetry/otel-arrow/pull/201)
- Protocol includes use of more gRPC codes. [#202](https://github.com/open-telemetry/otel-arrow/pull/202)
- Receiver concurrency bugfix. [#205](https://github.com/open-telemetry/otel-arrow/pull/205)
- Concurrent batch processor size==0 bugfix. [#208](https://github.com/open-telemetry/otel-arrow/pull/208)

## [0.23.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) - 2024-05-09

Expand Down
26 changes: 15 additions & 11 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,22 +441,28 @@ func (bp *batchProcessor) countRelease(bytes int64) {
}

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
respCh := make(chan error, 1)
item := dataItem{
parentCtx: ctx,
data: data,
responseCh: respCh,
}

var itemCount int
switch telem := data.(type) {
case ptrace.Traces:
item.count = telem.SpanCount()
itemCount = telem.SpanCount()
case pmetric.Metrics:
item.count = telem.DataPointCount()
itemCount = telem.DataPointCount()
case plog.Logs:
item.count = telem.LogRecordCount()
itemCount = telem.LogRecordCount()
}

if itemCount == 0 {
return nil
}

respCh := make(chan error, 1)
item := dataItem{
parentCtx: ctx,
data: data,
responseCh: respCh,
count: itemCount,
}
bytes := int64(b.batch.sizeBytes(data))

if bytes > b.processor.limitBytes {
Expand All @@ -475,7 +481,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
b.processor.countRelease(bytes)
return
}

// context may have timed out before we received all
// responses. Start goroutine to wait and release
// all acquired bytes after the parent thread returns.
Expand Down Expand Up @@ -519,7 +524,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
return err
}
}
return nil
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func TestBatchProcessorSpansPanicRecover(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
err = bp.ConsumeTraces(context.Background(), td)
assert.Contains(t, err.Error(), "testing panic")
consumeErr := bp.ConsumeTraces(context.Background(), td)
assert.Contains(t, consumeErr.Error(), "testing panic")
wg.Done()
}()
}
Expand Down Expand Up @@ -168,8 +168,8 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
wg.Add(1)
go func() {
err = bp.ConsumeMetrics(context.Background(), md)
assert.Contains(t, err.Error(), "testing panic")
consumeErr := bp.ConsumeMetrics(context.Background(), md)
assert.Contains(t, consumeErr.Error(), "testing panic")
wg.Done()
}()
}
Expand Down Expand Up @@ -202,8 +202,8 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
err = bp.ConsumeLogs(context.Background(), ld)
assert.Contains(t, err.Error(), "testing panic")
consumeErr := bp.ConsumeLogs(context.Background(), ld)
assert.Contains(t, consumeErr.Error(), "testing panic")
wg.Done()
}()
}
Expand Down Expand Up @@ -307,8 +307,8 @@ func TestBatchProcessorCancelContext(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
err = bp.ConsumeTraces(ctx, td)
assert.Contains(t, err.Error(), "context canceled")
consumeErr := bp.ConsumeTraces(ctx, td)
assert.Contains(t, consumeErr.Error(), "context canceled")
wg.Done()
}()
}
Expand Down Expand Up @@ -1668,3 +1668,32 @@ func TestBatchTooLarge(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "request size exceeds max-in-flight bytes")
}

func TestBatchProcessorEmptyBatch(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 100
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.Timeout = 100 * time.Millisecond

requestCount := 5

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := ptrace.NewTraces()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}()
}

wg.Wait()
require.NoError(t, batcher.Shutdown(context.Background()))
}

0 comments on commit 0ed688e

Please sign in to comment.