Skip to content

Commit

Permalink
[feat]: Fix mulit slb node update notification ID bug & Support heart…
Browse files Browse the repository at this point in the history
… beat
  • Loading branch information
647-coder authored and 647(siki.liu) committed Sep 1, 2021
1 parent 93b8dd4 commit 843b64c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 2 deletions.
67 changes: 65 additions & 2 deletions agollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ type agollo struct {

errorsCh chan *LongPollerError

runOnce sync.Once
runOnce sync.Once
runHeartBeat sync.Once

stop bool
stopCh chan struct{}
stopLock sync.Mutex
Expand Down Expand Up @@ -286,9 +288,63 @@ func (a *agollo) Start() <-chan *LongPollerError {
}()
})

if a.opts.EnableHeartBeat {
a.runHeartBeat.Do(func() {
go func() {
timer := time.NewTimer(a.opts.HeartBeatInterval)
defer timer.Stop()
for !a.shouldStop() {
select {
case <-timer.C:
a.heartBeat()
timer.Reset(a.opts.HeartBeatInterval)
case <-a.stopCh:
return
}
}
}()
})
}

return a.errorsCh
}

func (a *agollo) heartBeat() {
var configServerURL string
configServerURL, err := a.opts.Balancer.Select()
if err != nil {
a.log("Action", "BalancerSelect", "Error", err)
return
}

a.releaseKeyMap.Range(func(namespace, cachedReleaseKey interface{}) bool {
var config *Config
namespaceStr := namespace.(string)
status, config, err := a.opts.ApolloClient.GetConfigsFromNonCache(
configServerURL,
a.opts.AppID,
a.opts.Cluster,
namespaceStr,
ReleaseKey(cachedReleaseKey.(string)),
)
if err != nil {
return true
}
if status == http.StatusOK {
oldValue := a.getNamespace(namespaceStr)
a.cache.Store(namespace, config.Configurations)
a.releaseKeyMap.Store(namespace, config.ReleaseKey)
if err = a.backup(namespaceStr, config.Configurations); err != nil {
a.log("BackupFile", a.opts.BackupFile, "Namespace", namespace,
"Action", "Backup", "Error", err)
}
a.sendWatchCh(namespaceStr, oldValue, config.Configurations)
a.notificationMap.Store(namespaceStr, config.ReleaseKey)
}
return true
})
}

func (a *agollo) shouldStop() bool {
select {
case <-a.stopCh:
Expand Down Expand Up @@ -316,8 +372,15 @@ func (a *agollo) longPoll() {
oldValue := a.getNamespace(notification.NamespaceName)

// 更新namespace
_, newValue, err := a.reloadNamespace(notification.NamespaceName)
status, newValue, err := a.reloadNamespace(notification.NamespaceName)

if err == nil {
// Notifications 有更新,但是 GetConfigsFromNonCache 返回 304,
// 可能是请求恰好打在尚未同步的节点上,不更新 NotificationID,等待下次再更新
if status == http.StatusNotModified {
continue
}

// 发送到监听channel
a.sendWatchCh(notification.NamespaceName, oldValue, newValue)

Expand Down
18 changes: 18 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
defaultFailTolerantOnBackupExists = false
defaultEnableSLB = false
defaultLongPollInterval = 1 * time.Second
defaultEnableHeartBeat = false
defaultHeartBeatInterval = 300 * time.Second
)

type Options struct {
Expand All @@ -30,6 +32,8 @@ type Options struct {
EnableSLB bool // 启用ConfigServer负载均衡
RefreshIntervalInSecond time.Duration // ConfigServer刷新间隔
ClientOptions []ApolloClientOption // 设置apollo HTTP api的配置项
EnableHeartBeat bool // 是否允许兜底检查,默认:false
HeartBeatInterval time.Duration // 兜底检查间隔时间,默认:300s
}

func newOptions(configServerURL, appID string, opts ...Option) (Options, error) {
Expand All @@ -43,6 +47,8 @@ func newOptions(configServerURL, appID string, opts ...Option) (Options, error)
BackupFile: defaultBackupFile,
FailTolerantOnBackupExists: defaultFailTolerantOnBackupExists,
EnableSLB: defaultEnableSLB,
EnableHeartBeat: defaultEnableHeartBeat,
HeartBeatInterval: defaultHeartBeatInterval,
}
for _, opt := range opts {
opt(&options)
Expand Down Expand Up @@ -147,6 +153,18 @@ func LongPollerInterval(i time.Duration) Option {
}
}

func EnableHeartBeat(b bool) Option {
return func(o *Options) {
o.EnableHeartBeat = b
}
}

func HeartBeatInterval(i time.Duration) Option {
return func(o *Options) {
o.HeartBeatInterval = i
}
}

func BackupFile(backupFile string) Option {
return func(o *Options) {
o.BackupFile = backupFile
Expand Down
6 changes: 6 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func TestOptions(t *testing.T) {
assert.Equal(t, defaultBackupFile, opts.BackupFile)
assert.Equal(t, defaultFailTolerantOnBackupExists, opts.FailTolerantOnBackupExists)
assert.Equal(t, defaultEnableSLB, opts.EnableSLB)
assert.Equal(t, defaultEnableHeartBeat, opts.EnableHeartBeat)
assert.Equal(t, defaultHeartBeatInterval, opts.HeartBeatInterval)
assert.NotNil(t, opts.Logger)
assert.NotNil(t, opts.ApolloClient)
assert.NotNil(t, opts.Balancer)
Expand All @@ -50,6 +52,8 @@ func TestOptions(t *testing.T) {
BackupFile("test_backup"),
FailTolerantOnBackupExists(),
AccessKey("test_access_key"),
EnableHeartBeat(true),
HeartBeatInterval(time.Second * 120),
},
func(opts Options) {
assert.Equal(t, "test_cluster", opts.Cluster)
Expand All @@ -66,6 +70,8 @@ func TestOptions(t *testing.T) {
ac := &apolloClient{}
ac.Apply(opts.ClientOptions...)
assert.Equal(t, "test_access_key", ac.AccessKey)
assert.Equal(t, true, opts.EnableHeartBeat)
assert.Equal(t, time.Second*120, opts.HeartBeatInterval)
},
},
{
Expand Down

0 comments on commit 843b64c

Please sign in to comment.