Skip to content

Commit

Permalink
bugfix: Graceful shutdown of the node
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Oct 1, 2024
1 parent 22a17fc commit 4a78e13
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 68 deletions.
3 changes: 2 additions & 1 deletion cmd/go-quai/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func runStart(cmd *cobra.Command, args []string) error {
}
// Start the hierarchical co-ordinator
var nodeWg sync.WaitGroup
hc := utils.NewHierarchicalCoordinator(node, logLevel, &nodeWg, startingExpansionNumber, quitCh)
hc := utils.NewHierarchicalCoordinator(node, logLevel, &nodeWg, startingExpansionNumber)
err = hc.StartHierarchicalCoordinator()
if err != nil {
log.Global.WithField("error", err).Fatal("error starting hierarchical coordinator")
Expand All @@ -110,6 +110,7 @@ func runStart(cmd *cobra.Command, args []string) error {
<-ch
log.Global.Warn("Received 'stop' signal, shutting down gracefully...")
cancel()
node.Close()
// stop the hierarchical co-ordinator
hc.Stop()
if err := node.Stop(); err != nil {
Expand Down
56 changes: 16 additions & 40 deletions cmd/utils/hierarchical_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (ns *NodeSet) Copy() NodeSet {
}

// NewHierarchicalCoordinator creates a new instance of the HierarchicalCoordinator
func NewHierarchicalCoordinator(p2p quai.NetworkingAPI, logLevel string, nodeWg *sync.WaitGroup, startingExpansionNumber uint64, quitCh chan struct{}) *HierarchicalCoordinator {
func NewHierarchicalCoordinator(p2p quai.NetworkingAPI, logLevel string, nodeWg *sync.WaitGroup, startingExpansionNumber uint64) *HierarchicalCoordinator {
db, err := OpenBackendDB()
if err != nil {
log.Global.WithField("err", err).Fatal("Error opening the backend db")
Expand All @@ -275,7 +275,7 @@ func NewHierarchicalCoordinator(p2p quai.NetworkingAPI, logLevel string, nodeWg
logLevel: logLevel,
slicesRunning: GetRunningZones(),
treeExpansionTriggerStarted: false,
quitCh: quitCh,
quitCh: make(chan struct{}),
recentBlocks: make(map[string]*lru.Cache[common.Hash, Node]),
pendingHeaders: NewPendingHeaders(),
bestEntropy: new(big.Int).Set(common.Big0),
Expand Down Expand Up @@ -434,12 +434,12 @@ func (hc *HierarchicalCoordinator) startNode(logPath string, quaiBackend quai.Co
}

func (hc *HierarchicalCoordinator) Stop() {
close(hc.quitCh)
for _, chainEventSub := range hc.chainSubs {
chainEventSub.Unsubscribe()
}
hc.expansionSub.Unsubscribe()
hc.db.Close()
close(hc.quitCh)
hc.wg.Wait()
}

Expand Down Expand Up @@ -484,7 +484,8 @@ func (hc *HierarchicalCoordinator) expansionEventLoop() {
}
}
}

case <-hc.quitCh:
return
case <-hc.expansionSub.Err():
return
}
Expand Down Expand Up @@ -617,6 +618,8 @@ func (hc *HierarchicalCoordinator) ChainEventLoop(chainEvent chan core.ChainEven
hc.pendingHeaderBackupCh <- struct{}{}
lastUpdateTime = time.Now()
}
case <-hc.quitCh:
return
case <-sub.Err():
return
}
Expand All @@ -641,7 +644,6 @@ func (hc *HierarchicalCoordinator) MapConstructProc() {
hc.PendingHeadersMap()
case <-hc.quitCh:
return
default:
}
}
}
Expand Down Expand Up @@ -837,44 +839,18 @@ func (hc *HierarchicalCoordinator) ReapplicationLoop(head core.ChainEvent) {
sleepTime := 1

for {
hc.BuildPendingHeaders(head.Block, head.Order, head.Entropy)
time.Sleep(time.Duration(sleepTime) * time.Second)
sleepTime = sleepTime * 2
if sleepTime > 65 {
break
}
}
}

func (hc *HierarchicalCoordinator) GetLock(location common.Location, order int) []*sync.RWMutex {
hc.mutexMapMu.Lock()
defer hc.mutexMapMu.Unlock()
_, exists := hc.pendingHeaderMu[location.Name()]
if !exists {
hc.pendingHeaderMu[location.Name()] = &sync.RWMutex{}
}

regionNum, zoneNum := common.GetHierarchySizeForExpansionNumber(hc.currentExpansionNumber)
var locks []*sync.RWMutex
switch order {
case common.PRIME_CTX:
locks = append(locks, hc.pendingHeaderMu[common.Location{}.Name()])
for i := 0; i < int(regionNum); i++ {
locks = append(locks, hc.pendingHeaderMu[common.Location{byte(i)}.Name()])
for j := 0; j < int(zoneNum); j++ {
locks = append(locks, hc.pendingHeaderMu[common.Location{byte(i), byte(j)}.Name()])
select {
case <-hc.quitCh:
return
default:
hc.BuildPendingHeaders(head.Block, head.Order, head.Entropy)
time.Sleep(time.Duration(sleepTime) * time.Second)
sleepTime = sleepTime * 2
if sleepTime > 65 {
return
}
}
case common.REGION_CTX:
locks = append(locks, hc.pendingHeaderMu[common.Location{byte(location.Region())}.Name()])
for j := 0; j < int(zoneNum); j++ {
locks = append(locks, hc.pendingHeaderMu[common.Location{byte(location.Region()), byte(j)}.Name()])
}
case common.ZONE_CTX:
locks = append(locks, hc.pendingHeaderMu[common.Location{byte(location.Region()), byte(location.Zone())}.Name()])
}

return locks
}

func (hc *HierarchicalCoordinator) CalculateLeaders(badHashes map[common.Hash]bool) []Node {
Expand Down
59 changes: 33 additions & 26 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func (p *P2PNode) Subscribe(location common.Location, datatype interface{}) erro
log.Global.Infof("providing topic %s in %s", reflect.TypeOf(datatype), location.Name())
return
}
case <-p.quitCh:
return
case <-timeout.C:
log.Global.Errorf("unable to provide topic %s in %s", reflect.TypeOf(datatype), location.Name())
return
Expand Down Expand Up @@ -160,34 +162,39 @@ func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData inter
}
}()
defer close(resultChan)
peers := p.peerManager.GetPeers(topic)
log.Global.WithFields(log.Fields{
"peers": peers,
"topic": topic,
}).Debug("Requesting data from peers")

var requestWg sync.WaitGroup
for peerID := range peers {
// if we have exceeded the outbound rate limit for this peer, skip them for now
if err := protocol.ProcRequestRate(peerID, false); err != nil {
log.Global.Warnf("Exceeded request rate to peer %s", peerID)
continue
select {
case <-p.ctx.Done():
return
default:
peers := p.peerManager.GetPeers(topic)
log.Global.WithFields(log.Fields{
"peers": peers,
"topic": topic,
}).Debug("Requesting data from peers")

var requestWg sync.WaitGroup
for peerID := range peers {
// if we have exceeded the outbound rate limit for this peer, skip them for now
if err := protocol.ProcRequestRate(peerID, false); err != nil {
log.Global.Warnf("Exceeded request rate to peer %s", peerID)
continue
}
requestWg.Add(1)
go func(peerID peer.ID) {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
defer requestWg.Done()
p.requestAndWait(peerID, topic, requestData, respDataType, resultChan)
}(peerID)
}
requestWg.Add(1)
go func(peerID peer.ID) {
defer func() {
if r := recover(); r != nil {
log.Global.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
defer requestWg.Done()
p.requestAndWait(peerID, topic, requestData, respDataType, resultChan)
}(peerID)
requestWg.Wait()
}
requestWg.Wait()
}()
}

Expand Down
1 change: 0 additions & 1 deletion p2p/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func NewNode(ctx context.Context, quitCh chan struct{}) (*P2PNode, error) {

// Close performs cleanup of resources used by P2PNode
func (p *P2PNode) Close() error {
p.cancel()
// Close PubSub manager
if err := p.pubsub.Stop(); err != nil {
log.Global.Errorf("error closing pubsub manager: %s", err)
Expand Down

0 comments on commit 4a78e13

Please sign in to comment.