diff --git a/x-pack/filebeat/input/awss3/acks.go b/x-pack/filebeat/input/awss3/acks.go index f4817f06db81..a3850c01e87a 100644 --- a/x-pack/filebeat/input/awss3/acks.go +++ b/x-pack/filebeat/input/awss3/acks.go @@ -93,7 +93,9 @@ func (ah *awsACKHandler) run() { // Run finalization asynchronously so we don't block the SQS worker // or the queue by ignoring the ack handler's input channels. Ordering // is no longer important at this point. - go result.ackCallback() + if result.ackCallback != nil { + go result.ackCallback() + } } // If the input is closed and all acks are completed, we're done diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index ce3708fb16be..6d8288ceafad 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -164,10 +164,18 @@ func (c constantS3) ListObjectsPaginator(string, string) s3Pager { var _ beat.Pipeline = (*fakePipeline)(nil) // fakePipeline returns new ackClients. -type fakePipeline struct{} +type fakePipeline struct { + ackHandler *awsACKHandler +} + +func newFakePipeline() *fakePipeline { + return &fakePipeline{ackHandler: newAWSACKHandler()} +} func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { - return &ackClient{}, nil + return &ackClient{ + ackHandler: c.ackHandler, + }, nil } func (c *fakePipeline) Connect() (beat.Client, error) { @@ -177,13 +185,14 @@ func (c *fakePipeline) Connect() (beat.Client, error) { var _ beat.Client = (*ackClient)(nil) // ackClient is a fake beat.Client that ACKs the published messages. -type ackClient struct{} +type ackClient struct { + ackHandler *awsACKHandler +} func (c *ackClient) Close() error { return nil } func (c *ackClient) Publish(event beat.Event) { - // Fake the ACK handling. - event.Private.(*awscommon.EventACKTracker).ACK() + c.ackHandler.Add(1, nil) } func (c *ackClient) PublishAll(event []beat.Event) { @@ -216,6 +225,7 @@ func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult { config.NumberOfWorkers = workerCount sqsReader := newSQSReaderInput(config, aws.Config{}) sqsReader.log = log.Named("sqs") + sqsReader.pipeline = newFakePipeline() sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), workerCount) sqsReader.sqs, err = newConstantSQS() require.NoError(t, err) @@ -306,7 +316,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { event.Private.(*awscommon.EventACKTracker).ACK() }) - pipeline := pubtest.PublisherWithClient(client) + pipeline := newFakePipeline() defer func() { _ = client.Close()