Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drainer support plugin framework #911

Open
wants to merge 65 commits into
base: plugin
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
19b6895
add two args
tsthght Feb 5, 2020
4e0a4de
support plugin
tsthght Feb 5, 2020
2cc57b8
refine
tsthght Feb 7, 2020
b583108
optimize the plugin management structure
tsthght Feb 7, 2020
76c6be6
modify plugin framework
tsthght Feb 25, 2020
27989e2
modify plugin framework
tsthght Feb 25, 2020
4811764
modify plugin demo
tsthght Feb 25, 2020
a5c5572
gofmt
tsthght Feb 25, 2020
73c7f0b
add testcase
tsthght Feb 25, 2020
c2ac0ec
modify testcase
tsthght Feb 25, 2020
f9a74e3
modify go.mod
tsthght Feb 25, 2020
ccfd79b
modify
tsthght Feb 25, 2020
861e5df
fix a problem
tsthght Feb 25, 2020
2af5039
modify plugin
tsthght Feb 26, 2020
f4b62a2
modify plugin demo
tsthght Feb 26, 2020
3ca4924
modify plugin pos
tsthght Feb 27, 2020
6ea9197
modify FilterTxn ret
tsthght Feb 27, 2020
c085535
merge master
tsthght Feb 27, 2020
e4917f3
modify value: index
tsthght Feb 27, 2020
8f24b8a
add RecordId
tsthght Feb 27, 2020
24eaa9e
modify plugin
tsthght Feb 27, 2020
c97f3f2
support configure the mark database name and mark table name
tsthght Mar 2, 2020
536943a
add some testcase
tsthght Mar 4, 2020
af05581
add testcase
tsthght Mar 4, 2020
1d4e857
add testcase
tsthght Mar 4, 2020
fc3644f
modify some error
tsthght Mar 4, 2020
8525698
add some tests
tsthght Mar 4, 2020
3aa0f2e
modify test
tsthght Mar 4, 2020
9f0ae6a
add some log
tsthght Mar 4, 2020
b04f256
modify log
tsthght Mar 4, 2020
f0cbd85
refine code
tsthght Mar 5, 2020
7782196
add some log
tsthght Mar 5, 2020
ce64ee4
trim string
tsthght Mar 5, 2020
2df8841
should not conflict with LoopbackControl
tsthght Mar 5, 2020
0c764b1
refine comment
tsthght Mar 5, 2020
76061b5
add comment
tsthght Mar 5, 2020
78e9153
replace errors.New(fmt.Sprintf(...)) with fmt.Errorf(...)
tsthght Mar 5, 2020
59ad9ee
add comment
tsthght Mar 5, 2020
8859fd7
change RecordId to RecordID
tsthght Mar 5, 2020
f06e03f
change SetPlugin to setPlugin
tsthght Mar 5, 2020
355b679
add comment
tsthght Mar 5, 2020
9d8f6e3
refine
tsthght Mar 5, 2020
7a12571
modify comment
tsthght Mar 5, 2020
5e2e3ac
refine
tsthght Mar 5, 2020
d385b89
refine code
tsthght Mar 5, 2020
f102f7c
handle ci error
tsthght Mar 5, 2020
90c5126
add init interface
tsthght Mar 11, 2020
fbd2748
modify SyncerPlugin to
tsthght Mar 11, 2020
7df02f6
modify SyncerPlugin to SyncerFilter
tsthght Mar 11, 2020
2893d2f
modify Loopback to SyncerFilter
tsthght Mar 11, 2020
ed5852f
modify
tsthght Mar 11, 2020
518415d
add
tsthght Mar 11, 2020
6ac7d53
refine
tsthght Mar 11, 2020
0929e8d
add LoaderInit
tsthght Mar 11, 2020
124a885
add loaderdestroy
tsthght Mar 11, 2020
ce067e3
refine
tsthght Mar 11, 2020
8b5fbd9
modify
tsthght Mar 11, 2020
8a2d0a7
refine
tsthght Mar 11, 2020
8c71d4a
reine
tsthght Mar 11, 2020
1d33874
refine
tsthght Mar 11, 2020
c99c28c
modify
tsthght Mar 11, 2020
cd06a34
refine
tsthght Mar 12, 2020
613567d
add log
tsthght Mar 12, 2020
edecd47
add logs
tsthght Mar 12, 2020
016b67f
Get rid of useless parameters
tsthght Mar 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type SyncerConfig struct {
EnableDispatch bool `toml:"enable-dispatch" json:"enable-dispatch"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
EnableCausality bool `toml:"enable-detect" json:"enable-detect"`
PluginPath string `toml:"plugin-path" json:"plugin-path"`
PluginNames []string `toml:"plugin-names" json:"plugin-names"`
SupportPlugin bool `toml:"support-plugin" json:"support-plugin"`
}

// Config holds the configuration of drainer
Expand All @@ -106,6 +109,22 @@ type Config struct {
tls *tls.Config
}

type sliceNames []string

func newSliceNames(vals []string, p *[]string) *sliceNames {
*p = vals
return (*sliceNames)(p)
}

func (s *sliceNames) Set(val string) error {
*s = sliceNames(strings.Split(val, ","))
return nil
}

func (s *sliceNames) Get() interface{} { return []string(*s) }

func (s *sliceNames) String() string { return strings.Join([]string(*s), ",") }

tsthght marked this conversation as resolved.
Show resolved Hide resolved
// NewConfig return an instance of configuration
func NewConfig() *Config {
cfg := &Config{
Expand Down Expand Up @@ -148,6 +167,9 @@ func NewConfig() *Config {
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
fs.StringVar(&cfg.SyncerCfg.PluginPath, "plugin-path", "", "The path of the plugins")
fs.Var(newSliceNames([]string{}, &cfg.SyncerCfg.PluginNames), "plugin-names", "The names of the plugins")
fs.BoolVar(&cfg.SyncerCfg.SupportPlugin, "support-plugin", false, "Whether plugin is supported,default: false")

return cfg
}
Expand Down
6 changes: 6 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (t *testDrainerSuite) TestConfig(c *C) {
"-config", "../cmd/drainer/drainer.toml",
"-addr", "192.168.15.10:8257",
"-advertise-addr", "192.168.15.10:8257",
"-plugin-path", "/opt/drainer/plugin",
"-plugin-names", "p1, p2",
"-support-plugin",
}

cfg := NewConfig()
Expand All @@ -68,6 +71,9 @@ func (t *testDrainerSuite) TestConfig(c *C) {
var strSQLMode *string
c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode)
c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0))
c.Assert(cfg.SyncerCfg.PluginPath, Equals, "/opt/drainer/plugin")
c.Assert(len(cfg.SyncerCfg.PluginNames), Equals, 2)
c.Assert(cfg.SyncerCfg.SupportPlugin, Equals, true)
}

func (t *testDrainerSuite) TestValidateFilter(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions drainer/filter_txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package drainer

import (
"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"
)

type LoopBack interface {
FilterTxn(txn *loader.Txn, info *loopbacksync.LoopBackSync) bool
}
11 changes: 10 additions & 1 deletion drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package loopbacksync

import "github.com/pingcap/tidb-binlog/pkg/plugin"

const (
//MarkTableName mark table name
MarkTableName = "retl._drainer_repl_mark"
Expand All @@ -29,14 +31,21 @@ type LoopBackSync struct {
ChannelID int64
LoopbackControl bool
SyncDDL bool
PluginPath string
PluginNames []string
Hooks []*plugin.EventHooks
SupportPlugin bool
}

//NewLoopBackSyncInfo return LoopBackSyncInfo objec
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync {
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool, path string, names []string, SupportPlug bool) *LoopBackSync {
l := &LoopBackSync{
ChannelID: ChannelID,
LoopbackControl: LoopbackControl,
SyncDDL: SyncDDL,
PluginPath: path,
PluginNames: names,
SupportPlugin: SupportPlug,
}
return l
}
2 changes: 1 addition & 1 deletion drainer/loopbacksync/loopbacksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNewLoopBackSyncInfo(t *testing.T) {
var ChannelID int64 = 1
var LoopbackControl = true
var SyncDDL = false
l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL)
l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL, "", nil, false)
if l == nil {
t.Error("alloc loopBackSyncInfo objec failed ")
}
Expand Down
226 changes: 141 additions & 85 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/plugin"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/drainer/checkpoint"
Expand Down Expand Up @@ -77,7 +77,22 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
ignoreDBs = strings.Split(cfg.IgnoreSchemas, ",")
}
syncer.filter = filter.NewFilter(ignoreDBs, cfg.IgnoreTables, cfg.DoDBs, cfg.DoTables)
syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL)
syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL, cfg.PluginPath, cfg.PluginNames, cfg.SupportPlugin)
if syncer.loopbackSync.SupportPlugin {
for _, name := range syncer.loopbackSync.PluginNames {
sym, err := plugin.LoadPlugin(syncer.loopbackSync.Hooks[plugin.SyncerPlugin],
syncer.loopbackSync.PluginPath, name)
if err != nil {
return nil, err
}
newPlugin, ok := sym.(func() LoopBack)
if !ok {
continue
}
plugin.RegisterPlugin(syncer.loopbackSync.Hooks[plugin.SyncerPlugin],
name, newPlugin())
}
tsthght marked this conversation as resolved.
Show resolved Hide resolved
}

var err error
// create schema
Expand Down Expand Up @@ -333,110 +348,151 @@ ForLoop:
if startTS == commitTS {
fakeBinlogs = append(fakeBinlogs, binlog)
fakeBinlogPreAddTS = append(fakeBinlogPreAddTS, lastAddComitTS)
} else if jobID == 0 {
preWriteValue := binlog.GetPrewriteValue()
preWrite := &pb.PrewriteValue{}
err = preWrite.Unmarshal(preWriteValue)
if err != nil {
err = errors.Annotatef(err, "prewrite %s Unmarshal failed", preWriteValue)
break ForLoop
}
} else {
if s.loopbackSync.SupportPlugin {
schema, table, err := s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
break ForLoop
}

err = s.rewriteForOldVersion(preWrite)
if err != nil {
err = errors.Annotate(err, "rewrite for old version fail")
break ForLoop
preWrite := &pb.PrewriteValue{}
err = preWrite.Unmarshal(binlog.GetPrewriteValue())
if err != nil {
err = errors.Annotatef(err, "prewrite %s Unmarshal failed", binlog.GetPrewriteValue())
break ForLoop
}
var txn *loader.Txn
txn, err = translator.TiBinlogToTxn(s.schema, schema, table, binlog, preWrite)
var isFilterTransaction = false
txn, err = translator.TiBinlogToTxn(s.schema, "", "", binlog, preWrite)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
err = errors.Annotate(err, "analyze transaction failed")
break ForLoop
}
hook := s.loopbackSync.Hooks[plugin.SyncerPlugin]
hook.Range(func(k, val interface{}) bool {
c, ok := val.(LoopBack)
if !ok {
return true
}
isFilterTransaction = c.FilterTxn(txn, s.loopbackSync)
if isFilterTransaction {
return false
}
return true
})
if isFilterTransaction {
continue
}
}

log.Debug("get DML", zap.Int64("SchemaVersion", preWrite.SchemaVersion))
if preWrite.SchemaVersion < lastDDLSchemaVersion {
log.Debug("encounter older schema dml")
}
if jobID == 0 {
preWriteValue := binlog.GetPrewriteValue()
preWrite := &pb.PrewriteValue{}
err = preWrite.Unmarshal(preWriteValue)
if err != nil {
err = errors.Annotatef(err, "prewrite %s Unmarshal failed", preWriteValue)
break ForLoop
}

err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
break ForLoop
}
var isFilterTransaction = false
var err1 error
if s.loopbackSync != nil && s.loopbackSync.LoopbackControl {
isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync)
if err1 != nil {
err = errors.Annotate(err1, "analyze transaction failed")
err = s.rewriteForOldVersion(preWrite)
if err != nil {
err = errors.Annotate(err, "rewrite for old version fail")
break ForLoop
}
}

var ignore bool
ignore, err = filterTable(preWrite, s.filter, s.schema)
if err != nil {
err = errors.Annotate(err, "filterTable failed")
break ForLoop
}
log.Debug("get DML", zap.Int64("SchemaVersion", preWrite.SchemaVersion))
if preWrite.SchemaVersion < lastDDLSchemaVersion {
log.Debug("encounter older schema dml")
}

if !ignore && !isFilterTransaction {
s.addDMLEventMetrics(preWrite.GetMutations())
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite})
err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
}
} else if jobID > 0 {
log.Debug("get ddl binlog job", zap.Stringer("job", b.job))

// Notice: the version of DDL Binlog we receive are Monotonically increasing
// DDL (with version 10, commit ts 100) -> DDL (with version 9, commit ts 101) would never happen
s.schema.addJob(b.job)
var isFilterTransaction = false
var err1 error
if s.loopbackSync != nil && s.loopbackSync.LoopbackControl && !s.loopbackSync.SupportPlugin {
isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync)
if err1 != nil {
err = errors.Annotate(err1, "analyze transaction failed")
break ForLoop
}
}

if !s.cfg.SyncDDL {
log.Info("Syncer skips DDL", zap.String("sql", b.job.Query), zap.Int64("ts", b.GetCommitTs()), zap.Bool("SyncDDL", s.cfg.SyncDDL))
continue
}
var ignore bool
ignore, err = filterTable(preWrite, s.filter, s.schema)
if err != nil {
err = errors.Annotate(err, "filterTable failed")
break ForLoop
}

log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion))
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion
if !ignore && !isFilterTransaction {
s.addDMLEventMetrics(preWrite.GetMutations())
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite})
if err != nil {
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
}
} else if jobID > 0 {
log.Debug("get ddl binlog job", zap.Stringer("job", b.job))

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
break ForLoop
}
// Notice: the version of DDL Binlog we receive are Monotonically increasing
// DDL (with version 10, commit ts 100) -> DDL (with version 9, commit ts 101) would never happen
s.schema.addJob(b.job)

if b.job.SchemaState == model.StateDeleteOnly && b.job.Type == model.ActionDropColumn {
log.Info("Syncer skips DeleteOnly DDL", zap.Stringer("job", b.job), zap.Int64("ts", b.GetCommitTs()))
continue
}
if !s.cfg.SyncDDL {
log.Info("Syncer skips DDL", zap.String("sql", b.job.Query), zap.Int64("ts", b.GetCommitTs()), zap.Bool("SyncDDL", s.cfg.SyncDDL))
continue
}

sql := b.job.Query
var schema, table string
schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
break ForLoop
}
log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion))
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

if s.filter.SkipSchemaAndTable(schema, table) {
log.Info("skip ddl", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
} else if sql != "" {
s.addDDLCount()
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()
err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Trace(err)
break ForLoop
}

log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed",
zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs))
if b.job.SchemaState == model.StateDeleteOnly && b.job.Type == model.ActionDropColumn {
log.Info("Syncer skips DeleteOnly DDL", zap.Stringer("job", b.job), zap.Int64("ts", b.GetCommitTs()))
continue
}

err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table})
sql := b.job.Query
var schema, table string
schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion)
if err != nil {
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
err = errors.Trace(err)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())

if s.filter.SkipSchemaAndTable(schema, table) {
log.Info("skip ddl", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
} else if sql != "" {
s.addDDLCount()
beginTime := time.Now()
lastAddComitTS = binlog.GetCommitTs()

log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed",
zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs))

err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table})
if err != nil {
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
}
}
}
}
Expand Down
Loading