Skip to content

Commit

Permalink
Removing the request and stream manager dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Jun 17, 2024
1 parent f77a069 commit f42fb09
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 97 deletions.
22 changes: 12 additions & 10 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package node

import (
"context"
"math/big"
"reflect"
"runtime/debug"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p"
"github.com/dominant-strategies/go-quai/p2p/node/pubsubManager"
"github.com/dominant-strategies/go-quai/p2p/node/streamManager"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/quai"
"github.com/dominant-strategies/go-quai/trie"
Expand Down Expand Up @@ -134,7 +134,7 @@ func (p *P2PNode) Stop() error {
}
}

func (p *P2PNode) requestFromPeers(ctx context.Context, topic *pubsubManager.Topic, requestData interface{}, respDataType interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData interface{}, respDataType interface{}, resultChan chan interface{}) {
go func() {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -164,14 +164,14 @@ func (p *P2PNode) requestFromPeers(ctx context.Context, topic *pubsubManager.Top
}).Error("Go-Quai Panicked")
}
}()
p.requestAndWait(ctx, peerID, topic, requestData, respDataType, resultChan)
p.requestAndWait(peerID, topic, requestData, respDataType, resultChan)
}(peerID)
}
requestWg.Wait()
}()
}

func (p *P2PNode) requestAndWait(ctx context.Context, peerID peer.ID, topic *pubsubManager.Topic, reqData interface{}, respDataType interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestAndWait(peerID peer.ID, topic *pubsubManager.Topic, reqData interface{}, respDataType interface{}, resultChan chan interface{}) {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
Expand All @@ -182,6 +182,8 @@ func (p *P2PNode) requestAndWait(ctx context.Context, peerID peer.ID, topic *pub
}()
var recvd interface{}
var err error
requestTimer := time.NewTimer(requestTimeout)
defer requestTimer.Stop()
if recvd, err = p.requestFromPeer(peerID, topic, reqData, respDataType); err == nil {
log.Global.WithFields(log.Fields{
"peerId": peerID,
Expand All @@ -193,13 +195,12 @@ func (p *P2PNode) requestAndWait(ctx context.Context, peerID peer.ID, topic *pub
select {
case resultChan <- recvd:
// Data sent successfully
case <-ctx.Done():
case <-requestTimer.C:
// Request timed out, return
log.Global.WithFields(log.Fields{
"peerId": peerID,
"message": "Request timed out, data not sent",
}).Warning("Missed data request")

default:
// Optionally log the missed send or handle it in another way
log.Global.WithFields(log.Fields{
Expand All @@ -208,6 +209,9 @@ func (p *P2PNode) requestAndWait(ctx context.Context, peerID peer.ID, topic *pub
}).Warning("Missed data send")
}
} else {
if err.Error() == streamManager.ErrStreamNotFound.Error() {
return
}
log.Global.WithFields(log.Fields{
"peerId": peerID,
"topic": topic.String(),
Expand All @@ -231,8 +235,6 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
}

resultChan := make(chan interface{}, 10)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
// If it is a hash, first check to see if it is contained in the caches
if hash, ok := requestData.(common.Hash); ok {
result, ok := p.cacheGet(hash, responseDataType, location)
Expand All @@ -242,7 +244,7 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
}
}

p.requestFromPeers(ctx, topic, requestData, responseDataType, resultChan)
p.requestFromPeers(topic, requestData, responseDataType, resultChan)
// TODO: optimize with waitgroups or a doneChan to only query if no peers responded
// Right now this creates too many streams, so don't call this until we have a better solution
// p.queryDHT(location, requestData, responseDataType, resultChan)
Expand Down Expand Up @@ -312,7 +314,7 @@ func (p *P2PNode) BanPeer(peer p2p.PeerID) {
}

// Opens a new stream to the given peer using the given protocol ID
func (p *P2PNode) NewStream(peerID peer.ID) (network.Stream, error) {
func (p *P2PNode) GetStream(peerID peer.ID) (network.Stream, error) {
return p.peerManager.GetStream(peerID)
}

Expand Down
2 changes: 2 additions & 0 deletions p2p/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func NewNode(ctx context.Context, quitCh chan struct{}) (*P2PNode, error) {
if err != nil {
return nil, err
}
sm.Start()

p2p.peerManager.SetStreamManager(sm)

return p2p, nil
Expand Down
7 changes: 2 additions & 5 deletions p2p/node/p2p_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, topic *pubsubManager.Topic, re
"peerId": peerID,
"topic": topic,
}).Trace("Requesting the data from peer")
stream, err := p.NewStream(peerID)
stream, err := p.GetStream(peerID)
if err != nil {
log.Global.WithFields(log.Fields{
"peerId": peerID,
"error": err,
}).Error("Failed to open stream to peer")
return nil, err
}

Expand All @@ -65,6 +61,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, topic *pubsubManager.Topic, re
if err != nil {
return nil, err
}

var recvdType interface{}
select {
case recvdType = <-dataChan:
Expand Down
98 changes: 66 additions & 32 deletions p2p/node/streamManager/streamManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package streamManager
import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"

Expand All @@ -23,14 +24,17 @@ const (
c_stream_timeout = 10 * time.Second

// The amount of redundancy for open streams
c_streamCacheSize = 9
c_streamCacheSize = 30

// c_newStreamRequestChanSize is the size of the channel to handle new stream request
c_newStreamRequestChanSize = 10

// The maximum number of concurrent requests before a stream is considered failed
c_maxPendingRequests = 100
)

var (
errStreamNotFound = errors.New("stream not found")
ErrStreamNotFound = errors.New("stream not found")
)

type StreamManager interface {
Expand All @@ -56,6 +60,8 @@ type basicStreamManager struct {
streamCache *expireLru.LRU[p2p.PeerID, streamWrapper]
p2pBackend quaiprotocol.QuaiP2PNode

newStreamRequestChan chan p2p.PeerID

host host.Host
mu sync.Mutex
}
Expand All @@ -73,12 +79,16 @@ func NewStreamManager(node quaiprotocol.QuaiP2PNode, host host.Host) (*basicStre
0,
)

return &basicStreamManager{
ctx: context.Background(),
streamCache: lruCache,
p2pBackend: node,
host: host,
}, nil
sm := &basicStreamManager{
ctx: context.Background(),
streamCache: lruCache,
p2pBackend: node,
host: host,
newStreamRequestChan: make(chan p2p.PeerID, c_newStreamRequestChanSize),
}

return sm, nil

}

// Expects a key as peerID and value of *streamWrapper
Expand All @@ -93,6 +103,48 @@ func severStream(key p2p.PeerID, wrappedStream streamWrapper) {
}
}

func (sm *basicStreamManager) Start() {
go sm.listenForNewStreamRequest()
}

func (sm *basicStreamManager) listenForNewStreamRequest() {
for {
select {
case peerID := <-sm.newStreamRequestChan:
err := sm.OpenStream(peerID)
if err != nil {
log.Global.WithFields(log.Fields{"peerId": peerID, "err": err}).Warn("Error opening new strean into peer")
}
case <-sm.ctx.Done():
return
}
}
}

func (sm *basicStreamManager) OpenStream(peerID p2p.PeerID) error {
// check if there is an existing stream
if _, ok := sm.streamCache.Get(peerID); ok {
return nil
}
// Create a new stream to the peer and register it in the cache
stream, err := sm.host.NewStream(sm.ctx, peerID, quaiprotocol.ProtocolVersion)
if err != nil {
return fmt.Errorf("error opening new stream with peer %s", peerID)
}
wrappedStream := streamWrapper{
stream: stream,
semaphore: make(chan struct{}, c_maxPendingRequests),
errCount: 0,
}
sm.streamCache.Add(peerID, wrappedStream)
go quaiprotocol.QuaiProtocolHandler(stream, sm.p2pBackend)
log.Global.WithField("PeerID", peerID).Info("Had to create new stream")
if streamMetrics != nil {
streamMetrics.WithLabelValues("NumStreams").Inc()
}
return nil
}

func (sm *basicStreamManager) CloseStream(peerID p2p.PeerID) error {
sm.mu.Lock()
defer sm.mu.Unlock()
Expand All @@ -103,37 +155,19 @@ func (sm *basicStreamManager) CloseStream(peerID p2p.PeerID) error {
log.Global.WithField("peerID", peerID).Debug("Pruned connection with peer")
return nil
}
return errStreamNotFound
return ErrStreamNotFound
}

func (sm *basicStreamManager) GetStream(peerID p2p.PeerID) (network.Stream, error) {
wrappedStream, ok := sm.streamCache.Get(peerID)
var err error
if !ok {
// Create a new stream to the peer and register it in the cache
stream, err := sm.host.NewStream(sm.ctx, peerID, quaiprotocol.ProtocolVersion)
if err != nil {
// Explicitly return nil here to avoid casting a nil later
return nil, err
}
if existingStream, ok := sm.streamCache.Get(peerID); !ok {
wrappedStream = streamWrapper{
stream: stream,
semaphore: make(chan struct{}, c_maxPendingRequests),
errCount: 0,
}
sm.streamCache.Add(peerID, wrappedStream)
} else {
// Close the stream if someone already opened a stream and reuse the
// existing stream
stream.Close()
return existingStream.stream, nil
}
go quaiprotocol.QuaiProtocolHandler(stream, sm.p2pBackend)
log.Global.WithField("PeerID", peerID).Info("Had to create new stream")
if streamMetrics != nil {
streamMetrics.WithLabelValues("NumStreams").Inc()
select {
case sm.newStreamRequestChan <- peerID:
default:
log.Global.Error("sm.newPeers is full with new stream creation requests")
}
return nil, ErrStreamNotFound
} else {
log.Global.WithField("PeerID", peerID).Info("Requested stream was found in cache")
}
Expand Down
51 changes: 38 additions & 13 deletions p2p/pb/proto_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/dominant-strategies/go-quai/log"
)

var EmptyResponse = errors.New("received empty reponse from peer")

func DecodeQuaiMessage(data []byte) (*QuaiMessage, error) {
msg := &QuaiMessage{} // Assuming QuaiMessage is the struct generated by protoc
if err := proto.Unmarshal(data, msg); err != nil {
Expand Down Expand Up @@ -93,30 +95,44 @@ func DecodeQuaiRequest(reqMsg *QuaiRequestMessage) (uint32, interface{}, common.

// EncodeResponse creates a marshaled protobuf message for a Quai Response.
// Returns the serialized protobuf message.
func EncodeQuaiResponse(id uint32, location common.Location, data interface{}) ([]byte, error) {
func EncodeQuaiResponse(id uint32, location common.Location, respDataType interface{}, data interface{}) ([]byte, error) {

respMsg := QuaiResponseMessage{
Id: id,
Location: location.ProtoEncode(),
}

switch data := data.(type) {
var err error
switch respDataType.(type) {
case *types.WorkObjectBlockView:
protoWorkObjectBlock, err := data.ProtoEncode()
if err != nil {
return nil, err
if data == nil {
respMsg.Response = &QuaiResponseMessage_WorkObjectBlockView{}
} else {
protoWorkObjectBlock, err := data.(*types.WorkObjectBlockView).ProtoEncode()
if err != nil {
return nil, err
}
respMsg.Response = &QuaiResponseMessage_WorkObjectBlockView{WorkObjectBlockView: protoWorkObjectBlock}
}
respMsg.Response = &QuaiResponseMessage_WorkObjectBlockView{WorkObjectBlockView: protoWorkObjectBlock}

case *types.WorkObjectHeaderView:
protoWorkObjectHeader, err := data.ProtoEncode()
if err != nil {
return nil, err
protoWorkObjectHeader := &types.ProtoWorkObjectHeaderView{}
if data == nil {
respMsg.Response = &QuaiResponseMessage_WorkObjectHeaderView{}
} else {
protoWorkObjectHeader, err = data.(*types.WorkObjectHeaderView).ProtoEncode()
if err != nil {
return nil, err
}
respMsg.Response = &QuaiResponseMessage_WorkObjectHeaderView{WorkObjectHeaderView: protoWorkObjectHeader}
}
respMsg.Response = &QuaiResponseMessage_WorkObjectHeaderView{WorkObjectHeaderView: protoWorkObjectHeader}

case *common.Hash:
respMsg.Response = &QuaiResponseMessage_BlockHash{BlockHash: data.ProtoEncode()}
if data == nil {
respMsg.Response = &QuaiResponseMessage_BlockHash{}
} else {
respMsg.Response = &QuaiResponseMessage_BlockHash{BlockHash: data.(common.Hash).ProtoEncode()}
}

default:
return nil, errors.Errorf("unsupported response data type: %T", data)
Expand All @@ -143,7 +159,10 @@ func DecodeQuaiResponse(respMsg *QuaiResponseMessage) (uint32, interface{}, erro
case *QuaiResponseMessage_WorkObjectHeaderView:
protoWorkObject := respMsg.GetWorkObjectHeaderView()
if protoWorkObject == nil {
return id, nil, errors.New("work object header is nil")
return id, nil, errors.New("nil response, and is not valid")
}
if protoWorkObject.WorkObject == nil {
return id, nil, EmptyResponse
}
block := &types.WorkObjectHeaderView{
WorkObject: &types.WorkObject{},
Expand All @@ -159,7 +178,10 @@ func DecodeQuaiResponse(respMsg *QuaiResponseMessage) (uint32, interface{}, erro
case *QuaiResponseMessage_WorkObjectBlockView:
protoWorkObject := respMsg.GetWorkObjectBlockView()
if protoWorkObject == nil {
return id, nil, errors.New("work object block is nil")
return id, nil, errors.New("nil response, and is not valid")
}
if protoWorkObject.WorkObject == nil {
return id, nil, EmptyResponse
}
block := &types.WorkObjectBlockView{
WorkObject: &types.WorkObject{},
Expand All @@ -174,6 +196,9 @@ func DecodeQuaiResponse(respMsg *QuaiResponseMessage) (uint32, interface{}, erro
return id, block, nil
case *QuaiResponseMessage_BlockHash:
blockHash := respMsg.GetBlockHash()
if blockHash == nil {
return id, nil, EmptyResponse
}
hash := common.Hash{}
hash.ProtoDecode(blockHash)
return id, hash, nil
Expand Down
Loading

0 comments on commit f42fb09

Please sign in to comment.