Skip to content

Commit

Permalink
Fix benchmark test ack counting
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Oct 15, 2024
1 parent 48025b5 commit 10ecfd3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/awss3/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (ah *awsACKHandler) run() {
// Run finalization asynchronously so we don't block the SQS worker
// or the queue by ignoring the ack handler's input channels. Ordering
// is no longer important at this point.
go result.ackCallback()
if result.ackCallback != nil {
go result.ackCallback()
}
}

// If the input is closed and all acks are completed, we're done
Expand Down
22 changes: 16 additions & 6 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,18 @@ func (c constantS3) ListObjectsPaginator(string, string) s3Pager {
var _ beat.Pipeline = (*fakePipeline)(nil)

// fakePipeline returns new ackClients.
type fakePipeline struct{}
type fakePipeline struct {
ackHandler *awsACKHandler
}

func newFakePipeline() *fakePipeline {
return &fakePipeline{ackHandler: newAWSACKHandler()}
}

func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
return &ackClient{
ackHandler: c.ackHandler,
}, nil
}

func (c *fakePipeline) Connect() (beat.Client, error) {
Expand All @@ -177,13 +185,14 @@ func (c *fakePipeline) Connect() (beat.Client, error) {
var _ beat.Client = (*ackClient)(nil)

// ackClient is a fake beat.Client that ACKs the published messages.
type ackClient struct{}
type ackClient struct {
ackHandler *awsACKHandler
}

func (c *ackClient) Close() error { return nil }

func (c *ackClient) Publish(event beat.Event) {
// Fake the ACK handling.
event.Private.(*awscommon.EventACKTracker).ACK()
c.ackHandler.Add(1, nil)
}

func (c *ackClient) PublishAll(event []beat.Event) {
Expand Down Expand Up @@ -216,6 +225,7 @@ func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult {
config.NumberOfWorkers = workerCount
sqsReader := newSQSReaderInput(config, aws.Config{})
sqsReader.log = log.Named("sqs")
sqsReader.pipeline = newFakePipeline()
sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), workerCount)
sqsReader.sqs, err = newConstantSQS()
require.NoError(t, err)
Expand Down Expand Up @@ -306,7 +316,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) {
event.Private.(*awscommon.EventACKTracker).ACK()
})
pipeline := pubtest.PublisherWithClient(client)
pipeline := newFakePipeline()

defer func() {
_ = client.Close()
Expand Down

0 comments on commit 10ecfd3

Please sign in to comment.