Skip to content

Commit

Permalink
Edit according to feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Oct 10, 2024
1 parent 83e80e3 commit e596144
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, T, bool) {
func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
return 0, item.req, ok
return 0, item.ctx, item.req, ok
}

// Should be called to remove the item of the given index from the queue once processing is finished.
Expand Down
4 changes: 2 additions & 2 deletions exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
startWG.Done()
defer qc.stopWG.Done()
for {
index, req, ok := qc.queue.Read(context.Background())
index, ctx, req, ok := qc.queue.Read(context.Background())
if !ok {
return
}
consumeErr := qc.consumeFunc(context.Background(), req)
consumeErr := qc.consumeFunc(ctx, req)
qc.queue.OnProcessingFinished(index, consumeErr)
}
}()
Expand Down
6 changes: 3 additions & 3 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
return nil
}

func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, T, bool) {
func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) {
for {
var (
index uint64
Expand All @@ -303,10 +303,10 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, T, bool) {
return size
})
if !ok {
return 0, req, false
return 0, nil, req, false
}
if consumed {
return index, req, true
return index, nil, req, true
}

// If ok && !consumed, it means we are stopped. In this case, we still process all the other events
Expand Down
2 changes: 1 addition & 1 deletion exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Queue[T any] interface {
// finished, the index should be called with OnProcessingFinished to clean up the storage.
// The function blocks until an item is available or if the queue is stopped.
// Returns false if reading failed or if the queue is stopped.
Read(context.Context) (uint64, T, bool)
Read(context.Context) (uint64, context.Context, T, bool)
// Should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
}
Expand Down

0 comments on commit e596144

Please sign in to comment.