Skip to content

Commit

Permalink
feat: stateless support reconfiguring operation (#3831) (#3864)
Browse files Browse the repository at this point in the history
  • Loading branch information
sophon-zt authored Jun 20, 2023
1 parent 57bc8aa commit f9aeb35
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 139 deletions.
30 changes: 16 additions & 14 deletions apis/apps/v1alpha1/opsrequest_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,22 @@ const (

// condition and event reasons

ReasonReconfigureMerging = "ReconfigureMerging"
ReasonReconfigureMerged = "ReconfigureMerged"
ReasonReconfigureFailed = "ReconfigureFailed"
ReasonReconfigureNoChanged = "ReconfigureNoChanged"
ReasonReconfigureSucceed = "ReconfigureSucceed"
ReasonReconfigureRunning = "ReconfigureRunning"
ReasonClusterPhaseMismatch = "ClusterPhaseMismatch"
ReasonOpsTypeNotSupported = "OpsTypeNotSupported"
ReasonValidateFailed = "ValidateFailed"
ReasonClusterNotFound = "ClusterNotFound"
ReasonOpsRequestFailed = "OpsRequestFailed"
ReasonOpsCanceling = "Canceling"
ReasonOpsCancelFailed = "CancelFailed"
ReasonOpsCancelSucceed = "CancelSucceed"
ReasonReconfigureMerging = "ReconfigureMerging"
ReasonReconfigureMerged = "ReconfigureMerged"
ReasonReconfigureFailed = "ReconfigureFailed"
ReasonReconfigureRestartFailed = "ReconfigureRestartFailed"
ReasonReconfigureRestart = "ReconfigureRestarted"
ReasonReconfigureNoChanged = "ReconfigureNoChanged"
ReasonReconfigureSucceed = "ReconfigureSucceed"
ReasonReconfigureRunning = "ReconfigureRunning"
ReasonClusterPhaseMismatch = "ClusterPhaseMismatch"
ReasonOpsTypeNotSupported = "OpsTypeNotSupported"
ReasonValidateFailed = "ValidateFailed"
ReasonClusterNotFound = "ClusterNotFound"
ReasonOpsRequestFailed = "OpsRequestFailed"
ReasonOpsCanceling = "Canceling"
ReasonOpsCancelFailed = "CancelFailed"
ReasonOpsCancelSucceed = "CancelSucceed"
)

func (r *OpsRequest) SetStatusCondition(condition metav1.Condition) {
Expand Down
66 changes: 46 additions & 20 deletions controllers/apps/configuration/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"fmt"
"reflect"

"github.com/StudioSol/set"
cfgutil "github.com/apecloud/kubeblocks/internal/configuration/util"
"github.com/go-logr/logr"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -325,16 +325,15 @@ func validateConfigTemplate(cli client.Client, ctx intctrlutil.RequestCtx, confi
logger.Error(err, "failed to get config template cm object!")
return nil, err
}

if configSpec.ConfigConstraintRef == "" {
return nil, nil
}

configObj := &appsv1alpha1.ConfigConstraint{}
if err := cli.Get(ctx.Ctx, client.ObjectKey{
configKey := client.ObjectKey{
Namespace: "",
Name: configSpec.ConfigConstraintRef,
}, configObj); err != nil {
}
configObj := &appsv1alpha1.ConfigConstraint{}
if err := cli.Get(ctx.Ctx, configKey, configObj); err != nil {
logger.Error(err, "failed to get template cm object!")
return nil, err
}
Expand Down Expand Up @@ -378,35 +377,62 @@ func updateConfigConstraintStatus(cli client.Client, ctx intctrlutil.RequestCtx,
return cli.Status().Patch(ctx.Ctx, configConstraint, patch)
}

func getAssociatedComponentsByConfigmap(stsList *appv1.StatefulSetList, cfg client.ObjectKey, configSpecName string) ([]appv1.StatefulSet, []string) {
managerContainerName := constant.ConfigSidecarName
stsLen := len(stsList.Items)
if stsLen == 0 {
return nil, nil
func toObjects[T generics.Object, L generics.ObjList[T], PL generics.PObjList[T, L]](compList PL) []T {
return reflect.ValueOf(compList).Elem().FieldByName("Items").Interface().([]T)
}

func toResourceObject(obj any) client.Object {
return obj.(client.Object)
}

func getPodTemplate(obj client.Object) *corev1.PodTemplateSpec {
switch v := obj.(type) {
default:
return nil
case *appv1.StatefulSet:
return &v.Spec.Template
case *appv1.Deployment:
return &v.Spec.Template
}
}

func getRelatedComponentsByConfigmap[T generics.Object, PT generics.PObject[T], L generics.ObjList[T], PL generics.PObjList[T, L]](cli client.Client, ctx context.Context, _ func(T, L), cfg client.ObjectKey, configSpecName string, opts ...client.ListOption) ([]T, []string, error) {
var objList L
if err := cli.List(ctx, PL(&objList), opts...); err != nil {
return nil, nil, err
}

sts := make([]appv1.StatefulSet, 0, stsLen)
containers := set.NewLinkedHashSetString()
objs := make([]T, 0)
containers := cfgutil.NewSet()
configSpecKey := cfgcore.GenerateTPLUniqLabelKeyWithConfig(configSpecName)
for _, s := range stsList.Items {
if !usingComponentConfigSpec(s.GetAnnotations(), configSpecKey, cfg.Name) {
items := toObjects[T, L, PL](&objList)
for i := range items {
obj := toResourceObject(&items[i])
if objs == nil {
return nil, nil, cfgcore.MakeError("failed to convert to resource object")
}
if !usingComponentConfigSpec(obj.GetAnnotations(), configSpecKey, cfg.Name) {
continue
}
podTemplate := getPodTemplate(obj)
if podTemplate == nil {
continue
}
volumeMounted := intctrlutil.GetVolumeMountName(s.Spec.Template.Spec.Volumes, cfg.Name)
volumeMounted := intctrlutil.GetVolumeMountName(podTemplate.Spec.Volumes, cfg.Name)
if volumeMounted == nil {
continue
}
// filter config manager sidecar container
contains := intctrlutil.GetContainersByConfigmap(s.Spec.Template.Spec.Containers,
contains := intctrlutil.GetContainersByConfigmap(podTemplate.Spec.Containers,
volumeMounted.Name, func(containerName string) bool {
return managerContainerName == containerName
return constant.ConfigSidecarName == containerName
})
if len(contains) > 0 {
sts = append(sts, s)
objs = append(objs, items[i])
containers.Add(contains...)
}
}
return sts, containers.AsSlice()
return objs, containers.AsSlice(), nil
}

func createConfigPatch(cfg *corev1.ConfigMap, format appsv1alpha1.CfgFileFormat, cmKeys []string) (*cfgcore.ConfigPatchInfo, bool, error) {
Expand Down
113 changes: 68 additions & 45 deletions controllers/apps/configuration/policy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"strconv"

"github.com/spf13/viper"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apecloud/kubeblocks/controllers/apps/components/consensus"
"github.com/apecloud/kubeblocks/controllers/apps/components/util"
Expand All @@ -37,51 +39,6 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

type createReconfigureClient func(addr string) (cfgproto.ReconfigureClient, error)

type GetPodsFunc func(params reconfigureParams) ([]corev1.Pod, error)

type RestartContainerFunc func(pod *corev1.Pod, ctx context.Context, containerName []string, createConnFn createReconfigureClient) error
type OnlineUpdatePodFunc func(pod *corev1.Pod, ctx context.Context, createClient createReconfigureClient, configSpec string, updatedParams map[string]string) error

type RollingUpgradeFuncs struct {
GetPodsFunc GetPodsFunc
RestartContainerFunc RestartContainerFunc
OnlineUpdatePodFunc OnlineUpdatePodFunc
}

func GetConsensusRollingUpgradeFuncs() RollingUpgradeFuncs {
return RollingUpgradeFuncs{
GetPodsFunc: getConsensusPods,
RestartContainerFunc: commonStopContainerWithPod,
OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
}
}

func GetStatefulSetRollingUpgradeFuncs() RollingUpgradeFuncs {
return RollingUpgradeFuncs{
GetPodsFunc: getStatefulSetPods,
RestartContainerFunc: commonStopContainerWithPod,
OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
}
}

func GetReplicationRollingUpgradeFuncs() RollingUpgradeFuncs {
return RollingUpgradeFuncs{
GetPodsFunc: getReplicationSetPods,
RestartContainerFunc: commonStopContainerWithPod,
OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
}
}

func GetDeploymentRollingUpgradeFuncs() RollingUpgradeFuncs {
return RollingUpgradeFuncs{
GetPodsFunc: getDeploymentRollingPods,
RestartContainerFunc: commonStopContainerWithPod,
OnlineUpdatePodFunc: commonOnlineUpdateWithPod,
}
}

func getDeploymentRollingPods(params reconfigureParams) ([]corev1.Pod, error) {
// util.GetComponentPodList supports deployment
return getReplicationSetPods(params)
Expand Down Expand Up @@ -246,3 +203,69 @@ func getURLFromPod(pod *corev1.Pod, portPort int) (string, error) {
}
return net.JoinHostPort(ip.String(), strconv.Itoa(portPort)), nil
}

func restartStatelessComponent(client client.Client, ctx intctrlutil.RequestCtx, configKey string, expectedVersion string, deployObjs []client.Object, recordEvent func(obj client.Object)) (client.Object, error) {
cfgAnnotationKey := cfgcore.GenerateUniqKeyWithConfig(constant.UpgradeRestartAnnotationKey, configKey)
deployRestart := func(deploy *appv1.Deployment, expectedVersion string) error {
if deploy.Spec.Template.Annotations == nil {
deploy.Spec.Template.Annotations = map[string]string{}
}
deploy.Spec.Template.Annotations[cfgAnnotationKey] = expectedVersion
if err := client.Update(ctx.Ctx, deploy); err != nil {
return err
}
return nil
}

for _, obj := range deployObjs {
deploy, ok := obj.(*appv1.Deployment)
if !ok {
continue
}
if updatedVersion(&deploy.Spec.Template, cfgAnnotationKey, expectedVersion) {
continue
}
if err := deployRestart(deploy, expectedVersion); err != nil {
return deploy, err
}
if recordEvent != nil {
recordEvent(deploy)
}
}
return nil, nil
}

func restartStatefulComponent(client client.Client, ctx intctrlutil.RequestCtx, configKey string, newVersion string, objs []client.Object, recordEvent func(obj client.Object)) (client.Object, error) {
cfgAnnotationKey := cfgcore.GenerateUniqKeyWithConfig(constant.UpgradeRestartAnnotationKey, configKey)
stsRestart := func(sts *appv1.StatefulSet, expectedVersion string) error {
if sts.Spec.Template.Annotations == nil {
sts.Spec.Template.Annotations = map[string]string{}
}
sts.Spec.Template.Annotations[cfgAnnotationKey] = expectedVersion
if err := client.Update(ctx.Ctx, sts); err != nil {
return err
}
return nil
}

for _, obj := range objs {
sts, ok := obj.(*appv1.StatefulSet)
if !ok {
continue
}
if updatedVersion(&sts.Spec.Template, cfgAnnotationKey, newVersion) {
continue
}
if err := stsRestart(sts, newVersion); err != nil {
return sts, err
}
if recordEvent != nil {
recordEvent(sts)
}
}
return nil, nil
}

func updatedVersion(podTemplate *corev1.PodTemplateSpec, keyPath, expectedVersion string) bool {
return podTemplate.Annotations != nil && podTemplate.Annotations[keyPath] == expectedVersion
}
59 changes: 59 additions & 0 deletions controllers/apps/configuration/policy_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,39 @@ var (
stsSchemaKind = appsv1.SchemeGroupVersion.WithKind("StatefulSet")
)

func newMockDeployments(replicas int, name string, labels map[string]string) appsv1.Deployment {
uid, _ := password.Generate(12, 12, 0, true, false)
return appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
UID: types.UID(uid),
},
Spec: appsv1.DeploymentSpec{
Replicas: func() *int32 { i := int32(replicas); return &i }(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{},
Volumes: []corev1.Volume{{
Name: "for_test",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/tmp",
},
}}},
},
},
},
}
}

func newMockStatefulSet(replicas int, name string, labels map[string]string) appsv1.StatefulSet {
uid, _ := password.Generate(12, 12, 0, true, false)
serviceName, _ := password.Generate(12, 0, 0, true, false)
Expand Down Expand Up @@ -91,6 +124,16 @@ func withMockStatefulSet(replicas int, labels map[string]string) ParamsOps {
}
}

func withMockDeployments(replicas int, labels map[string]string) ParamsOps {
return func(params *reconfigureParams) {
rand, _ := password.Generate(12, 8, 0, true, false)
deployName := "test_" + rand
params.DeploymentUnits = []appsv1.Deployment{
newMockDeployments(replicas, deployName, labels),
}
}
}

func withClusterComponent(replicas int) ParamsOps {
return func(params *reconfigureParams) {
params.ClusterComponent = &appsv1alpha1.ClusterComponentSpec{
Expand Down Expand Up @@ -207,6 +250,22 @@ func newMockPodsWithStatefulSet(sts *appsv1.StatefulSet, replicas int, options .
return pods
}

func newMockPodsWithDeployment(deploy *appsv1.Deployment, replicas int, options ...PodOptions) []corev1.Pod {
pods := make([]corev1.Pod, replicas)
for i := 0; i < replicas; i++ {
pods[i] = newMockPod(deploy.Name+"-"+fmt.Sprint(i), &deploy.Spec.Template.Spec)
pods[i].OwnerReferences = []metav1.OwnerReference{newControllerRef(deploy, stsSchemaKind)}
pods[i].Status.PodIP = "1.1.1.1"
}
for _, customFn := range options {
for i := range pods {
pod := &pods[i]
customFn(pod, i)
}
}
return pods
}

func withReadyPod(rMin, rMax int) PodOptions {
return func(pod *corev1.Pod, index int) {
if index < rMin || index >= rMax {
Expand Down
23 changes: 21 additions & 2 deletions controllers/apps/configuration/reconfigure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
appv1 "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -103,7 +103,10 @@ type reconfigureParams struct {
Component *appsv1alpha1.ClusterComponentDefinition

// List of StatefulSets using this config template.
ComponentUnits []appv1.StatefulSet
ComponentUnits []appsv1.StatefulSet

// List of Deployment using this config template.
DeploymentUnits []appsv1.Deployment
}

var (
Expand Down Expand Up @@ -259,3 +262,19 @@ func makeReturnedStatus(status ExecStatus, ops ...func(status *ReturnedStatus))
}
return ret
}

func fromDeploymentObjects(units []appsv1.Deployment) []client.Object {
r := make([]client.Object, len(units))
for i, unit := range units {
r[i] = &unit
}
return r
}

func fromStatefulSetObjects(units []appsv1.StatefulSet) []client.Object {
r := make([]client.Object, len(units))
for i, unit := range units {
r[i] = &unit
}
return r
}
Loading

0 comments on commit f9aeb35

Please sign in to comment.