diff --git a/apis/apps/v1alpha1/opsrequest_conditions.go b/apis/apps/v1alpha1/opsrequest_conditions.go
index f31af7c4002..73e1b8ec538 100644
--- a/apis/apps/v1alpha1/opsrequest_conditions.go
+++ b/apis/apps/v1alpha1/opsrequest_conditions.go
@@ -42,20 +42,22 @@ const (
// condition and event reasons
- ReasonReconfigureMerging = "ReconfigureMerging"
- ReasonReconfigureMerged = "ReconfigureMerged"
- ReasonReconfigureFailed = "ReconfigureFailed"
- ReasonReconfigureNoChanged = "ReconfigureNoChanged"
- ReasonReconfigureSucceed = "ReconfigureSucceed"
- ReasonReconfigureRunning = "ReconfigureRunning"
- ReasonClusterPhaseMismatch = "ClusterPhaseMismatch"
- ReasonOpsTypeNotSupported = "OpsTypeNotSupported"
- ReasonValidateFailed = "ValidateFailed"
- ReasonClusterNotFound = "ClusterNotFound"
- ReasonOpsRequestFailed = "OpsRequestFailed"
- ReasonOpsCanceling = "Canceling"
- ReasonOpsCancelFailed = "CancelFailed"
- ReasonOpsCancelSucceed = "CancelSucceed"
+ ReasonReconfigureMerging = "ReconfigureMerging"
+ ReasonReconfigureMerged = "ReconfigureMerged"
+ ReasonReconfigureFailed = "ReconfigureFailed"
+ ReasonReconfigureRestartFailed = "ReconfigureRestartFailed"
+ ReasonReconfigureRestart = "ReconfigureRestarted"
+ ReasonReconfigureNoChanged = "ReconfigureNoChanged"
+ ReasonReconfigureSucceed = "ReconfigureSucceed"
+ ReasonReconfigureRunning = "ReconfigureRunning"
+ ReasonClusterPhaseMismatch = "ClusterPhaseMismatch"
+ ReasonOpsTypeNotSupported = "OpsTypeNotSupported"
+ ReasonValidateFailed = "ValidateFailed"
+ ReasonClusterNotFound = "ClusterNotFound"
+ ReasonOpsRequestFailed = "OpsRequestFailed"
+ ReasonOpsCanceling = "Canceling"
+ ReasonOpsCancelFailed = "CancelFailed"
+ ReasonOpsCancelSucceed = "CancelSucceed"
)
func (r *OpsRequest) SetStatusCondition(condition metav1.Condition) {
diff --git a/controllers/apps/configuration/config_util.go b/controllers/apps/configuration/config_util.go
index 4d862dfe872..f3482751675 100644
--- a/controllers/apps/configuration/config_util.go
+++ b/controllers/apps/configuration/config_util.go
@@ -24,7 +24,7 @@ import (
"fmt"
"reflect"
- "github.com/StudioSol/set"
+ cfgutil "github.com/apecloud/kubeblocks/internal/configuration/util"
"github.com/go-logr/logr"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -325,16 +325,15 @@ func validateConfigTemplate(cli client.Client, ctx intctrlutil.RequestCtx, confi
logger.Error(err, "failed to get config template cm object!")
return nil, err
}
-
if configSpec.ConfigConstraintRef == "" {
return nil, nil
}
-
- configObj := &appsv1alpha1.ConfigConstraint{}
- if err := cli.Get(ctx.Ctx, client.ObjectKey{
+ configKey := client.ObjectKey{
Namespace: "",
Name: configSpec.ConfigConstraintRef,
- }, configObj); err != nil {
+ }
+ configObj := &appsv1alpha1.ConfigConstraint{}
+ if err := cli.Get(ctx.Ctx, configKey, configObj); err != nil {
logger.Error(err, "failed to get template cm object!")
return nil, err
}
@@ -378,35 +377,62 @@ func updateConfigConstraintStatus(cli client.Client, ctx intctrlutil.RequestCtx,
return cli.Status().Patch(ctx.Ctx, configConstraint, patch)
}
-func getAssociatedComponentsByConfigmap(stsList *appv1.StatefulSetList, cfg client.ObjectKey, configSpecName string) ([]appv1.StatefulSet, []string) {
- managerContainerName := constant.ConfigSidecarName
- stsLen := len(stsList.Items)
- if stsLen == 0 {
- return nil, nil
+func toObjects[T generics.Object, L generics.ObjList[T], PL generics.PObjList[T, L]](compList PL) []T {
+ return reflect.ValueOf(compList).Elem().FieldByName("Items").Interface().([]T)
+}
+
+func toResourceObject(obj any) client.Object {
+ return obj.(client.Object)
+}
+
+func getPodTemplate(obj client.Object) *corev1.PodTemplateSpec {
+ switch v := obj.(type) {
+ default:
+ return nil
+ case *appv1.StatefulSet:
+ return &v.Spec.Template
+ case *appv1.Deployment:
+ return &v.Spec.Template
+ }
+}
+
+func getRelatedComponentsByConfigmap[T generics.Object, PT generics.PObject[T], L generics.ObjList[T], PL generics.PObjList[T, L]](cli client.Client, ctx context.Context, _ func(T, L), cfg client.ObjectKey, configSpecName string, opts ...client.ListOption) ([]T, []string, error) {
+ var objList L
+ if err := cli.List(ctx, PL(&objList), opts...); err != nil {
+ return nil, nil, err
}
- sts := make([]appv1.StatefulSet, 0, stsLen)
- containers := set.NewLinkedHashSetString()
+ objs := make([]T, 0)
+ containers := cfgutil.NewSet()
configSpecKey := cfgcore.GenerateTPLUniqLabelKeyWithConfig(configSpecName)
- for _, s := range stsList.Items {
- if !usingComponentConfigSpec(s.GetAnnotations(), configSpecKey, cfg.Name) {
+ items := toObjects[T, L, PL](&objList)
+ for i := range items {
+ obj := toResourceObject(&items[i])
+ if objs == nil {
+ return nil, nil, cfgcore.MakeError("failed to convert to resource object")
+ }
+ if !usingComponentConfigSpec(obj.GetAnnotations(), configSpecKey, cfg.Name) {
+ continue
+ }
+ podTemplate := getPodTemplate(obj)
+ if podTemplate == nil {
continue
}
- volumeMounted := intctrlutil.GetVolumeMountName(s.Spec.Template.Spec.Volumes, cfg.Name)
+ volumeMounted := intctrlutil.GetVolumeMountName(podTemplate.Spec.Volumes, cfg.Name)
if volumeMounted == nil {
continue
}
// filter config manager sidecar container
- contains := intctrlutil.GetContainersByConfigmap(s.Spec.Template.Spec.Containers,
+ contains := intctrlutil.GetContainersByConfigmap(podTemplate.Spec.Containers,
volumeMounted.Name, func(containerName string) bool {
- return managerContainerName == containerName
+ return constant.ConfigSidecarName == containerName
})
if len(contains) > 0 {
- sts = append(sts, s)
+ objs = append(objs, items[i])
containers.Add(contains...)
}
}
- return sts, containers.AsSlice()
+ return objs, containers.AsSlice(), nil
}
func createConfigPatch(cfg *corev1.ConfigMap, format appsv1alpha1.CfgFileFormat, cmKeys []string) (*cfgcore.ConfigPatchInfo, bool, error) {
diff --git a/controllers/apps/configuration/policy_util.go b/controllers/apps/configuration/policy_util.go
index ebaace75c96..a07940118bb 100644
--- a/controllers/apps/configuration/policy_util.go
+++ b/controllers/apps/configuration/policy_util.go
@@ -27,7 +27,9 @@ import (
"strconv"
"github.com/spf13/viper"
+ appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apecloud/kubeblocks/controllers/apps/components/consensus"
"github.com/apecloud/kubeblocks/controllers/apps/components/util"
@@ -37,51 +39,6 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)
-type createReconfigureClient func(addr string) (cfgproto.ReconfigureClient, error)
-
-type GetPodsFunc func(params reconfigureParams) ([]corev1.Pod, error)
-
-type RestartContainerFunc func(pod *corev1.Pod, ctx context.Context, containerName []string, createConnFn createReconfigureClient) error
-type OnlineUpdatePodFunc func(pod *corev1.Pod, ctx context.Context, createClient createReconfigureClient, configSpec string, updatedParams map[string]string) error
-
-type RollingUpgradeFuncs struct {
- GetPodsFunc GetPodsFunc
- RestartContainerFunc RestartContainerFunc
- OnlineUpdatePodFunc OnlineUpdatePodFunc
-}
-
-func GetConsensusRollingUpgradeFuncs() RollingUpgradeFuncs {
- return RollingUpgradeFuncs{
- GetPodsFunc: getConsensusPods,
- RestartContainerFunc: commonStopContainerWithPod,
- OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
- }
-}
-
-func GetStatefulSetRollingUpgradeFuncs() RollingUpgradeFuncs {
- return RollingUpgradeFuncs{
- GetPodsFunc: getStatefulSetPods,
- RestartContainerFunc: commonStopContainerWithPod,
- OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
- }
-}
-
-func GetReplicationRollingUpgradeFuncs() RollingUpgradeFuncs {
- return RollingUpgradeFuncs{
- GetPodsFunc: getReplicationSetPods,
- RestartContainerFunc: commonStopContainerWithPod,
- OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
- }
-}
-
-func GetDeploymentRollingUpgradeFuncs() RollingUpgradeFuncs {
- return RollingUpgradeFuncs{
- GetPodsFunc: getDeploymentRollingPods,
- RestartContainerFunc: commonStopContainerWithPod,
- OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
- }
-}
-
func getDeploymentRollingPods(params reconfigureParams) ([]corev1.Pod, error) {
// util.GetComponentPodList supports deployment
return getReplicationSetPods(params)
@@ -246,3 +203,69 @@ func getURLFromPod(pod *corev1.Pod, portPort int) (string, error) {
}
return net.JoinHostPort(ip.String(), strconv.Itoa(portPort)), nil
}
+
+func restartStatelessComponent(client client.Client, ctx intctrlutil.RequestCtx, configKey string, expectedVersion string, deployObjs []client.Object, recordEvent func(obj client.Object)) (client.Object, error) {
+ cfgAnnotationKey := cfgcore.GenerateUniqKeyWithConfig(constant.UpgradeRestartAnnotationKey, configKey)
+ deployRestart := func(deploy *appv1.Deployment, expectedVersion string) error {
+ if deploy.Spec.Template.Annotations == nil {
+ deploy.Spec.Template.Annotations = map[string]string{}
+ }
+ deploy.Spec.Template.Annotations[cfgAnnotationKey] = expectedVersion
+ if err := client.Update(ctx.Ctx, deploy); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ for _, obj := range deployObjs {
+ deploy, ok := obj.(*appv1.Deployment)
+ if !ok {
+ continue
+ }
+ if updatedVersion(&deploy.Spec.Template, cfgAnnotationKey, expectedVersion) {
+ continue
+ }
+ if err := deployRestart(deploy, expectedVersion); err != nil {
+ return deploy, err
+ }
+ if recordEvent != nil {
+ recordEvent(deploy)
+ }
+ }
+ return nil, nil
+}
+
+func restartStatefulComponent(client client.Client, ctx intctrlutil.RequestCtx, configKey string, newVersion string, objs []client.Object, recordEvent func(obj client.Object)) (client.Object, error) {
+ cfgAnnotationKey := cfgcore.GenerateUniqKeyWithConfig(constant.UpgradeRestartAnnotationKey, configKey)
+ stsRestart := func(sts *appv1.StatefulSet, expectedVersion string) error {
+ if sts.Spec.Template.Annotations == nil {
+ sts.Spec.Template.Annotations = map[string]string{}
+ }
+ sts.Spec.Template.Annotations[cfgAnnotationKey] = expectedVersion
+ if err := client.Update(ctx.Ctx, sts); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ for _, obj := range objs {
+ sts, ok := obj.(*appv1.StatefulSet)
+ if !ok {
+ continue
+ }
+ if updatedVersion(&sts.Spec.Template, cfgAnnotationKey, newVersion) {
+ continue
+ }
+ if err := stsRestart(sts, newVersion); err != nil {
+ return sts, err
+ }
+ if recordEvent != nil {
+ recordEvent(sts)
+ }
+ }
+ return nil, nil
+}
+
+func updatedVersion(podTemplate *corev1.PodTemplateSpec, keyPath, expectedVersion string) bool {
+ return podTemplate.Annotations != nil && podTemplate.Annotations[keyPath] == expectedVersion
+}
diff --git a/controllers/apps/configuration/policy_util_test.go b/controllers/apps/configuration/policy_util_test.go
index d9125a80b76..95bb887e7b1 100644
--- a/controllers/apps/configuration/policy_util_test.go
+++ b/controllers/apps/configuration/policy_util_test.go
@@ -44,6 +44,39 @@ var (
stsSchemaKind = appsv1.SchemeGroupVersion.WithKind("StatefulSet")
)
+func newMockDeployments(replicas int, name string, labels map[string]string) appsv1.Deployment {
+ uid, _ := password.Generate(12, 12, 0, true, false)
+ return appsv1.Deployment{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "StatefulSet",
+ APIVersion: "apps/v1",
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ UID: types.UID(uid),
+ },
+ Spec: appsv1.DeploymentSpec{
+ Replicas: func() *int32 { i := int32(replicas); return &i }(),
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: labels,
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{},
+ Volumes: []corev1.Volume{{
+ Name: "for_test",
+ VolumeSource: corev1.VolumeSource{
+ HostPath: &corev1.HostPathVolumeSource{
+ Path: "/tmp",
+ },
+ }}},
+ },
+ },
+ },
+ }
+}
+
func newMockStatefulSet(replicas int, name string, labels map[string]string) appsv1.StatefulSet {
uid, _ := password.Generate(12, 12, 0, true, false)
serviceName, _ := password.Generate(12, 0, 0, true, false)
@@ -91,6 +124,16 @@ func withMockStatefulSet(replicas int, labels map[string]string) ParamsOps {
}
}
+func withMockDeployments(replicas int, labels map[string]string) ParamsOps {
+ return func(params *reconfigureParams) {
+ rand, _ := password.Generate(12, 8, 0, true, false)
+ deployName := "test_" + rand
+ params.DeploymentUnits = []appsv1.Deployment{
+ newMockDeployments(replicas, deployName, labels),
+ }
+ }
+}
+
func withClusterComponent(replicas int) ParamsOps {
return func(params *reconfigureParams) {
params.ClusterComponent = &appsv1alpha1.ClusterComponentSpec{
@@ -207,6 +250,22 @@ func newMockPodsWithStatefulSet(sts *appsv1.StatefulSet, replicas int, options .
return pods
}
+func newMockPodsWithDeployment(deploy *appsv1.Deployment, replicas int, options ...PodOptions) []corev1.Pod {
+ pods := make([]corev1.Pod, replicas)
+ for i := 0; i < replicas; i++ {
+ pods[i] = newMockPod(deploy.Name+"-"+fmt.Sprint(i), &deploy.Spec.Template.Spec)
+ pods[i].OwnerReferences = []metav1.OwnerReference{newControllerRef(deploy, stsSchemaKind)}
+ pods[i].Status.PodIP = "1.1.1.1"
+ }
+ for _, customFn := range options {
+ for i := range pods {
+ pod := &pods[i]
+ customFn(pod, i)
+ }
+ }
+ return pods
+}
+
func withReadyPod(rMin, rMax int) PodOptions {
return func(pod *corev1.Pod, index int) {
if index < rMin || index >= rMax {
diff --git a/controllers/apps/configuration/reconfigure_policy.go b/controllers/apps/configuration/reconfigure_policy.go
index 7c2be9df476..b710a300bd0 100644
--- a/controllers/apps/configuration/reconfigure_policy.go
+++ b/controllers/apps/configuration/reconfigure_policy.go
@@ -25,7 +25,7 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
- appv1 "k8s.io/api/apps/v1"
+ appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -103,7 +103,10 @@ type reconfigureParams struct {
Component *appsv1alpha1.ClusterComponentDefinition
// List of StatefulSets using this config template.
- ComponentUnits []appv1.StatefulSet
+ ComponentUnits []appsv1.StatefulSet
+
+ // List of Deployment using this config template.
+ DeploymentUnits []appsv1.Deployment
}
var (
@@ -259,3 +262,19 @@ func makeReturnedStatus(status ExecStatus, ops ...func(status *ReturnedStatus))
}
return ret
}
+
+func fromDeploymentObjects(units []appsv1.Deployment) []client.Object {
+ r := make([]client.Object, len(units))
+ for i, unit := range units {
+ r[i] = &unit
+ }
+ return r
+}
+
+func fromStatefulSetObjects(units []appsv1.StatefulSet) []client.Object {
+ r := make([]client.Object, len(units))
+ for i, unit := range units {
+ r[i] = &unit
+ }
+ return r
+}
diff --git a/controllers/apps/configuration/reconfigurerequest_controller.go b/controllers/apps/configuration/reconfigurerequest_controller.go
index 9a77a264a44..b34e36d0b70 100644
--- a/controllers/apps/configuration/reconfigurerequest_controller.go
+++ b/controllers/apps/configuration/reconfigurerequest_controller.go
@@ -26,6 +26,7 @@ import (
"strings"
"time"
+ "github.com/apecloud/kubeblocks/internal/generics"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -137,7 +138,6 @@ func checkConfigurationObject(object client.Object) bool {
func (r *ReconfigureRequestReconciler) sync(reqCtx intctrlutil.RequestCtx, config *corev1.ConfigMap, tpl *appsv1alpha1.ConfigConstraint) (ctrl.Result, error) {
var (
- stsLists = appv1.StatefulSetList{}
cluster = appsv1alpha1.Cluster{}
clusterKey = client.ObjectKey{
Namespace: config.GetNamespace(),
@@ -217,19 +217,13 @@ func (r *ReconfigureRequestReconciler) sync(reqCtx intctrlutil.RequestCtx, confi
return intctrlutil.Reconciled()
}
- // find STS CR
- if err := r.Client.List(reqCtx.Ctx, &stsLists, client.InNamespace(config.Namespace), client.MatchingLabels(componentLabels)); err != nil {
- return intctrlutil.RequeueWithErrorAndRecordEvent(config,
- r.Recorder,
- cfgcore.WrapError(err,
- "failed to get component. configmap[%s] label[%s]",
- reqCtx.Req.NamespacedName, componentLabels),
+ sts, deploys, containersList, err := r.syncRelatedComponents(reqCtx, config, component, componentLabels, configKey, configSpecName)
+ if err != nil {
+ return intctrlutil.RequeueWithErrorAndRecordEvent(config, r.Recorder,
+ cfgcore.WrapError(err, "failed to get component. configmap[%s] label[%s]", reqCtx.Req.NamespacedName, componentLabels),
reqCtx.Log)
}
-
- // configmap has never been used
- sts, containersList := getAssociatedComponentsByConfigmap(&stsLists, configKey, configSpecName)
- if len(sts) == 0 {
+ if len(sts) == 0 && len(deploys) == 0 {
reqCtx.Recorder.Eventf(config,
corev1.EventTypeWarning,
appsv1alpha1.ReasonReconfigureFailed,
@@ -247,6 +241,7 @@ func (r *ReconfigureRequestReconciler) sync(reqCtx intctrlutil.RequestCtx, confi
Cluster: &cluster,
ContainerNames: containersList,
ComponentUnits: sts,
+ DeploymentUnits: deploys,
Component: component,
ClusterComponent: clusterComponent,
Restart: forceRestart || !cfgcm.IsSupportReload(tpl.Spec.ReloadOptions),
@@ -333,6 +328,7 @@ func (r *ReconfigureRequestReconciler) handleConfigEvent(params reconfigureParam
ConfigConstraint: params.ConfigConstraint,
ConfigMap: params.ConfigMap,
ComponentUnits: params.ComponentUnits,
+ DeploymentUnits: params.DeploymentUnits,
PolicyStatus: status,
}
@@ -344,6 +340,22 @@ func (r *ReconfigureRequestReconciler) handleConfigEvent(params reconfigureParam
return nil
}
+func (r *ReconfigureRequestReconciler) syncRelatedComponents(reqCtx intctrlutil.RequestCtx, config *corev1.ConfigMap, component *appsv1alpha1.ClusterComponentDefinition, matchingLabels client.MatchingLabels, configKey client.ObjectKey, configSpecName string) ([]appv1.StatefulSet, []appv1.Deployment, []string, error) {
+ var err error
+ var containers []string
+ var statefuls []appv1.StatefulSet
+ var deploys []appv1.Deployment
+
+ if component.WorkloadType == appsv1alpha1.Stateless {
+ deploys, containers, err = getRelatedComponentsByConfigmap(r.Client, reqCtx.Ctx, generics.DeploymentSignature,
+ configKey, configSpecName, client.InNamespace(config.Namespace), matchingLabels)
+ } else {
+ statefuls, containers, err = getRelatedComponentsByConfigmap(r.Client, reqCtx.Ctx, generics.StatefulSetSignature,
+ configKey, configSpecName, client.InNamespace(config.Namespace), matchingLabels)
+ }
+ return statefuls, deploys, containers, err
+}
+
func fromReconfigureStatus(status ExecStatus) appsv1alpha1.OpsPhase {
switch status {
case ESFailed:
diff --git a/controllers/apps/configuration/simple_policy.go b/controllers/apps/configuration/simple_policy.go
index 3d6a24c1167..8939beb4085 100644
--- a/controllers/apps/configuration/simple_policy.go
+++ b/controllers/apps/configuration/simple_policy.go
@@ -20,14 +20,11 @@ along with this program. If not, see .
package configuration
import (
- appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
cfgcore "github.com/apecloud/kubeblocks/internal/configuration"
- "github.com/apecloud/kubeblocks/internal/constant"
- intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)
type simplePolicy struct {
@@ -40,22 +37,34 @@ func init() {
func (s *simplePolicy) Upgrade(params reconfigureParams) (ReturnedStatus, error) {
params.Ctx.Log.V(1).Info("simple policy begin....")
+ var funcs RollingUpgradeFuncs
+ var compLists []client.Object
+
switch params.WorkloadType() {
- case appsv1alpha1.Stateful, appsv1alpha1.Consensus, appsv1alpha1.Replication:
- return rollingStatefulSets(params)
default:
return makeReturnedStatus(ESNotSupport), cfgcore.MakeError("not supported component workload type:[%s]", params.WorkloadType())
+ case appsv1alpha1.Consensus:
+ funcs = GetConsensusRollingUpgradeFuncs()
+ compLists = fromStatefulSetObjects(params.ComponentUnits)
+ case appsv1alpha1.Stateful:
+ funcs = GetStatefulSetRollingUpgradeFuncs()
+ compLists = fromStatefulSetObjects(params.ComponentUnits)
+ case appsv1alpha1.Replication:
+ funcs = GetReplicationRollingUpgradeFuncs()
+ compLists = fromStatefulSetObjects(params.ComponentUnits)
+ case appsv1alpha1.Stateless:
+ funcs = GetDeploymentRollingUpgradeFuncs()
+ compLists = fromDeploymentObjects(params.DeploymentUnits)
}
+ return restartAndCheckComponent(params, funcs, compLists)
}
func (s *simplePolicy) GetPolicyName() string {
return string(appsv1alpha1.NormalPolicy)
}
-func rollingStatefulSets(param reconfigureParams) (ReturnedStatus, error) {
+func restartAndCheckComponent(param reconfigureParams, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) {
var (
- units = param.ComponentUnits
- client = param.Client
newVersion = param.getTargetVersionHash()
configKey = param.getConfigKey()
@@ -63,14 +72,19 @@ func rollingStatefulSets(param reconfigureParams) (ReturnedStatus, error) {
progress = cfgcore.NotStarted
)
- for _, sts := range units {
- if err := restartStsWithRolling(client, param.Ctx, sts, configKey, newVersion, param.Cluster.Name, param.ClusterComponent.Name); err != nil {
- param.Ctx.Log.Error(err, "failed to restart statefulSet.", "stsName", sts.GetName())
- return makeReturnedStatus(ESAndRetryFailed), err
- }
+ recordEvent := func(obj client.Object) {
+ param.Ctx.Recorder.Eventf(obj,
+ corev1.EventTypeNormal, appsv1alpha1.ReasonReconfigureRestart,
+ "restarting component[%s] in cluster[%s], version: %s", param.ClusterComponent.Name, param.Cluster.Name, newVersion)
+ }
+ if obj, err := funcs.RestartComponent(param.Client, param.Ctx, configKey, newVersion, objs, recordEvent); err != nil {
+ param.Ctx.Recorder.Eventf(obj,
+ corev1.EventTypeWarning, appsv1alpha1.ReasonReconfigureRestartFailed,
+ "failed to restart component[%s] in cluster[%s], version: %s", client.ObjectKeyFromObject(obj), param.Cluster.Name, newVersion)
+ return makeReturnedStatus(ESAndRetryFailed), err
}
- pods, err := GetComponentPods(param)
+ pods, err := funcs.GetPodsFunc(param)
if err != nil {
return makeReturnedStatus(ESAndRetryFailed), err
}
@@ -82,30 +96,3 @@ func rollingStatefulSets(param reconfigureParams) (ReturnedStatus, error) {
}
return makeReturnedStatus(retStatus, withExpected(int32(len(pods))), withSucceed(progress)), nil
}
-
-func restartStsWithRolling(cli client.Client, ctx intctrlutil.RequestCtx, sts appsv1.StatefulSet, configKey string, newVersion string, clusterName, componentName string) error {
- cfgAnnotationKey := cfgcore.GenerateUniqKeyWithConfig(constant.UpgradeRestartAnnotationKey, configKey)
- if sts.Spec.Template.Annotations == nil {
- sts.Spec.Template.Annotations = map[string]string{}
- }
-
- lastVersion := ""
- if updatedVersion, ok := sts.Spec.Template.Annotations[cfgAnnotationKey]; ok {
- lastVersion = updatedVersion
- }
-
- // updated UpgradeRestartAnnotationKey
- if lastVersion == newVersion {
- return nil
- }
-
- ctx.Recorder.Eventf(&sts,
- corev1.EventTypeNormal, "Restarting",
- "restarting component[%s] in cluster[%s], version: %s", componentName, clusterName, newVersion)
- sts.Spec.Template.Annotations[cfgAnnotationKey] = newVersion
- if err := cli.Update(ctx.Ctx, &sts); err != nil {
- return err
- }
-
- return nil
-}
diff --git a/controllers/apps/configuration/simple_policy_test.go b/controllers/apps/configuration/simple_policy_test.go
index 3d07f9fdf5c..330b638ff03 100644
--- a/controllers/apps/configuration/simple_policy_test.go
+++ b/controllers/apps/configuration/simple_policy_test.go
@@ -159,8 +159,8 @@ var _ = Describe("Reconfigure simplePolicy", func() {
Context("simple reconfigure policy test for not supported component", func() {
It("Should failed", func() {
// not support type
- mockParam := newMockReconfigureParams("simplePolicy", nil,
- withMockStatefulSet(2, nil),
+ mockParam := newMockReconfigureParams("simplePolicy", k8sMockClient.Client(),
+ withMockDeployments(2, nil),
withConfigSpec("for_test", map[string]string{
"key": "value",
}),
@@ -170,10 +170,53 @@ var _ = Describe("Reconfigure simplePolicy", func() {
Name: "for_test",
VolumeName: "test_volume",
}}}))
+
+ updateErr := cfgcore.MakeError("update failed!")
+ k8sMockClient.MockUpdateMethod(
+ testutil.WithFailed(updateErr, testutil.WithTimes(1)),
+ testutil.WithSucceed(testutil.WithAnyTimes()))
+ k8sMockClient.MockListMethod(testutil.WithListReturned(
+ testutil.WithConstructListSequenceResult([][]runtime.Object{
+ fromPodObjectList(newMockPodsWithDeployment(&mockParam.DeploymentUnits[0], 2)),
+ fromPodObjectList(newMockPodsWithDeployment(&mockParam.DeploymentUnits[0], 2, withReadyPod(0, 2), func(pod *corev1.Pod, index int) {
+ // mock pod-1 restart
+ if index == 1 {
+ updatePodCfgVersion(pod, mockParam.getConfigKey(), mockParam.getTargetVersionHash())
+ }
+ })),
+ fromPodObjectList(newMockPodsWithDeployment(&mockParam.DeploymentUnits[0], 2, withReadyPod(0, 2), func(pod *corev1.Pod, index int) {
+ // mock all pod restart
+ updatePodCfgVersion(pod, mockParam.getConfigKey(), mockParam.getTargetVersionHash())
+ })),
+ }),
+ testutil.WithTimes(3),
+ ))
+
status, err := simplePolicy.Upgrade(mockParam)
- Expect(err).ShouldNot(Succeed())
- Expect(err.Error()).Should(ContainSubstring("not supported component workload type"))
- Expect(status.Status).Should(BeEquivalentTo(ESNotSupport))
+ Expect(err).Should(BeEquivalentTo(updateErr))
+ Expect(status.Status).Should(BeEquivalentTo(ESAndRetryFailed))
+
+ // first upgrade, not pod is ready
+ status, err = simplePolicy.Upgrade(mockParam)
+ Expect(err).Should(Succeed())
+ Expect(status.Status).Should(BeEquivalentTo(ESRetry))
+ Expect(status.SucceedCount).Should(BeEquivalentTo(int32(0)))
+ Expect(status.ExpectedCount).Should(BeEquivalentTo(int32(2)))
+
+ // only one pod ready
+ status, err = simplePolicy.Upgrade(mockParam)
+ Expect(err).Should(Succeed())
+ Expect(status.Status).Should(BeEquivalentTo(ESRetry))
+ Expect(status.SucceedCount).Should(BeEquivalentTo(int32(1)))
+ Expect(status.ExpectedCount).Should(BeEquivalentTo(int32(2)))
+
+ // succeed update pod
+ status, err = simplePolicy.Upgrade(mockParam)
+ Expect(err).Should(Succeed())
+ Expect(status.Status).Should(BeEquivalentTo(ESNone))
+ Expect(status.SucceedCount).Should(BeEquivalentTo(int32(2)))
+ Expect(status.ExpectedCount).Should(BeEquivalentTo(int32(2)))
+
})
})
diff --git a/controllers/apps/configuration/types.go b/controllers/apps/configuration/types.go
new file mode 100644
index 00000000000..4afaa420afc
--- /dev/null
+++ b/controllers/apps/configuration/types.go
@@ -0,0 +1,81 @@
+/*
+Copyright (C) 2022-2023 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package configuration
+
+import (
+ "context"
+
+ corev1 "k8s.io/api/core/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ cfgproto "github.com/apecloud/kubeblocks/internal/configuration/proto"
+ intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
+)
+
+type createReconfigureClient func(addr string) (cfgproto.ReconfigureClient, error)
+
+type GetPodsFunc func(params reconfigureParams) ([]corev1.Pod, error)
+type RestartComponent func(client client.Client, ctx intctrlutil.RequestCtx, key string, version string, objs []client.Object, recordEvent func(obj client.Object)) (client.Object, error)
+
+type RestartContainerFunc func(pod *corev1.Pod, ctx context.Context, containerName []string, createConnFn createReconfigureClient) error
+type OnlineUpdatePodFunc func(pod *corev1.Pod, ctx context.Context, createClient createReconfigureClient, configSpec string, updatedParams map[string]string) error
+
+type RollingUpgradeFuncs struct {
+ GetPodsFunc GetPodsFunc
+ RestartContainerFunc RestartContainerFunc
+ OnlineUpdatePodFunc OnlineUpdatePodFunc
+ RestartComponent RestartComponent
+}
+
+func GetConsensusRollingUpgradeFuncs() RollingUpgradeFuncs {
+ return RollingUpgradeFuncs{
+ GetPodsFunc: getConsensusPods,
+ RestartContainerFunc: commonStopContainerWithPod,
+ OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
+ RestartComponent: restartStatefulComponent,
+ }
+}
+
+func GetStatefulSetRollingUpgradeFuncs() RollingUpgradeFuncs {
+ return RollingUpgradeFuncs{
+ GetPodsFunc: getStatefulSetPods,
+ RestartContainerFunc: commonStopContainerWithPod,
+ OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
+ RestartComponent: restartStatefulComponent,
+ }
+}
+
+func GetReplicationRollingUpgradeFuncs() RollingUpgradeFuncs {
+ return RollingUpgradeFuncs{
+ GetPodsFunc: getReplicationSetPods,
+ RestartContainerFunc: commonStopContainerWithPod,
+ OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
+ RestartComponent: restartStatefulComponent,
+ }
+}
+
+func GetDeploymentRollingUpgradeFuncs() RollingUpgradeFuncs {
+ return RollingUpgradeFuncs{
+ GetPodsFunc: getDeploymentRollingPods,
+ RestartContainerFunc: commonStopContainerWithPod,
+ OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
+ RestartComponent: restartStatelessComponent,
+ }
+}
diff --git a/internal/configuration/config.go b/internal/configuration/config.go
index f3ba5c22fe1..5e8060a2b49 100644
--- a/internal/configuration/config.go
+++ b/internal/configuration/config.go
@@ -62,6 +62,7 @@ type ConfigEventContext struct {
ClusterComponent *appsv1alpha1.ClusterComponentSpec
Component *appsv1alpha1.ClusterComponentDefinition
ComponentUnits []appv1.StatefulSet
+ DeploymentUnits []appv1.Deployment
ConfigSpecName string
ConfigPatch *ConfigPatchInfo