diff --git a/controllers/apps/transformer_component_workload.go b/controllers/apps/transformer_component_workload.go index 8bce908f9d5..fce6da0805f 100644 --- a/controllers/apps/transformer_component_workload.go +++ b/controllers/apps/transformer_component_workload.go @@ -618,9 +618,11 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error { // TODO: Move memberLeave to the ITS controller. Instead of performing a switchover, we can directly scale down the non-leader nodes. This is because the pod ordinal is not guaranteed to be continuous. podsToMemberLeave := make([]*corev1.Pod, 0) + desiredPods := make([]*corev1.Pod, 0) for _, pod := range pods { // if the pod not exists in the generated pod names, it should be a member that needs to leave if _, ok := r.desiredCompPodNameSet[pod.Name]; ok { + desiredPods = append(desiredPods, pod) continue } podsToMemberLeave = append(podsToMemberLeave, pod) @@ -644,12 +646,18 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error { return switchoverErr } - if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx); err2 != nil { + if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx, nil); err2 != nil { // For the purpose of upgrade compatibility, if the version of Lorry is 0.7 and // the version of KB is upgraded to 0.8 or newer, lorry client will return an NotImplemented error, // in this case, here just ignore it. if err2 == lorry.NotImplemented { r.reqCtx.Log.Info("lorry leave member api is not implemented") + } else if unableToConnect(err2) { + r.reqCtx.Log.Info(fmt.Sprintf("when leaving pod %s by lorry, can not connect lorry on pod %s, try to leave member by other pods", pod.Name, pod.Name)) + err3 := r.leaveMemberByOtherPods(desiredPods, pod) + if err == nil { + err = err3 + } } else if err == nil { err = err2 } @@ -658,6 +666,50 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error { return err // TODO: use requeue-after } +func unableToConnect(err error) bool { + if err == nil { + return false + } + if strings.Contains(err.Error(), "i/o timeout") { + return true + } + return false +} + +// Try to leave `podToLeave` by pods in `desiredPods`, +// if any error occurs not due to `unableToConnect` to pods in `desiredPods`, return it immediately. +func (r *componentWorkloadOps) leaveMemberByOtherPods(desiredPods []*corev1.Pod, podToLeave *corev1.Pod) error { + parameters := make(map[string]any) + parameters["podName"] = podToLeave.Spec.Hostname + + for _, pod := range desiredPods { + lorryCli, err1 := lorry.NewClient(*pod) + if err1 != nil { + return fmt.Errorf("error when leaveMemberByOtherPods NewClient pod %v: %v", pod.Name, err1) + } + + if intctrlutil.IsNil(lorryCli) { + // no lorry in the pod + continue + } + + if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx, parameters); err2 != nil { + // For the purpose of upgrade compatibility, if the version of Lorry is 0.7 and + // the version of KB is upgraded to 0.8 or newer, lorry client will return an NotImplemented error, + // in this case, here just ignore it. + if err2 == lorry.NotImplemented { + r.reqCtx.Log.Info("lorry leave member api is not implemented") + } else if unableToConnect(err2) { + r.reqCtx.Log.Info(fmt.Sprintf("leaveMemberByOtherPods: can not connect lorry on pod %s", pod.Name)) + } else { + return fmt.Errorf("error when leaveMemberByOtherPods LeaveMember, try to leave pod %v on pod %v: %v", podToLeave.Name, pod.Name, err2) + } + } + return nil + } + return fmt.Errorf("leaveMemberByOtherPods: try to leave pod %v by other pods fail", podToLeave.Name) +} + func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet) error { graphCli := model.NewGraphClient(r.cli) for _, podName := range r.runningItsPodNames { diff --git a/pkg/lorry/client/client.go b/pkg/lorry/client/client.go index 1308dabaa4b..78d04202a61 100644 --- a/pkg/lorry/client/client.go +++ b/pkg/lorry/client/client.go @@ -210,8 +210,15 @@ func (cli *lorryClient) JoinMember(ctx context.Context) error { } // LeaveMember sends a Leave member operation request to Lorry, located on the target pod that is about to leave. -func (cli *lorryClient) LeaveMember(ctx context.Context) error { - _, err := cli.Request(ctx, string(LeaveMemberOperation), http.MethodPost, nil) +// Or if the parameters is not nil, the pod which Lorry accepts the request (assigned by lorryClient) is different +// from the pod to leave (assigned by podName key in request parameters). +func (cli *lorryClient) LeaveMember(ctx context.Context, parameters map[string]any) error { + req := make(map[string]any) + if parameters != nil { + req["parameters"] = parameters + } + + _, err := cli.Request(ctx, string(LeaveMemberOperation), http.MethodPost, req) return err } diff --git a/pkg/lorry/client/client_mock.go b/pkg/lorry/client/client_mock.go index a607c0ba5c3..a16bbea8330 100644 --- a/pkg/lorry/client/client_mock.go +++ b/pkg/lorry/client/client_mock.go @@ -170,7 +170,7 @@ func (mr *MockClientMockRecorder) JoinMember(arg0 interface{}) *gomock.Call { } // LeaveMember mocks base method. -func (m *MockClient) LeaveMember(arg0 context.Context) error { +func (m *MockClient) LeaveMember(arg0 context.Context,parameters map[string]any) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LeaveMember", arg0) ret0, _ := ret[0].(error) diff --git a/pkg/lorry/client/httpclient_test.go b/pkg/lorry/client/httpclient_test.go index bd566af1638..2e97577cd36 100644 --- a/pkg/lorry/client/httpclient_test.go +++ b/pkg/lorry/client/httpclient_test.go @@ -451,7 +451,7 @@ var _ = Describe("Lorry HTTP Client", func() { mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockDCSStore.EXPECT().GetCluster().Return(cluster, nil) mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(2) - Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed()) + Expect(lorryClient.LeaveMember(context.TODO(), nil)).Should(Succeed()) Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1)) }) @@ -461,10 +461,10 @@ var _ = Describe("Lorry HTTP Client", func() { mockDCSStore.EXPECT().GetCluster().Return(cluster, nil).Times(2) mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(3) // first leave - Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed()) + Expect(lorryClient.LeaveMember(context.TODO(), nil)).Should(Succeed()) Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1)) // second leave - Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed()) + Expect(lorryClient.LeaveMember(context.TODO(), nil)).Should(Succeed()) Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1)) }) @@ -473,7 +473,7 @@ var _ = Describe("Lorry HTTP Client", func() { mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf(msg)) mockDCSStore.EXPECT().GetCluster().Return(cluster, nil) mockDCSStore.EXPECT().UpdateHaConfig().Return(nil) - err := lorryClient.LeaveMember(context.TODO()) + err := lorryClient.LeaveMember(context.TODO(), nil) Expect(err).Should(HaveOccurred()) Expect(err.Error()).Should(ContainSubstring(msg)) }) @@ -509,7 +509,7 @@ var _ = Describe("Lorry HTTP Client", func() { _ = ops[strings.ToLower(string(util.LeaveMemberOperation))].Init(context.TODO()) customManager, _ := custom.NewManager(engines.Properties{}) register.SetCustomManager(customManager) - err := lorryClient.LeaveMember(context.TODO()) + err := lorryClient.LeaveMember(context.TODO(), nil) Expect(err).Should(HaveOccurred()) Expect(err.Error()).Should(ContainSubstring("executable file not found")) }) diff --git a/pkg/lorry/client/interface.go b/pkg/lorry/client/interface.go index 0ec73cf5cd0..7fbeb6ba79f 100644 --- a/pkg/lorry/client/interface.go +++ b/pkg/lorry/client/interface.go @@ -38,7 +38,7 @@ type Client interface { JoinMember(ctx context.Context) error // LeaveMember sends a Leave member operation request to Lorry, located on the target pod that is about to leave. - LeaveMember(ctx context.Context) error + LeaveMember(ctx context.Context, parameters map[string]any) error Switchover(ctx context.Context, primary, candidate string, force bool) error Lock(ctx context.Context) error diff --git a/pkg/lorry/engines/postgres/apecloudpostgres/manager.go b/pkg/lorry/engines/postgres/apecloudpostgres/manager.go index 7309e616134..b1782ffc65a 100644 --- a/pkg/lorry/engines/postgres/apecloudpostgres/manager.go +++ b/pkg/lorry/engines/postgres/apecloudpostgres/manager.go @@ -22,6 +22,7 @@ package apecloudpostgres import ( "context" "fmt" + "strconv" "strings" "time" @@ -317,9 +318,20 @@ func (mgr *Manager) LeaveMemberFromCluster(ctx context.Context, cluster *dcs.Clu return nil } - sql := fmt.Sprintf(`alter system consensus drop follower '%s:%d';`, addr, mgr.Config.GetDBPort()) + var port int + var err error + if memberName != mgr.CurrentMemberName { + port, err = strconv.Atoi(cluster.GetMemberWithName(memberName).DBPort) + if err != nil { + mgr.Logger.Error(err, fmt.Sprintf("get member %v port failed", memberName)) + } + } else { + port = mgr.Config.GetDBPort() + } - _, err := mgr.ExecLeader(ctx, sql, cluster) + sql := fmt.Sprintf(`alter system consensus drop follower '%s:%d';`, addr, port) + + _, err = mgr.ExecLeader(ctx, sql, cluster) if err != nil { mgr.Logger.Error(err, fmt.Sprintf("exec sql:%s failed", sql)) return err diff --git a/pkg/lorry/operations/replica/leave.go b/pkg/lorry/operations/replica/leave.go index e363a6f52b9..b00d1fe9d86 100644 --- a/pkg/lorry/operations/replica/leave.go +++ b/pkg/lorry/operations/replica/leave.go @@ -86,21 +86,28 @@ func (s *Leave) Do(ctx context.Context, req *operations.OpsRequest) (*operations return nil, err } - currentMember := cluster.GetMemberWithName(manager.GetCurrentMemberName()) - if !cluster.HaConfig.IsDeleting(currentMember) { - cluster.HaConfig.AddMemberToDelete(currentMember) + var memberNameToLeave string + if req.Parameters != nil && req.GetString("podName") != "" { + memberNameToLeave = req.GetString("podName") + } else { + memberNameToLeave = manager.GetCurrentMemberName() + } + + memberToLeave := cluster.GetMemberWithName(memberNameToLeave) + if !cluster.HaConfig.IsDeleting(memberToLeave) { + cluster.HaConfig.AddMemberToDelete(memberToLeave) _ = s.dcsStore.UpdateHaConfig() } - // remove current member from db cluster - err = manager.LeaveMemberFromCluster(ctx, cluster, manager.GetCurrentMemberName()) + // remove member from db cluster, the member may be other pod, depending on if podName is assigned in req.Parameters + err = manager.LeaveMemberFromCluster(ctx, cluster, memberNameToLeave) if err != nil { s.logger.Error(err, "Leave member from cluster failed") return nil, err } - if cluster.HaConfig.IsDeleting(currentMember) { - cluster.HaConfig.FinishDeleted(currentMember) + if cluster.HaConfig.IsDeleting(memberToLeave) { + cluster.HaConfig.FinishDeleted(memberToLeave) _ = s.dcsStore.UpdateHaConfig() }