diff --git a/CHANGELOG.md b/CHANGELOG.md index bf872d48..65c7fa9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Jitter is applied to once per process, not once per stream. [#199](https://github.com/open-telemetry/otel-arrow/pull/199) - Network statistics tracing instrumentation simplified. [#201](https://github.com/open-telemetry/otel-arrow/pull/201) - Protocol includes use of more gRPC codes. [#202](https://github.com/open-telemetry/otel-arrow/pull/202) +- Receiver concurrency bugfix. [#205](https://github.com/open-telemetry/otel-arrow/pull/205) +- Concurrent batch processor size==0 bugfix. [#208](https://github.com/open-telemetry/otel-arrow/pull/208) ## [0.23.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) - 2024-05-09 diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 5a018596..cf73edb9 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -441,22 +441,28 @@ func (bp *batchProcessor) countRelease(bytes int64) { } func (b *shard) consumeAndWait(ctx context.Context, data any) error { - respCh := make(chan error, 1) - item := dataItem{ - parentCtx: ctx, - data: data, - responseCh: respCh, - } + var itemCount int switch telem := data.(type) { case ptrace.Traces: - item.count = telem.SpanCount() + itemCount = telem.SpanCount() case pmetric.Metrics: - item.count = telem.DataPointCount() + itemCount = telem.DataPointCount() case plog.Logs: - item.count = telem.LogRecordCount() + itemCount = telem.LogRecordCount() + } + + if itemCount == 0 { + return nil } + respCh := make(chan error, 1) + item := dataItem{ + parentCtx: ctx, + data: data, + responseCh: respCh, + count: itemCount, + } bytes := int64(b.batch.sizeBytes(data)) if bytes > b.processor.limitBytes { @@ -475,7 +481,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { b.processor.countRelease(bytes) return } - // context may have timed out before we received all // responses. Start goroutine to wait and release // all acquired bytes after the parent thread returns. @@ -519,7 +524,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { return err } } - return nil } // singleShardBatcher is used when metadataKeys is empty, to avoid the diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 8abd3042..c1ab697e 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -134,8 +134,8 @@ func TestBatchProcessorSpansPanicRecover(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { - err = bp.ConsumeTraces(context.Background(), td) - assert.Contains(t, err.Error(), "testing panic") + consumeErr := bp.ConsumeTraces(context.Background(), td) + assert.Contains(t, consumeErr.Error(), "testing panic") wg.Done() }() } @@ -168,8 +168,8 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) { md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) wg.Add(1) go func() { - err = bp.ConsumeMetrics(context.Background(), md) - assert.Contains(t, err.Error(), "testing panic") + consumeErr := bp.ConsumeMetrics(context.Background(), md) + assert.Contains(t, consumeErr.Error(), "testing panic") wg.Done() }() } @@ -202,8 +202,8 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) wg.Add(1) go func() { - err = bp.ConsumeLogs(context.Background(), ld) - assert.Contains(t, err.Error(), "testing panic") + consumeErr := bp.ConsumeLogs(context.Background(), ld) + assert.Contains(t, consumeErr.Error(), "testing panic") wg.Done() }() } @@ -307,8 +307,8 @@ func TestBatchProcessorCancelContext(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { - err = bp.ConsumeTraces(ctx, td) - assert.Contains(t, err.Error(), "context canceled") + consumeErr := bp.ConsumeTraces(ctx, td) + assert.Contains(t, consumeErr.Error(), "context canceled") wg.Done() }() } @@ -1668,3 +1668,32 @@ func TestBatchTooLarge(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "request size exceeds max-in-flight bytes") } + +func TestBatchProcessorEmptyBatch(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := createDefaultConfig().(*Config) + sendBatchSize := 100 + cfg.SendBatchSize = uint32(sendBatchSize) + cfg.Timeout = 100 * time.Millisecond + + requestCount := 5 + + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := ptrace.NewTraces() + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + }() + } + + wg.Wait() + require.NoError(t, batcher.Shutdown(context.Background())) +}