Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sqsReader.Receive: move queue ack waiting and message deleti… #38146

Closed
wants to merge 49 commits into from
Closed

Refactor sqsReader.Receive: move queue ack waiting and message deleti… #38146

wants to merge 49 commits into from

Conversation

aspacca
Copy link

@aspacca aspacca commented Feb 29, 2024

…on in a separated goroutine

Enhancement

Proposed commit message

Refactor AWS S3-SQS input in order to decouple the number of messages processed from waiting for a flush from the publishing queue.

WHAT:
Each SQS message in the AWS S3-SQS input is handled in a separate goroutine, where operations are sequential. Instead of waiting inside each goroutine for a flush from the publishing queue, we’ll return as soon as all events are sent to the queue and start a new goroutine for a new SQS message.

We wait for a flush from an SQS message handled by each of the previous goroutines in a separated ones, messages will be kept it in flight before it’s done. Only after we can decide if we must send the message back to the queue or delete it.

This implementation decouple the number of messages processed from waiting for a flush from the publishing queue. It is only message deletion/sending back to the queue that will wait for that.

WHY:
The coupling on waiting for flush from the publishing queue in the existing implementation causes an implicit throttling on SQS messages to be processed on the throughput of the publishing queue flush.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Benchmarks comparison

Benchmark scenarios:

  1. New implementation, after all the events from an sqs message are published to the queue (same goroutine behaviour as now) we send to a channel the data we need to process the deletion, in a single goroutine we read from this channel and start a gouroutine for each channel element read in order to delete the message. awscommon.EventACKTracker changed (see below)
  2. Same as above, but we cap the number of concurrent deletion goroutine to 3200 (it’s currently hardcoded: it’s chosen to match the queue size I’ve tested). The reasons is to cap memory usage and goroutines spawn.
  3. Current codebase, just a change in the awscommon.EventACKTracker (brought from this PR) to manage some kind of race conditions where the PendingACKs could reach 0 and then increase again, in case not all the Add calls (increasing the PendingACKs) will come before all the ACK calls (decreasing PendingACKs): existing implementation will stop waiting for all events to be ack’ed by the queue even if not all of them were actually ack’ed
  4. The same as before, but adding not only the logic change for awscommon.EventACKTracker, but also switching from mutex to guard against race conditions for PendingACKs, to using the atomic package.

For each scenarios I’ve tested three different types of load:

  1. Dynamic number of messages: I generate 1.1x max_number_of_messages, testing from 1 to 1024 max_number_of_messages in power of 2 steps. The size of each message, the number of s3 notifications in each, and the number of events for each S3 object are generated randomly with the same seed, and according to the max_number_of_messages this will generate different values.
  2. I identified 71 message to be generated as a particular performant benchmark load during the development, so I test this exact load on every scenario.
  3. I tested the "1 SQS message : 1 S3 object : 1 Event" load, always with randomization according to the max_number_of_messages tested. This is the original load type that urged the refactoring of the input.

See https://github.com/elastic/beats/blob/01ee8d18fcb523586883cf946914643902b01631/x-pack/filebeat/input/awss3/benchmarks-TO-BE-DELETED.md for results

@aspacca aspacca self-assigned this Feb 29, 2024
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 29, 2024
Copy link
Contributor

mergify bot commented Feb 29, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @aspacca? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@aspacca aspacca added the Team:obs-ds-hosted-services Label for the Observability Hosted Services team label Feb 29, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 29, 2024
@elasticmachine
Copy link
Collaborator

elasticmachine commented Feb 29, 2024

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Duration: 140 min 35 sec

❕ Flaky test report

No test was executed to be analysed.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

x-pack/filebeat/input/awss3/sqs.go Outdated Show resolved Hide resolved
x-pack/libbeat/common/aws/acker.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/awss3/sqs.go Outdated Show resolved Hide resolved
@aspacca
Copy link
Author

aspacca commented Mar 12, 2024

@cmacknz @faec

Benchmark from last commit:
4 mins taken ca for full ingestion, 290 MB RAM, 5000 EPS avg ca, tens of messages inflight varying

Memory is decreasing after end of ingestion

Copy link
Contributor

@faec faec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm about to be on PTO (back Wednesday 3/20) so I can't make the sync tomorrow. I've gone through the new revisions in more detail, hopefully these comments can keep things progressing in the meantime.

"elapsed_time_ns", time.Since(a.start))
}
} else {
a.log.Infow("Skipping deleting SQS message, not all events acked.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages should still be deleted even if some events are dropped. For example a user might configure their processors to filter messages with a particular tag, but this doesn't mean that anything is wrong or that processing of that SQS message is not really "done." Until the client itself is closed, the Beats pipeline will only drop events that should be permanently dropped based on user settings.

Copy link
Author

@aspacca aspacca Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages should still be deleted even if some events are dropped. For example a user might configure their processors to filter messages with a particular tag, but this doesn't mean that anything is wrong or that processing of that SQS message is not really "done." Until the client itself is closed, the Beats pipeline will only drop events that should be permanently dropped based on user settings.

Can you please clarify between "dropped", "published" and "acked"?

I assumed the following

  • dropped: discarded by a processor in beats. not sent to the output
  • published: sent to the output
  • acked: output returned a positive response of receiving the event


// This is eating its own tail: we should check for dropped+published, but then we won't wait for acked.
// Acked might not be equal to published?
return a.EventsDropped.Load()+a.EventsAcked.Load() == eventsToBeTracked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventsDropped shouldn't be included here, and probably shouldn't be tracked at all since it has no effect that the input can act on (see comments on (*eventListener).AddEvent and FlushForSQS)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we still need to track.
We know that a sqs message has 10 events
3 of them were dropped
7 acked

We sum the two above that's 7+3 == 10, so we know every events in the sqs message was taken care by the queue and we can delete the message without waiting anything else.


func (a *eventListener) ClientClosed() {}

func (a *eventListener) AddEvent(event beat.Event, published bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventPrivateReporter already triggers your callback in NewEventACKHandler for every event, whether it is published or not, so the current Drop handling leads to double counting. I believe you can remove eventListener entirely, along with EventsDropped and EventsPublished -- there should be no reason to alter behavior based on whether something is published or dropped, since you just need to compare the final event count for a message with the callback invocations from EventPrivateReporter.

Copy link
Author

@aspacca aspacca Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that in AddEvent I have the published param, that tells me if the event was published or dropped.

EventPrivateReporter already triggers your callback in NewEventACKHandler for every event, whether it is published or not

Oki, I thought it trigger my callback for every "acked" event (as " output returned a positive response of receiving the event").

If "acked" means that the queue handled the message, either publishing it or dropping the, yes. Then I can just use only EventPrivateReporter and LastEventPrivateReporter.

In EventPrivateReporter I will just call acker.ACK() for every event, and in LastEventPrivateReporter:

if acker.FullyTracked() {
    acker.FlushForSQS()
}

But I need to introduce one client for every acker

It seems this way we don't have the possibility to ensure at-least-once delivery: at least in the meaning that the output ingested the event.

x-pack/filebeat/input/awss3/sqs_acker.go Outdated Show resolved Hide resolved
EventsToBeTracked: atomic.NewUint64(0),
}

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a looping goroutine for each message isn't needed. It should be enough to check FullyTracked:

  • In (*EventACKTracker).ACK
  • In (*EventACKTracker).MarkSQSProcessedWithData

since these are the only two places where the acked count and/or EventsToBeTracked can be modified. (This could be done safely with just atomics given some careful ordering, but it's also fine to use a mutex for the initial implementation, that's still a lot better than an extra goroutine for every message.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a looping goroutine for each message isn't needed. It should be enough to check FullyTracked:

  • In (*EventACKTracker).ACK
  • In (*EventACKTracker).MarkSQSProcessedWithData

that's what we wanted to remove because it required a mutex, and it will block the listener callback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In EventPrivateReporter I will just call acker.ACK() for every event, and in LastEventPrivateReporter:

if acker.FullyTracked() {
    acker.FlushForSQS()
}

beware that also this we can end up with the following timeline:
the timeline could be the following:
. T1 client publishes event 1
. T2 client publishes event 2
. T3 private event listener acks event 1
. T4 private event listener acks event 2
. T5 last event listener check how many events has to be acked (0, because T6 has to come)
. T6 sqs event processor mark sqs message as processed, informing the acker that there are only 2 event

No more last event listener will be invocked. We don't delete the message and the input will stay appended on shutdown because we have deletiongWg.Wait()

So I guess a goroutine is needed indeed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core problem being that the s3 objects are read and published as a stream where you don't know how many events to expect before the object can be deleted until you have read them all?

https://github.com/aspacca/beats/blob/8c3c39a69efe506763aaad50efc5e8280cc0f7c5/x-pack/filebeat/input/awss3/s3_objects.go#L398-L403

		message, err := reader.Next()
		if len(message.Content) > 0 {
			event := p.createEvent(string(message.Content), offset)
			event.Fields.DeepUpdate(message.Fields)
			offset += int64(message.Bytes)
			p.publish(&event)
		}

		if errors.Is(err, io.EOF) {
			// No more lines
			break
		}

Comment on lines 86 to 88
clientsMutex.Lock()
clients[id] = client
clientsMutex.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is concurrent access to the map actually possible here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no indeed

acker := NewEventACKTracker(ctx, deletionWg)

deletionWg.Add(1)
deletionWaiter.Swap(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the bool ever actually set to true? The default value of deletionWaiter := new(atomic.Bool) is false isn't it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

// an event has been ACKed an output. If the event contains a private metadata
// pointing to an eventACKTracker then it will invoke the trackers ACK() method
// to decrement the number of pending ACKs.
func NewEventACKHandler() beat.EventListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can NewEventACKHandler function stay inside libbeat?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have awscommon.EventACKTracker in github.com/elastic/beats/v7/x-pack/libbeat/common/aws
This akcer is specific for sqs-s3 notifcations

event.Private = ack
func (p *s3ObjectProcessor) publish(event *beat.Event) {
if p.acker != nil {
event.Private = p.acker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need p.acker.Add() first?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p.acker is awss3.EventACKTracker: it has no Add()
p.ackerForPollin is awscommon.EventACKTracker: it has Add()

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

elasticmachine commented Mar 19, 2024

💔 Build Failed

Failed CI Steps

History

cc @aspacca

@elasticmachine
Copy link
Collaborator

elasticmachine commented Mar 19, 2024

@aspacca aspacca closed this Mar 20, 2024
@aspacca aspacca deleted the sqs-s3-input-wait-for-ack-in-a-separated-goroutine branch March 20, 2024 03:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:obs-ds-hosted-services Label for the Observability Hosted Services team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants