From 575c5f5e25319ad46dc13b09fe85566d3851310c Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Mon, 27 Nov 2023 10:14:43 -0800 Subject: [PATCH] [chore] [exporterheper] Fix not-started queue sender shutdown (#8995) Do not panic on the shutdown of a not-started queue sender --- exporter/exporterhelper/queue_sender.go | 7 +++---- exporter/exporterhelper/queue_sender_test.go | 5 +++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index f6369b778ca..2f86c475611 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -81,7 +81,6 @@ type queueSender struct { traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter - numConsumers int consumers *internal.QueueConsumers[Request] stopWG sync.WaitGroup requeuingEnabled bool @@ -101,18 +100,19 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co } else { queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize) } - return &queueSender{ + qs := &queueSender{ fullName: set.ID.String(), signal: signal, queue: queue, traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), - numConsumers: config.NumConsumers, stopWG: sync.WaitGroup{}, // TODO: this can be further exposed as a config param rather than relying on a type of queue requeuingEnabled: isPersistent, } + qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) + return qs } // consume is the function that is executed by the queue consumers to send the data to the next consumerSender. @@ -149,7 +149,6 @@ func (qs *queueSender) consume(ctx context.Context, req Request) { // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume) if err := qs.consumers.Start(ctx, host); err != nil { return err } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 0b76915d20d..75a30226e25 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -431,6 +431,11 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestQueueSenderNoStartShutdown(t *testing.T) { + qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil) + assert.NoError(t, qs.Shutdown(context.Background())) +} + type mockHost struct { component.Host ext map[component.ID]component.Component