Skip to content

Commit

Permalink
Review 2: electric boogaloo
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Jul 4, 2024
1 parent b0b6cf9 commit d4ba8ba
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
41 changes: 19 additions & 22 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type Server struct {
limit *rate.Limiter
sem *semaphore.Weighted
queue chan request
started chan struct{}
stopped chan struct{}

metrics *tracker // metrics can be nil

Expand All @@ -182,7 +182,7 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
interval: time.Second,

queue: make(chan request),
started: make(chan struct{}),
stopped: make(chan struct{}),
}
for _, opt := range opts {
opt(srv)
Expand All @@ -209,6 +209,22 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
srv.requestsPerInterval,
)
srv.sem = semaphore.NewWeighted(int64(srv.queueSize))
srv.h.SetStreamHandler(protocol.ID(srv.protocol), func(stream network.Stream) {
if !srv.sem.TryAcquire(1) {
if srv.metrics != nil {
srv.metrics.dropped.Inc()

Check warning on line 215 in p2p/server/server.go

View check run for this annotation

Codecov / codecov/patch

p2p/server/server.go#L215

Added line #L215 was not covered by tests
}
stream.Close()
return
}
select {
case <-srv.stopped:
srv.sem.Release(1)
stream.Close()

Check warning on line 223 in p2p/server/server.go

View check run for this annotation

Codecov / codecov/patch

p2p/server/server.go#L221-L223

Added lines #L221 - L223 were not covered by tests
case srv.queue <- request{stream: stream, received: time.Now()}:
// at most s.queueSize requests block here, the others are dropped with the semaphore
}
})
if srv.metrics != nil {
srv.metrics.targetQueue.Set(float64(srv.queueSize))
srv.metrics.targetRps.Set(float64(srv.limit.Limit()))
Expand All @@ -221,31 +237,12 @@ type request struct {
received time.Time
}

func (s *Server) Ready() <-chan struct{} {
return s.started
}

func (s *Server) Run(ctx context.Context) error {
s.h.SetStreamHandler(protocol.ID(s.protocol), func(stream network.Stream) {
if !s.sem.TryAcquire(1) {
if s.metrics != nil {
s.metrics.dropped.Inc()
}
stream.Close()
return
}
select {
case <-ctx.Done():
case s.queue <- request{stream: stream, received: time.Now()}:
// at most s.queueSize requests block here, the others are dropped with the semaphore
}
})
close(s.started)

var eg errgroup.Group
for {
select {
case <-ctx.Done():
close(s.stopped)
eg.Wait()
return nil
case req := <-s.queue:
Expand Down
2 changes: 0 additions & 2 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func Test_Queued(t *testing.T) {
eg.Go(func() error {
return srv.Run(ctx)
})
<-srv.Ready()
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
Expand Down Expand Up @@ -254,7 +253,6 @@ func Test_RequestInterval(t *testing.T) {
eg.Go(func() error {
return srv.Run(ctx)
})
<-srv.Ready()
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
Expand Down

0 comments on commit d4ba8ba

Please sign in to comment.