Skip to content

Commit

Permalink
feat: volume protection from space exhaustion
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Jun 27, 2023
1 parent 869cd4d commit 8edeb2c
Show file tree
Hide file tree
Showing 25 changed files with 805 additions and 212 deletions.
27 changes: 27 additions & 0 deletions apis/apps/v1alpha1/clusterdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,24 @@ type VolumeTypeSpec struct {
Type VolumeType `json:"type,omitempty"`
}

type VolumeProtectionSpec struct {
// +kubebuilder:validation:Maximum=100
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=90
// +optional
LowWatermark int `json:"lowWatermark,omitempty"`

// +kubebuilder:validation:Maximum=100
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=90
// +optional
HighWatermark int `json:"highWatermark,omitempty"`

// Volumes to monitor.
// +optional
Volumes []string `json:"volumes,omitempty"`
}

// ClusterComponentDefinition provides a workload component specification template,
// with attributes that strongly work with stateful workloads and day-2 operations
// behaviors.
Expand Down Expand Up @@ -364,6 +382,9 @@ type ClusterComponentDefinition struct {
// in particular, when workloadType=Replication, the command defined in switchoverSpec will only be executed under the condition of cluster.componentSpecs[x].SwitchPolicy.type=Noop.
// +optional
SwitchoverSpec *SwitchoverSpec `json:"switchoverSpec,omitempty"`

// +optional
VolumeProtectionSpec *VolumeProtectionSpec `json:"volumeProtectionSpec,omitempty"`
}

func (r *ClusterComponentDefinition) GetStatefulSetWorkload() StatefulSetWorkload {
Expand Down Expand Up @@ -621,6 +642,12 @@ type ClusterDefinitionProbes struct {
// +optional
RoleProbe *ClusterDefinitionProbe `json:"roleProbe,omitempty"`

// TODO: we should not provide this option for user.
// Probe to monitor the volume space usage and prohibit the instance from writing if the volume space usage is over the defined threshold.
// Reference ClusterDefinition.ClusterComponentDefinition.VolumeProtectionSpec for the threshold and volumes to monitor.
// +optional
VolumeProtectionProbe *ClusterDefinitionProbe `json:"volumeProtectionProbe,omitempty"`

// roleProbeTimeoutAfterPodsReady(in seconds), when all pods of the component are ready,
// it will detect whether the application is available in the pod.
// if pods exceed the InitializationTimeoutSeconds time without a role label,
Expand Down
60 changes: 47 additions & 13 deletions cmd/probe/internal/binding/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ import (
. "github.com/apecloud/kubeblocks/internal/sqlchannel/util"
)

type Operation func(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (OpsResult, error)
type LegacyOperation func(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (OpsResult, error)

type OpsResult map[string]interface{}

type Operation interface {
Kind() bindings.OperationKind
Init(metadata bindings.Metadata) error
Invoke(ctx context.Context, req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) error
}

// AccessMode defines SVC access mode enums.
// +enum
type AccessMode string
Expand Down Expand Up @@ -72,7 +78,8 @@ type BaseOperations struct {
Metadata bindings.Metadata
InitIfNeed func() bool
GetRole func(context.Context, *bindings.InvokeRequest, *bindings.InvokeResponse) (string, error)
OperationMap map[bindings.OperationKind]Operation
LegacyOperations map[bindings.OperationKind]LegacyOperation
Ops map[bindings.OperationKind]Operation
}

func init() {
Expand Down Expand Up @@ -102,34 +109,44 @@ func (ops *BaseOperations) Init(metadata bindings.Metadata) {
}
}
ops.Metadata = metadata
ops.OperationMap = map[bindings.OperationKind]Operation{
ops.LegacyOperations = map[bindings.OperationKind]LegacyOperation{
CheckRunningOperation: ops.CheckRunningOps,
CheckRoleOperation: ops.CheckRoleOps,
GetRoleOperation: ops.GetRoleOps,
VolumeProtection: ops.volumeProtection,
}
ops.Ops = map[bindings.OperationKind]Operation{
VolumeProtection: &operationVolumeProtection{},
}
ops.DBAddress = ops.getAddress()

for kind, op := range ops.Ops {
if err := op.Init(metadata); err != nil {
panic(fmt.Sprintf("init operation %s error: %s", kind, err.Error()))
}
}
}

func (ops *BaseOperations) RegisterOperation(opsKind bindings.OperationKind, operation Operation) {
if ops.OperationMap == nil {
ops.OperationMap = map[bindings.OperationKind]Operation{}
func (ops *BaseOperations) RegisterOperation(opsKind bindings.OperationKind, operation LegacyOperation) {
if ops.LegacyOperations == nil {
ops.LegacyOperations = map[bindings.OperationKind]LegacyOperation{}
}
ops.OperationMap[opsKind] = operation
ops.LegacyOperations[opsKind] = operation
}

// Operations returns list of operations supported by the binding.
func (ops *BaseOperations) Operations() []bindings.OperationKind {
opsKinds := make([]bindings.OperationKind, len(ops.OperationMap))
opsKinds := make([]bindings.OperationKind, len(ops.LegacyOperations))
i := 0
for opsKind := range ops.OperationMap {
for opsKind := range ops.LegacyOperations {
opsKinds[i] = opsKind
i++
}
return opsKinds
}

// getAddress returns component service address, if component is not listening on
// 127.0.0.1, the Operation needs to overwrite this function and set ops.DBAddress
// 127.0.0.1, the LegacyOperation needs to overwrite this function and set ops.DBAddress
func (ops *BaseOperations) getAddress() string {
return "127.0.0.1"
}
Expand All @@ -155,7 +172,7 @@ func (ops *BaseOperations) Invoke(ctx context.Context, req *bindings.InvokeReque
return resp, nil
}

operation, ok := ops.OperationMap[req.Operation]
operation, ok := ops.LegacyOperations[req.Operation]
opsRes := OpsResult{}
if !ok {
message := fmt.Sprintf("%v operation is not implemented for %v", req.Operation, ops.DBType)
Expand All @@ -180,8 +197,10 @@ func (ops *BaseOperations) Invoke(ctx context.Context, req *bindings.InvokeReque
if err != nil {
return nil, err
}
res, _ := json.Marshal(opsRes)
resp.Data = res
if opsRes != nil {
res, _ := json.Marshal(opsRes)
resp.Data = res
}

return updateRespMetadata()
}
Expand Down Expand Up @@ -268,6 +287,21 @@ func (ops *BaseOperations) GetRoleOps(ctx context.Context, req *bindings.InvokeR
return opsRes, nil
}

func (ops *BaseOperations) volumeProtection(ctx context.Context, req *bindings.InvokeRequest,
rsp *bindings.InvokeResponse) (OpsResult, error) {
return ops.fwdLegacyOperationCall(VolumeProtection, ctx, req, rsp)
}

func (ops *BaseOperations) fwdLegacyOperationCall(kind bindings.OperationKind, ctx context.Context,
req *bindings.InvokeRequest, rsp *bindings.InvokeResponse) (OpsResult, error) {
op, ok := ops.Ops[kind]
if !ok {
panic(fmt.Sprintf("unknown operation kind: %s", kind))
}
// since the rsp.Data has been set properly, it doesn't need to return a OpsResult here.
return nil, op.Invoke(ctx, req, rsp)
}

// Component may have some internal roles that needn't be exposed to end user,
// and not configured in cluster definition, e.g. ETCD's Candidate.
// roleValidate is used to filter the internal roles and decrease the number
Expand Down
2 changes: 1 addition & 1 deletion cmd/probe/internal/binding/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestOperations(t *testing.T) {
ops := p.Operations()

if len(ops) != 4 {
t.Errorf("p.OperationMap init failed: %s", p.OriRole)
t.Errorf("p.LegacyOperations init failed: %s", p.OriRole)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/probe/internal/binding/custom/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (h *HTTPCustom) Init(metadata bindings.Metadata) error {

h.BaseOperations.Init(metadata)
h.BaseOperations.GetRole = h.GetRole
h.OperationMap[CheckRoleOperation] = h.CheckRoleOps
h.LegacyOperations[CheckRoleOperation] = h.CheckRoleOps

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/probe/internal/binding/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (e *Etcd) Init(metadata bindings.Metadata) error {
e.DBType = "etcd"
e.InitIfNeed = e.initIfNeed
e.DBPort = e.GetRunningPort()
e.OperationMap[GetRoleOperation] = e.GetRoleOps
e.LegacyOperations[GetRoleOperation] = e.GetRoleOps
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/probe/internal/binding/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestInit(t *testing.T) {

assert.Equal(t, "kafka", kafkaOps.DBType)
assert.NotNil(t, kafkaOps.InitIfNeed)
assert.NotNil(t, kafkaOps.OperationMap[CheckStatusOperation])
assert.NotNil(t, kafkaOps.LegacyOperations[CheckStatusOperation])
}

func TestCheckStatusOps(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/probe/internal/binding/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (mongoOps *MongoDBOperations) Init(metadata bindings.Metadata) error {
mongoOps.InitIfNeed = mongoOps.initIfNeed
mongoOps.DBPort = mongoOps.GetRunningPort()
mongoOps.BaseOperations.GetRole = mongoOps.GetRole
mongoOps.OperationMap[GetRoleOperation] = mongoOps.GetRoleOps
mongoOps.LegacyOperations[GetRoleOperation] = mongoOps.GetRoleOps
return nil
}

Expand Down
24 changes: 12 additions & 12 deletions cmd/probe/internal/binding/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ func TestInit(t *testing.T) {
assert.NotNil(t, mysqlOps.InitIfNeed)
assert.NotNil(t, mysqlOps.GetRole)
assert.Equal(t, 3306, mysqlOps.DBPort)
assert.NotNil(t, mysqlOps.OperationMap[GetRoleOperation])
assert.NotNil(t, mysqlOps.OperationMap[CheckStatusOperation])
assert.NotNil(t, mysqlOps.OperationMap[GetLagOperation])
assert.NotNil(t, mysqlOps.OperationMap[ExecOperation])
assert.NotNil(t, mysqlOps.OperationMap[QueryOperation])

assert.NotNil(t, mysqlOps.OperationMap[ListUsersOp])
assert.NotNil(t, mysqlOps.OperationMap[CreateUserOp])
assert.NotNil(t, mysqlOps.OperationMap[DeleteUserOp])
assert.NotNil(t, mysqlOps.OperationMap[DescribeUserOp])
assert.NotNil(t, mysqlOps.OperationMap[GrantUserRoleOp])
assert.NotNil(t, mysqlOps.OperationMap[RevokeUserRoleOp])
assert.NotNil(t, mysqlOps.LegacyOperations[GetRoleOperation])
assert.NotNil(t, mysqlOps.LegacyOperations[CheckStatusOperation])
assert.NotNil(t, mysqlOps.LegacyOperations[GetLagOperation])
assert.NotNil(t, mysqlOps.LegacyOperations[ExecOperation])
assert.NotNil(t, mysqlOps.LegacyOperations[QueryOperation])

assert.NotNil(t, mysqlOps.LegacyOperations[ListUsersOp])
assert.NotNil(t, mysqlOps.LegacyOperations[CreateUserOp])
assert.NotNil(t, mysqlOps.LegacyOperations[DeleteUserOp])
assert.NotNil(t, mysqlOps.LegacyOperations[DescribeUserOp])
assert.NotNil(t, mysqlOps.LegacyOperations[GrantUserRoleOp])
assert.NotNil(t, mysqlOps.LegacyOperations[RevokeUserRoleOp])
// Clear out previously set viper variables
viper.Reset()
}
Expand Down
Loading

0 comments on commit 8edeb2c

Please sign in to comment.