Skip to content

Commit

Permalink
Add additional test for RequestInterval
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Jul 3, 2024
1 parent cde7d8c commit f8578c1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
4 changes: 4 additions & 0 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte, extraProt
if err := s.StreamRequest(ctx, pid, req, func(ctx context.Context, stream io.ReadWriter) error {
rd := bufio.NewReader(stream)
if _, err := codec.DecodeFrom(rd, &r); err != nil {
if errors.Is(err, io.ErrClosedPipe) && ctx.Err() != nil {
// ensure that a canceled context is returned as the right error
return ctx.Err()
}
return fmt.Errorf("peer %s: %w", pid, err)
}
if r.Error != "" {
Expand Down
44 changes: 42 additions & 2 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ func Test_Queued(t *testing.T) {
return msg, nil
}),
WithQueueSize(queueSize),
WithRequestsPerInterval(50, time.Second),
WithMetrics(),
)
var (
eg errgroup.Group
Expand Down Expand Up @@ -228,6 +226,48 @@ func Test_Queued(t *testing.T) {
require.NoError(t, reqEq.Wait())
}

func Test_RequestInterval(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

var (
maxReqPerMin = 10
proto = "test"
)

client := New(wrapHost(t, mesh.Hosts()[0]), proto, nil)
srv := New(
wrapHost(t, mesh.Hosts()[1]),
proto,
WrapHandler(func(_ context.Context, msg []byte) ([]byte, error) {
return msg, nil
}),
WithRequestsPerInterval(maxReqPerMin, time.Minute),
)
var (
eg errgroup.Group
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
eg.Go(func() error {
return srv.Run(ctx)
})
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})

for i := 0; i < maxReqPerMin; i++ {
resp, err := client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping"))
require.NoError(t, err)
require.Equal(t, []byte("ping"), resp)
}

ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err = client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping"))
require.ErrorIs(t, err, context.DeadlineExceeded)
}

func FuzzResponseConsistency(f *testing.F) {
tester.FuzzConsistency[Response](f)
}
Expand Down

0 comments on commit f8578c1

Please sign in to comment.