From 843b64c0b730a0accd8b1cc0cdf373a2f2c09fc6 Mon Sep 17 00:00:00 2001 From: 647 Date: Wed, 1 Sep 2021 18:47:23 +0800 Subject: [PATCH] [feat]: Fix mulit slb node update notification ID bug & Support heart beat --- agollo.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++-- options.go | 18 +++++++++++++ options_test.go | 6 +++++ 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/agollo.go b/agollo.go index 24410c1..dfcdafb 100644 --- a/agollo.go +++ b/agollo.go @@ -59,7 +59,9 @@ type agollo struct { errorsCh chan *LongPollerError - runOnce sync.Once + runOnce sync.Once + runHeartBeat sync.Once + stop bool stopCh chan struct{} stopLock sync.Mutex @@ -286,9 +288,63 @@ func (a *agollo) Start() <-chan *LongPollerError { }() }) + if a.opts.EnableHeartBeat { + a.runHeartBeat.Do(func() { + go func() { + timer := time.NewTimer(a.opts.HeartBeatInterval) + defer timer.Stop() + for !a.shouldStop() { + select { + case <-timer.C: + a.heartBeat() + timer.Reset(a.opts.HeartBeatInterval) + case <-a.stopCh: + return + } + } + }() + }) + } + return a.errorsCh } +func (a *agollo) heartBeat() { + var configServerURL string + configServerURL, err := a.opts.Balancer.Select() + if err != nil { + a.log("Action", "BalancerSelect", "Error", err) + return + } + + a.releaseKeyMap.Range(func(namespace, cachedReleaseKey interface{}) bool { + var config *Config + namespaceStr := namespace.(string) + status, config, err := a.opts.ApolloClient.GetConfigsFromNonCache( + configServerURL, + a.opts.AppID, + a.opts.Cluster, + namespaceStr, + ReleaseKey(cachedReleaseKey.(string)), + ) + if err != nil { + return true + } + if status == http.StatusOK { + oldValue := a.getNamespace(namespaceStr) + a.cache.Store(namespace, config.Configurations) + a.releaseKeyMap.Store(namespace, config.ReleaseKey) + if err = a.backup(namespaceStr, config.Configurations); err != nil { + a.log("BackupFile", a.opts.BackupFile, "Namespace", namespace, + "Action", "Backup", "Error", err) + } + a.sendWatchCh(namespaceStr, oldValue, config.Configurations) + a.notificationMap.Store(namespaceStr, config.ReleaseKey) + } + return true + }) +} + func (a *agollo) shouldStop() bool { select { case <-a.stopCh: @@ -316,8 +372,15 @@ func (a *agollo) longPoll() { oldValue := a.getNamespace(notification.NamespaceName) // 更新namespace - _, newValue, err := a.reloadNamespace(notification.NamespaceName) + status, newValue, err := a.reloadNamespace(notification.NamespaceName) + if err == nil { + // Notifications 有更新,但是 GetConfigsFromNonCache 返回 304, + // 可能是请求恰好打在尚未同步的节点上,不更新 NotificationID,等待下次再更新 + if status == http.StatusNotModified { + continue + } + // 发送到监听channel a.sendWatchCh(notification.NamespaceName, oldValue, newValue) diff --git a/options.go b/options.go index 695fe3d..dc3b061 100644 --- a/options.go +++ b/options.go @@ -13,6 +13,8 @@ var ( defaultFailTolerantOnBackupExists = false defaultEnableSLB = false defaultLongPollInterval = 1 * time.Second + defaultEnableHeartBeat = false + defaultHeartBeatInterval = 300 * time.Second ) type Options struct { @@ -30,6 +32,8 @@ type Options struct { EnableSLB bool // 启用ConfigServer负载均衡 RefreshIntervalInSecond time.Duration // ConfigServer刷新间隔 ClientOptions []ApolloClientOption // 设置apollo HTTP api的配置项 + EnableHeartBeat bool // 是否允许兜底检查,默认:false + HeartBeatInterval time.Duration // 兜底检查间隔时间,默认:300s } func newOptions(configServerURL, appID string, opts ...Option) (Options, error) { @@ -43,6 +47,8 @@ func newOptions(configServerURL, appID string, opts ...Option) (Options, error) BackupFile: defaultBackupFile, FailTolerantOnBackupExists: defaultFailTolerantOnBackupExists, EnableSLB: defaultEnableSLB, + EnableHeartBeat: defaultEnableHeartBeat, + HeartBeatInterval: defaultHeartBeatInterval, } for _, opt := range opts { opt(&options) @@ -147,6 +153,18 @@ func LongPollerInterval(i time.Duration) Option { } } +func EnableHeartBeat(b bool) Option { + return func(o *Options) { + o.EnableHeartBeat = b + } +} + +func HeartBeatInterval(i time.Duration) Option { + return func(o *Options) { + o.HeartBeatInterval = i + } +} + func BackupFile(backupFile string) Option { return func(o *Options) { o.BackupFile = backupFile diff --git a/options_test.go b/options_test.go index 3ae7cc8..49f90ce 100644 --- a/options_test.go +++ b/options_test.go @@ -26,6 +26,8 @@ func TestOptions(t *testing.T) { assert.Equal(t, defaultBackupFile, opts.BackupFile) assert.Equal(t, defaultFailTolerantOnBackupExists, opts.FailTolerantOnBackupExists) assert.Equal(t, defaultEnableSLB, opts.EnableSLB) + assert.Equal(t, defaultEnableHeartBeat, opts.EnableHeartBeat) + assert.Equal(t, defaultHeartBeatInterval, opts.HeartBeatInterval) assert.NotNil(t, opts.Logger) assert.NotNil(t, opts.ApolloClient) assert.NotNil(t, opts.Balancer) @@ -50,6 +52,8 @@ func TestOptions(t *testing.T) { BackupFile("test_backup"), FailTolerantOnBackupExists(), AccessKey("test_access_key"), + EnableHeartBeat(true), + HeartBeatInterval(time.Second * 120), }, func(opts Options) { assert.Equal(t, "test_cluster", opts.Cluster) @@ -66,6 +70,8 @@ func TestOptions(t *testing.T) { ac := &apolloClient{} ac.Apply(opts.ClientOptions...) assert.Equal(t, "test_access_key", ac.AccessKey) + assert.Equal(t, true, opts.EnableHeartBeat) + assert.Equal(t, time.Second*120, opts.HeartBeatInterval) }, }, {