-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Merged by Bors] - Fix flaky TestQueued #6097
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #6097 +/- ##
=========================================
- Coverage 81.6% 81.5% -0.1%
=========================================
Files 302 302
Lines 32454 32466 +12
=========================================
+ Hits 26487 26492 +5
- Misses 4235 4239 +4
- Partials 1732 1735 +3 ☔ View full report in Codecov by Sentry. |
f8578c1
to
c4c308c
Compare
p2p/server/server.go
Outdated
srv.metrics.targetRps.Set(float64(srv.limit.Limit())) | ||
} | ||
srv.h.SetStreamHandler(protocol.ID(srv.protocol), func(stream network.Stream) { | ||
srv.queue <- request{stream: stream, received: time.Now()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a select
case in the block that was moved. Any particular reason why to remove the default
case? Ideally there would be a context here that could be selected on, but also the problem is the rate limiting which could result in messages being dropped (but also, goroutines could pile up waiting to write to this channel, which would have other repercussions, like messages timing out on the other end). It might be useful to dwell on this a bit longer.
A bit of food for thought that maybe doing rate-limiting on the p2p level is not a good approach - some protocols might be more aggressive than others on resources, which could result in resource starvation on others. This means that having rate-limiting on the p2p level won't solve the problem of resources necessarily and also removes the possibility of prioritizing some protocols over others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the reading side of the channel - every incoming request is immediately handled and if it exceeds the queue size it will be dropped immediately. I did this so that TryAcquire
and Release
calls to the semaphore are in the same code block.
I can add the TryAcquire
to the callback of SetStreamHandler
and only if it passes that check send the request to the queue to be handled in Run
but then the Release
would be disconnected from the TryAcquire
call 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the reader goes away then the goroutines pile up on shutdown. I'm not sure which repercussions this may have (or not have). As a rule of thumb I think we should try to avoid writing directly to channels (without select
s). But that's just me...
It could be that it is inconsequential, but it could potentially have some side-effects. I'm not so familiar with the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the SetStreamHandler
callback in a way which does the handling on the sender side of the channel instead of the receiver side, see b0b6cf9. It has the advantage of dropping requests on the sending side of the channel if a) the context is already done or b) the semaphore is full, but since now the callback has to be set in the Run
method I need an additional check with the Ready
method because otherwise the first request might be sent from the client to the server before the SetStreamHandler
callback has been set causing the test to halt.
So it is either:
- waiting on
Ready
after callingRun
- handling limiting and context checking in the
Run
method instead of in theSetStreamHandler
callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK nvm, I found a way to do it without the need for the Ready
method 🙂
9aceb84
to
d4ba8ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks!
bors merge |
## Motivation This should once and for all fix the flaky `TestQueued` test that keeps preventing merges of PRs. 🙂
Build failed:
|
rate.Every(srv.interval/time.Duration(srv.requestsPerInterval)), | ||
srv.requestsPerInterval, | ||
) | ||
srv.sem = semaphore.NewWeighted(int64(srv.queueSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about using a goroutine pool like https://github.com/panjf2000/ants? It can automatically control how many tasks are executed concurrently, reuse goroutines instead of creating a new one for each request (which is cheap but not free), scale the workers down if not used, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look at ants
but I'm not sure it is a good fit here. We have 2 situations that we want to limit:
- if the queue is full, we want to drop incoming requests immediately. I don't quite see how to make this possible with
Pool.Submit()
- it would have to beWithNonBlocking(true)
but then we don't know if the submit actually started a routine or not (because if it didn't we need to close the stream and update metrics accordingly) - a maximum rate of incoming requests: the
ant.Pool
will always process as many requests concurrently as it has capacity, but we actually want to limit to at most x requests per timeframe. If requests come in at a higher rate those should block for the time necessary until the requests per timeframe is below the targeted rate
I don't see ants
as a good fit for these requirements - as far as I understand the package it just ensures that no more than a certain number of requests are inflight at the same time while any new incoming request either blocks until a running one is finished or immediately fails without the more fine-grained behaviour we have now 😕
EDIT: probably the given requirements can be met with ants
and some extra code, but we would then only replace semaphore
with it while still needing the rate limiter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of a goroutine pool only to achieve 2 goals:
- to reuse goroutines
- to control how many streams are processed concurrently (via pool size)
We would need to use the pool in the blocking mode + use the rate-limiter and queue (for buffering waiting requests) as we do now.
bors merge |
## Motivation This should once and for all fix the flaky `TestQueued` test that keeps preventing merges of PRs. 🙂
Build failed (retrying...): |
## Motivation This should once and for all fix the flaky `TestQueued` test that keeps preventing merges of PRs. 🙂
Build failed: |
bors merge |
## Motivation This should once and for all fix the flaky `TestQueued` test that keeps preventing merges of PRs. 🙂
Build failed: |
bors merge |
## Motivation This should once and for all fix the flaky `TestQueued` test that keeps preventing merges of PRs. 🙂
Pull request successfully merged into develop. Build succeeded: |
Motivation
This should once and for all fix the flaky
TestQueued
test that keeps preventing merges of PRs. 🙂Description
I fixed various issues with races in the test that caused it to be so unstable:
Test_Queued
to check if too many requests are in flight at the same time new requests are not accepted (same goal as original test)Test_RequestInterval
that checks if requests are correctly rate limitedserver.Server
in itsRun
method to theNew
function instead, this prevents the issue that clients within a test might make a request before the server is even ready to accept that request.errgroup.Group
had a limit set. With a semaphore this becomes more straight forward and easier to understand. It also fixes the off by one error.go test -failfast -count=2000 -run ^Test_Queued$ github.com/spacemeshos/go-spacemesh/p2p/server
much easier 🙂Test Plan
TODO