diff --git a/p2p/node/api.go b/p2p/node/api.go index 763bc4f55b..4f560ddae1 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -52,7 +52,12 @@ func (p *P2PNode) Subscribe(location common.Location, datatype interface{}) erro return err } - go p.peerManager.Provide(p.ctx, location, datatype) + go func() { + err := p.peerManager.Provide(p.ctx, location, datatype) + if err != nil { + log.Global.Errorf("error providing topic %s in %s: %s", reflect.TypeOf(datatype), location.Name(), err.Error()) + } + }() return nil } diff --git a/p2p/node/peerManager/peerManager.go b/p2p/node/peerManager/peerManager.go index 05a5331121..0d0f0dd94c 100644 --- a/p2p/node/peerManager/peerManager.go +++ b/p2p/node/peerManager/peerManager.go @@ -208,6 +208,14 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data logger := log.NewLogger("peers.log", viper.GetString(utils.PeersLogLevelFlag.Name)) go func() { + defer func() { + if r := recover(); r != nil { + log.Global.WithFields(log.Fields{ + "error": r, + "stacktrace": string(debug.Stack()), + }).Fatal("Go-Quai Panicked") + } + }() q := query.Query{} ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() diff --git a/p2p/node/pubsubManager/gossipsub.go b/p2p/node/pubsubManager/gossipsub.go index 7c63c31844..5c3e8bafcd 100644 --- a/p2p/node/pubsubManager/gossipsub.go +++ b/p2p/node/pubsubManager/gossipsub.go @@ -131,24 +131,36 @@ func (g *PubsubManager) Subscribe(location common.Location, datatype interface{} // close the msgChan if we exit this function defer close(msgChan) full := 0 - // Start worker goroutines - for i := 0; i < numWorkers; i++ { - go func(location common.Location) { - for msg := range msgChan { // This should exit when msgChan is closed - var data interface{} - // unmarshal the received data depending on the topic's type - err = pb.UnmarshalAndConvert(msg.Data, location, &data, datatype) - if err != nil { - log.Global.Errorf("error unmarshalling data: %s", err) - continue - } - - // handle the received data - if g.onReceived != nil { - g.onReceived(msg.ReceivedFrom, *msg.Topic, data, location) - } + // maintain a number of worker threads to handle messages + var msgWorker func(location common.Location) + msgWorker = func(location common.Location) { + defer func() { + if r := recover(); r != nil { + log.Global.WithFields(log.Fields{ + "error": r, + "stacktrace": string(debug.Stack()), + "location": location.Name(), + }).Errorf("Go-Quai Panicked") + } + go msgWorker(location) // If this worker exits, start a new one + }() + for msg := range msgChan { // This should exit when msgChan is closed + var data interface{} + // unmarshal the received data depending on the topic's type + err = pb.UnmarshalAndConvert(msg.Data, location, &data, datatype) + if err != nil { + log.Global.Errorf("error unmarshalling data: %s", err) + continue + } + + // handle the received data + if g.onReceived != nil { + g.onReceived(msg.ReceivedFrom, *msg.Topic, data, location) } - }(location) + } + } + for i := 0; i < numWorkers; i++ { + go msgWorker(location) } log.Global.WithField("topic", topic.String()).Debugf("Subscribed to topic") for {