-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor sqsReader.Receive: move queue ack waiting and message deleti… #38146
Refactor sqsReader.Receive: move queue ack waiting and message deleti… #38146
Conversation
…on in a separated goroutine
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about to be on PTO (back Wednesday 3/20) so I can't make the sync tomorrow. I've gone through the new revisions in more detail, hopefully these comments can keep things progressing in the meantime.
"elapsed_time_ns", time.Since(a.start)) | ||
} | ||
} else { | ||
a.log.Infow("Skipping deleting SQS message, not all events acked.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Messages should still be deleted even if some events are dropped. For example a user might configure their processors to filter messages with a particular tag, but this doesn't mean that anything is wrong or that processing of that SQS message is not really "done." Until the client itself is closed, the Beats pipeline will only drop events that should be permanently dropped based on user settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Messages should still be deleted even if some events are dropped. For example a user might configure their processors to filter messages with a particular tag, but this doesn't mean that anything is wrong or that processing of that SQS message is not really "done." Until the client itself is closed, the Beats pipeline will only drop events that should be permanently dropped based on user settings.
Can you please clarify between "dropped", "published" and "acked"?
I assumed the following
- dropped: discarded by a processor in beats. not sent to the output
- published: sent to the output
- acked: output returned a positive response of receiving the event
|
||
// This is eating its own tail: we should check for dropped+published, but then we won't wait for acked. | ||
// Acked might not be equal to published? | ||
return a.EventsDropped.Load()+a.EventsAcked.Load() == eventsToBeTracked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventsDropped
shouldn't be included here, and probably shouldn't be tracked at all since it has no effect that the input can act on (see comments on (*eventListener).AddEvent
and FlushForSQS
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we still need to track.
We know that a sqs message has 10 events
3 of them were dropped
7 acked
We sum the two above that's 7+3 == 10, so we know every events in the sqs message was taken care by the queue and we can delete the message without waiting anything else.
|
||
func (a *eventListener) ClientClosed() {} | ||
|
||
func (a *eventListener) AddEvent(event beat.Event, published bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventPrivateReporter
already triggers your callback in NewEventACKHandler
for every event, whether it is published or not, so the current Drop
handling leads to double counting. I believe you can remove eventListener
entirely, along with EventsDropped
and EventsPublished
-- there should be no reason to alter behavior based on whether something is published or dropped, since you just need to compare the final event count for a message with the callback invocations from EventPrivateReporter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see that in AddEvent
I have the published
param, that tells me if the event was published or dropped.
EventPrivateReporter already triggers your callback in NewEventACKHandler for every event, whether it is published or not
Oki, I thought it trigger my callback for every "acked" event (as " output returned a positive response of receiving the event").
If "acked" means that the queue handled the message, either publishing it or dropping the, yes. Then I can just use only EventPrivateReporter
and LastEventPrivateReporter
.
In EventPrivateReporter
I will just call acker.ACK()
for every event, and in LastEventPrivateReporter
:
if acker.FullyTracked() {
acker.FlushForSQS()
}
But I need to introduce one client for every acker
It seems this way we don't have the possibility to ensure at-least-once delivery: at least in the meaning that the output ingested the event.
EventsToBeTracked: atomic.NewUint64(0), | ||
} | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a looping goroutine for each message isn't needed. It should be enough to check FullyTracked
:
- In
(*EventACKTracker).ACK
- In
(*EventACKTracker).MarkSQSProcessedWithData
since these are the only two places where the acked count and/or EventsToBeTracked
can be modified. (This could be done safely with just atomics given some careful ordering, but it's also fine to use a mutex for the initial implementation, that's still a lot better than an extra goroutine for every message.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a looping goroutine for each message isn't needed. It should be enough to check
FullyTracked
:
- In
(*EventACKTracker).ACK
- In
(*EventACKTracker).MarkSQSProcessedWithData
that's what we wanted to remove because it required a mutex, and it will block the listener callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
EventPrivateReporter
I will just callacker.ACK()
for every event, and inLastEventPrivateReporter
:if acker.FullyTracked() { acker.FlushForSQS() }
beware that also this we can end up with the following timeline:
the timeline could be the following:
. T1 client publishes event 1
. T2 client publishes event 2
. T3 private event listener acks event 1
. T4 private event listener acks event 2
. T5 last event listener check how many events has to be acked (0, because T6 has to come)
. T6 sqs event processor mark sqs message as processed, informing the acker that there are only 2 event
No more last event listener will be invocked. We don't delete the message and the input will stay appended on shutdown because we have deletiongWg.Wait()
So I guess a goroutine is needed indeed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The core problem being that the s3 objects are read and published as a stream where you don't know how many events to expect before the object can be deleted until you have read them all?
message, err := reader.Next()
if len(message.Content) > 0 {
event := p.createEvent(string(message.Content), offset)
event.Fields.DeepUpdate(message.Fields)
offset += int64(message.Bytes)
p.publish(&event)
}
if errors.Is(err, io.EOF) {
// No more lines
break
}
x-pack/filebeat/input/awss3/sqs.go
Outdated
clientsMutex.Lock() | ||
clients[id] = client | ||
clientsMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is concurrent access to the map actually possible here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no indeed
acker := NewEventACKTracker(ctx, deletionWg) | ||
|
||
deletionWg.Add(1) | ||
deletionWaiter.Swap(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the bool ever actually set to true? The default value of deletionWaiter := new(atomic.Bool)
is false isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
// an event has been ACKed an output. If the event contains a private metadata | ||
// pointing to an eventACKTracker then it will invoke the trackers ACK() method | ||
// to decrement the number of pending ACKs. | ||
func NewEventACKHandler() beat.EventListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can NewEventACKHandler
function stay inside libbeat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have awscommon.EventACKTracker
in github.com/elastic/beats/v7/x-pack/libbeat/common/aws
This akcer is specific for sqs-s3 notifcations
event.Private = ack | ||
func (p *s3ObjectProcessor) publish(event *beat.Event) { | ||
if p.acker != nil { | ||
event.Private = p.acker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also need p.acker.Add()
first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.acker
is awss3.EventACKTracker
: it has no Add()
p.ackerForPollin
is awscommon.EventACKTracker
: it has Add()
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💚 Build Succeeded
History
cc @aspacca |
💔 Build Failed
Failed CI StepsHistory
cc @aspacca |
💔 Build Failed
Failed CI StepsHistory
cc @aspacca |
…on in a separated goroutine
Enhancement
Proposed commit message
Refactor AWS S3-SQS input in order to decouple the number of messages processed from waiting for a flush from the publishing queue.
WHAT:
Each SQS message in the AWS S3-SQS input is handled in a separate goroutine, where operations are sequential. Instead of waiting inside each goroutine for a flush from the publishing queue, we’ll return as soon as all events are sent to the queue and start a new goroutine for a new SQS message.
We wait for a flush from an SQS message handled by each of the previous goroutines in a separated ones, messages will be kept it in flight before it’s done. Only after we can decide if we must send the message back to the queue or delete it.
This implementation decouple the number of messages processed from waiting for a flush from the publishing queue. It is only message deletion/sending back to the queue that will wait for that.
WHY:
The coupling on waiting for flush from the publishing queue in the existing implementation causes an implicit throttling on SQS messages to be processed on the throughput of the publishing queue flush.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Benchmarks comparison
Benchmark scenarios:
awscommon.EventACKTracker
changed (see below)awscommon.EventACKTracker
(brought from this PR) to manage some kind of race conditions where thePendingACKs
could reach 0 and then increase again, in case not all the Add calls (increasing thePendingACKs
) will come before all the ACK calls (decreasingPendingACKs
): existing implementation will stop waiting for all events to be ack’ed by the queue even if not all of them were actually ack’edawscommon.EventACKTracker
, but also switching from mutex to guard against race conditions forPendingACKs
, to using the atomic package.For each scenarios I’ve tested three different types of load:
max_number_of_messages
, testing from 1 to 1024max_number_of_messages
in power of 2 steps. The size of each message, the number of s3 notifications in each, and the number of events for each S3 object are generated randomly with the same seed, and according to themax_number_of_messages
this will generate different values.max_number_of_messages
tested. This is the original load type that urged the refactoring of the input.See https://github.com/elastic/beats/blob/01ee8d18fcb523586883cf946914643902b01631/x-pack/filebeat/input/awss3/benchmarks-TO-BE-DELETED.md for results