Skip to content

Commit

Permalink
fix pool-coordinator running error when some error happened and abort…
Browse files Browse the repository at this point in the history
… parameter init (openyurtio#1312)
  • Loading branch information
JameKeal authored Mar 31, 2023
1 parent eafe3c0 commit 6996b92
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -81,6 +82,11 @@ type Coordinator interface {
IsHealthy() (cachemanager.CacheManager, bool)
}

type statusInfo struct {
electorStatus int32
currentCnt uint64
}

type coordinator struct {
sync.Mutex
ctx context.Context
Expand All @@ -95,6 +101,8 @@ type coordinator struct {
etcdStorage storage.Store
hubElector *HubElector
electStatus int32
cnt uint64
statusInfoChan chan statusInfo
isPoolCacheSynced bool
certMgr *certmanager.CertManager
// cloudCAFilePath is the file path of cloud kubernetes cluster CA cert.
Expand Down Expand Up @@ -155,6 +163,7 @@ func NewCoordinator(
serializerMgr: cfg.SerializerManager,
restMapperMgr: cfg.RESTMapperManager,
hubElector: elector,
statusInfoChan: make(chan statusInfo, 10),
}

poolCacheSyncedDetector := &poolCacheSyncedDetector{
Expand Down Expand Up @@ -231,7 +240,37 @@ func (coordinator *coordinator) Run() {
}
metrics.Metrics.ObservePoolCoordinatorYurthubRole(electorStatus)

switch electorStatus {
if coordinator.cnt == math.MaxUint64 {
// cnt will overflow, reset it.
coordinator.cnt = 0
// if statusInfoChan channel also has data, clean it.
length := len(coordinator.statusInfoChan)
if length > 0 {
i := 0
for v := range coordinator.statusInfoChan {
klog.Infof("clean statusInfo data %+v when coordinator.cnt is reset", v)
i++
if i == length {
break
}
}
}
}
coordinator.cnt++
coordinator.statusInfoChan <- statusInfo{
electorStatus: electorStatus,
currentCnt: coordinator.cnt,
}
case electorStatusInfo, ok := <-coordinator.statusInfoChan:
if !ok {
return
}
if electorStatusInfo.currentCnt < coordinator.cnt {
klog.Infof("electorStatusInfo %+v is behind of current cnt %d", electorStatusInfo, coordinator.cnt)
continue
}

switch electorStatusInfo.electorStatus {
case PendingHub:
coordinator.poolCacheSyncManager.EnsureStop()
coordinator.delegateNodeLeaseManager.EnsureStop()
Expand All @@ -245,20 +284,24 @@ func (coordinator *coordinator) Run() {
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
coordinator.statusInfoChan <- electorStatusInfo
continue
}

nodeLeaseProxyClient, err := coordinator.newNodeLeaseProxyClient()
if err != nil {
klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err)
continue
}
klog.Infof("coordinator newCloudLeaseClient success.")
if err := coordinator.poolCacheSyncManager.EnsureStart(); err != nil {
klog.Errorf("failed to sync pool-scoped resource, %v", err)
coordinator.statusInfoChan <- electorStatusInfo
continue
}
klog.Infof("coordinator poolCacheSyncManager has ensure started")

nodeLeaseProxyClient, err := coordinator.newNodeLeaseProxyClient()
if err != nil {
klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err)
coordinator.statusInfoChan <- electorStatusInfo
continue
}
coordinator.delegateNodeLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{
FilterFunc: ifDelegateHeartBeat,
Handler: cache.ResourceEventHandlerFuncs{
Expand All @@ -270,6 +313,7 @@ func (coordinator *coordinator) Run() {
},
},
})

coordinator.poolCacheSyncedDetector.EnsureStart()

if coordinator.needUploadLocalCache {
Expand All @@ -283,6 +327,7 @@ func (coordinator *coordinator) Run() {
poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore()
if err != nil {
klog.Errorf("failed to create pool scoped cache store and manager, %v", err)
coordinator.statusInfoChan <- electorStatusInfo
continue
}

Expand All @@ -306,7 +351,7 @@ func (coordinator *coordinator) Run() {
if needCancelEtcdStorage {
cancelEtcdStorage()
}
coordinator.electStatus = electorStatus
coordinator.electStatus = electorStatusInfo.electorStatus
coordinator.poolCacheManager = poolCacheManager
coordinator.etcdStorage = etcdStorage
coordinator.cancelEtcdStorage = cancelEtcdStorage
Expand Down

0 comments on commit 6996b92

Please sign in to comment.