Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix flaky TestQueued #6097

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"

"github.com/spacemeshos/go-spacemesh/codec"
Expand Down Expand Up @@ -156,6 +157,10 @@ type Server struct {
decayingTagSpec *DecayingTagSpec
decayingTag connmgr.DecayingTag

limit *rate.Limiter
sem *semaphore.Weighted
queue chan request

metrics *tracker // metrics can be nil

h Host
Expand All @@ -174,6 +179,8 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
queueSize: 1000,
requestsPerInterval: 100,
interval: time.Second,

queue: make(chan request),
}
for _, opt := range opts {
opt(srv)
Expand All @@ -195,6 +202,19 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
}
}

srv.limit = rate.NewLimiter(
rate.Every(srv.interval/time.Duration(srv.requestsPerInterval)),
srv.requestsPerInterval,
)
srv.sem = semaphore.NewWeighted(int64(srv.queueSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using a goroutine pool like https://github.com/panjf2000/ants? It can automatically control how many tasks are executed concurrently, reuse goroutines instead of creating a new one for each request (which is cheap but not free), scale the workers down if not used, etc.

Copy link
Member Author

@fasmat fasmat Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look at ants but I'm not sure it is a good fit here. We have 2 situations that we want to limit:

  • if the queue is full, we want to drop incoming requests immediately. I don't quite see how to make this possible with Pool.Submit() - it would have to be WithNonBlocking(true) but then we don't know if the submit actually started a routine or not (because if it didn't we need to close the stream and update metrics accordingly)
  • a maximum rate of incoming requests: the ant.Pool will always process as many requests concurrently as it has capacity, but we actually want to limit to at most x requests per timeframe. If requests come in at a higher rate those should block for the time necessary until the requests per timeframe is below the targeted rate

I don't see ants as a good fit for these requirements - as far as I understand the package it just ensures that no more than a certain number of requests are inflight at the same time while any new incoming request either blocks until a running one is finished or immediately fails without the more fine-grained behaviour we have now 😕

EDIT: probably the given requirements can be met with ants and some extra code, but we would then only replace semaphore with it while still needing the rate limiter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of a goroutine pool only to achieve 2 goals:

  • to reuse goroutines
  • to control how many streams are processed concurrently (via pool size)

We would need to use the pool in the blocking mode + use the rate-limiter and queue (for buffering waiting requests) as we do now.

if srv.metrics != nil {
srv.metrics.targetQueue.Set(float64(srv.queueSize))
srv.metrics.targetRps.Set(float64(srv.limit.Limit()))
}
srv.h.SetStreamHandler(protocol.ID(srv.protocol), func(stream network.Stream) {
srv.queue <- request{stream: stream, received: time.Now()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a select case in the block that was moved. Any particular reason why to remove the default case? Ideally there would be a context here that could be selected on, but also the problem is the rate limiting which could result in messages being dropped (but also, goroutines could pile up waiting to write to this channel, which would have other repercussions, like messages timing out on the other end). It might be useful to dwell on this a bit longer.

A bit of food for thought that maybe doing rate-limiting on the p2p level is not a good approach - some protocols might be more aggressive than others on resources, which could result in resource starvation on others. This means that having rate-limiting on the p2p level won't solve the problem of resources necessarily and also removes the possibility of prioritizing some protocols over others.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the reading side of the channel - every incoming request is immediately handled and if it exceeds the queue size it will be dropped immediately. I did this so that TryAcquire and Release calls to the semaphore are in the same code block.

I can add the TryAcquire to the callback of SetStreamHandler and only if it passes that check send the request to the queue to be handled in Run but then the Release would be disconnected from the TryAcquire call 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the reader goes away then the goroutines pile up on shutdown. I'm not sure which repercussions this may have (or not have). As a rule of thumb I think we should try to avoid writing directly to channels (without selects). But that's just me...
It could be that it is inconsequential, but it could potentially have some side-effects. I'm not so familiar with the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the SetStreamHandler callback in a way which does the handling on the sender side of the channel instead of the receiver side, see b0b6cf9. It has the advantage of dropping requests on the sending side of the channel if a) the context is already done or b) the semaphore is full, but since now the callback has to be set in the Run method I need an additional check with the Ready method because otherwise the first request might be sent from the client to the server before the SetStreamHandler callback has been set causing the test to halt.

So it is either:

  • waiting on Ready after calling Run
  • handling limiting and context checking in the Run method instead of in the SetStreamHandler callback

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK nvm, I found a way to do it without the need for the Ready method 🙂

})

return srv
}

Expand All @@ -204,45 +224,35 @@ type request struct {
}

func (s *Server) Run(ctx context.Context) error {
limit := rate.NewLimiter(rate.Every(s.interval/time.Duration(s.requestsPerInterval)), s.requestsPerInterval)
queue := make(chan request, s.queueSize)
if s.metrics != nil {
s.metrics.targetQueue.Set(float64(s.queueSize))
s.metrics.targetRps.Set(float64(limit.Limit()))
}
s.h.SetStreamHandler(protocol.ID(s.protocol), func(stream network.Stream) {
select {
case queue <- request{stream: stream, received: time.Now()}:
default:
if s.metrics != nil {
s.metrics.dropped.Inc()
}
stream.Close()
}
})

var eg errgroup.Group
eg.SetLimit(s.queueSize * 2)
for {
select {
case <-ctx.Done():
eg.Wait()
return nil
case req := <-queue:
case req := <-s.queue:
if !s.sem.TryAcquire(1) {
if s.metrics != nil {
s.metrics.dropped.Inc()
}
req.stream.Close()
continue
}
if s.metrics != nil {
s.metrics.queue.Set(float64(len(queue)))
s.metrics.queue.Set(float64(s.queueSize))
s.metrics.accepted.Inc()
}
if s.metrics != nil {
s.metrics.inQueueLatency.Observe(time.Since(req.received).Seconds())
}
if err := limit.Wait(ctx); err != nil {
if err := s.limit.Wait(ctx); err != nil {
eg.Wait()
return nil
}
ctx, cancel := context.WithCancel(ctx)
eg.Go(func() error {
<-ctx.Done()
s.sem.Release(1)
req.stream.Close()
return nil
})
Expand Down
44 changes: 24 additions & 20 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"context"
"errors"
"sync/atomic"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -172,25 +172,28 @@ func TestServer(t *testing.T) {
})
}

func TestQueued(t *testing.T) {
func Test_Queued(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

var (
total = 100
proto = "test"
success, failure atomic.Int64
wait = make(chan struct{}, total)
queueSize = 10
proto = "test"
stop = make(chan struct{})
wg sync.WaitGroup
)

wg.Add(queueSize)
client := New(wrapHost(t, mesh.Hosts()[0]), proto, nil)
srv := New(
wrapHost(t, mesh.Hosts()[1]),
proto,
WrapHandler(func(_ context.Context, msg []byte) ([]byte, error) {
wg.Done()
<-stop
return msg, nil
}),
WithQueueSize(total/3),
WithQueueSize(queueSize),
WithRequestsPerInterval(50, time.Second),
WithMetrics(),
)
Expand All @@ -205,23 +208,24 @@ func TestQueued(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
for i := 0; i < total; i++ {
eg.Go(func() error {
if _, err := client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping")); err != nil {
failure.Add(1)
} else {
success.Add(1)
}
wait <- struct{}{}
var reqEq errgroup.Group
for i := 0; i < queueSize; i++ { // fill the queue with requests
reqEq.Go(func() error {
resp, err := client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping"))
require.NoError(t, err)
require.Equal(t, []byte("ping"), resp)
return nil
})
}
for i := 0; i < total; i++ {
<-wait
wg.Wait()

for i := 0; i < queueSize; i++ { // queue is full, requests fail
_, err := client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping"))
require.Error(t, err)
}
require.NotZero(t, failure.Load())
require.Greater(t, int(success.Load()), total/2)
t.Log(success.Load())

close(stop)
require.NoError(t, reqEq.Wait())
}

func FuzzResponseConsistency(f *testing.F) {
Expand Down
Loading