Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sqsReader.Receive: move queue ack waiting and message deleti… #38146

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c2b9325
Refactor sqsReader.Receive: move queue ack waiting and message deleti…
Feb 28, 2024
6734772
background ctx in DeleteMessage, linting
Feb 29, 2024
826b53c
make check
Feb 29, 2024
e1294f4
Refactor benchmark, acker with atomic
Mar 4, 2024
7120e9f
Improve sqsReader.Receive
Mar 4, 2024
edae48f
linting
Mar 4, 2024
a696473
remove ctx from awscommon-EventACKTracker
Mar 4, 2024
0636921
Revert awscommon.EventAckTracker and add awss3.EventACKTracker
Mar 6, 2024
96389b1
Implement usage of new awss3.EventACKTracker
Mar 6, 2024
c6ff95f
fix tests
Mar 6, 2024
c91076b
make check
Mar 6, 2024
802b618
Merge branch 'main' into sqs-s3-input-wait-for-ack-in-a-separated-gor…
Mar 6, 2024
e38a699
linting
Mar 6, 2024
1b587a1
No need to acker in beat.ClientConfig
Mar 6, 2024
6a757ae
Handle race condition that prevents to FlushForSQS, add ack mutex and…
Mar 6, 2024
7bf18a6
fix tests
Mar 6, 2024
60f7c10
Properly handle input beat.Pipeline
Mar 6, 2024
85279eb
fix tests
Mar 6, 2024
a5e818d
Move client.Close() on deletion
Mar 6, 2024
a005078
fix test
Mar 6, 2024
0865222
Fix atomic operation on awss3.EventACKTracker.ackMutexLockedOnInit an…
Mar 7, 2024
445146a
fix tests
Mar 7, 2024
46e92a5
Introduce capped level of workers
Mar 8, 2024
8af4435
fix tests
Mar 8, 2024
8d26b37
lint
Mar 8, 2024
013bec1
Merge branch 'main' into sqs-s3-input-wait-for-ack-in-a-separated-gor…
Mar 8, 2024
3d0b7d1
add benchmarks markdown
Mar 11, 2024
6cb6221
Handle deletion WaitGroup when no events to be acked
Mar 11, 2024
0b75813
fix tests
Mar 11, 2024
826a298
lint
Mar 11, 2024
b0cea24
Merge branch 'main' into sqs-s3-input-wait-for-ack-in-a-separated-gor…
Mar 11, 2024
f589bc5
Merge branch 'main' into sqs-s3-input-wait-for-ack-in-a-separated-gor…
Mar 12, 2024
7413873
revert awscommon.EventACKTracker in S3 polling
Mar 12, 2024
2aedb51
SQS acker with tracker listener and no blocking
Mar 12, 2024
7fb85cb
fix deletionWaiter
Mar 12, 2024
732c97a
fix test
Mar 12, 2024
136f026
lint
Mar 12, 2024
d06021e
fix race in acker goroutine
Mar 12, 2024
d1be46c
lint
Mar 12, 2024
32bf3da
Delete only on fully acked
Mar 12, 2024
01ee8d1
fix tests
Mar 12, 2024
f7dda95
track published, dropped and acked events
Mar 12, 2024
c9699f3
fix tests
Mar 12, 2024
d531efb
Merge branch 'main' into sqs-s3-input-wait-for-ack-in-a-separated-gor…
Mar 12, 2024
895a9d7
wait for acked
Mar 13, 2024
4dbd25b
No need to track dropped and published
Mar 14, 2024
8c3c39a
fix tests
Mar 14, 2024
fe71445
cr fixes and other cleaning
Mar 18, 2024
03cc1d6
Merge branch 'main' into sqs-s3-input-wait-for-ack-in-a-separated-gor…
Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions x-pack/filebeat/input/awss3/benchmarks-TO-BE-DELETED.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestParquetDecoding(t *testing.T) {
name string
file string
contentType string
numEvents int
numEvents uint64
assertAgainst string
config *readerConfig
}{
Expand Down Expand Up @@ -121,7 +121,7 @@ func readJSONFromFile(t *testing.T, filepath string) []string {
var rawMessages []json.RawMessage
err = json.Unmarshal(fileBytes, &rawMessages)
assert.NoError(t, err)
var data []string
data := make([]string, 0, len(rawMessages))

for _, rawMsg := range rawMessages {
data = append(data, string(rawMsg))
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
EventListener: awscommon.NewEventACKHandler(),
EventListener: NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
Expand Down Expand Up @@ -227,8 +227,8 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
}
in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages)
sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler, pipeline)

return sqsReader, nil
}
Expand Down
Loading
Loading