Skip to content

Commit

Permalink
Remove consume
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Oct 9, 2024
1 parent 05e9dd3 commit 5834f7a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 33 deletions.
13 changes: 0 additions & 13 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,6 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
_, item, ok := q.Read(context.Background())
if !ok {
return false
}
// the memory queue doesn't handle consume errors
_ = consumeFunc(context.Background(), item)
return true
}

func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, T, bool) {
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
return 0, item.req, ok
Expand Down
5 changes: 4 additions & 1 deletion exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
startWG.Done()
defer qc.stopWG.Done()
for {
if !qc.queue.Consume(qc.consumeFunc) {
index, req, ok := qc.queue.Read(context.Background())
if !ok {
return
}
consumeErr := qc.consumeFunc(context.Background(), req)
qc.queue.OnProcessingFinished(index, consumeErr)
}
}()
}
Expand Down
40 changes: 25 additions & 15 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func unmarshalTracesRequest(bytes []byte) (tracesRequest, error) {
return tracesRequest{traces: traces}, err
}

func consume(pq *persistentQueue[tracesRequest], consumeFunc func(context.Context, tracesRequest) error) bool {
index, req, ok := pq.Read(context.Background())
if !ok {
return false
}
consumeErr := consumeFunc(context.Background(), req)
pq.OnProcessingFinished(index, consumeErr)
return true
}

type mockHost struct {
component.Host
ext map[component.ID]component.Component
Expand Down Expand Up @@ -564,7 +574,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) {
go func() {
defer conWg.Done()
for i := 0; i < 10; i++ {
assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil }))
}
}()
}
Expand Down Expand Up @@ -856,7 +866,7 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) {
require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull)
assert.Equal(t, 100, pq.Size())

assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
Expand All @@ -874,13 +884,13 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) {
// Check the combined queue size.
assert.Equal(t, 70, newPQ.Size())

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
assert.Equal(t, 30, newPQ.Size())

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 20, traces.traces.SpanCount())
return nil
}))
Expand All @@ -901,7 +911,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) {
assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5)))
assert.Equal(t, 3, pq.Size())

assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
Expand All @@ -920,14 +930,14 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) {
assert.Equal(t, 12, newPQ.Size())

// Consuming a restored request should reduce the restored size by 20 but it should not go to below zero
assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 20, traces.traces.SpanCount())
return nil
}))
assert.Equal(t, 0, newPQ.Size())

// Consuming another restored request should not affect the restored size since it's already dropped to 0.
assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 25, traces.traces.SpanCount())
return nil
}))
Expand All @@ -937,7 +947,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) {
require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5)))
assert.Equal(t, 25, newPQ.Size())

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 10, traces.traces.SpanCount())
return nil
}))
Expand All @@ -961,7 +971,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {

// Read the first request just to populate the read index in the storage.
// Otherwise, the write index won't be restored either.
assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
Expand All @@ -979,7 +989,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {
// Queue is full
require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5)))

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 20, traces.traces.SpanCount())
return nil
}))
Expand All @@ -988,7 +998,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {
// Still full
require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5)))

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 25, traces.traces.SpanCount())
return nil
}))
Expand All @@ -1015,7 +1025,7 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) {

// Consume 30 items
for i := 0; i < 3; i++ {
assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil }))
}
// The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes.
assert.Equal(t, 30, pq.Size())
Expand All @@ -1027,12 +1037,12 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) {
// In reality the size should be 30. Once the queue is drained, it will be updated to the correct size.
assert.Equal(t, 50, newPQ.Size())

assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil }))
assert.Equal(t, 30, newPQ.Size())

// Now the size must be correctly reflected
assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil }))
assert.Equal(t, 0, newPQ.Size())

assert.NoError(t, newPQ.Shutdown(context.Background()))
Expand Down
4 changes: 0 additions & 4 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ type Queue[T any] interface {
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item T) error
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
Consume(func(ctx context.Context, item T) error) bool
// Size returns the current Size of the queue
Size() int
// Capacity returns the capacity of the queue.
Expand Down

0 comments on commit 5834f7a

Please sign in to comment.