Skip to content

Commit

Permalink
[AWS] [S3] Remove url.QueryUnescape() from aws-s3 input in polling mo…
Browse files Browse the repository at this point in the history
…de (elastic#38125)

* Remove url.QueryUnescape()

We introduced [^1] the `url.QueryUnescape()` function to unescape
object keys from S3 notification in SQS messages.

However, the object keys in the S3 list object responses do not
require [^2] unescape.

We must remove the unescape to avoid unintended changes to the S3
object key.

[^1]: elastic#18370
[^2]: elastic#38012 (comment)

---------

Co-authored-by: Andrea Spacca <andrea.spacca@elastic.co>
  • Loading branch information
zmoog and Andrea Spacca authored Mar 4, 2024
1 parent 27cde87 commit 5f1e656
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ fields added to events containing the Beats version. {pull}37553[37553]
- [threatintel] MISP pagination fixes {pull}37898[37898]
- Fix file handle leak when handling errors in filestream {pull}37973[37973]
- Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116]
- Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125]

*Heartbeat*

Expand Down
12 changes: 2 additions & 10 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"time"

Expand Down Expand Up @@ -208,14 +207,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
// Metrics
p.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects))
for _, object := range page.Contents {
// Unescape s3 key name. For example, convert "%3D" back to "=".
filename, err := url.QueryUnescape(*object.Key)
if err != nil {
p.log.Errorw("Error when unescaping object key, skipping.", "error", err, "s3_object", *object.Key)
continue
}

state := newState(bucketName, filename, *object.ETag, p.listPrefix, *object.LastModified)
state := newState(bucketName, *object.Key, *object.ETag, p.listPrefix, *object.LastModified)
if p.states.MustSkip(state, p.store) {
p.log.Debugw("skipping state.", "state", state)
continue
Expand All @@ -240,7 +232,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
s3ObjectHandler: s3Processor,
s3ObjectInfo: s3ObjectInfo{
name: bucketName,
key: filename,
key: *object.Key,
etag: *object.ETag,
lastModified: *object.LastModified,
listingID: listingID.String(),
Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/awss3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func TestS3Poller(t *testing.T) {
Key: aws.String("key5"),
LastModified: aws.Time(time.Now()),
},
{
ETag: aws.String("etag6"),
Key: aws.String("2024-02-08T08:35:00+00:02.json.gz"),
LastModified: aws.Time(time.Now()),
},
},
}, nil
})
Expand Down Expand Up @@ -124,6 +129,10 @@ func TestS3Poller(t *testing.T) {
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")).
Return(nil, errFakeConnectivityFailure)

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval)
require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx))
Expand Down

0 comments on commit 5f1e656

Please sign in to comment.