From e596144c1fa951be30330b66d2c70d748c123016 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 9 Oct 2024 23:30:21 -0700 Subject: [PATCH] Edit according to feedback --- exporter/internal/queue/bounded_memory_queue.go | 4 ++-- exporter/internal/queue/consumers.go | 4 ++-- exporter/internal/queue/persistent_queue.go | 6 +++--- exporter/internal/queue/queue.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 0b495f343b6..43b6baf7d02 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,9 +40,9 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } -func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, T, bool) { +func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) - return 0, item.req, ok + return 0, item.ctx, item.req, ok } // Should be called to remove the item of the given index from the queue once processing is finished. diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index 2d3288d898e..8eab2db6bbe 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -40,11 +40,11 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { startWG.Done() defer qc.stopWG.Done() for { - index, req, ok := qc.queue.Read(context.Background()) + index, ctx, req, ok := qc.queue.Read(context.Background()) if !ok { return } - consumeErr := qc.consumeFunc(context.Background(), req) + consumeErr := qc.consumeFunc(ctx, req) qc.queue.OnProcessingFinished(index, consumeErr) } }() diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index a23d1e15cb7..e5dc5d5b72f 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -287,7 +287,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { return nil } -func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, T, bool) { +func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) { for { var ( index uint64 @@ -303,10 +303,10 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, T, bool) { return size }) if !ok { - return 0, req, false + return 0, nil, req, false } if consumed { - return index, req, true + return index, nil, req, true } // If ok && !consumed, it means we are stopped. In this case, we still process all the other events diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 1ba3da69d0c..ee79408361a 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -33,7 +33,7 @@ type Queue[T any] interface { // finished, the index should be called with OnProcessingFinished to clean up the storage. // The function blocks until an item is available or if the queue is stopped. // Returns false if reading failed or if the queue is stopped. - Read(context.Context) (uint64, T, bool) + Read(context.Context) (uint64, context.Context, T, bool) // Should be called to remove the item of the given index from the queue once processing is finished. OnProcessingFinished(index uint64, consumeErr error) }