Skip to content

Commit

Permalink
fix: improve redis get role (#7206)
Browse files Browse the repository at this point in the history
  • Loading branch information
kizuna-lek authored May 7, 2024
1 parent d3ada23 commit 25d2cf4
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 33 deletions.
12 changes: 6 additions & 6 deletions cmd/lorry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ func main() {
}
ctrl.SetLogger(kzap.New(kopts...))

// Initialize DB Manager
err = register.InitDBManager(configDir)
if err != nil {
panic(errors.Wrap(err, "DB manager initialize failed"))
}

// Initialize DCS (Distributed Control System)
err = dcs.InitStore()
if err != nil {
panic(errors.Wrap(err, "DCS initialize failed"))
}

// Initialize DB Manager
err = register.InitDBManager(configDir)
if err != nil {
panic(errors.Wrap(err, "DB manager initialize failed"))
}

// Start HA
characterType := viper.GetString(constant.KBEnvCharacterType)
if viper.IsSet(constant.KBEnvBuiltinHandler) {
Expand Down
1 change: 1 addition & 0 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package common
type PodRoleNamePair struct {
PodName string `json:"podName,omitempty"`
RoleName string `json:"roleName,omitempty"`
PodUID string `json:"podUid,omitempty"`
}

// GlobalRoleSnapshot defines a global(leader) perspective of all pods role.
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const (
// KBEnvRsmRoleUpdateMechanism defines the method to send events: DirectAPIServerEventUpdate(through lorry service), ReadinessProbeEventUpdate(through kubelet service)
KBEnvRsmRoleUpdateMechanism = "KB_RSM_ROLE_UPDATE_MECHANISM"
KBEnvRoleProbeTimeout = "KB_RSM_ROLE_PROBE_TIMEOUT"
KBEnvRoleProbePeriod = "KB_RSM_ROLE_PROBE_PERIOD"

KBEnvVolumeProtectionSpec = "KB_VOLUME_PROTECTION_SPEC"
)
4 changes: 4 additions & 0 deletions pkg/controller/component/lorry_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func buildRoleProbeContainer(roleChangedContainer *corev1.Container, roleProbe *
probe.TimeoutSeconds = roleProbe.TimeoutSeconds
probe.FailureThreshold = 3
roleChangedContainer.ReadinessProbe = probe
roleChangedContainer.Env = append(roleChangedContainer.Env, corev1.EnvVar{
Name: constant.KBEnvRoleProbePeriod,
Value: strconv.Itoa(int(roleProbe.PeriodSeconds)),
})
}

func volumeProtectionEnabled(component *SynthesizedComponent) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/instanceset/pod_role_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func handleRoleChangedEvent(cli client.Client, reqCtx intctrlutil.RequestCtx, re
return pair.RoleName, err
}
// event belongs to old pod with the same name, ignore it
if pod.Name == pair.PodName && pod.UID != event.InvolvedObject.UID {
if pod.Name == pair.PodName && string(pod.UID) != pair.PodUID {
return pair.RoleName, nil
}

Expand Down Expand Up @@ -163,6 +163,7 @@ func parseGlobalRoleSnapshot(role string, event *corev1.Event) *common.GlobalRol
pair := common.PodRoleNamePair{
PodName: event.InvolvedObject.Name,
RoleName: role,
PodUID: string(event.InvolvedObject.UID),
}
snapshot.PodRoleNamePairs = append(snapshot.PodRoleNamePairs, pair)
return snapshot
Expand Down
93 changes: 74 additions & 19 deletions pkg/lorry/engines/redis/get_replica_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,91 @@ package redis

import (
"context"
"encoding/json"
"strings"
"time"

"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/models"
)

func (mgr *Manager) GetReplicaRole(ctx context.Context, cluster *dcs.Cluster) (string, error) {
section := "Replication"
func (mgr *Manager) GetReplicaRole(ctx context.Context, _ *dcs.Cluster) (string, error) {
// To ensure that the role information obtained through subscription is always delivered.
if mgr.role != "" && mgr.roleSubscribeUpdateTime+mgr.roleProbePeriod*2 < time.Now().Unix() {
return mgr.role, nil
}

var role string
result, err := mgr.client.Info(ctx, section).Result()
// We use the role obtained from Sentinel as the sole source of truth.
masterAddr, err := mgr.sentinelClient.GetMasterAddrByName(ctx, mgr.ClusterCompName).Result()
if err != nil {
mgr.Logger.Error(err, "Role query error")
return role, err
} else {
// split the result into lines
lines := strings.Split(result, "\r\n")
// find the line with role
for _, line := range lines {
if strings.HasPrefix(line, "role:") {
role = strings.Split(line, ":")[1]
break
// when we can't get role from sentinel, we query redis instead
var role string
result, err := mgr.client.Info(ctx, "Replication").Result()
if err != nil {
mgr.Logger.Error(err, "Role query error")
return role, err
} else {
// split the result into lines
lines := strings.Split(result, "\r\n")
// find the line with role
for _, line := range lines {
if strings.HasPrefix(line, "role:") {
role = strings.Split(line, ":")[1]
break
}
}
}
if role == models.MASTER {
return models.PRIMARY, nil
} else {
return models.SECONDARY, nil
}
}
if role == models.MASTER {
return models.PRIMARY, nil
}
if role == models.SLAVE {

masterName := strings.Split(masterAddr[0], ".")[0]
// if current member is not master from sentinel, just return secondary to avoid double master
if masterName != mgr.CurrentMemberName {
return models.SECONDARY, nil
}
return role, nil
return models.PRIMARY, nil
}

func (mgr *Manager) SubscribeRoleChange(ctx context.Context, cluster *dcs.Cluster) {
pubSub := mgr.sentinelClient.Subscribe(ctx, "+switch-master")

// go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not receive for 30 seconds.
// so we don't need to retry
ch := pubSub.Channel()
for msg := range ch {
// +switch-master <master name> <old ip> <old port> <new ip> <new port>
masterAddr := strings.Split(msg.Payload, " ")
masterName := strings.Split(masterAddr[3], ".")[0]

// When network partition occurs, the new primary needs to send global role change information to the controller.
if masterName == mgr.CurrentMemberName {
roleSnapshot := &common.GlobalRoleSnapshot{}
oldMasterName := strings.Split(masterAddr[1], ".")[0]
roleSnapshot.PodRoleNamePairs = []common.PodRoleNamePair{
{
PodName: oldMasterName,
RoleName: models.SECONDARY,
PodUID: cluster.GetMemberWithName(oldMasterName).UID,
},
{
PodName: masterName,
RoleName: models.PRIMARY,
PodUID: cluster.GetMemberWithName(masterName).UID,
},
}
roleSnapshot.Version = time.Now().Format(time.RFC3339Nano)

b, _ := json.Marshal(roleSnapshot)
mgr.role = string(b)
} else {
mgr.role = models.SECONDARY
}
mgr.roleSubscribeUpdateTime = time.Now().Unix()
}
}
22 changes: 16 additions & 6 deletions pkg/lorry/engines/redis/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)
Expand All @@ -41,10 +42,14 @@ type Manager struct {
engines.DBManagerBase
client redis.UniversalClient
clientSettings *Settings

ctx context.Context
cancel context.CancelFunc
startAt time.Time
sentinelClient *redis.SentinelClient

ctx context.Context
cancel context.CancelFunc
startAt time.Time
role string
roleSubscribeUpdateTime int64
roleProbePeriod int64
}

var _ engines.DBManager = &Manager{}
Expand All @@ -65,7 +70,8 @@ func NewManager(properties engines.Properties) (engines.DBManager, error) {
return nil, err
}
mgr := &Manager{
DBManagerBase: *managerBase,
DBManagerBase: *managerBase,
roleProbePeriod: int64(viper.GetInt(constant.KBEnvRoleProbePeriod)),
}

mgr.startAt = time.Now()
Expand All @@ -74,12 +80,16 @@ func NewManager(properties engines.Properties) (engines.DBManager, error) {
Password: redisPasswd,
Username: redisUser,
}
mgr.client, mgr.clientSettings, err = ParseClientFromProperties(map[string]string(properties), defaultSettings)
mgr.client, mgr.clientSettings, err = ParseClientFromProperties(properties, defaultSettings)
if err != nil {
return nil, err
}

mgr.sentinelClient = newSentinelClient(mgr.clientSettings, mgr.ClusterCompName)

mgr.ctx, mgr.cancel = context.WithCancel(context.Background())

go mgr.SubscribeRoleChange(mgr.ctx, dcs.GetStore().GetClusterFromCache())
return mgr, nil
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/lorry/engines/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/redis/go-redis/v9"

viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

const (
Expand Down Expand Up @@ -141,3 +143,30 @@ func newClient(s *Settings) redis.UniversalClient {

return redis.NewClient(options)
}

func newSentinelClient(s *Settings, clusterCompName string) *redis.SentinelClient {
// TODO: use headless service directly
sentinelEnv := fmt.Sprintf("%s_SENTINEL_SERVICE", strings.ToUpper(strings.Join(strings.Split(clusterCompName, "-"), "_")))
sentinelHost := viper.GetString(fmt.Sprintf("%s_HOST", sentinelEnv))
sentinelPort := viper.GetString(fmt.Sprintf("%s_PORT", sentinelEnv))

opt := &redis.Options{
DB: s.DB,
Addr: fmt.Sprintf("%s:%s", sentinelHost, sentinelPort),
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
}

sentinelClient := redis.NewSentinelClient(opt)

return sentinelClient
}
1 change: 0 additions & 1 deletion pkg/lorry/operations/replica/checkrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func (s *CheckRole) Init(ctx context.Context) error {
}
}
return nil

}

func (s *CheckRole) IsReadonly(ctx context.Context) bool {
Expand Down

0 comments on commit 25d2cf4

Please sign in to comment.