Skip to content

Commit

Permalink
linter / CI fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Oct 15, 2024
1 parent a743c08 commit 48025b5
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 23 deletions.
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (c config) getBucketARN() string {
// Should be provided as a parameter to s3.NewFromConfig.
func (c config) s3ConfigModifier(o *s3.Options) {
if c.NonAWSBucketName != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,6 @@ type nonAWSBucketResolver struct {
}

func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {

Check failure on line 118 in x-pack/filebeat/input/awss3/s3.go

View workflow job for this annotation

GitHub Actions / lint (windows)

SA1019: awssdk.Endpoint is deprecated: This structure was used with the global [EndpointResolver] interface, which has been deprecated in favor of service-specific endpoint resolution. See the deprecation docs on that interface for more information. (staticcheck)
//nolint:staticcheck // haven't migrated to the new interface yet
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
for dec.next() {
val, err := dec.decodeValue()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
return nil
}
break
Expand Down
23 changes: 7 additions & 16 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -187,22 +186,20 @@ func TestS3ObjectProcessor(t *testing.T) {
ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()
mockS3API := NewMockS3API(ctrl)
mockPublisher := NewMockBeatClient(ctrl)
s3Event, s3Resp := newS3Object(t, "testdata/log.txt", "")

var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Do(func(event beat.Event) { events = append(events, event) }).
Times(2),
)

var events []beat.Event
s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, nil, backupConfig{})
err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object(logp.NewLogger(inputName), func(_ beat.Event) {})
err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object(logp.NewLogger(inputName), func(event beat.Event) {
events = append(events, event)
})
assert.Equal(t, 2, len(events))
require.NoError(t, err)
})

Expand Down Expand Up @@ -309,29 +306,23 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int,
ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()
mockS3API := NewMockS3API(ctrl)
mockPublisher := NewMockBeatClient(ctrl)

s3Event, s3Resp := newS3Object(t, file, contentType)
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Do(func(event beat.Event) { events = append(events, event) }).
Times(numEvents),
)

s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3API, selectors, backupConfig{})
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, s3Event).ProcessS3Object(
logp.NewLogger(inputName), func(_ beat.Event) {})
logp.NewLogger(inputName),
func(event beat.Event) { events = append(events, event) })

if !expectErr {
require.NoError(t, err)
assert.Equal(t, numEvents, len(events))
assert.EqualValues(t, numEvents, ack.PendingACKs)
} else {
require.Error(t, err)
}
Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/awss3/sqs_s3_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func newSQSS3EventProcessor(
}

type sqsProcessingResult struct {
log *logp.Logger
processor *sqsS3EventProcessor
msg *types.Message
receiveCount int // How many times this SQS object has been read
Expand Down Expand Up @@ -201,7 +200,6 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message
})

return sqsProcessingResult{
log: p.log,
msg: msg,
processor: p,
receiveCount: receiveCount,
Expand All @@ -228,7 +226,11 @@ func (r sqsProcessingResult) Done() {
p.log.Errorf("failed deleting message from SQS queue (it may be reprocessed): %v", msgDelErr.Error())
return
}
p.metrics.sqsMessagesDeletedTotal.Inc()
if p.metrics != nil {
// This nil check always passes in production, but it's nice when unit
// tests don't have to initialize irrelevant fields
p.metrics.sqsMessagesDeletedTotal.Inc()
}
// SQS message finished and deleted, finalize s3 objects
if finalizeErr := r.finalizeS3Objects(); finalizeErr != nil {
p.log.Errorf("failed finalizing message from SQS queue (manual cleanup is required): %v", finalizeErr.Error())
Expand Down
32 changes: 29 additions & 3 deletions x-pack/filebeat/input/awss3/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -72,19 +73,43 @@ func TestSQSReceiver(t *testing.T) {
return map[string]string{sqsApproximateNumberOfMessages: "10000"}, nil
}).AnyTimes()

mockSQS.EXPECT().
DeleteMessage(gomock.Any(), gomock.Any()).Times(1).Do(
func(_ context.Context, _ *types.Message) {
cancel()
})

logger := logp.NewLogger(inputName)

// Expect the one message returned to have been processed.
mockMsgHandler.EXPECT().
ProcessSQS(gomock.Any(), gomock.Eq(&msg), gomock.Any()).
Times(1).
Return(nil)
DoAndReturn(
func(_ context.Context, _ *types.Message, _ func(e beat.Event)) sqsProcessingResult {
return sqsProcessingResult{
keepaliveCancel: func() {},
processor: &sqsS3EventProcessor{
log: logger,
sqs: mockSQS,
},
}
})

// Execute sqsReader and verify calls/state.
sqsReader := newSQSReaderInput(config{NumberOfWorkers: workerCount}, aws.Config{})
sqsReader.log = logp.NewLogger(inputName)
sqsReader.log = logger
sqsReader.sqs = mockSQS
sqsReader.msgHandler = mockMsgHandler
sqsReader.metrics = newInputMetrics("", nil, 0)
sqsReader.pipeline = &fakePipeline{}
sqsReader.msgHandler = mockMsgHandler
sqsReader.run(ctx)

select {
case <-ctx.Done():
case <-time.After(time.Second):
require.Fail(t, "Never observed SQS DeleteMessage call")
}
})

t.Run("retry after ReceiveMessage error", func(t *testing.T) {
Expand Down Expand Up @@ -125,6 +150,7 @@ func TestSQSReceiver(t *testing.T) {
sqsReader.sqs = mockSQS
sqsReader.msgHandler = mockMsgHandler
sqsReader.metrics = newInputMetrics("", nil, 0)
sqsReader.pipeline = &fakePipeline{}
sqsReader.run(ctx)
})
}
Expand Down

0 comments on commit 48025b5

Please sign in to comment.