Skip to content

Commit

Permalink
builtin lock/unlock action for engines
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Jul 13, 2023
1 parent bcc09e7 commit 9c89e6d
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 12 deletions.
9 changes: 5 additions & 4 deletions cmd/probe/internal/binding/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ type BaseOperations struct {
Metadata bindings.Metadata
InitIfNeed func() bool
GetRole func(context.Context, *bindings.InvokeRequest, *bindings.InvokeResponse) (string, error)
LockInstance func(ctx context.Context) error
UnlockInstance func(ctx context.Context) error
LegacyOperations map[bindings.OperationKind]LegacyOperation
Ops map[bindings.OperationKind]Operation
// TODO: need a better way to support the extension for engines.
LockInstance func(ctx context.Context) error
UnlockInstance func(ctx context.Context) error
LegacyOperations map[bindings.OperationKind]LegacyOperation
Ops map[bindings.OperationKind]Operation
}

func init() {
Expand Down
12 changes: 12 additions & 0 deletions cmd/probe/internal/binding/custom/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (h *HTTPCustom) Init(metadata bindings.Metadata) error {

h.BaseOperations.Init(metadata)
h.BaseOperations.GetRole = h.GetRole
h.BaseOperations.LockInstance = h.LockInstance
h.BaseOperations.UnlockInstance = h.UnlockInstance
h.LegacyOperations[CheckRoleOperation] = h.CheckRoleOps

return nil
Expand Down Expand Up @@ -114,6 +116,16 @@ func (h *HTTPCustom) GetRoleOps(ctx context.Context, req *bindings.InvokeRequest
return opsRes, nil
}

func (h *HTTPCustom) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (h *HTTPCustom) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

// callAction performs an HTTP request to local HTTP endpoint specified by actionSvcPort
func (h *HTTPCustom) callAction(ctx context.Context, url string) (string, error) {
// compose http request
Expand Down
13 changes: 13 additions & 0 deletions cmd/probe/internal/binding/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package etcd

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -60,6 +61,8 @@ func (e *Etcd) Init(metadata bindings.Metadata) error {
e.InitIfNeed = e.initIfNeed
e.DBPort = e.GetRunningPort()
e.BaseOperations.GetRole = e.GetRole
e.BaseOperations.LockInstance = e.LockInstance
e.BaseOperations.UnlockInstance = e.UnlockInstance
e.LegacyOperations[GetRoleOperation] = e.GetRoleOps
return nil
}
Expand Down Expand Up @@ -131,6 +134,16 @@ func (e *Etcd) GetRoleOps(ctx context.Context, req *bindings.InvokeRequest, resp
return opsRes, nil
}

func (e *Etcd) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (e *Etcd) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (e *Etcd) GetRunningPort() int {
index := strings.Index(e.endpoint, ":")
if index < 0 {
Expand Down
13 changes: 13 additions & 0 deletions cmd/probe/internal/binding/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package kafka

import (
"context"
"fmt"
"strings"
"sync"

Expand Down Expand Up @@ -64,6 +65,8 @@ func (kafkaOps *KafkaOperations) Init(metadata bindings.Metadata) error {
kafkaOps.DBType = "kafka"
kafkaOps.InitIfNeed = kafkaOps.initIfNeed
// kafkaOps.BaseOperations.GetRole = kafkaOps.GetRole
kafkaOps.BaseOperations.LockInstance = kafkaOps.LockInstance
kafkaOps.BaseOperations.UnlockInstance = kafkaOps.UnlockInstance
// kafkaOps.DBPort = kafkaOps.GetRunningPort()
// kafkaOps.RegisterOperation(GetRoleOperation, kafkaOps.GetRoleOps)
// kafkaOps.RegisterOperation(GetLagOperation, kafkaOps.GetLagOps)
Expand Down Expand Up @@ -132,3 +135,13 @@ func (kafkaOps *KafkaOperations) CheckStatusOps(ctx context.Context, req *bindin

return result, nil
}

func (kafkaOps *KafkaOperations) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (kafkaOps *KafkaOperations) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}
12 changes: 12 additions & 0 deletions cmd/probe/internal/binding/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func (mongoOps *MongoDBOperations) Init(metadata bindings.Metadata) error {
mongoOps.InitIfNeed = mongoOps.initIfNeed
mongoOps.DBPort = mongoOps.GetRunningPort()
mongoOps.BaseOperations.GetRole = mongoOps.GetRole
mongoOps.BaseOperations.LockInstance = mongoOps.LockInstance
mongoOps.BaseOperations.UnlockInstance = mongoOps.UnlockInstance
mongoOps.LegacyOperations[GetRoleOperation] = mongoOps.GetRoleOps
return nil
}
Expand Down Expand Up @@ -264,6 +266,16 @@ func (mongoOps *MongoDBOperations) GetRoleOps(ctx context.Context, req *bindings
return opsRes, nil
}

func (mongoOps *MongoDBOperations) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (mongoOps *MongoDBOperations) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (mongoOps *MongoDBOperations) StatusCheck(ctx context.Context, cmd string, response *bindings.InvokeResponse) (OpsResult, error) {
// TODO implement me when proposal is passed
return nil, nil
Expand Down
10 changes: 2 additions & 8 deletions cmd/probe/internal/binding/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,13 @@ func (mysqlOps *MysqlOperations) GetRole(ctx context.Context, request *bindings.
func (mysqlOps *MysqlOperations) LockInstance(ctx context.Context) error {
sql := "set global read_only=1"
_, err := mysqlOps.db.ExecContext(ctx, sql)
if err != nil {
return err
}
return nil
return err
}

func (mysqlOps *MysqlOperations) UnlockInstance(ctx context.Context) error {
sql := "set global read_only=0"
_, err := mysqlOps.db.ExecContext(ctx, sql)
if err != nil {
return err
}
return nil
return err
}

func (mysqlOps *MysqlOperations) ExecOps(ctx context.Context, req *bindings.InvokeRequest, resp *bindings.InvokeResponse) (OpsResult, error) {
Expand Down
16 changes: 16 additions & 0 deletions cmd/probe/internal/binding/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (pgOps *PostgresOperations) Init(metadata bindings.Metadata) error {
pgOps.DBType = "postgres"
pgOps.InitIfNeed = pgOps.initIfNeed
pgOps.BaseOperations.GetRole = pgOps.GetRole
pgOps.BaseOperations.LockInstance = pgOps.LockInstance
pgOps.BaseOperations.UnlockInstance = pgOps.UnlockInstance
pgOps.DBPort = pgOps.GetRunningPort()
pgOps.RegisterOperation(GetRoleOperation, pgOps.GetRoleOps)
// pgOps.RegisterOperation(GetLagOperation, pgOps.GetLagOps)
Expand Down Expand Up @@ -224,6 +226,20 @@ func (pgOps *PostgresOperations) GetRole(ctx context.Context, request *bindings.
return "", errors.Errorf("exec sql %s failed: no data returned", sql)
}

func (pgOps *PostgresOperations) LockInstance(ctx context.Context) error {
// sql := "alter system set default_transaction_read_only=on; select pg_reload_conf();"
// _, err := pgOps.exec(ctx, sql)
// return err
return fmt.Errorf("NotSupported")
}

func (pgOps *PostgresOperations) UnlockInstance(ctx context.Context) error {
// sql := "alter system set default_transaction_read_only=off; select pg_reload_conf();"
// _, err := pgOps.exec(ctx, sql)
// return err
return fmt.Errorf("NotSupported")
}

func (pgOps *PostgresOperations) ExecOps(ctx context.Context, req *bindings.InvokeRequest, resp *bindings.InvokeResponse) (OpsResult, error) {
result := OpsResult{}
sql, ok := req.Metadata["sql"]
Expand Down
12 changes: 12 additions & 0 deletions cmd/probe/internal/binding/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) {
r.DBType = "redis"
r.InitIfNeed = r.initIfNeed
r.BaseOperations.GetRole = r.GetRole
r.BaseOperations.LockInstance = r.LockInstance
r.BaseOperations.UnlockInstance = r.UnlockInstance

// register redis operations
r.RegisterOperation(bindings.CreateOperation, r.createOps)
Expand Down Expand Up @@ -606,6 +608,16 @@ func (r *Redis) GetRole(ctx context.Context, request *bindings.InvokeRequest, re
return role, nil
}

func (r *Redis) LockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func (r *Redis) UnlockInstance(ctx context.Context) error {
// TODO: impl
return fmt.Errorf("NotSupported")
}

func defaultRedisEntryParser(req *bindings.InvokeRequest, object *RedisEntry) error {
if req == nil || req.Metadata == nil {
return fmt.Errorf("no metadata provided")
Expand Down

0 comments on commit 9c89e6d

Please sign in to comment.