Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: volume protection from space exhaustion #3988

Merged
merged 32 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fab09df
feat: volume protection from space exhaustion
leon-inf Jun 27, 2023
0bf0210
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jun 27, 2023
4e93a0c
fix compile
leon-inf Jun 27, 2023
28efa50
chore: auto update cli doc changes
leon-inf Jun 27, 2023
66c6bb1
fix compile
leon-inf Jun 27, 2023
2941953
Merge branch 'feature/volume-protection-from-exhaustion' of https://g…
leon-inf Jun 27, 2023
1bd2561
fix
leon-inf Jun 28, 2023
8e49f8d
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jun 28, 2023
759a2e6
fix
leon-inf Jun 29, 2023
5c50cc4
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jun 29, 2023
849534f
create and delete cluster role and binding
leon-inf Jun 29, 2023
27c6298
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jul 3, 2023
f1fffff
remove volume protection probe from cluster definition
leon-inf Jul 3, 2023
781561c
per volume threshold setting
leon-inf Jul 3, 2023
d23a0f5
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jul 4, 2023
7927f1d
fix nil pointer
leon-inf Jul 4, 2023
6e8bcb3
add test
leon-inf Jul 5, 2023
5dfdcaa
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jul 12, 2023
0df5df9
chore: auto update cli doc changes
leon-inf Jul 12, 2023
c9c9690
revert
leon-inf Jul 12, 2023
7c77381
add cluster role for node
leon-inf Jul 12, 2023
4486ce7
enable volume protection when rbac manager is enabled
leon-inf Jul 12, 2023
c9ffa8c
fix typo
leon-inf Jul 12, 2023
0dede0f
fix default and disable value for volume high watermark
leon-inf Jul 12, 2023
c0c60ac
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jul 12, 2023
b108cbe
fix static check error
leon-inf Jul 12, 2023
1dd0b87
add test for build probe container
leon-inf Jul 13, 2023
14e25e2
add rules to kblit rbac tpl
leon-inf Jul 13, 2023
3db6069
Revert "add rules to kblit rbac tpl"
leon-inf Jul 13, 2023
bcc09e7
enable all cases
leon-inf Jul 13, 2023
9c89e6d
builtin lock/unlock action for engines
leon-inf Jul 13, 2023
2834c5c
Merge branch 'main' into feature/volume-protection-from-exhaustion
leon-inf Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions apis/apps/v1alpha1/clusterdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,36 @@ type VolumeTypeSpec struct {
Type VolumeType `json:"type,omitempty"`
}

type VolumeProtectionSpec struct {
// The high watermark threshold for volume space usage.
// If there is any specified volumes who's space usage is over the threshold, the pre-defined "LOCK" action
// will be triggered to degrade the service to protect volume from space exhaustion, such as to set the instance
// as read-only. And after that, if all volumes' space usage drops under the threshold later, the pre-defined
// "UNLOCK" action will be performed to recover the service normally.
// +kubebuilder:validation:Maximum=100
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=90
// +optional
HighWatermark int `json:"highWatermark,omitempty"`

// Volumes to protect.
// +optional
Volumes []ProtectedVolume `json:"volumes,omitempty"`
}

type ProtectedVolume struct {
// Name of volume to protect.
// +optional
Name string `json:"name,omitempty"`

// Volume specified high watermark threshold, it will override the component level threshold.
// If the value is invalid, it will be ignored and the component level threshold will be used.
// +kubebuilder:validation:Maximum=100
// +kubebuilder:validation:Minimum=0
// +optional
HighWatermark *int `json:"highWatermark,omitempty"`
}

// ClusterComponentDefinition provides a workload component specification template,
// with attributes that strongly work with stateful workloads and day-2 operations
// behaviors.
Expand Down Expand Up @@ -367,6 +397,10 @@ type ClusterComponentDefinition struct {
// in particular, when workloadType=Replication, the command defined in switchoverSpec will only be executed under the condition of cluster.componentSpecs[x].SwitchPolicy.type=Noop.
// +optional
SwitchoverSpec *SwitchoverSpec `json:"switchoverSpec,omitempty"`

// +optional
VolumeProtectionSpec *VolumeProtectionSpec `json:"volumeProtectionSpec,omitempty"`

// componentDefRef is used to inject values from other components into the current component.
// values will be saved and updated in a configmap and mounted to the current component.
// +patchMergeKey=componentDefName
Expand Down
47 changes: 47 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func init() {

viper.SetDefault(constant.CfgKeyCtrlrReconcileRetryDurationMS, 1000)
viper.SetDefault("CERT_DIR", "/tmp/k8s-webhook-server/serving-certs")
viper.SetDefault("ENABLE_RBAC_MANAGER", true)
viper.SetDefault(constant.EnableRBACManager, true)
viper.SetDefault("VOLUMESNAPSHOT", false)
viper.SetDefault("VOLUMESNAPSHOT_API_BETA", false)
viper.SetDefault(constant.KBToolsImage, "apecloud/kubeblocks-tools:latest")
Expand Down
64 changes: 51 additions & 13 deletions cmd/probe/internal/binding/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ import (
. "github.com/apecloud/kubeblocks/internal/sqlchannel/util"
)

type Operation func(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (OpsResult, error)
type LegacyOperation func(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (OpsResult, error)

type OpsResult map[string]interface{}

type Operation interface {
Kind() bindings.OperationKind
Init(metadata bindings.Metadata) error
Invoke(ctx context.Context, req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) error
}

// AccessMode defines SVC access mode enums.
// +enum
type AccessMode string
Expand Down Expand Up @@ -72,7 +78,11 @@ type BaseOperations struct {
Metadata bindings.Metadata
InitIfNeed func() bool
GetRole func(context.Context, *bindings.InvokeRequest, *bindings.InvokeResponse) (string, error)
OperationMap map[bindings.OperationKind]Operation
// TODO: need a better way to support the extension for engines.
LockInstance func(ctx context.Context) error
UnlockInstance func(ctx context.Context) error
LegacyOperations map[bindings.OperationKind]LegacyOperation
Ops map[bindings.OperationKind]Operation
}

func init() {
Expand Down Expand Up @@ -102,34 +112,45 @@ func (ops *BaseOperations) Init(metadata bindings.Metadata) {
}
}
ops.Metadata = metadata
ops.OperationMap = map[bindings.OperationKind]Operation{
ops.LegacyOperations = map[bindings.OperationKind]LegacyOperation{
CheckRunningOperation: ops.CheckRunningOps,
CheckRoleOperation: ops.CheckRoleOps,
GetRoleOperation: ops.GetRoleOps,
VolumeProtection: ops.volumeProtection,
}
ops.Ops = map[bindings.OperationKind]Operation{
VolumeProtection: newVolumeProtectionOperation(ops.Logger, ops),
}
ops.DBAddress = ops.getAddress()

for kind, op := range ops.Ops {
if err := op.Init(metadata); err != nil {
ops.Logger.Warnf("init operation %s error: %s", kind, err.Error())
// panic(fmt.Sprintf("init operation %s error: %s", kind, err.Error()))
}
}
}

func (ops *BaseOperations) RegisterOperation(opsKind bindings.OperationKind, operation Operation) {
if ops.OperationMap == nil {
ops.OperationMap = map[bindings.OperationKind]Operation{}
func (ops *BaseOperations) RegisterOperation(opsKind bindings.OperationKind, operation LegacyOperation) {
if ops.LegacyOperations == nil {
ops.LegacyOperations = map[bindings.OperationKind]LegacyOperation{}
}
ops.OperationMap[opsKind] = operation
ops.LegacyOperations[opsKind] = operation
}

// Operations returns list of operations supported by the binding.
func (ops *BaseOperations) Operations() []bindings.OperationKind {
opsKinds := make([]bindings.OperationKind, len(ops.OperationMap))
opsKinds := make([]bindings.OperationKind, len(ops.LegacyOperations))
i := 0
for opsKind := range ops.OperationMap {
for opsKind := range ops.LegacyOperations {
opsKinds[i] = opsKind
i++
}
return opsKinds
}

// getAddress returns component service address, if component is not listening on
// 127.0.0.1, the Operation needs to overwrite this function and set ops.DBAddress
// 127.0.0.1, the LegacyOperation needs to overwrite this function and set ops.DBAddress
func (ops *BaseOperations) getAddress() string {
return "127.0.0.1"
}
Expand All @@ -155,7 +176,7 @@ func (ops *BaseOperations) Invoke(ctx context.Context, req *bindings.InvokeReque
return resp, nil
}

operation, ok := ops.OperationMap[req.Operation]
operation, ok := ops.LegacyOperations[req.Operation]
opsRes := OpsResult{}
if !ok {
message := fmt.Sprintf("%v operation is not implemented for %v", req.Operation, ops.DBType)
Expand All @@ -180,8 +201,10 @@ func (ops *BaseOperations) Invoke(ctx context.Context, req *bindings.InvokeReque
if err != nil {
return nil, err
}
res, _ := json.Marshal(opsRes)
resp.Data = res
if opsRes != nil {
res, _ := json.Marshal(opsRes)
resp.Data = res
}

return updateRespMetadata()
}
Expand Down Expand Up @@ -268,6 +291,21 @@ func (ops *BaseOperations) GetRoleOps(ctx context.Context, req *bindings.InvokeR
return opsRes, nil
}

func (ops *BaseOperations) volumeProtection(ctx context.Context, req *bindings.InvokeRequest,
rsp *bindings.InvokeResponse) (OpsResult, error) {
return ops.fwdLegacyOperationCall(VolumeProtection, ctx, req, rsp)
}

func (ops *BaseOperations) fwdLegacyOperationCall(kind bindings.OperationKind, ctx context.Context,
req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) (OpsResult, error) {
op, ok := ops.Ops[kind]
if !ok {
panic(fmt.Sprintf("unknown operation kind: %s", kind))
}
// since the rsp.Data has been set properly, it doesn't need to return a OpsResult here.
return nil, op.Invoke(ctx, req, rsp)
}

// Component may have some internal roles that needn't be exposed to end user,
// and not configured in cluster definition, e.g. ETCD's Candidate.
// roleValidate is used to filter the internal roles and decrease the number
Expand Down
4 changes: 2 additions & 2 deletions cmd/probe/internal/binding/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func TestOperations(t *testing.T) {
p.Init(bindings.Metadata{})
ops := p.Operations()

if len(ops) != 4 {
t.Errorf("p.OperationMap init failed: %s", p.OriRole)
if len(ops) != 5 {
t.Errorf("p.LegacyOperations init failed: %s", p.OriRole)
}
}

Expand Down
14 changes: 13 additions & 1 deletion cmd/probe/internal/binding/custom/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func (h *HTTPCustom) Init(metadata bindings.Metadata) error {

h.BaseOperations.Init(metadata)
h.BaseOperations.GetRole = h.GetRole
h.OperationMap[CheckRoleOperation] = h.CheckRoleOps
h.BaseOperations.LockInstance = h.LockInstance
h.BaseOperations.UnlockInstance = h.UnlockInstance
h.LegacyOperations[CheckRoleOperation] = h.CheckRoleOps

return nil
}
Expand Down Expand Up @@ -114,6 +116,16 @@ func (h *HTTPCustom) GetRoleOps(ctx context.Context, req *bindings.InvokeRequest
return opsRes, nil
}

func (h *HTTPCustom) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (h *HTTPCustom) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

// callAction performs an HTTP request to local HTTP endpoint specified by actionSvcPort
func (h *HTTPCustom) callAction(ctx context.Context, url string) (string, error) {
// compose http request
Expand Down
15 changes: 14 additions & 1 deletion cmd/probe/internal/binding/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package etcd

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -60,7 +61,9 @@ func (e *Etcd) Init(metadata bindings.Metadata) error {
e.InitIfNeed = e.initIfNeed
e.DBPort = e.GetRunningPort()
e.BaseOperations.GetRole = e.GetRole
e.OperationMap[GetRoleOperation] = e.GetRoleOps
e.BaseOperations.LockInstance = e.LockInstance
e.BaseOperations.UnlockInstance = e.UnlockInstance
e.LegacyOperations[GetRoleOperation] = e.GetRoleOps
return nil
}

Expand Down Expand Up @@ -131,6 +134,16 @@ func (e *Etcd) GetRoleOps(ctx context.Context, req *bindings.InvokeRequest, resp
return opsRes, nil
}

func (e *Etcd) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (e *Etcd) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (e *Etcd) GetRunningPort() int {
index := strings.Index(e.endpoint, ":")
if index < 0 {
Expand Down
13 changes: 13 additions & 0 deletions cmd/probe/internal/binding/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package kafka

import (
"context"
"fmt"
"strings"
"sync"

Expand Down Expand Up @@ -64,6 +65,8 @@ func (kafkaOps *KafkaOperations) Init(metadata bindings.Metadata) error {
kafkaOps.DBType = "kafka"
kafkaOps.InitIfNeed = kafkaOps.initIfNeed
// kafkaOps.BaseOperations.GetRole = kafkaOps.GetRole
kafkaOps.BaseOperations.LockInstance = kafkaOps.LockInstance
kafkaOps.BaseOperations.UnlockInstance = kafkaOps.UnlockInstance
// kafkaOps.DBPort = kafkaOps.GetRunningPort()
// kafkaOps.RegisterOperation(GetRoleOperation, kafkaOps.GetRoleOps)
// kafkaOps.RegisterOperation(GetLagOperation, kafkaOps.GetLagOps)
Expand Down Expand Up @@ -132,3 +135,13 @@ func (kafkaOps *KafkaOperations) CheckStatusOps(ctx context.Context, req *bindin

return result, nil
}

func (kafkaOps *KafkaOperations) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (kafkaOps *KafkaOperations) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}
2 changes: 1 addition & 1 deletion cmd/probe/internal/binding/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestInit(t *testing.T) {

assert.Equal(t, "kafka", kafkaOps.DBType)
assert.NotNil(t, kafkaOps.InitIfNeed)
assert.NotNil(t, kafkaOps.OperationMap[CheckStatusOperation])
assert.NotNil(t, kafkaOps.LegacyOperations[CheckStatusOperation])
}

func TestCheckStatusOps(t *testing.T) {
Expand Down
14 changes: 13 additions & 1 deletion cmd/probe/internal/binding/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ func (mongoOps *MongoDBOperations) Init(metadata bindings.Metadata) error {
mongoOps.InitIfNeed = mongoOps.initIfNeed
mongoOps.DBPort = mongoOps.GetRunningPort()
mongoOps.BaseOperations.GetRole = mongoOps.GetRole
mongoOps.OperationMap[GetRoleOperation] = mongoOps.GetRoleOps
mongoOps.BaseOperations.LockInstance = mongoOps.LockInstance
mongoOps.BaseOperations.UnlockInstance = mongoOps.UnlockInstance
mongoOps.LegacyOperations[GetRoleOperation] = mongoOps.GetRoleOps
return nil
}

Expand Down Expand Up @@ -264,6 +266,16 @@ func (mongoOps *MongoDBOperations) GetRoleOps(ctx context.Context, req *bindings
return opsRes, nil
}

func (mongoOps *MongoDBOperations) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (mongoOps *MongoDBOperations) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (mongoOps *MongoDBOperations) StatusCheck(ctx context.Context, cmd string, response *bindings.InvokeResponse) (OpsResult, error) {
// TODO implement me when proposal is passed
return nil, nil
Expand Down
Loading