-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Merged by Bors] - Fix flaky TestQueued #6097
Changes from 2 commits
8d689b2
cde7d8c
c4c308c
035701c
b0b6cf9
d4ba8ba
c268a18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ import ( | |
dto "github.com/prometheus/client_model/go" | ||
"go.uber.org/zap" | ||
"golang.org/x/sync/errgroup" | ||
"golang.org/x/sync/semaphore" | ||
"golang.org/x/time/rate" | ||
|
||
"github.com/spacemeshos/go-spacemesh/codec" | ||
|
@@ -156,6 +157,10 @@ type Server struct { | |
decayingTagSpec *DecayingTagSpec | ||
decayingTag connmgr.DecayingTag | ||
|
||
limit *rate.Limiter | ||
sem *semaphore.Weighted | ||
queue chan request | ||
|
||
metrics *tracker // metrics can be nil | ||
|
||
h Host | ||
|
@@ -174,6 +179,8 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server { | |
queueSize: 1000, | ||
requestsPerInterval: 100, | ||
interval: time.Second, | ||
|
||
queue: make(chan request), | ||
} | ||
for _, opt := range opts { | ||
opt(srv) | ||
|
@@ -195,6 +202,19 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server { | |
} | ||
} | ||
|
||
srv.limit = rate.NewLimiter( | ||
rate.Every(srv.interval/time.Duration(srv.requestsPerInterval)), | ||
srv.requestsPerInterval, | ||
) | ||
srv.sem = semaphore.NewWeighted(int64(srv.queueSize)) | ||
if srv.metrics != nil { | ||
srv.metrics.targetQueue.Set(float64(srv.queueSize)) | ||
srv.metrics.targetRps.Set(float64(srv.limit.Limit())) | ||
} | ||
srv.h.SetStreamHandler(protocol.ID(srv.protocol), func(stream network.Stream) { | ||
srv.queue <- request{stream: stream, received: time.Now()} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a A bit of food for thought that maybe doing rate-limiting on the p2p level is not a good approach - some protocols might be more aggressive than others on resources, which could result in resource starvation on others. This means that having rate-limiting on the p2p level won't solve the problem of resources necessarily and also removes the possibility of prioritizing some protocols over others. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check the reading side of the channel - every incoming request is immediately handled and if it exceeds the queue size it will be dropped immediately. I did this so that I can add the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the reader goes away then the goroutines pile up on shutdown. I'm not sure which repercussions this may have (or not have). As a rule of thumb I think we should try to avoid writing directly to channels (without There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated the So it is either:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK nvm, I found a way to do it without the need for the |
||
}) | ||
|
||
return srv | ||
} | ||
|
||
|
@@ -204,45 +224,35 @@ type request struct { | |
} | ||
|
||
func (s *Server) Run(ctx context.Context) error { | ||
limit := rate.NewLimiter(rate.Every(s.interval/time.Duration(s.requestsPerInterval)), s.requestsPerInterval) | ||
queue := make(chan request, s.queueSize) | ||
if s.metrics != nil { | ||
s.metrics.targetQueue.Set(float64(s.queueSize)) | ||
s.metrics.targetRps.Set(float64(limit.Limit())) | ||
} | ||
s.h.SetStreamHandler(protocol.ID(s.protocol), func(stream network.Stream) { | ||
select { | ||
case queue <- request{stream: stream, received: time.Now()}: | ||
default: | ||
if s.metrics != nil { | ||
s.metrics.dropped.Inc() | ||
} | ||
stream.Close() | ||
} | ||
}) | ||
|
||
var eg errgroup.Group | ||
eg.SetLimit(s.queueSize * 2) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
eg.Wait() | ||
return nil | ||
case req := <-queue: | ||
case req := <-s.queue: | ||
if !s.sem.TryAcquire(1) { | ||
if s.metrics != nil { | ||
s.metrics.dropped.Inc() | ||
} | ||
req.stream.Close() | ||
continue | ||
} | ||
if s.metrics != nil { | ||
s.metrics.queue.Set(float64(len(queue))) | ||
s.metrics.queue.Set(float64(s.queueSize)) | ||
s.metrics.accepted.Inc() | ||
} | ||
if s.metrics != nil { | ||
s.metrics.inQueueLatency.Observe(time.Since(req.received).Seconds()) | ||
} | ||
if err := limit.Wait(ctx); err != nil { | ||
if err := s.limit.Wait(ctx); err != nil { | ||
eg.Wait() | ||
return nil | ||
} | ||
ctx, cancel := context.WithCancel(ctx) | ||
eg.Go(func() error { | ||
<-ctx.Done() | ||
s.sem.Release(1) | ||
req.stream.Close() | ||
return nil | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about using a goroutine pool like https://github.com/panjf2000/ants? It can automatically control how many tasks are executed concurrently, reuse goroutines instead of creating a new one for each request (which is cheap but not free), scale the workers down if not used, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look at
ants
but I'm not sure it is a good fit here. We have 2 situations that we want to limit:Pool.Submit()
- it would have to beWithNonBlocking(true)
but then we don't know if the submit actually started a routine or not (because if it didn't we need to close the stream and update metrics accordingly)ant.Pool
will always process as many requests concurrently as it has capacity, but we actually want to limit to at most x requests per timeframe. If requests come in at a higher rate those should block for the time necessary until the requests per timeframe is below the targeted rateI don't see
ants
as a good fit for these requirements - as far as I understand the package it just ensures that no more than a certain number of requests are inflight at the same time while any new incoming request either blocks until a running one is finished or immediately fails without the more fine-grained behaviour we have now 😕EDIT: probably the given requirements can be met with
ants
and some extra code, but we would then only replacesemaphore
with it while still needing the rate limiter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of a goroutine pool only to achieve 2 goals:
We would need to use the pool in the blocking mode + use the rate-limiter and queue (for buffering waiting requests) as we do now.