diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index 092f71a1886..cf85e8be299 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "math" "strconv" "sync" "time" @@ -81,6 +82,11 @@ type Coordinator interface { IsHealthy() (cachemanager.CacheManager, bool) } +type statusInfo struct { + electorStatus int32 + currentCnt uint64 +} + type coordinator struct { sync.Mutex ctx context.Context @@ -95,6 +101,8 @@ type coordinator struct { etcdStorage storage.Store hubElector *HubElector electStatus int32 + cnt uint64 + statusInfoChan chan statusInfo isPoolCacheSynced bool certMgr *certmanager.CertManager // cloudCAFilePath is the file path of cloud kubernetes cluster CA cert. @@ -155,6 +163,7 @@ func NewCoordinator( serializerMgr: cfg.SerializerManager, restMapperMgr: cfg.RESTMapperManager, hubElector: elector, + statusInfoChan: make(chan statusInfo, 10), } poolCacheSyncedDetector := &poolCacheSyncedDetector{ @@ -231,7 +240,37 @@ func (coordinator *coordinator) Run() { } metrics.Metrics.ObservePoolCoordinatorYurthubRole(electorStatus) - switch electorStatus { + if coordinator.cnt == math.MaxUint64 { + // cnt will overflow, reset it. + coordinator.cnt = 0 + // if statusInfoChan channel also has data, clean it. + length := len(coordinator.statusInfoChan) + if length > 0 { + i := 0 + for v := range coordinator.statusInfoChan { + klog.Infof("clean statusInfo data %+v when coordinator.cnt is reset", v) + i++ + if i == length { + break + } + } + } + } + coordinator.cnt++ + coordinator.statusInfoChan <- statusInfo{ + electorStatus: electorStatus, + currentCnt: coordinator.cnt, + } + case electorStatusInfo, ok := <-coordinator.statusInfoChan: + if !ok { + return + } + if electorStatusInfo.currentCnt < coordinator.cnt { + klog.Infof("electorStatusInfo %+v is behind of current cnt %d", electorStatusInfo, coordinator.cnt) + continue + } + + switch electorStatusInfo.electorStatus { case PendingHub: coordinator.poolCacheSyncManager.EnsureStop() coordinator.delegateNodeLeaseManager.EnsureStop() @@ -245,20 +284,24 @@ func (coordinator *coordinator) Run() { poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) + coordinator.statusInfoChan <- electorStatusInfo continue } - nodeLeaseProxyClient, err := coordinator.newNodeLeaseProxyClient() - if err != nil { - klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err) - continue - } klog.Infof("coordinator newCloudLeaseClient success.") if err := coordinator.poolCacheSyncManager.EnsureStart(); err != nil { klog.Errorf("failed to sync pool-scoped resource, %v", err) + coordinator.statusInfoChan <- electorStatusInfo continue } klog.Infof("coordinator poolCacheSyncManager has ensure started") + + nodeLeaseProxyClient, err := coordinator.newNodeLeaseProxyClient() + if err != nil { + klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err) + coordinator.statusInfoChan <- electorStatusInfo + continue + } coordinator.delegateNodeLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{ FilterFunc: ifDelegateHeartBeat, Handler: cache.ResourceEventHandlerFuncs{ @@ -270,6 +313,7 @@ func (coordinator *coordinator) Run() { }, }, }) + coordinator.poolCacheSyncedDetector.EnsureStart() if coordinator.needUploadLocalCache { @@ -283,6 +327,7 @@ func (coordinator *coordinator) Run() { poolCacheManager, etcdStorage, cancelEtcdStorage, err = coordinator.buildPoolCacheStore() if err != nil { klog.Errorf("failed to create pool scoped cache store and manager, %v", err) + coordinator.statusInfoChan <- electorStatusInfo continue } @@ -306,7 +351,7 @@ func (coordinator *coordinator) Run() { if needCancelEtcdStorage { cancelEtcdStorage() } - coordinator.electStatus = electorStatus + coordinator.electStatus = electorStatusInfo.electorStatus coordinator.poolCacheManager = poolCacheManager coordinator.etcdStorage = etcdStorage coordinator.cancelEtcdStorage = cancelEtcdStorage