From cf8df7326e2a450413101705fda45ce931cdfe6a Mon Sep 17 00:00:00 2001 From: Leon Date: Tue, 27 Jun 2023 16:58:15 +0800 Subject: [PATCH] feat: volume protection from space exhaustion --- apis/apps/v1alpha1/clusterdefinition_types.go | 27 ++ cmd/probe/internal/binding/base.go | 60 ++- cmd/probe/internal/binding/base_test.go | 2 +- cmd/probe/internal/binding/custom/custom.go | 2 +- cmd/probe/internal/binding/etcd/etcd.go | 2 +- .../internal/binding/kafka/kafka_test.go | 2 +- cmd/probe/internal/binding/mongodb/mongodb.go | 2 +- .../internal/binding/mysql/mysql_test.go | 24 +- .../binding/operation_volume_protection.go | 399 ++++++++++++++++++ .../binding/postgres/postgres_test.go | 22 +- .../internal/binding/redis/redis_test.go | 12 +- cmd/probe/internal/binding/utils.go | 51 ++- controllers/k8score/event_controller.go | 127 ------ controllers/k8score/event_handler.go | 40 ++ controllers/k8score/event_utils.go | 15 + .../k8score/role_change_event_handler.go | 121 ++++++ controllers/k8score/types.go | 30 ++ go.mod | 11 +- go.sum | 12 + .../template/bench_sysbench_template.cue | 4 +- internal/constant/const.go | 11 +- internal/controller/component/component.go | 1 + internal/controller/component/probe_utils.go | 39 +- internal/controller/component/type.go | 1 + internal/sqlchannel/util/types.go | 1 + 25 files changed, 806 insertions(+), 212 deletions(-) create mode 100644 cmd/probe/internal/binding/operation_volume_protection.go create mode 100644 controllers/k8score/event_handler.go create mode 100644 controllers/k8score/role_change_event_handler.go create mode 100644 controllers/k8score/types.go diff --git a/apis/apps/v1alpha1/clusterdefinition_types.go b/apis/apps/v1alpha1/clusterdefinition_types.go index 98f7759e8cbf..1de455001c55 100644 --- a/apis/apps/v1alpha1/clusterdefinition_types.go +++ b/apis/apps/v1alpha1/clusterdefinition_types.go @@ -239,6 +239,24 @@ type VolumeTypeSpec struct { Type VolumeType `json:"type,omitempty"` } +type VolumeProtectionSpec struct { + // +kubebuilder:validation:Maximum=100 + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=90 + // +optional + LowWatermark int `json:"lowWatermark,omitempty"` + + // +kubebuilder:validation:Maximum=100 + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=90 + // +optional + HighWatermark int `json:"highWatermark,omitempty"` + + // Volumes to monitor. + // +optional + Volumes []string `json:"volumes,omitempty"` +} + // ClusterComponentDefinition provides a workload component specification template, // with attributes that strongly work with stateful workloads and day-2 operations // behaviors. @@ -364,6 +382,9 @@ type ClusterComponentDefinition struct { // in particular, when workloadType=Replication, the command defined in switchoverSpec will only be executed under the condition of cluster.componentSpecs[x].SwitchPolicy.type=Noop. // +optional SwitchoverSpec *SwitchoverSpec `json:"switchoverSpec,omitempty"` + + // +optional + VolumeProtectionSpec *VolumeProtectionSpec `json:"volumeProtectionSpec,omitempty"` } func (r *ClusterComponentDefinition) GetStatefulSetWorkload() StatefulSetWorkload { @@ -621,6 +642,12 @@ type ClusterDefinitionProbes struct { // +optional RoleProbe *ClusterDefinitionProbe `json:"roleProbe,omitempty"` + // TODO: we should not provide this option for user. + // Probe to monitor the volume space usage and prohibit the instance from writing if the volume space usage is over the defined threshold. + // Reference ClusterDefinition.ClusterComponentDefinition.VolumeProtectionSpec for the threshold and volumes to monitor. + // +optional + VolumeProtectionProbe *ClusterDefinitionProbe `json:"volumeProtectionProbe,omitempty"` + // roleProbeTimeoutAfterPodsReady(in seconds), when all pods of the component are ready, // it will detect whether the application is available in the pod. // if pods exceed the InitializationTimeoutSeconds time without a role label, diff --git a/cmd/probe/internal/binding/base.go b/cmd/probe/internal/binding/base.go index ef4a123b2145..2976202fbbce 100644 --- a/cmd/probe/internal/binding/base.go +++ b/cmd/probe/internal/binding/base.go @@ -36,10 +36,16 @@ import ( . "github.com/apecloud/kubeblocks/internal/sqlchannel/util" ) -type Operation func(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (OpsResult, error) +type LegacyOperation func(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (OpsResult, error) type OpsResult map[string]interface{} +type Operation interface { + Kind() bindings.OperationKind + Init(metadata bindings.Metadata) error + Invoke(ctx context.Context, req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) error +} + // AccessMode defines SVC access mode enums. // +enum type AccessMode string @@ -72,7 +78,8 @@ type BaseOperations struct { Metadata bindings.Metadata InitIfNeed func() bool GetRole func(context.Context, *bindings.InvokeRequest, *bindings.InvokeResponse) (string, error) - OperationMap map[bindings.OperationKind]Operation + LegacyOperations map[bindings.OperationKind]LegacyOperation + Ops map[bindings.OperationKind]Operation } func init() { @@ -102,26 +109,36 @@ func (ops *BaseOperations) Init(metadata bindings.Metadata) { } } ops.Metadata = metadata - ops.OperationMap = map[bindings.OperationKind]Operation{ + ops.LegacyOperations = map[bindings.OperationKind]LegacyOperation{ CheckRunningOperation: ops.CheckRunningOps, CheckRoleOperation: ops.CheckRoleOps, GetRoleOperation: ops.GetRoleOps, + VolumeProtection: ops.volumeProtection, + } + ops.Ops = map[bindings.OperationKind]Operation{ + VolumeProtection: &operationVolumeProtection{}, } ops.DBAddress = ops.getAddress() + + for kind, op := range ops.Ops { + if err := op.Init(metadata); err != nil { + panic(fmt.Sprintf("init operation %s error: %s", kind, err.Error())) + } + } } -func (ops *BaseOperations) RegisterOperation(opsKind bindings.OperationKind, operation Operation) { - if ops.OperationMap == nil { - ops.OperationMap = map[bindings.OperationKind]Operation{} +func (ops *BaseOperations) RegisterOperation(opsKind bindings.OperationKind, operation LegacyOperation) { + if ops.LegacyOperations == nil { + ops.LegacyOperations = map[bindings.OperationKind]LegacyOperation{} } - ops.OperationMap[opsKind] = operation + ops.LegacyOperations[opsKind] = operation } // Operations returns list of operations supported by the binding. func (ops *BaseOperations) Operations() []bindings.OperationKind { - opsKinds := make([]bindings.OperationKind, len(ops.OperationMap)) + opsKinds := make([]bindings.OperationKind, len(ops.LegacyOperations)) i := 0 - for opsKind := range ops.OperationMap { + for opsKind := range ops.LegacyOperations { opsKinds[i] = opsKind i++ } @@ -129,7 +146,7 @@ func (ops *BaseOperations) Operations() []bindings.OperationKind { } // getAddress returns component service address, if component is not listening on -// 127.0.0.1, the Operation needs to overwrite this function and set ops.DBAddress +// 127.0.0.1, the LegacyOperation needs to overwrite this function and set ops.DBAddress func (ops *BaseOperations) getAddress() string { return "127.0.0.1" } @@ -155,7 +172,7 @@ func (ops *BaseOperations) Invoke(ctx context.Context, req *bindings.InvokeReque return resp, nil } - operation, ok := ops.OperationMap[req.Operation] + operation, ok := ops.LegacyOperations[req.Operation] opsRes := OpsResult{} if !ok { message := fmt.Sprintf("%v operation is not implemented for %v", req.Operation, ops.DBType) @@ -180,8 +197,10 @@ func (ops *BaseOperations) Invoke(ctx context.Context, req *bindings.InvokeReque if err != nil { return nil, err } - res, _ := json.Marshal(opsRes) - resp.Data = res + if opsRes != nil { + res, _ := json.Marshal(opsRes) + resp.Data = res + } return updateRespMetadata() } @@ -268,6 +287,21 @@ func (ops *BaseOperations) GetRoleOps(ctx context.Context, req *bindings.InvokeR return opsRes, nil } +func (ops *BaseOperations) volumeProtection(ctx context.Context, req *bindings.InvokeRequest, + rsp *bindings.InvokeResponse) (OpsResult, error) { + return ops.fwdLegacyOperationCall(VolumeProtection, ctx, req, rsp) +} + +func (ops *BaseOperations) fwdLegacyOperationCall(kind bindings.OperationKind, ctx context.Context, + req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) (OpsResult, error) { + op, ok := ops.Ops[kind] + if !ok { + panic(fmt.Sprintf("unknown operation kind: %s", kind)) + } + // since the rsp.Data has been set properly, it doesn't need to return a OpsResult here. + return nil, op.Invoke(ctx, req, rsp) +} + // Component may have some internal roles that needn't be exposed to end user, // and not configured in cluster definition, e.g. ETCD's Candidate. // roleValidate is used to filter the internal roles and decrease the number diff --git a/cmd/probe/internal/binding/base_test.go b/cmd/probe/internal/binding/base_test.go index 2070b4c6634d..932a3b1b41fe 100644 --- a/cmd/probe/internal/binding/base_test.go +++ b/cmd/probe/internal/binding/base_test.go @@ -75,7 +75,7 @@ func TestOperations(t *testing.T) { ops := p.Operations() if len(ops) != 4 { - t.Errorf("p.OperationMap init failed: %s", p.OriRole) + t.Errorf("p.LegacyOperations init failed: %s", p.OriRole) } } diff --git a/cmd/probe/internal/binding/custom/custom.go b/cmd/probe/internal/binding/custom/custom.go index 57ae370367b8..91a9ea9e7ff9 100644 --- a/cmd/probe/internal/binding/custom/custom.go +++ b/cmd/probe/internal/binding/custom/custom.go @@ -78,7 +78,7 @@ func (h *HTTPCustom) Init(metadata bindings.Metadata) error { h.BaseOperations.Init(metadata) h.BaseOperations.GetRole = h.GetRole - h.OperationMap[CheckRoleOperation] = h.CheckRoleOps + h.LegacyOperations[CheckRoleOperation] = h.CheckRoleOps return nil } diff --git a/cmd/probe/internal/binding/etcd/etcd.go b/cmd/probe/internal/binding/etcd/etcd.go index e877f01601e6..0faa228f8441 100644 --- a/cmd/probe/internal/binding/etcd/etcd.go +++ b/cmd/probe/internal/binding/etcd/etcd.go @@ -59,7 +59,7 @@ func (e *Etcd) Init(metadata bindings.Metadata) error { e.DBType = "etcd" e.InitIfNeed = e.initIfNeed e.DBPort = e.GetRunningPort() - e.OperationMap[GetRoleOperation] = e.GetRoleOps + e.LegacyOperations[GetRoleOperation] = e.GetRoleOps return nil } diff --git a/cmd/probe/internal/binding/kafka/kafka_test.go b/cmd/probe/internal/binding/kafka/kafka_test.go index 67442c11bd04..7dca6d16df11 100644 --- a/cmd/probe/internal/binding/kafka/kafka_test.go +++ b/cmd/probe/internal/binding/kafka/kafka_test.go @@ -41,7 +41,7 @@ func TestInit(t *testing.T) { assert.Equal(t, "kafka", kafkaOps.DBType) assert.NotNil(t, kafkaOps.InitIfNeed) - assert.NotNil(t, kafkaOps.OperationMap[CheckStatusOperation]) + assert.NotNil(t, kafkaOps.LegacyOperations[CheckStatusOperation]) } func TestCheckStatusOps(t *testing.T) { diff --git a/cmd/probe/internal/binding/mongodb/mongodb.go b/cmd/probe/internal/binding/mongodb/mongodb.go index d31430644235..4f214d0ab74b 100644 --- a/cmd/probe/internal/binding/mongodb/mongodb.go +++ b/cmd/probe/internal/binding/mongodb/mongodb.go @@ -146,7 +146,7 @@ func (mongoOps *MongoDBOperations) Init(metadata bindings.Metadata) error { mongoOps.InitIfNeed = mongoOps.initIfNeed mongoOps.DBPort = mongoOps.GetRunningPort() mongoOps.BaseOperations.GetRole = mongoOps.GetRole - mongoOps.OperationMap[GetRoleOperation] = mongoOps.GetRoleOps + mongoOps.LegacyOperations[GetRoleOperation] = mongoOps.GetRoleOps return nil } diff --git a/cmd/probe/internal/binding/mysql/mysql_test.go b/cmd/probe/internal/binding/mysql/mysql_test.go index 2225cf0ee9a8..29b4046017c9 100644 --- a/cmd/probe/internal/binding/mysql/mysql_test.go +++ b/cmd/probe/internal/binding/mysql/mysql_test.go @@ -65,18 +65,18 @@ func TestInit(t *testing.T) { assert.NotNil(t, mysqlOps.InitIfNeed) assert.NotNil(t, mysqlOps.GetRole) assert.Equal(t, 3306, mysqlOps.DBPort) - assert.NotNil(t, mysqlOps.OperationMap[GetRoleOperation]) - assert.NotNil(t, mysqlOps.OperationMap[CheckStatusOperation]) - assert.NotNil(t, mysqlOps.OperationMap[GetLagOperation]) - assert.NotNil(t, mysqlOps.OperationMap[ExecOperation]) - assert.NotNil(t, mysqlOps.OperationMap[QueryOperation]) - - assert.NotNil(t, mysqlOps.OperationMap[ListUsersOp]) - assert.NotNil(t, mysqlOps.OperationMap[CreateUserOp]) - assert.NotNil(t, mysqlOps.OperationMap[DeleteUserOp]) - assert.NotNil(t, mysqlOps.OperationMap[DescribeUserOp]) - assert.NotNil(t, mysqlOps.OperationMap[GrantUserRoleOp]) - assert.NotNil(t, mysqlOps.OperationMap[RevokeUserRoleOp]) + assert.NotNil(t, mysqlOps.LegacyOperations[GetRoleOperation]) + assert.NotNil(t, mysqlOps.LegacyOperations[CheckStatusOperation]) + assert.NotNil(t, mysqlOps.LegacyOperations[GetLagOperation]) + assert.NotNil(t, mysqlOps.LegacyOperations[ExecOperation]) + assert.NotNil(t, mysqlOps.LegacyOperations[QueryOperation]) + + assert.NotNil(t, mysqlOps.LegacyOperations[ListUsersOp]) + assert.NotNil(t, mysqlOps.LegacyOperations[CreateUserOp]) + assert.NotNil(t, mysqlOps.LegacyOperations[DeleteUserOp]) + assert.NotNil(t, mysqlOps.LegacyOperations[DescribeUserOp]) + assert.NotNil(t, mysqlOps.LegacyOperations[GrantUserRoleOp]) + assert.NotNil(t, mysqlOps.LegacyOperations[RevokeUserRoleOp]) // Clear out previously set viper variables viper.Reset() } diff --git a/cmd/probe/internal/binding/operation_volume_protection.go b/cmd/probe/internal/binding/operation_volume_protection.go new file mode 100644 index 000000000000..b651b2c951f7 --- /dev/null +++ b/cmd/probe/internal/binding/operation_volume_protection.go @@ -0,0 +1,399 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package binding + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/kit/logger" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + statsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + . "github.com/apecloud/kubeblocks/internal/sqlchannel/util" +) + +const ( + kubeletStatsSummaryURL = "https://%s:%s/stats/summary" + + certFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" + + envNamespace = "KB_NAMESPACE" + envHostIp = "KB_HOST_IP" + envNodeName = "KB_NODENAME" + envPodName = "KB_POD_NAME" + envPodUid = "KB_POD_UID" + envVolumesToProbe = "KB_VOLUME_PROTECTION_SPEC" + + reasonLock = "HighVolumeWatermark" + reasonUnlock = "LowVolumeWatermark" // TODO +) + +type operationVolumeProtection struct { + Logger logger.Logger + Client *http.Client + Request *http.Request + Pod string + VolumeProtectionSpec appsv1alpha1.VolumeProtectionSpec + VolumeStats map[string]statsv1alpha1.VolumeStats + Readonly bool +} + +func (o *operationVolumeProtection) Kind() bindings.OperationKind { + return VolumeProtection +} + +func (o *operationVolumeProtection) Init(metadata bindings.Metadata) error { + var err error + if o.Client, err = httpClient(); err != nil { + o.Logger.Warnf("build HTTP client error at setup: %s", err.Error()) + return err + } + + if o.Request, err = httpRequest(context.Background()); err != nil { + o.Logger.Warnf("build HTTP request error at setup, will try it later: %s", err.Error()) + } + + o.Pod = os.Getenv(envPodName) + if err = o.initVolumes(); err != nil { + o.Logger.Warnf("init volumes to monitor error: %s", err.Error()) + return err + } + return nil +} + +func (o *operationVolumeProtection) initVolumes() error { + spec := &o.VolumeProtectionSpec + if err := json.Unmarshal([]byte(os.Getenv(envVolumesToProbe)), spec); err != nil { + o.Logger.Warnf("unmarshal volume protection spec error: %s", err.Error()) + return err + } + normalizeWatermarks(&o.VolumeProtectionSpec.LowWatermark, &o.VolumeProtectionSpec.HighWatermark) + + if o.VolumeStats == nil { + o.VolumeStats = make(map[string]statsv1alpha1.VolumeStats) + } + for _, name := range spec.Volumes { + o.VolumeStats[name] = statsv1alpha1.VolumeStats{ + Name: name, + } + } + return nil +} + +func (o *operationVolumeProtection) Invoke(ctx context.Context, req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) error { + if o.disabled() { + return nil + } + if o.Client == nil { + return fmt.Errorf("HTTP client for kubelet is unavailable") + } + if o.Request == nil { + // try to build http request again + var err error + o.Request, err = httpRequest(ctx) + if err != nil { + o.Logger.Warnf("build HTTP request to query kubelet error: %s", err.Error()) + return err + } + } + + summary, err := o.request(ctx) + if err != nil { + o.Logger.Warnf("request stats summary from kubelet error: %s", err.Error()) + return err + } + + if err = o.updateVolumeStats(summary); err != nil { + return err + } + + // TODO: fillin the @rsp. + return o.checkUsage(ctx) +} + +func (o *operationVolumeProtection) disabled() bool { + skip := func(watermark int) bool { + return watermark <= 0 || watermark >= 100 + } + // TODO: check the role and skip secondary instances. + return len(o.Pod) == 0 || len(o.VolumeProtectionSpec.Volumes) == 0 || + skip(o.VolumeProtectionSpec.LowWatermark) || skip(o.VolumeProtectionSpec.HighWatermark) +} + +func (o *operationVolumeProtection) request(ctx context.Context) ([]byte, error) { + req := o.Request.WithContext(ctx) + rsp, err := o.Client.Do(req) + if err != nil { + o.Logger.Warnf("issue request to kubelet error: %s", err.Error()) + return nil, err + } + if rsp.StatusCode != 200 { + o.Logger.Warnf("HTTP response from kubelet error: %s", rsp.Status) + return nil, fmt.Errorf(rsp.Status) + } + + defer rsp.Body.Close() + return io.ReadAll(rsp.Body) +} + +func (o *operationVolumeProtection) updateVolumeStats(payload []byte) error { + summary := &statsv1alpha1.Summary{} + if err := json.Unmarshal(payload, summary); err != nil { + o.Logger.Warnf("stats summary obtained from kubelet error: %s", err.Error()) + return err + } + for _, pod := range summary.Pods { + if pod.PodRef.Name == o.Pod { + for _, stats := range pod.VolumeStats { + if _, ok := o.VolumeStats[stats.Name]; !ok { + continue + } + o.VolumeStats[stats.Name] = stats + } + break + } + } + return nil +} + +func (o *operationVolumeProtection) checkUsage(ctx context.Context) error { + lower := make([]string, 0) + higher := make([]string, 0) + for name, stats := range o.VolumeStats { + ret := o.checkVolumeWatermark(stats) + if ret == 0 { + continue + } + if ret < 0 { + lower = append(lower, name) + } else { + higher = append(higher, name) + } + } + + readonly := o.Readonly + // the instance is running normally and there have volume(s) over the space usage threshold. + if !readonly && len(higher) > 0 { + if err := o.highWatermark(ctx, higher); err != nil { + return err + } + } + // the instance is protected in RO mode, and all volumes' space usage are under the threshold. + if readonly && len(lower) == len(o.VolumeStats) { + if err := o.lowWatermark(ctx, lower); err != nil { + return err + } + } + return nil +} + +func (o *operationVolumeProtection) checkVolumeWatermark(stats statsv1alpha1.VolumeStats) int { + if stats.AvailableBytes == nil || stats.UsedBytes == nil { + return 0 + } + + lowThresholdBytes := *stats.AvailableBytes / 100 * uint64(o.VolumeProtectionSpec.LowWatermark) + if *stats.UsedBytes < lowThresholdBytes { + return -1 + } + highThresholdBytes := *stats.AvailableBytes / 100 * uint64(o.VolumeProtectionSpec.HighWatermark) + if *stats.UsedBytes > highThresholdBytes { + return 1 + } + return 0 +} + +func (o *operationVolumeProtection) highWatermark(ctx context.Context, volumes []string) error { + if o.Readonly { // double check + return nil + } + msg := o.buildEventMsg(volumes) + if err := o.lockInstance(ctx); err != nil { + o.Logger.Warnf("set instance to read-only error: %s, volumes: %s", err.Error(), msg) + return err + } + o.Logger.Infof("set instance to read-only OK: %s", msg) + if err := o.sendEvent(ctx, reasonLock, msg); err != nil { + o.Logger.Warnf("send volume protection (lock) event error: %s, volumes: %s", err.Error(), msg) + return err + } + o.Readonly = true + return nil +} + +func (o *operationVolumeProtection) lowWatermark(ctx context.Context, volumes []string) error { + if !o.Readonly { // double check + return nil + } + msg := o.buildEventMsg(volumes) + if err := o.unlockInstance(ctx); err != nil { + o.Logger.Warnf("reset instance to read-write error: %s, volumes: %s", err.Error(), msg) + return err + } + o.Logger.Infof("reset instance to read-write OK: %s", msg) + if err := o.sendEvent(ctx, reasonUnlock, msg); err != nil { + o.Logger.Warnf("send volume protection (unlock) event error: %s, volumes: %s", err.Error(), msg) + return err + } + o.Readonly = false + return nil +} + +func (o *operationVolumeProtection) lockInstance(ctx context.Context) error { + // TODO: impl, only set read-only for primary instance. + return nil +} + +func (o *operationVolumeProtection) unlockInstance(ctx context.Context) error { + // TODO: impl, only set read-write for primary instance. + return nil +} + +func (o *operationVolumeProtection) buildEventMsg(volumes []string) string { + usages := make(map[string]string) + for _, v := range volumes { + stats := o.VolumeStats[v] + usages[v] = fmt.Sprintf("%d%%", int(*stats.UsedBytes*100 / *stats.AvailableBytes)) + } + msg, _ := json.Marshal(usages) + return string(msg) +} + +func (o *operationVolumeProtection) sendEvent(ctx context.Context, reason, msg string) error { + return sendEvent(ctx, o.Logger, o.createEvent(reason, msg)) +} + +func (o *operationVolumeProtection) createEvent(reason, msg string) *corev1.Event { + return &corev1.Event{ + //Name: "", + //Namespace: "," + InvolvedObject: corev1.ObjectReference{ + Kind: "Pod", + Namespace: os.Getenv(envNamespace), + Name: os.Getenv(envPodName), + UID: types.UID(os.Getenv(envPodUid)), + FieldPath: "spec.containers{sqlchannel}", + }, + Reason: reason, + Message: msg, + Source: corev1.EventSource{ + Component: "sqlchannel", + Host: os.Getenv(envNodeName), + }, + FirstTimestamp: metav1.Now(), + LastTimestamp: metav1.Now(), + Type: "Normal", + } +} + +func httpClient() (*http.Client, error) { + cert, err := os.ReadFile(certFile) + if err != nil { + return nil, err + } + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(cert) + return &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: certPool, + }, + }, + }, nil +} + +func httpRequest(ctx context.Context) (*http.Request, error) { + host, err := kubeletEndpointHost(ctx) + if err != nil { + return nil, err + } + port, err := kubeletEndpointPort(ctx) + if err != nil { + return nil, err + } + url := fmt.Sprintf(kubeletStatsSummaryURL, host, port) + + accessToken, err := os.ReadFile(tokenFile) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + if len(accessToken) > 0 { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", accessToken)) + } + return req, nil +} + +func kubeletEndpointHost(ctx context.Context) (string, error) { + return os.Getenv(envHostIp), nil +} + +func kubeletEndpointPort(ctx context.Context) (string, error) { + config, err := rest.InClusterConfig() + if err != nil { + return "", err + } + + cliset, err := kubernetes.NewForConfig(config) + if err != nil { + return "", err + } + node, err := cliset.CoreV1().Nodes().Get(ctx, os.Getenv(envNodeName), metav1.GetOptions{}) + if err == nil { + return "", err + } + return string(node.Status.DaemonEndpoints.KubeletEndpoint.Port), nil +} + +func normalizeWatermarks(low, high *int) { + if *low < 0 || *low > 100 { + *low = 0 + } + if *high < 0 || *high > 100 { + *high = 0 + } + if *low == 0 && *high != 0 { + *low = *high + } + if *low != 0 && *high == 0 { + *high = *low + } + if *low > *high { + *low = *high + } +} diff --git a/cmd/probe/internal/binding/postgres/postgres_test.go b/cmd/probe/internal/binding/postgres/postgres_test.go index 19a1b136c9ba..0aaa07b29bf3 100644 --- a/cmd/probe/internal/binding/postgres/postgres_test.go +++ b/cmd/probe/internal/binding/postgres/postgres_test.go @@ -61,17 +61,17 @@ func TestOperations(t *testing.T) { assert.NotNil(t, pgOps.InitIfNeed) assert.NotNil(t, pgOps.GetRole) assert.Equal(t, 5432, pgOps.DBPort) - assert.NotNil(t, pgOps.OperationMap[GetRoleOperation]) - assert.NotNil(t, pgOps.OperationMap[ExecOperation]) - assert.NotNil(t, pgOps.OperationMap[QueryOperation]) - assert.NotNil(t, pgOps.OperationMap[CheckStatusOperation]) - - assert.NotNil(t, pgOps.OperationMap[ListUsersOp]) - assert.NotNil(t, pgOps.OperationMap[CreateUserOp]) - assert.NotNil(t, pgOps.OperationMap[DeleteUserOp]) - assert.NotNil(t, pgOps.OperationMap[DescribeUserOp]) - assert.NotNil(t, pgOps.OperationMap[GrantUserRoleOp]) - assert.NotNil(t, pgOps.OperationMap[RevokeUserRoleOp]) + assert.NotNil(t, pgOps.LegacyOperations[GetRoleOperation]) + assert.NotNil(t, pgOps.LegacyOperations[ExecOperation]) + assert.NotNil(t, pgOps.LegacyOperations[QueryOperation]) + assert.NotNil(t, pgOps.LegacyOperations[CheckStatusOperation]) + + assert.NotNil(t, pgOps.LegacyOperations[ListUsersOp]) + assert.NotNil(t, pgOps.LegacyOperations[CreateUserOp]) + assert.NotNil(t, pgOps.LegacyOperations[DeleteUserOp]) + assert.NotNil(t, pgOps.LegacyOperations[DescribeUserOp]) + assert.NotNil(t, pgOps.LegacyOperations[GrantUserRoleOp]) + assert.NotNil(t, pgOps.LegacyOperations[RevokeUserRoleOp]) } diff --git a/cmd/probe/internal/binding/redis/redis_test.go b/cmd/probe/internal/binding/redis/redis_test.go index 49e21a3f6f60..895121eff8c1 100644 --- a/cmd/probe/internal/binding/redis/redis_test.go +++ b/cmd/probe/internal/binding/redis/redis_test.go @@ -59,12 +59,12 @@ func TestRedisInit(t *testing.T) { defer r.Close() // make sure operations are inited assert.NotNil(t, r.client) - assert.NotNil(t, r.OperationMap[ListUsersOp]) - assert.NotNil(t, r.OperationMap[CreateUserOp]) - assert.NotNil(t, r.OperationMap[DeleteUserOp]) - assert.NotNil(t, r.OperationMap[DescribeUserOp]) - assert.NotNil(t, r.OperationMap[GrantUserRoleOp]) - assert.NotNil(t, r.OperationMap[RevokeUserRoleOp]) + assert.NotNil(t, r.LegacyOperations[ListUsersOp]) + assert.NotNil(t, r.LegacyOperations[CreateUserOp]) + assert.NotNil(t, r.LegacyOperations[DeleteUserOp]) + assert.NotNil(t, r.LegacyOperations[DescribeUserOp]) + assert.NotNil(t, r.LegacyOperations[GrantUserRoleOp]) + assert.NotNil(t, r.LegacyOperations[RevokeUserRoleOp]) } func TestRedisInvokeCreate(t *testing.T) { r, mock := mockRedisOps(t) diff --git a/cmd/probe/internal/binding/utils.go b/cmd/probe/internal/binding/utils.go index cbc2015a8031..f3be39202dfb 100644 --- a/cmd/probe/internal/binding/utils.go +++ b/cmd/probe/internal/binding/utils.go @@ -227,26 +227,7 @@ func SentProbeEvent(ctx context.Context, opsResult OpsResult, log logger.Logger) return } - config, err := rest.InClusterConfig() - if err != nil { - log.Errorf("get k8s client config failed: %v", err) - return - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Infof("k8s client create failed: %v", err) - return - } - namespace := os.Getenv("KB_NAMESPACE") - for i := 0; i < 30; i++ { - _, err = clientset.CoreV1().Events(namespace).Create(ctx, event, metav1.CreateOptions{}) - if err == nil { - break - } - log.Errorf("send event failed: %v", err) - time.Sleep(10 * time.Second) - } + sendEvent(ctx, log, event) } func createProbeEvent(opsResult OpsResult) (*corev1.Event, error) { @@ -254,13 +235,13 @@ func createProbeEvent(opsResult OpsResult) (*corev1.Event, error) { apiVersion: v1 kind: Event metadata: - name: {{ .PodName }}.{{ .EventSeq }} + name: {{ .Pod }}.{{ .EventSeq }} namespace: {{ .Namespace }} involvedObject: apiVersion: v1 fieldPath: spec.containers{sqlchannel} kind: Pod - name: {{ .PodName }} + name: {{ .Pod }} namespace: {{ .Namespace }} reason: RoleChanged type: Normal @@ -276,7 +257,7 @@ source: msg, _ := json.Marshal(opsResult) seq := rand.String(16) roleValue := map[string]string{ - "PodName": podName, + "Pod": podName, "Namespace": namespace, "EventSeq": seq, } @@ -304,3 +285,27 @@ source: return event, nil } + +func sendEvent(ctx context.Context, log logger.Logger, event *corev1.Event) error { + config, err := rest.InClusterConfig() + if err != nil { + log.Errorf("get k8s client config failed: %v", err) + return err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Infof("k8s client create failed: %v", err) + return err + } + namespace := os.Getenv("KB_NAMESPACE") + for i := 0; i < 30; i++ { + _, err = clientset.CoreV1().Events(namespace).Create(ctx, event, metav1.CreateOptions{}) + if err == nil { + break + } + log.Errorf("send event failed: %v", err) + time.Sleep(10 * time.Second) + } + return err +} diff --git a/controllers/k8score/event_controller.go b/controllers/k8score/event_controller.go index f544bb0e0e0c..1f24314d1802 100644 --- a/controllers/k8score/event_controller.go +++ b/controllers/k8score/event_controller.go @@ -21,45 +21,18 @@ package k8score import ( "context" - "encoding/json" - "strings" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" - "github.com/apecloud/kubeblocks/controllers/apps/components/consensus" - "github.com/apecloud/kubeblocks/controllers/apps/components/replication" - componentutil "github.com/apecloud/kubeblocks/controllers/apps/components/util" - "github.com/apecloud/kubeblocks/internal/constant" - "github.com/apecloud/kubeblocks/internal/controller/rsm" intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil" - probeutil "github.com/apecloud/kubeblocks/internal/sqlchannel/util" ) -type EventHandler interface { - Handle(client.Client, intctrlutil.RequestCtx, record.EventRecorder, *corev1.Event) error -} - -// RoleChangeEventHandler is the event handler for the role change event -type RoleChangeEventHandler struct{} - -// ProbeEventType defines the type of probe event. -type ProbeEventType string - -type ProbeMessage struct { - Event ProbeEventType `json:"event,omitempty"` - Message string `json:"message,omitempty"` - OriginalRole string `json:"originalRole,omitempty"` - Role string `json:"role,omitempty"` -} - // EventReconciler reconciles an Event object type EventReconciler struct { client.Client @@ -70,15 +43,6 @@ type EventReconciler struct { // events API only allows ready-only, create, patch // +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch -var EventHandlerMap = map[string]EventHandler{} - -var _ EventHandler = &RoleChangeEventHandler{} - -func init() { - EventHandlerMap["role-change-handler"] = &RoleChangeEventHandler{} - EventHandlerMap["rsm-event-handler"] = &rsm.PodRoleEventHandler{} -} - // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // @@ -114,94 +78,3 @@ func (r *EventReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&corev1.Event{}). Complete(r) } - -// Handle handles role changed event. -func (r *RoleChangeEventHandler) Handle(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) error { - if event.Reason != string(probeutil.CheckRoleOperation) { - return nil - } - var ( - err error - annotations = event.GetAnnotations() - ) - // filter role changed event that has been handled - if annotations != nil && annotations[roleChangedAnnotKey] == trueStr { - return nil - } - - if _, err = handleRoleChangedEvent(cli, reqCtx, recorder, event); err != nil { - return err - } - - // event order is crucial in role probing, but it's not guaranteed when controller restarted, so we have to mark them to be filtered - patch := client.MergeFrom(event.DeepCopy()) - if event.Annotations == nil { - event.Annotations = make(map[string]string, 0) - } - event.Annotations[roleChangedAnnotKey] = trueStr - return cli.Patch(reqCtx.Ctx, event, patch) -} - -// handleRoleChangedEvent handles role changed event and return role. -func handleRoleChangedEvent(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) (string, error) { - // parse probe event message - message := ParseProbeEventMessage(reqCtx, event) - if message == nil { - reqCtx.Log.Info("parse probe event message failed", "message", event.Message) - return "", nil - } - - // if probe event operation is not implemented, check role failed or invalid, ignore it - if message.Event == ProbeEventOperationNotImpl || message.Event == ProbeEventCheckRoleFailed || message.Event == ProbeEventRoleInvalid { - reqCtx.Log.Info("probe event failed", "message", message.Message) - return "", nil - } - role := strings.ToLower(message.Role) - - podName := types.NamespacedName{ - Namespace: event.InvolvedObject.Namespace, - Name: event.InvolvedObject.Name, - } - // get pod - pod := &corev1.Pod{} - if err := cli.Get(reqCtx.Ctx, podName, pod); err != nil { - return role, err - } - // event belongs to old pod with the same name, ignore it - if pod.UID != event.InvolvedObject.UID { - return role, nil - } - - // get cluster obj of the pod - cluster := &appsv1alpha1.Cluster{} - if err := cli.Get(reqCtx.Ctx, types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Labels[constant.AppInstanceLabelKey], - }, cluster); err != nil { - return role, err - } - reqCtx.Log.V(1).Info("handle role changed event", "event uid", event.UID, "cluster", cluster.Name, "pod", pod.Name, "role", role, "originalRole", message.OriginalRole) - compName, componentDef, err := componentutil.GetComponentInfoByPod(reqCtx.Ctx, cli, *cluster, pod) - if err != nil { - return role, err - } - switch componentDef.WorkloadType { - case appsv1alpha1.Consensus: - return role, consensus.UpdateConsensusSetRoleLabel(cli, reqCtx, componentDef, pod, role) - case appsv1alpha1.Replication: - return role, replication.HandleReplicationSetRoleChangeEvent(cli, reqCtx, cluster, compName, pod, role) - } - return role, nil -} - -// ParseProbeEventMessage parses probe event message. -func ParseProbeEventMessage(reqCtx intctrlutil.RequestCtx, event *corev1.Event) *ProbeMessage { - message := &ProbeMessage{} - err := json.Unmarshal([]byte(event.Message), message) - if err != nil { - // not role related message, ignore it - reqCtx.Log.Info("not role message", "message", event.Message, "error", err) - return nil - } - return message -} diff --git a/controllers/k8score/event_handler.go b/controllers/k8score/event_handler.go new file mode 100644 index 000000000000..21ec2b1cbeab --- /dev/null +++ b/controllers/k8score/event_handler.go @@ -0,0 +1,40 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package k8score + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/apecloud/kubeblocks/internal/controller/rsm" + intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil" +) + +type EventHandler interface { + Handle(client.Client, intctrlutil.RequestCtx, record.EventRecorder, *corev1.Event) error +} + +var EventHandlerMap = map[string]EventHandler{} + +func init() { + EventHandlerMap["role-change-handler"] = &RoleChangeEventHandler{} + EventHandlerMap["rsm-event-handler"] = &rsm.PodRoleEventHandler{} +} diff --git a/controllers/k8score/event_utils.go b/controllers/k8score/event_utils.go index c7e39afb9348..4e8d9ea59c56 100644 --- a/controllers/k8score/event_utils.go +++ b/controllers/k8score/event_utils.go @@ -20,9 +20,12 @@ along with this program. If not, see . package k8score import ( + "encoding/json" "time" corev1 "k8s.io/api/core/v1" + + intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil" ) // IsOvertimeEvent checks whether the duration of warning event reaches the threshold. @@ -33,3 +36,15 @@ func IsOvertimeEvent(event *corev1.Event, timeout time.Duration) bool { // Note: LastTimestamp/FirstTimestamp/Count/Source of event are deprecated in k8s v1.25 return event.LastTimestamp.After(event.FirstTimestamp.Add(timeout)) } + +// ParseProbeEventMessage parses probe event message. +func ParseProbeEventMessage(reqCtx intctrlutil.RequestCtx, event *corev1.Event) *ProbeMessage { + message := &ProbeMessage{} + err := json.Unmarshal([]byte(event.Message), message) + if err != nil { + // not role related message, ignore it + reqCtx.Log.Info("not role message", "message", event.Message, "error", err) + return nil + } + return message +} diff --git a/controllers/k8score/role_change_event_handler.go b/controllers/k8score/role_change_event_handler.go new file mode 100644 index 000000000000..3ed7d8de1e9a --- /dev/null +++ b/controllers/k8score/role_change_event_handler.go @@ -0,0 +1,121 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package k8score + +import ( + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + "github.com/apecloud/kubeblocks/controllers/apps/components/consensus" + "github.com/apecloud/kubeblocks/controllers/apps/components/replication" + componentutil "github.com/apecloud/kubeblocks/controllers/apps/components/util" + "github.com/apecloud/kubeblocks/internal/constant" + intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil" + probeutil "github.com/apecloud/kubeblocks/internal/sqlchannel/util" +) + +// RoleChangeEventHandler is the event handler for the role change event +type RoleChangeEventHandler struct{} + +var _ EventHandler = &RoleChangeEventHandler{} + +// Handle handles role changed event. +func (r *RoleChangeEventHandler) Handle(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) error { + if event.Reason != string(probeutil.CheckRoleOperation) { + return nil + } + var ( + err error + annotations = event.GetAnnotations() + ) + // filter role changed event that has been handled + if annotations != nil && annotations[roleChangedAnnotKey] == trueStr { + return nil + } + + if _, err = handleRoleChangedEvent(cli, reqCtx, recorder, event); err != nil { + return err + } + + // event order is crucial in role probing, but it's not guaranteed when controller restarted, so we have to mark them to be filtered + patch := client.MergeFrom(event.DeepCopy()) + if event.Annotations == nil { + event.Annotations = make(map[string]string, 0) + } + event.Annotations[roleChangedAnnotKey] = trueStr + return cli.Patch(reqCtx.Ctx, event, patch) +} + +// handleRoleChangedEvent handles role changed event and return role. +func handleRoleChangedEvent(cli client.Client, reqCtx intctrlutil.RequestCtx, recorder record.EventRecorder, event *corev1.Event) (string, error) { + // parse probe event message + message := ParseProbeEventMessage(reqCtx, event) + if message == nil { + reqCtx.Log.Info("parse probe event message failed", "message", event.Message) + return "", nil + } + + // if probe event operation is not implemented, check role failed or invalid, ignore it + if message.Event == ProbeEventOperationNotImpl || message.Event == ProbeEventCheckRoleFailed || message.Event == ProbeEventRoleInvalid { + reqCtx.Log.Info("probe event failed", "message", message.Message) + return "", nil + } + role := strings.ToLower(message.Role) + + podName := types.NamespacedName{ + Namespace: event.InvolvedObject.Namespace, + Name: event.InvolvedObject.Name, + } + // get pod + pod := &corev1.Pod{} + if err := cli.Get(reqCtx.Ctx, podName, pod); err != nil { + return role, err + } + // event belongs to old pod with the same name, ignore it + if pod.UID != event.InvolvedObject.UID { + return role, nil + } + + // get cluster obj of the pod + cluster := &appsv1alpha1.Cluster{} + if err := cli.Get(reqCtx.Ctx, types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Labels[constant.AppInstanceLabelKey], + }, cluster); err != nil { + return role, err + } + reqCtx.Log.V(1).Info("handle role changed event", "event uid", event.UID, "cluster", cluster.Name, "pod", pod.Name, "role", role, "originalRole", message.OriginalRole) + compName, componentDef, err := componentutil.GetComponentInfoByPod(reqCtx.Ctx, cli, *cluster, pod) + if err != nil { + return role, err + } + switch componentDef.WorkloadType { + case appsv1alpha1.Consensus: + return role, consensus.UpdateConsensusSetRoleLabel(cli, reqCtx, componentDef, pod, role) + case appsv1alpha1.Replication: + return role, replication.HandleReplicationSetRoleChangeEvent(cli, reqCtx, cluster, compName, pod, role) + } + return role, nil +} diff --git a/controllers/k8score/types.go b/controllers/k8score/types.go new file mode 100644 index 000000000000..00b93a161b07 --- /dev/null +++ b/controllers/k8score/types.go @@ -0,0 +1,30 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package k8score + +// ProbeEventType defines the type of probe event. +type ProbeEventType string + +type ProbeMessage struct { + Event ProbeEventType `json:"event,omitempty"` + Message string `json:"message,omitempty"` + OriginalRole string `json:"originalRole,omitempty"` + Role string `json:"role,omitempty"` +} diff --git a/go.mod b/go.mod index 10833cf30c2c..fed5ec88411b 100644 --- a/go.mod +++ b/go.mod @@ -86,19 +86,20 @@ require ( gopkg.in/inf.v0 v0.9.1 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.11.1 - k8s.io/api v0.26.1 + k8s.io/api v0.27.3 k8s.io/apiextensions-apiserver v0.26.1 - k8s.io/apimachinery v0.26.1 + k8s.io/apimachinery v0.27.3 k8s.io/cli-runtime v0.26.1 - k8s.io/client-go v0.26.1 + k8s.io/client-go v0.27.3 k8s.io/code-generator v0.26.1 - k8s.io/component-base v0.26.1 + k8s.io/component-base v0.27.3 k8s.io/cri-api v0.25.0 k8s.io/gengo v0.0.0-20220913193501-391367153a38 k8s.io/klog v1.0.0 k8s.io/klog/v2 v2.90.1 - k8s.io/kube-openapi v0.0.0-20230308215209-15aac26d736a + k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f k8s.io/kubectl v0.26.0 + k8s.io/kubelet v0.27.3 k8s.io/metrics v0.26.0 k8s.io/utils v0.0.0-20230209194617-a36077c30491 sigs.k8s.io/controller-runtime v0.14.4 diff --git a/go.sum b/go.sum index e093220e8e0c..70377d7f1cd7 100644 --- a/go.sum +++ b/go.sum @@ -2560,6 +2560,8 @@ k8s.io/api v0.20.4/go.mod h1:++lNL1AJMkDymriNniQsWRkMDzRaX2Y/POTUi8yvqYQ= k8s.io/api v0.20.6/go.mod h1:X9e8Qag6JV/bL5G6bU8sdVRltWKmdHsFUGS3eVndqE8= k8s.io/api v0.26.1 h1:f+SWYiPd/GsiWwVRz+NbFyCgvv75Pk9NK6dlkZgpCRQ= k8s.io/api v0.26.1/go.mod h1:xd/GBNgR0f707+ATNyPmQ1oyKSgndzXij81FzWGsejg= +k8s.io/api v0.27.3 h1:yR6oQXXnUEBWEWcvPWS0jQL575KoAboQPfJAuKNrw5Y= +k8s.io/api v0.27.3/go.mod h1:C4BNvZnQOF7JA/0Xed2S+aUyJSfTGkGFxLXz9MnpIpg= k8s.io/apiextensions-apiserver v0.26.1 h1:cB8h1SRk6e/+i3NOrQgSFij1B2S0Y0wDoNl66bn8RMI= k8s.io/apiextensions-apiserver v0.26.1/go.mod h1:AptjOSXDGuE0JICx/Em15PaoO7buLwTs0dGleIHixSM= k8s.io/apimachinery v0.19.0/go.mod h1:DnPGDnARWFvYa3pMHgSxtbZb7gpzzAZ1pTfaUNDVlmA= @@ -2568,6 +2570,8 @@ k8s.io/apimachinery v0.20.4/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRp k8s.io/apimachinery v0.20.6/go.mod h1:ejZXtW1Ra6V1O5H8xPBGz+T3+4gfkTCeExAHKU57MAc= k8s.io/apimachinery v0.26.1 h1:8EZ/eGJL+hY/MYCNwhmDzVqq2lPl3N3Bo8rvweJwXUQ= k8s.io/apimachinery v0.26.1/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74= +k8s.io/apimachinery v0.27.3 h1:Ubye8oBufD04l9QnNtW05idcOe9Z3GQN8+7PqmuVcUM= +k8s.io/apimachinery v0.27.3/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E= k8s.io/apiserver v0.20.1/go.mod h1:ro5QHeQkgMS7ZGpvf4tSMx6bBOgPfE+f52KwvXfScaU= k8s.io/apiserver v0.20.4/go.mod h1:Mc80thBKOyy7tbvFtB4kJv1kbdD0eIH8k8vianJcbFM= k8s.io/apiserver v0.20.6/go.mod h1:QIJXNt6i6JB+0YQRNcS0hdRHJlMhflFmsBDeSgT1r8Q= @@ -2581,6 +2585,8 @@ k8s.io/client-go v0.20.4/go.mod h1:LiMv25ND1gLUdBeYxBIwKpkSC5IsozMMmOOeSJboP+k= k8s.io/client-go v0.20.6/go.mod h1:nNQMnOvEUEsOzRRFIIkdmYOjAZrC8bgq0ExboWSU1I0= k8s.io/client-go v0.26.1 h1:87CXzYJnAMGaa/IDDfRdhTzxk/wzGZ+/HUQpqgVSZXU= k8s.io/client-go v0.26.1/go.mod h1:IWNSglg+rQ3OcvDkhY6+QLeasV4OYHDjdqeWkDQZwGE= +k8s.io/client-go v0.27.3 h1:7dnEGHZEJld3lYwxvLl7WoehK6lAq7GvgjxpA3nv1E8= +k8s.io/client-go v0.27.3/go.mod h1:2MBEKuTo6V1lbKy3z1euEGnhPfGZLKTS9tiJ2xodM48= k8s.io/code-generator v0.19.0/go.mod h1:moqLn7w0t9cMs4+5CQyxnfA/HV8MF6aAVENF+WZZhgk= k8s.io/code-generator v0.19.7/go.mod h1:lwEq3YnLYb/7uVXLorOJfxg+cUu2oihFhHZ0n9NIla0= k8s.io/code-generator v0.26.1 h1:dusFDsnNSKlMFYhzIM0jAO1OlnTN5WYwQQ+Ai12IIlo= @@ -2590,6 +2596,8 @@ k8s.io/component-base v0.20.4/go.mod h1:t4p9EdiagbVCJKrQ1RsA5/V4rFQNDfRlevJajlGw k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMYnrM= k8s.io/component-base v0.26.1 h1:4ahudpeQXHZL5kko+iDHqLj/FSGAEUnSVO0EBbgDd+4= k8s.io/component-base v0.26.1/go.mod h1:VHrLR0b58oC035w6YQiBSbtsf0ThuSwXP+p5dD/kAWU= +k8s.io/component-base v0.27.3 h1:g078YmdcdTfrCE4fFobt7qmVXwS8J/3cI1XxRi/2+6k= +k8s.io/component-base v0.27.3/go.mod h1:JNiKYcGImpQ44iwSYs6dysxzR9SxIIgQalk4HaCNVUY= k8s.io/component-helpers v0.26.0 h1:KNgwqs3EUdK0HLfW4GhnbD+q/Zl9U021VfIU7qoVYFk= k8s.io/component-helpers v0.26.0/go.mod h1:jHN01qS/Jdj95WCbTe9S2VZ9yxpxXNY488WjF+yW4fo= k8s.io/cri-api v0.17.3/go.mod h1:X1sbHmuXhwaHs9xxYffLqJogVsnI+f6cPRcgPel7ywM= @@ -2615,8 +2623,12 @@ k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM= k8s.io/kube-openapi v0.0.0-20230308215209-15aac26d736a h1:gmovKNur38vgoWfGtP5QOGNOA7ki4n6qNYoFAgMlNvg= k8s.io/kube-openapi v0.0.0-20230308215209-15aac26d736a/go.mod h1:y5VtZWM9sHHc2ZodIH/6SHzXj+TPU5USoA8lcIeKEKY= +k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= +k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= k8s.io/kubectl v0.26.0 h1:xmrzoKR9CyNdzxBmXV7jW9Ln8WMrwRK6hGbbf69o4T0= k8s.io/kubectl v0.26.0/go.mod h1:eInP0b+U9XUJWSYeU9XZnTA+cVYuWyl3iYPGtru0qhQ= +k8s.io/kubelet v0.27.3 h1:5WhTV1iiBu9q/rr+gvy65LQ+K/e7dmgcaYjys5ipLqY= +k8s.io/kubelet v0.27.3/go.mod h1:Mz42qgZZgWgPmOJEYaR5evmh+EoSwFzEvPBozA2y9mg= k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/metrics v0.26.0 h1:U/NzZHKDrIVGL93AUMRkqqXjOah3wGvjSnKmG/5NVCs= k8s.io/metrics v0.26.0/go.mod h1:cf5MlG4ZgWaEFZrR9+sOImhZ2ICMpIdNurA+D8snIs8= diff --git a/internal/cli/cmd/bench/template/bench_sysbench_template.cue b/internal/cli/cmd/bench/template/bench_sysbench_template.cue index 9f59cc8d8fb1..77bd43507395 100644 --- a/internal/cli/cmd/bench/template/bench_sysbench_template.cue +++ b/internal/cli/cmd/bench/template/bench_sysbench_template.cue @@ -17,11 +17,11 @@ // required, command line input options for parameters and flags options: { - flag: int + flag: int mode: string driver: string database: string - value: string + value: string } // required, k8s api resource content diff --git a/internal/constant/const.go b/internal/constant/const.go index 6db298d17fb3..79433c2f2beb 100644 --- a/internal/constant/const.go +++ b/internal/constant/const.go @@ -198,11 +198,12 @@ const ( const ( // Container port name - ProbeHTTPPortName = "probe-http-port" - ProbeGRPCPortName = "probe-grpc-port" - RoleProbeContainerName = "kb-checkrole" - StatusProbeContainerName = "kb-checkstatus" - RunningProbeContainerName = "kb-checkrunning" + ProbeHTTPPortName = "probe-http-port" + ProbeGRPCPortName = "probe-grpc-port" + RoleProbeContainerName = "kb-checkrole" + StatusProbeContainerName = "kb-checkstatus" + RunningProbeContainerName = "kb-checkrunning" + VolumeProtectionProbeContainerName = "kb-volume-protection" // the filedpath name used in event.InvolvedObject.FieldPath ProbeCheckRolePath = "spec.containers{" + RoleProbeContainerName + "}" diff --git a/internal/controller/component/component.go b/internal/controller/component/component.go index c0211f9af84b..bb79c465dd97 100644 --- a/internal/controller/component/component.go +++ b/internal/controller/component/component.go @@ -93,6 +93,7 @@ func buildComponent(reqCtx intctrlutil.RequestCtx, ConfigTemplates: clusterCompDefObj.ConfigSpecs, ScriptTemplates: clusterCompDefObj.ScriptSpecs, VolumeTypes: clusterCompDefObj.VolumeTypes, + VolumeProtection: clusterCompDefObj.VolumeProtectionSpec, CustomLabelSpecs: clusterCompDefObj.CustomLabelSpecs, SwitchoverSpec: clusterCompDefObj.SwitchoverSpec, StatefulSetWorkload: clusterCompDefObj.GetStatefulSetWorkload(), diff --git a/internal/controller/component/probe_utils.go b/internal/controller/component/probe_utils.go index f23d15bc0a2e..d13527892eee 100644 --- a/internal/controller/component/probe_utils.go +++ b/internal/controller/component/probe_utils.go @@ -38,9 +38,10 @@ import ( const ( // http://localhost:/v1.0/bindings/ - checkRoleURIFormat = "/v1.0/bindings/%s?operation=checkRole" - checkRunningURIFormat = "/v1.0/bindings/%s?operation=checkRunning" - checkStatusURIFormat = "/v1.0/bindings/%s?operation=checkStatus" + checkRoleURIFormat = "/v1.0/bindings/%s?operation=checkRole" + checkRunningURIFormat = "/v1.0/bindings/%s?operation=checkRunning" + checkStatusURIFormat = "/v1.0/bindings/%s?operation=checkStatus" + volumeProtectionURIFormat = "/v1.0/bindings/%s?operation=volumeProtection" ) var ( @@ -89,6 +90,13 @@ func buildProbeContainers(reqCtx intctrlutil.RequestCtx, component *SynthesizedC probeContainers = append(probeContainers, *runningProbeContainer) } + if componentProbes.VolumeProtectionProbe != nil && component.VolumeProtection != nil { + c := container.DeepCopy() + buildVolumeProtectionProbeContainer(component.CharacterType, c, *componentProbes.VolumeProtectionProbe, + *component.VolumeProtection, int(probeSvcHTTPPort)) + probeContainers = append(probeContainers, *c) + } + if len(probeContainers) >= 1 { container := &probeContainers[0] buildProbeServiceContainer(component, container, int(probeSvcHTTPPort), int(probeSvcGRPCPort)) @@ -228,3 +236,28 @@ func buildRunningProbeContainer(characterType string, runningProbeContainer *cor probe.FailureThreshold = probeSetting.FailureThreshold runningProbeContainer.StartupProbe.TCPSocket.Port = intstr.FromInt(probeSvcHTTPPort) } + +func buildVolumeProtectionProbeContainer(characterType string, c *corev1.Container, + probeSetting appsv1alpha1.ClusterDefinitionProbe, spec appsv1alpha1.VolumeProtectionSpec, probeSvcHTTPPort int) { + c.Name = constant.VolumeProtectionProbeContainerName + probe := c.ReadinessProbe + httpGet := &corev1.HTTPGetAction{} + httpGet.Path = fmt.Sprintf(volumeProtectionURIFormat, characterType) + httpGet.Port = intstr.FromInt(probeSvcHTTPPort) + probe.Exec = nil + probe.HTTPGet = httpGet + probe.PeriodSeconds = probeSetting.PeriodSeconds + probe.TimeoutSeconds = probeSetting.TimeoutSeconds + probe.FailureThreshold = probeSetting.FailureThreshold + c.StartupProbe.TCPSocket.Port = intstr.FromInt(probeSvcHTTPPort) + + // pass the volume protection spec to probe container through env. + value, err := json.Marshal(spec) + if err != nil { + panic(fmt.Sprintf("marshal volume protection spec error: %s", err.Error())) + } + c.Env = append(c.Env, corev1.EnvVar{ + Name: "KB_VOLUME_PROTECTION_SPEC", + Value: string(value), + }) +} diff --git a/internal/controller/component/type.go b/internal/controller/component/type.go index a8af05dac832..e09cf0db6d74 100644 --- a/internal/controller/component/type.go +++ b/internal/controller/component/type.go @@ -60,6 +60,7 @@ type SynthesizedComponent struct { TLS bool `json:"tls"` Issuer *v1alpha1.Issuer `json:"issuer,omitempty"` VolumeTypes []v1alpha1.VolumeTypeSpec `json:"volumeTypes,omitempty"` + VolumeProtection *v1alpha1.VolumeProtectionSpec `json:"volumeProtection,omitempty"` CustomLabelSpecs []v1alpha1.CustomLabelSpec `json:"customLabelSpecs,omitempty"` SwitchoverSpec *v1alpha1.SwitchoverSpec `json:"switchoverSpec,omitempty"` ComponentDef string `json:"componentDef,omitempty"` diff --git a/internal/sqlchannel/util/types.go b/internal/sqlchannel/util/types.go index 270530ec3d16..61a5ed5c687a 100644 --- a/internal/sqlchannel/util/types.go +++ b/internal/sqlchannel/util/types.go @@ -41,6 +41,7 @@ const ( ExecOperation bindings.OperationKind = "exec" QueryOperation bindings.OperationKind = "query" CloseOperation bindings.OperationKind = "close" + VolumeProtection bindings.OperationKind = "volumeProtection" // actions for cluster accounts management ListUsersOp bindings.OperationKind = "listUsers"