Skip to content

Commit

Permalink
fix broken merge
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Oct 16, 2024
1 parent 0501fce commit 9d1c53b
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 22 deletions.
16 changes: 1 addition & 15 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ var _ beat.Pipeline = (*fakePipeline)(nil)
type fakePipeline struct {
}

<<<<<<< HEAD
func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
=======
func newFakePipeline() *fakePipeline {
return &fakePipeline{}
}
Expand All @@ -175,7 +171,6 @@ func (c *fakePipeline) ConnectWith(config beat.ClientConfig) (beat.Client, error
return &ackClient{
eventListener: config.EventListener,
}, nil
>>>>>>> d2867fdd9f (Add asynchronous ACK handling to S3 and SQS inputs (#40699))
}

func (c *fakePipeline) Connect() (beat.Client, error) {
Expand Down Expand Up @@ -222,23 +217,14 @@ func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult {
return testing.Benchmark(func(b *testing.B) {
var err error

<<<<<<< HEAD
conf := makeBenchmarkConfig(t)
conf.MaxNumberOfMessages = maxMessagesInflight
sqsReader := newSQSReaderInput(conf, aws.Config{})
sqsReader.log = log.Named("sqs")
sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), maxMessagesInflight)
sqsReader.sqs = newConstantSQS()
=======
config := makeBenchmarkConfig(t)
config.NumberOfWorkers = workerCount
sqsReader := newSQSReaderInput(config, aws.Config{})
sqsReader.log = log.Named("sqs")
sqsReader.pipeline = newFakePipeline()
sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), workerCount)
sqsReader.sqs, err = newConstantSQS()
sqsReader.sqs = newConstantSQS()
require.NoError(t, err)
>>>>>>> d2867fdd9f (Add asynchronous ACK handling to S3 and SQS inputs (#40699))
sqsReader.s3 = newConstantS3(t)
sqsReader.msgHandler, err = sqsReader.createEventProcessor()
require.NoError(t, err, "createEventProcessor must succeed")
Expand Down
7 changes: 0 additions & 7 deletions x-pack/filebeat/input/awss3/sqs_s3_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,11 @@ func TestSQSS3EventProcessor(t *testing.T) {

mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil)

<<<<<<< HEAD
p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory)
err := p.ProcessSQS(ctx, &invalidBodyMsg)
require.Error(t, err)
t.Log(err)
=======
p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory)
result := p.ProcessSQS(ctx, &invalidBodyMsg, func(_ beat.Event) {})
require.Error(t, result.processingErr)
t.Log(result.processingErr)
result.Done()
>>>>>>> d2867fdd9f (Add asynchronous ACK handling to S3 and SQS inputs (#40699))
})

t.Run("zero S3 events in body", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/gofrs/uuid/v5"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down

0 comments on commit 9d1c53b

Please sign in to comment.