From 9c89e6d77f2980f8b30583ac0b53a717e51ba385 Mon Sep 17 00:00:00 2001 From: Leon Date: Thu, 13 Jul 2023 14:19:26 +0800 Subject: [PATCH] builtin lock/unlock action for engines --- cmd/probe/internal/binding/base.go | 9 +++++---- cmd/probe/internal/binding/custom/custom.go | 12 ++++++++++++ cmd/probe/internal/binding/etcd/etcd.go | 13 +++++++++++++ cmd/probe/internal/binding/kafka/kafka.go | 13 +++++++++++++ cmd/probe/internal/binding/mongodb/mongodb.go | 12 ++++++++++++ cmd/probe/internal/binding/mysql/mysql.go | 10 ++-------- cmd/probe/internal/binding/postgres/postgres.go | 16 ++++++++++++++++ cmd/probe/internal/binding/redis/redis.go | 12 ++++++++++++ 8 files changed, 85 insertions(+), 12 deletions(-) diff --git a/cmd/probe/internal/binding/base.go b/cmd/probe/internal/binding/base.go index 5d597e2789c..9ee1c4891b6 100644 --- a/cmd/probe/internal/binding/base.go +++ b/cmd/probe/internal/binding/base.go @@ -78,10 +78,11 @@ type BaseOperations struct { Metadata bindings.Metadata InitIfNeed func() bool GetRole func(context.Context, *bindings.InvokeRequest, *bindings.InvokeResponse) (string, error) - LockInstance func(ctx context.Context) error - UnlockInstance func(ctx context.Context) error - LegacyOperations map[bindings.OperationKind]LegacyOperation - Ops map[bindings.OperationKind]Operation + // TODO: need a better way to support the extension for engines. + LockInstance func(ctx context.Context) error + UnlockInstance func(ctx context.Context) error + LegacyOperations map[bindings.OperationKind]LegacyOperation + Ops map[bindings.OperationKind]Operation } func init() { diff --git a/cmd/probe/internal/binding/custom/custom.go b/cmd/probe/internal/binding/custom/custom.go index 91a9ea9e7ff..50797e4eb37 100644 --- a/cmd/probe/internal/binding/custom/custom.go +++ b/cmd/probe/internal/binding/custom/custom.go @@ -78,6 +78,8 @@ func (h *HTTPCustom) Init(metadata bindings.Metadata) error { h.BaseOperations.Init(metadata) h.BaseOperations.GetRole = h.GetRole + h.BaseOperations.LockInstance = h.LockInstance + h.BaseOperations.UnlockInstance = h.UnlockInstance h.LegacyOperations[CheckRoleOperation] = h.CheckRoleOps return nil @@ -114,6 +116,16 @@ func (h *HTTPCustom) GetRoleOps(ctx context.Context, req *bindings.InvokeRequest return opsRes, nil } +func (h *HTTPCustom) LockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + +func (h *HTTPCustom) UnlockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + // callAction performs an HTTP request to local HTTP endpoint specified by actionSvcPort func (h *HTTPCustom) callAction(ctx context.Context, url string) (string, error) { // compose http request diff --git a/cmd/probe/internal/binding/etcd/etcd.go b/cmd/probe/internal/binding/etcd/etcd.go index 985a2ac1856..27742dd98c0 100644 --- a/cmd/probe/internal/binding/etcd/etcd.go +++ b/cmd/probe/internal/binding/etcd/etcd.go @@ -21,6 +21,7 @@ package etcd import ( "context" + "fmt" "strconv" "strings" "sync" @@ -60,6 +61,8 @@ func (e *Etcd) Init(metadata bindings.Metadata) error { e.InitIfNeed = e.initIfNeed e.DBPort = e.GetRunningPort() e.BaseOperations.GetRole = e.GetRole + e.BaseOperations.LockInstance = e.LockInstance + e.BaseOperations.UnlockInstance = e.UnlockInstance e.LegacyOperations[GetRoleOperation] = e.GetRoleOps return nil } @@ -131,6 +134,16 @@ func (e *Etcd) GetRoleOps(ctx context.Context, req *bindings.InvokeRequest, resp return opsRes, nil } +func (e *Etcd) LockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + +func (e *Etcd) UnlockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + func (e *Etcd) GetRunningPort() int { index := strings.Index(e.endpoint, ":") if index < 0 { diff --git a/cmd/probe/internal/binding/kafka/kafka.go b/cmd/probe/internal/binding/kafka/kafka.go index 2f11e21f095..b2eac1d63bd 100644 --- a/cmd/probe/internal/binding/kafka/kafka.go +++ b/cmd/probe/internal/binding/kafka/kafka.go @@ -21,6 +21,7 @@ package kafka import ( "context" + "fmt" "strings" "sync" @@ -64,6 +65,8 @@ func (kafkaOps *KafkaOperations) Init(metadata bindings.Metadata) error { kafkaOps.DBType = "kafka" kafkaOps.InitIfNeed = kafkaOps.initIfNeed // kafkaOps.BaseOperations.GetRole = kafkaOps.GetRole + kafkaOps.BaseOperations.LockInstance = kafkaOps.LockInstance + kafkaOps.BaseOperations.UnlockInstance = kafkaOps.UnlockInstance // kafkaOps.DBPort = kafkaOps.GetRunningPort() // kafkaOps.RegisterOperation(GetRoleOperation, kafkaOps.GetRoleOps) // kafkaOps.RegisterOperation(GetLagOperation, kafkaOps.GetLagOps) @@ -132,3 +135,13 @@ func (kafkaOps *KafkaOperations) CheckStatusOps(ctx context.Context, req *bindin return result, nil } + +func (kafkaOps *KafkaOperations) LockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + +func (kafkaOps *KafkaOperations) UnlockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} diff --git a/cmd/probe/internal/binding/mongodb/mongodb.go b/cmd/probe/internal/binding/mongodb/mongodb.go index 4f214d0ab74..420da2f428c 100644 --- a/cmd/probe/internal/binding/mongodb/mongodb.go +++ b/cmd/probe/internal/binding/mongodb/mongodb.go @@ -146,6 +146,8 @@ func (mongoOps *MongoDBOperations) Init(metadata bindings.Metadata) error { mongoOps.InitIfNeed = mongoOps.initIfNeed mongoOps.DBPort = mongoOps.GetRunningPort() mongoOps.BaseOperations.GetRole = mongoOps.GetRole + mongoOps.BaseOperations.LockInstance = mongoOps.LockInstance + mongoOps.BaseOperations.UnlockInstance = mongoOps.UnlockInstance mongoOps.LegacyOperations[GetRoleOperation] = mongoOps.GetRoleOps return nil } @@ -264,6 +266,16 @@ func (mongoOps *MongoDBOperations) GetRoleOps(ctx context.Context, req *bindings return opsRes, nil } +func (mongoOps *MongoDBOperations) LockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + +func (mongoOps *MongoDBOperations) UnlockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + func (mongoOps *MongoDBOperations) StatusCheck(ctx context.Context, cmd string, response *bindings.InvokeResponse) (OpsResult, error) { // TODO implement me when proposal is passed return nil, nil diff --git a/cmd/probe/internal/binding/mysql/mysql.go b/cmd/probe/internal/binding/mysql/mysql.go index 4fd1e3500f9..3edaa55c039 100644 --- a/cmd/probe/internal/binding/mysql/mysql.go +++ b/cmd/probe/internal/binding/mysql/mysql.go @@ -261,19 +261,13 @@ func (mysqlOps *MysqlOperations) GetRole(ctx context.Context, request *bindings. func (mysqlOps *MysqlOperations) LockInstance(ctx context.Context) error { sql := "set global read_only=1" _, err := mysqlOps.db.ExecContext(ctx, sql) - if err != nil { - return err - } - return nil + return err } func (mysqlOps *MysqlOperations) UnlockInstance(ctx context.Context) error { sql := "set global read_only=0" _, err := mysqlOps.db.ExecContext(ctx, sql) - if err != nil { - return err - } - return nil + return err } func (mysqlOps *MysqlOperations) ExecOps(ctx context.Context, req *bindings.InvokeRequest, resp *bindings.InvokeResponse) (OpsResult, error) { diff --git a/cmd/probe/internal/binding/postgres/postgres.go b/cmd/probe/internal/binding/postgres/postgres.go index 744707bcb7f..28021a08575 100644 --- a/cmd/probe/internal/binding/postgres/postgres.go +++ b/cmd/probe/internal/binding/postgres/postgres.go @@ -113,6 +113,8 @@ func (pgOps *PostgresOperations) Init(metadata bindings.Metadata) error { pgOps.DBType = "postgres" pgOps.InitIfNeed = pgOps.initIfNeed pgOps.BaseOperations.GetRole = pgOps.GetRole + pgOps.BaseOperations.LockInstance = pgOps.LockInstance + pgOps.BaseOperations.UnlockInstance = pgOps.UnlockInstance pgOps.DBPort = pgOps.GetRunningPort() pgOps.RegisterOperation(GetRoleOperation, pgOps.GetRoleOps) // pgOps.RegisterOperation(GetLagOperation, pgOps.GetLagOps) @@ -224,6 +226,20 @@ func (pgOps *PostgresOperations) GetRole(ctx context.Context, request *bindings. return "", errors.Errorf("exec sql %s failed: no data returned", sql) } +func (pgOps *PostgresOperations) LockInstance(ctx context.Context) error { + // sql := "alter system set default_transaction_read_only=on; select pg_reload_conf();" + // _, err := pgOps.exec(ctx, sql) + // return err + return fmt.Errorf("NotSupported") +} + +func (pgOps *PostgresOperations) UnlockInstance(ctx context.Context) error { + // sql := "alter system set default_transaction_read_only=off; select pg_reload_conf();" + // _, err := pgOps.exec(ctx, sql) + // return err + return fmt.Errorf("NotSupported") +} + func (pgOps *PostgresOperations) ExecOps(ctx context.Context, req *bindings.InvokeRequest, resp *bindings.InvokeResponse) (OpsResult, error) { result := OpsResult{} sql, ok := req.Metadata["sql"] diff --git a/cmd/probe/internal/binding/redis/redis.go b/cmd/probe/internal/binding/redis/redis.go index 119e3da7e8e..14627d70c3b 100644 --- a/cmd/probe/internal/binding/redis/redis.go +++ b/cmd/probe/internal/binding/redis/redis.go @@ -92,6 +92,8 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) { r.DBType = "redis" r.InitIfNeed = r.initIfNeed r.BaseOperations.GetRole = r.GetRole + r.BaseOperations.LockInstance = r.LockInstance + r.BaseOperations.UnlockInstance = r.UnlockInstance // register redis operations r.RegisterOperation(bindings.CreateOperation, r.createOps) @@ -606,6 +608,16 @@ func (r *Redis) GetRole(ctx context.Context, request *bindings.InvokeRequest, re return role, nil } +func (r *Redis) LockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + +func (r *Redis) UnlockInstance(ctx context.Context) error { + // TODO: impl + return fmt.Errorf("NotSupported") +} + func defaultRedisEntryParser(req *bindings.InvokeRequest, object *RedisEntry) error { if req == nil || req.Metadata == nil { return fmt.Errorf("no metadata provided")