Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve redis get role #7206

Merged
merged 20 commits into from
May 7, 2024
1 change: 1 addition & 0 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package common
type PodRoleNamePair struct {
PodName string `json:"podName,omitempty"`
RoleName string `json:"roleName,omitempty"`
PodUID string `json:"podUid,omitempty"`
}

// GlobalRoleSnapshot defines a global(leader) perspective of all pods role.
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/instanceset/pod_role_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func handleRoleChangedEvent(cli client.Client, reqCtx intctrlutil.RequestCtx, re
return pair.RoleName, err
}
// event belongs to old pod with the same name, ignore it
if pod.Name == pair.PodName && pod.UID != event.InvolvedObject.UID {
if pod.Name == pair.PodName && string(pod.UID) != pair.PodUID {
return pair.RoleName, nil
}

Expand Down Expand Up @@ -163,6 +163,7 @@ func parseGlobalRoleSnapshot(role string, event *corev1.Event) *common.GlobalRol
pair := common.PodRoleNamePair{
PodName: event.InvolvedObject.Name,
RoleName: role,
PodUID: string(event.InvolvedObject.UID),
}
snapshot.PodRoleNamePairs = append(snapshot.PodRoleNamePairs, pair)
return snapshot
Expand Down
3 changes: 3 additions & 0 deletions pkg/lorry/client/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func newTCPServer(port int) (net.Listener, int) {
}

func StartHTTPServer() {
mockDCSStore.EXPECT().GetClusterFromCache()
mockDBManager.EXPECT().SubscribeRoleChange(gomock.Any(), gomock.Any(), gomock.Any())

ops := opsregister.Operations()
s := httpserver.NewServer(ops)
handler := s.Router()
Expand Down
2 changes: 2 additions & 0 deletions pkg/lorry/engines/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (mgr *DBManagerBase) GetReplicaRole(context.Context, *dcs.Cluster) (string,
return "", errors.New("not implemented")
}

func (mgr *DBManagerBase) SubscribeRoleChange(context.Context, *string, *dcs.Cluster) {}

func (mgr *DBManagerBase) Exec(context.Context, string) (int64, error) {
return 0, errors.New("not implemented")
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/lorry/engines/dbmanager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/lorry/engines/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type DBManager interface {
IsLeaderMember(context.Context, *dcs.Cluster, *dcs.Member) (bool, error)
IsFirstMember() bool
GetReplicaRole(context.Context, *dcs.Cluster) (string, error)
SubscribeRoleChange(ctx context.Context, oriRole *string, cluster *dcs.Cluster)

JoinCurrentMemberToCluster(context.Context, *dcs.Cluster) error
LeaveMemberFromCluster(context.Context, *dcs.Cluster, string) error
Expand Down
48 changes: 28 additions & 20 deletions pkg/lorry/engines/redis/get_replica_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,38 @@ import (
"github.com/apecloud/kubeblocks/pkg/lorry/engines/models"
)

func (mgr *Manager) GetReplicaRole(ctx context.Context, cluster *dcs.Cluster) (string, error) {
section := "Replication"

var role string
result, err := mgr.client.Info(ctx, section).Result()
func (mgr *Manager) GetReplicaRole(ctx context.Context, _ *dcs.Cluster) (string, error) {
// We use the role obtained from Sentinel as the sole source of truth.
masterAddr, err := mgr.sentinelClient.GetMasterAddrByName(ctx, mgr.ClusterCompName).Result()
if err != nil {
mgr.Logger.Error(err, "Role query error")
return role, err
} else {
// split the result into lines
lines := strings.Split(result, "\r\n")
// find the line with role
for _, line := range lines {
if strings.HasPrefix(line, "role:") {
role = strings.Split(line, ":")[1]
break
// when we can't get role from sentinel, we query redis instead
var role string
result, err := mgr.client.Info(ctx, "Replication").Result()
if err != nil {
mgr.Logger.Error(err, "Role query error")
return role, err
} else {
// split the result into lines
lines := strings.Split(result, "\r\n")
// find the line with role
for _, line := range lines {
if strings.HasPrefix(line, "role:") {
role = strings.Split(line, ":")[1]
break
}
}
}
if role == models.MASTER {
return models.PRIMARY, nil
} else {
return models.SECONDARY, nil
}
}
if role == models.MASTER {
return models.PRIMARY, nil
}
if role == models.SLAVE {

masterName := strings.Split(masterAddr[0], ".")[0]
// if current member is not master from sentinel, just return secondary to avoid double master
if masterName != mgr.CurrentMemberName {
return models.SECONDARY, nil
}
return role, nil
return models.PRIMARY, nil
}
62 changes: 61 additions & 1 deletion pkg/lorry/engines/redis/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ package redis

import (
"context"
"encoding/json"
"strings"
"time"

"github.com/redis/go-redis/v9"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/models"
"github.com/apecloud/kubeblocks/pkg/lorry/util"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

Expand All @@ -41,6 +46,7 @@ type Manager struct {
engines.DBManagerBase
client redis.UniversalClient
clientSettings *Settings
sentinelClient *redis.SentinelClient

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -74,11 +80,13 @@ func NewManager(properties engines.Properties) (engines.DBManager, error) {
Password: redisPasswd,
Username: redisUser,
}
mgr.client, mgr.clientSettings, err = ParseClientFromProperties(map[string]string(properties), defaultSettings)
mgr.client, mgr.clientSettings, err = ParseClientFromProperties(properties, defaultSettings)
if err != nil {
return nil, err
}

mgr.sentinelClient = newSentinelClient(mgr.clientSettings, mgr.ClusterCompName)

mgr.ctx, mgr.cancel = context.WithCancel(context.Background())
return mgr, nil
}
Expand Down Expand Up @@ -108,3 +116,55 @@ func tokenizeCmd2Args(cmd string) []interface{} {
}
return redisArgs
}

func (mgr *Manager) SubscribeRoleChange(ctx context.Context, oriRole *string, cluster *dcs.Cluster) {
pubSub := mgr.sentinelClient.Subscribe(ctx, "+switch-master")

// go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not receive for 30 seconds.
// so we don't need to retry
ch := pubSub.Channel()
var role, msgRole string
for msg := range ch {
// +switch-master <master name> <old ip> <old port> <new ip> <new port>
masterAddr := strings.Split(msg.Payload, " ")
masterName := strings.Split(masterAddr[3], ".")[0]

// When network partition occurs, the new primary needs to send global role change information to the controller.
if masterName == mgr.CurrentMemberName {
role = models.PRIMARY
roleSnapshot := &common.GlobalRoleSnapshot{}
oldMasterName := strings.Split(masterAddr[1], ".")[0]
roleSnapshot.PodRoleNamePairs = []common.PodRoleNamePair{
{
PodName: oldMasterName,
RoleName: models.SECONDARY,
PodUID: cluster.GetMemberWithName(oldMasterName).UID,
},
{
PodName: masterName,
RoleName: models.PRIMARY,
PodUID: cluster.GetMemberWithName(masterName).UID,
},
}
roleSnapshot.Version = time.Now().Format(time.RFC3339Nano)

b, _ := json.Marshal(roleSnapshot)
msgRole = string(b)
} else {
role = models.SECONDARY
msgRole = models.SECONDARY
}
if role != *oriRole {
err := util.SentEventForProbe(ctx, map[string]any{
"operation": util.CheckRoleOperation,
"originalRole": *oriRole,
"role": msgRole,
"event": util.OperationSuccess,
})
if err == nil {
*oriRole = role
}
}
}
}
29 changes: 29 additions & 0 deletions pkg/lorry/engines/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/redis/go-redis/v9"

viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

const (
Expand Down Expand Up @@ -141,3 +143,30 @@ func newClient(s *Settings) redis.UniversalClient {

return redis.NewClient(options)
}

func newSentinelClient(s *Settings, clusterCompName string) *redis.SentinelClient {
// TODO: use headless service directly
sentinelEnv := fmt.Sprintf("%s_SENTINEL_SERVICE", strings.ToUpper(strings.Join(strings.Split(clusterCompName, "-"), "_")))
sentinelHost := viper.GetString(fmt.Sprintf("%s_HOST", sentinelEnv))
sentinelPort := viper.GetString(fmt.Sprintf("%s_PORT", sentinelEnv))

opt := &redis.Options{
DB: s.DB,
Addr: fmt.Sprintf("%s:%s", sentinelHost, sentinelPort),
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
}

sentinelClient := redis.NewSentinelClient(opt)

return sentinelClient
}
11 changes: 5 additions & 6 deletions pkg/lorry/grpcserver/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
"net/http/httptest"
"strings"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
health "google.golang.org/grpc/health/grpc_health_v1"

"github.com/apecloud/kubeblocks/pkg/lorry/engines/custom"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/register"
"github.com/apecloud/kubeblocks/pkg/lorry/operations"
"github.com/apecloud/kubeblocks/pkg/lorry/operations/replica"
"github.com/apecloud/kubeblocks/pkg/lorry/util"
Expand All @@ -42,6 +41,7 @@ var _ = Describe("GRPC Server", func() {
Context("new GRPC server", func() {
It("fail -- no check role operation", func() {
delete(operations.Operations(), strings.ToLower(string(util.CheckRoleOperation)))
mockDBManager.EXPECT().SubscribeRoleChange(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
_, err := NewGRPCServer()
Expect(err).Should(HaveOccurred())
})
Expand Down Expand Up @@ -71,11 +71,10 @@ var _ = Describe("GRPC Server", func() {
viper.Set("KB_RSM_ACTION_SVC_LIST", "["+portStr+"]")
viper.Set("KB_RSM_ROLE_UPDATE_MECHANISM", "ReadinessProbeEventUpdate")

customManager, err := custom.NewManager(nil)
Expect(err).Should(BeNil())
register.SetDBManager(customManager)

server, _ := NewGRPCServer()

mockDBManager.EXPECT().IsDBStartupReady().Return(true)
mockDBManager.EXPECT().GetReplicaRole(gomock.Any(), gomock.Any()).Return("leader", nil)
check, err := server.Check(context.Background(), nil)

Expect(err).Should(HaveOccurred())
Expand Down
13 changes: 13 additions & 0 deletions pkg/lorry/grpcserver/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ import (

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/register"
)

var mockDBManager *engines.MockDBManager

func init() {
viper.AutomaticEnv()
viper.SetDefault(constant.KBEnvPodName, "pod-test")
Expand All @@ -50,6 +54,9 @@ func TestVolumeOperations(t *testing.T) {
var _ = BeforeSuite(func() {
// Init mock dcs store
InitMockDCSStore()

// Init mock db manager
InitMockDBManager()
})

var _ = AfterSuite(func() {
Expand All @@ -61,3 +68,9 @@ func InitMockDCSStore() {
mockDCSStore.EXPECT().GetClusterFromCache().Return(&dcs.Cluster{}).AnyTimes()
dcs.SetStore(mockDCSStore)
}

func InitMockDBManager() {
ctrl := gomock.NewController(GinkgoT())
mockDBManager = engines.NewMockDBManager(ctrl)
register.SetDBManager(mockDBManager)
}
Loading
Loading