diff --git a/cmd/go-quai/start.go b/cmd/go-quai/start.go index 089c32782c..da82f2fb50 100644 --- a/cmd/go-quai/start.go +++ b/cmd/go-quai/start.go @@ -87,7 +87,7 @@ func runStart(cmd *cobra.Command, args []string) error { } // Start the hierarchical co-ordinator var nodeWg sync.WaitGroup - hc := utils.NewHierarchicalCoordinator(node, logLevel, &nodeWg, startingExpansionNumber, quitCh) + hc := utils.NewHierarchicalCoordinator(node, logLevel, &nodeWg, startingExpansionNumber) err = hc.StartHierarchicalCoordinator() if err != nil { log.Global.WithField("error", err).Fatal("error starting hierarchical coordinator") @@ -110,6 +110,7 @@ func runStart(cmd *cobra.Command, args []string) error { <-ch log.Global.Warn("Received 'stop' signal, shutting down gracefully...") cancel() + node.Close() // stop the hierarchical co-ordinator hc.Stop() if err := node.Stop(); err != nil { diff --git a/cmd/utils/hierarchical_coordinator.go b/cmd/utils/hierarchical_coordinator.go index 0c8f0ec37c..bf1d2fb2e7 100644 --- a/cmd/utils/hierarchical_coordinator.go +++ b/cmd/utils/hierarchical_coordinator.go @@ -263,7 +263,7 @@ func (ns *NodeSet) Copy() NodeSet { } // NewHierarchicalCoordinator creates a new instance of the HierarchicalCoordinator -func NewHierarchicalCoordinator(p2p quai.NetworkingAPI, logLevel string, nodeWg *sync.WaitGroup, startingExpansionNumber uint64, quitCh chan struct{}) *HierarchicalCoordinator { +func NewHierarchicalCoordinator(p2p quai.NetworkingAPI, logLevel string, nodeWg *sync.WaitGroup, startingExpansionNumber uint64) *HierarchicalCoordinator { db, err := OpenBackendDB() if err != nil { log.Global.WithField("err", err).Fatal("Error opening the backend db") @@ -275,7 +275,7 @@ func NewHierarchicalCoordinator(p2p quai.NetworkingAPI, logLevel string, nodeWg logLevel: logLevel, slicesRunning: GetRunningZones(), treeExpansionTriggerStarted: false, - quitCh: quitCh, + quitCh: make(chan struct{}), recentBlocks: make(map[string]*lru.Cache[common.Hash, Node]), pendingHeaders: NewPendingHeaders(), bestEntropy: new(big.Int).Set(common.Big0), @@ -434,12 +434,12 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co } func (hc *HierarchicalCoordinator) Stop() { + close(hc.quitCh) for _, chainEventSub := range hc.chainSubs { chainEventSub.Unsubscribe() } hc.expansionSub.Unsubscribe() hc.db.Close() - close(hc.quitCh) hc.wg.Wait() } @@ -484,7 +484,8 @@ func (hc *HierarchicalCoordinator) expansionEventLoop() { } } } - + case <-hc.quitCh: + return case <-hc.expansionSub.Err(): return } @@ -617,6 +618,8 @@ func (hc *HierarchicalCoordinator) ChainEventLoop(chainEvent chan core.ChainEven hc.pendingHeaderBackupCh <- struct{}{} lastUpdateTime = time.Now() } + case <-hc.quitCh: + return case <-sub.Err(): return } @@ -641,7 +644,6 @@ func (hc *HierarchicalCoordinator) MapConstructProc() { hc.PendingHeadersMap() case <-hc.quitCh: return - default: } } } @@ -837,44 +839,18 @@ func (hc *HierarchicalCoordinator) ReapplicationLoop(head core.ChainEvent) { sleepTime := 1 for { - hc.BuildPendingHeaders(head.Block, head.Order, head.Entropy) - time.Sleep(time.Duration(sleepTime) * time.Second) - sleepTime = sleepTime * 2 - if sleepTime > 65 { - break - } - } -} - -func (hc *HierarchicalCoordinator) GetLock(location common.Location, order int) []*sync.RWMutex { - hc.mutexMapMu.Lock() - defer hc.mutexMapMu.Unlock() - _, exists := hc.pendingHeaderMu[location.Name()] - if !exists { - hc.pendingHeaderMu[location.Name()] = &sync.RWMutex{} - } - - regionNum, zoneNum := common.GetHierarchySizeForExpansionNumber(hc.currentExpansionNumber) - var locks []*sync.RWMutex - switch order { - case common.PRIME_CTX: - locks = append(locks, hc.pendingHeaderMu[common.Location{}.Name()]) - for i := 0; i < int(regionNum); i++ { - locks = append(locks, hc.pendingHeaderMu[common.Location{byte(i)}.Name()]) - for j := 0; j < int(zoneNum); j++ { - locks = append(locks, hc.pendingHeaderMu[common.Location{byte(i), byte(j)}.Name()]) + select { + case <-hc.quitCh: + return + default: + hc.BuildPendingHeaders(head.Block, head.Order, head.Entropy) + time.Sleep(time.Duration(sleepTime) * time.Second) + sleepTime = sleepTime * 2 + if sleepTime > 65 { + return } } - case common.REGION_CTX: - locks = append(locks, hc.pendingHeaderMu[common.Location{byte(location.Region())}.Name()]) - for j := 0; j < int(zoneNum); j++ { - locks = append(locks, hc.pendingHeaderMu[common.Location{byte(location.Region()), byte(j)}.Name()]) - } - case common.ZONE_CTX: - locks = append(locks, hc.pendingHeaderMu[common.Location{byte(location.Region()), byte(location.Zone())}.Name()]) } - - return locks } func (hc *HierarchicalCoordinator) CalculateLeaders(badHashes map[common.Hash]bool) []Node { diff --git a/p2p/node/api.go b/p2p/node/api.go index c09d40d7db..f692e39926 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -77,6 +77,8 @@ func (p *P2PNode) Subscribe(location common.Location, datatype interface{}) erro log.Global.Infof("providing topic %s in %s", reflect.TypeOf(datatype), location.Name()) return } + case <-p.quitCh: + return case <-timeout.C: log.Global.Errorf("unable to provide topic %s in %s", reflect.TypeOf(datatype), location.Name()) return @@ -160,34 +162,39 @@ func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData inter } }() defer close(resultChan) - peers := p.peerManager.GetPeers(topic) - log.Global.WithFields(log.Fields{ - "peers": peers, - "topic": topic, - }).Debug("Requesting data from peers") - - var requestWg sync.WaitGroup - for peerID := range peers { - // if we have exceeded the outbound rate limit for this peer, skip them for now - if err := protocol.ProcRequestRate(peerID, false); err != nil { - log.Global.Warnf("Exceeded request rate to peer %s", peerID) - continue + select { + case <-p.ctx.Done(): + return + default: + peers := p.peerManager.GetPeers(topic) + log.Global.WithFields(log.Fields{ + "peers": peers, + "topic": topic, + }).Debug("Requesting data from peers") + + var requestWg sync.WaitGroup + for peerID := range peers { + // if we have exceeded the outbound rate limit for this peer, skip them for now + if err := protocol.ProcRequestRate(peerID, false); err != nil { + log.Global.Warnf("Exceeded request rate to peer %s", peerID) + continue + } + requestWg.Add(1) + go func(peerID peer.ID) { + defer func() { + if r := recover(); r != nil { + log.Global.WithFields(log.Fields{ + "error": r, + "stacktrace": string(debug.Stack()), + }).Error("Go-Quai Panicked") + } + }() + defer requestWg.Done() + p.requestAndWait(peerID, topic, requestData, respDataType, resultChan) + }(peerID) } - requestWg.Add(1) - go func(peerID peer.ID) { - defer func() { - if r := recover(); r != nil { - log.Global.WithFields(log.Fields{ - "error": r, - "stacktrace": string(debug.Stack()), - }).Error("Go-Quai Panicked") - } - }() - defer requestWg.Done() - p.requestAndWait(peerID, topic, requestData, respDataType, resultChan) - }(peerID) + requestWg.Wait() } - requestWg.Wait() }() } diff --git a/p2p/node/node.go b/p2p/node/node.go index 6c244302d9..0091c214d3 100644 --- a/p2p/node/node.go +++ b/p2p/node/node.go @@ -244,7 +244,6 @@ func NewNode(ctx context.Context, quitCh chan struct{}) (*P2PNode, error) { // Close performs cleanup of resources used by P2PNode func (p *P2PNode) Close() error { - p.cancel() // Close PubSub manager if err := p.pubsub.Stop(); err != nil { log.Global.Errorf("error closing pubsub manager: %s", err)