Skip to content

Commit

Permalink
Review 2: electric boogaloo
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Jul 4, 2024
1 parent b0b6cf9 commit 9aceb84
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
41 changes: 19 additions & 22 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type Server struct {
limit *rate.Limiter
sem *semaphore.Weighted
queue chan request
started chan struct{}
stopped chan struct{}

metrics *tracker // metrics can be nil

Expand All @@ -182,7 +182,7 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
interval: time.Second,

queue: make(chan request),
started: make(chan struct{}),
stopped: make(chan struct{}),
}
for _, opt := range opts {
opt(srv)
Expand All @@ -204,6 +204,22 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
}
}

srv.h.SetStreamHandler(protocol.ID(srv.protocol), func(stream network.Stream) {
if !srv.sem.TryAcquire(1) {
if srv.metrics != nil {
srv.metrics.dropped.Inc()
}
stream.Close()
return
}
select {
case <-srv.stopped:
srv.sem.Release(1)
stream.Close()
case srv.queue <- request{stream: stream, received: time.Now()}:
// at most s.queueSize requests block here, the others are dropped with the semaphore
}
})
srv.limit = rate.NewLimiter(
rate.Every(srv.interval/time.Duration(srv.requestsPerInterval)),
srv.requestsPerInterval,
Expand All @@ -221,31 +237,12 @@ type request struct {
received time.Time
}

func (s *Server) Ready() <-chan struct{} {
return s.started
}

func (s *Server) Run(ctx context.Context) error {
s.h.SetStreamHandler(protocol.ID(s.protocol), func(stream network.Stream) {
if !s.sem.TryAcquire(1) {
if s.metrics != nil {
s.metrics.dropped.Inc()
}
stream.Close()
return
}
select {
case <-ctx.Done():
case s.queue <- request{stream: stream, received: time.Now()}:
// at most s.queueSize requests block here, the others are dropped with the semaphore
}
})
close(s.started)

var eg errgroup.Group
for {
select {
case <-ctx.Done():
close(s.stopped)
eg.Wait()
return nil
case req := <-s.queue:
Expand Down
2 changes: 0 additions & 2 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func Test_Queued(t *testing.T) {
eg.Go(func() error {
return srv.Run(ctx)
})
<-srv.Ready()
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
Expand Down Expand Up @@ -254,7 +253,6 @@ func Test_RequestInterval(t *testing.T) {
eg.Go(func() error {
return srv.Run(ctx)
})
<-srv.Ready()
t.Cleanup(func() {
assert.NoError(t, eg.Wait())
})
Expand Down

0 comments on commit 9aceb84

Please sign in to comment.