Skip to content

Commit

Permalink
Close pending semaphore slots after fulfillment or expiry
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed May 16, 2024
1 parent 716bf96 commit 8ca52b8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 0 deletions.
2 changes: 2 additions & 0 deletions p2p/node/p2p_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
var recvdType interface{}
select {
case recvdType = <-dataChan:
p.GetPeerManager().ClosePendingRequest(peerID)
break
case <-time.After(requestManager.C_requestTimeout):
log.Global.WithFields(log.Fields{
"peerId": peerID,
}).Warn("Peer did not respond in time")
p.peerManager.MarkUnresponsivePeer(peerID, location)
p.GetPeerManager().ClosePendingRequest(peerID)
return nil, errors.New("peer did not respond in time")
}

Expand Down
4 changes: 4 additions & 0 deletions p2p/node/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,7 @@ func (pm *BasicPeerManager) CloseStream(peerID p2p.PeerID) error {
func (pm *BasicPeerManager) WriteMessageToStream(peerId p2p.PeerID, stream network.Stream, msg []byte) error {
return pm.streamManager.WriteMessageToStream(peerId, stream, msg)
}

func (pm *BasicPeerManager) ClosePendingRequest(peer p2p.PeerID) error {
return pm.streamManager.ClosePendingRequest(peer)
}
13 changes: 13 additions & 0 deletions p2p/node/streamManager/streamManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type StreamManager interface {

// WriteMessageToStream writes the given message into the given stream
WriteMessageToStream(peerID p2p.PeerID, stream network.Stream, msg []byte) error

// Releases a semaphore slot for the given peerID
ClosePendingRequest(peerID p2p.PeerID) error
}

type basicStreamManager struct {
Expand Down Expand Up @@ -187,5 +190,15 @@ func (sm *basicStreamManager) WriteMessageToStream(peerID p2p.PeerID, stream net
if err != nil {
return errors.Wrap(err, "failed to write message to stream")
}

return nil
}

func (sm *basicStreamManager) ClosePendingRequest(peerID p2p.PeerID) error {
wrappedStream, found := sm.streamCache.Get(peerID)
if !found {
return errors.New("stream not found")
}
<-wrappedStream.(*streamWrapper).semaphore
return nil
}

0 comments on commit 8ca52b8

Please sign in to comment.