From fcf9adf92c51da47d8a6839822842481e8955d2d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 10 May 2021 17:16:44 +0800 Subject: [PATCH] syncer: save empty SQL when online DDL is filtered (#1659) (#1668) --- _utils/terror_gen/errors_release.txt | 2 +- dm/master/server.go | 2 +- dm/worker/worker.go | 4 ++-- errors.toml | 2 +- pkg/terror/error_list.go | 2 +- pkg/upgrade/upgrade.go | 7 ++++++ pkg/upgrade/upgrade_test.go | 2 +- pkg/upgrade/version.go | 2 +- pkg/utils/common.go | 2 +- syncer/ddl.go | 10 +++++++++ syncer/online_ddl.go | 26 ++++++++++------------- syncer/syncer.go | 22 +++++++++++++++++++ tests/dmctl_basic/run.sh | 3 ++- tests/online_ddl/conf/dm-task.yaml | 10 +++++++++ tests/online_ddl/data/db1.increment2.sql | 2 ++ tests/online_ddl/data/db2.increment2.sql | 2 ++ tests/online_ddl/run.sh | 5 +++++ tests/sequence_sharding_removemeta/run.sh | 1 + 18 files changed, 81 insertions(+), 25 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 10cbd9ff74..1f737df4fc 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -301,7 +301,7 @@ ErrSyncerUnitPTApplyEmptyTable,[code=36047:class=sync-unit:scope=internal:level= ErrSyncerUnitPTRenameTableNotValid,[code=36048:class=sync-unit:scope=internal:level=high], "Message: tables should contain old and new table name" ErrSyncerUnitPTRenameToPTTable,[code=36049:class=sync-unit:scope=internal:level=high], "Message: rename table to pt temporary table %s not supported" ErrSyncerUnitPTRenamePTTblToOther,[code=36050:class=sync-unit:scope=internal:level=high], "Message: rename pt temporary table to other temporary table %s not supported" -ErrSyncerUnitPTOnlineDDLOnPTTbl,[code=36051:class=sync-unit:scope=internal:level=high], "Message: online ddl metadata for pt temporary table `%s`.`%s` not found" +ErrSyncerUnitPTOnlineDDLOnPTTbl,[code=36051:class=sync-unit:scope=internal:level=high], "Message: online ddl metadata for pt temporary table `%s`.`%s` not found, Workaround: This error may caused when the online DDL is filtered by binlog event filter, if so, please use `handle-error skip` sometimes to skip related DDLs." ErrSyncerUnitRemoteSteamerWithGTID,[code=36052:class=sync-unit:scope=internal:level=high], "Message: open remote streamer with GTID mode not supported" ErrSyncerUnitRemoteSteamerStartSync,[code=36053:class=sync-unit:scope=internal:level=high], "Message: start syncing binlog from remote streamer" ErrSyncerUnitGetTableFromDB,[code=36054:class=sync-unit:scope=internal:level=high], "Message: invalid table `%s`.`%s`" diff --git a/dm/master/server.go b/dm/master/server.go index 1170b508be..f0d1f6630a 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -711,7 +711,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest } // adjust unsynced field in sync status by looking at DDL locks. -// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself +// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself. func (s *Server) fillUnsyncedStatus(resps []*pb.QueryStatusResponse) { for _, resp := range resps { for _, subtaskStatus := range resp.SubTaskStatus { diff --git a/dm/worker/worker.go b/dm/worker/worker.go index f5a828beeb..5ab131404f 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -545,12 +545,12 @@ func (w *Worker) postProcessStatus( syncStatus.Synced = true } } else { - syncPos, err := binlog.PositionFromStr(syncStatus.SyncerBinlog) + syncPos, err := binlog.PositionFromPosStr(syncStatus.SyncerBinlog) if err != nil { w.l.Debug("fail to parse mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err)) continue } - masterPos, err := binlog.PositionFromStr(masterBinlogPos) + masterPos, err := binlog.PositionFromPosStr(masterBinlogPos) if err != nil { w.l.Debug("fail to parse mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err)) continue diff --git a/errors.toml b/errors.toml index 61129c088e..f326756664 100644 --- a/errors.toml +++ b/errors.toml @@ -1819,7 +1819,7 @@ tags = ["internal", "high"] [error.DM-sync-unit-36051] message = "online ddl metadata for pt temporary table `%s`.`%s` not found" description = "" -workaround = "" +workaround = "This error may caused when the online DDL is filtered by binlog event filter, if so, please use `handle-error skip` sometimes to skip related DDLs." tags = ["internal", "high"] [error.DM-sync-unit-36052] diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 0488247695..fb83de36a9 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -987,7 +987,7 @@ var ( ErrSyncerUnitPTRenameTableNotValid = New(codeSyncerUnitPTRenameTableNotValid, ClassSyncUnit, ScopeInternal, LevelHigh, "tables should contain old and new table name", "") ErrSyncerUnitPTRenameToPTTable = New(codeSyncerUnitPTRenameToPTTable, ClassSyncUnit, ScopeInternal, LevelHigh, "rename table to pt temporary table %s not supported", "") ErrSyncerUnitPTRenamePTTblToOther = New(codeSyncerUnitPTRenamePTTblToOther, ClassSyncUnit, ScopeInternal, LevelHigh, "rename pt temporary table to other temporary table %s not supported", "") - ErrSyncerUnitPTOnlineDDLOnPTTbl = New(codeSyncerUnitPTOnlineDDLOnPTTbl, ClassSyncUnit, ScopeInternal, LevelHigh, "online ddl metadata for pt temporary table `%s`.`%s` not found", "") + ErrSyncerUnitPTOnlineDDLOnPTTbl = New(codeSyncerUnitPTOnlineDDLOnPTTbl, ClassSyncUnit, ScopeInternal, LevelHigh, "online ddl metadata for pt temporary table `%s`.`%s` not found", "This error may caused when the online DDL is filtered by binlog event filter, if so, please use `handle-error skip` sometimes to skip related DDLs.") ErrSyncerUnitRemoteSteamerWithGTID = New(codeSyncerUnitRemoteSteamerWithGTID, ClassSyncUnit, ScopeInternal, LevelHigh, "open remote streamer with GTID mode not supported", "") ErrSyncerUnitRemoteSteamerStartSync = New(codeSyncerUnitRemoteSteamerStartSync, ClassSyncUnit, ScopeInternal, LevelHigh, "start syncing binlog from remote streamer", "") ErrSyncerUnitGetTableFromDB = New(codeSyncerUnitGetTableFromDB, ClassSyncUnit, ScopeInternal, LevelHigh, "invalid table `%s`.`%s`", "") diff --git a/pkg/upgrade/upgrade.go b/pkg/upgrade/upgrade.go index 06898b7f55..fd34ee476c 100644 --- a/pkg/upgrade/upgrade.go +++ b/pkg/upgrade/upgrade.go @@ -36,6 +36,7 @@ import ( var upgrades = []func(cli *clientv3.Client, uctx Context) error{ upgradeToVer1, upgradeToVer2, + upgradeToVer4, } // upgradesBeforeScheduler records all upgrade functions before scheduler start. e.g. etcd key changed. @@ -267,3 +268,9 @@ func upgradeToVer3(ctx context.Context, cli *clientv3.Client) error { _, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...) return err } + +// upgradeToVer4 does nothing, version 4 is just to make sure cluster from version 3 could re-run bootstrap, because +// version 3 (v2.0.2) has some bugs and user may downgrade. +func upgradeToVer4(cli *clientv3.Client, uctx Context) error { + return nil +} diff --git a/pkg/upgrade/upgrade_test.go b/pkg/upgrade/upgrade_test.go index be01c40897..499df11551 100644 --- a/pkg/upgrade/upgrade_test.go +++ b/pkg/upgrade/upgrade_test.go @@ -73,7 +73,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) { c.Assert(err, IsNil) c.Assert(rev2, Greater, rev1) c.Assert(ver, DeepEquals, CurrentVersion) - c.Assert(mockVerNo, Equals, uint64(4)) + c.Assert(mockVerNo, Equals, uint64(5)) // try to upgrade again, do nothing because the version is the same. mockVerNo = 0 diff --git a/pkg/upgrade/version.go b/pkg/upgrade/version.go index 5bf0b0c5ff..089f19da6d 100644 --- a/pkg/upgrade/version.go +++ b/pkg/upgrade/version.go @@ -28,7 +28,7 @@ const ( // The current internal version number of the DM cluster used when upgrading from an older version. // NOTE: +1 when a new incompatible version is introduced, so it's different from the release version. // NOTE: it's the version of the cluster (= the version of DM-master leader now), other component versions are not recorded yet. - currentInternalNo uint64 = 3 + currentInternalNo uint64 = 4 // The minimum internal version number of the DM cluster used when importing from v1.0.x. minInternalNo uint64 = 0 ) diff --git a/pkg/utils/common.go b/pkg/utils/common.go index ef3cbe4b59..922fc40005 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -183,7 +183,7 @@ func ExtractTaskFromLockID(lockID string) string { return strs[1] } -// ExtractDBAndTableFromLockID extract schema and table from lockID +// ExtractDBAndTableFromLockID extract schema and table from lockID. func ExtractDBAndTableFromLockID(lockID string) (string, string) { strs := lockIDPattern.FindStringSubmatch(lockID) // strs should be [full-lock-ID, task, db, table] if successful matched diff --git a/syncer/ddl.go b/syncer/ddl.go index 6378ecadce..f63adc70fa 100644 --- a/syncer/ddl.go +++ b/syncer/ddl.go @@ -204,6 +204,16 @@ func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schem return sqls, nil, nil } + // remove empty sqls which inserted because online DDL is filtered + end := 0 + for _, sql2 := range sqls { + if sql2 != "" { + sqls[end] = sql2 + end++ + } + } + sqls = sqls[:end] + // replace ghost table name by real table name targetTables := []*filter.Table{ {Schema: realSchema, Name: realTable}, diff --git a/syncer/online_ddl.go b/syncer/online_ddl.go index dbbcdaf2bb..36a727304c 100644 --- a/syncer/online_ddl.go +++ b/syncer/online_ddl.go @@ -16,7 +16,6 @@ package syncer import ( "encoding/json" "fmt" - "strings" "sync" "github.com/pingcap/dm/dm/config" @@ -39,11 +38,11 @@ var OnlineDDLSchemes = map[string]func(*tcontext.Context, *config.SubTaskConfig) // OnlinePlugin handles online ddl solutions like pt, gh-ost. type OnlinePlugin interface { - // Applys does: + // Apply does: // * detect online ddl // * record changes // * apply online ddl on real table - // returns sqls, replaced/self schema, repliaced/slef table, error + // returns sqls, replaced/self schema, replaced/self table, error Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) // Finish would delete online ddl from memory and storage Finish(tctx *tcontext.Context, schema, table string) error @@ -54,6 +53,7 @@ type OnlinePlugin interface { // ResetConn reset db connection ResetConn(tctx *tcontext.Context) error // Clear clears all online information + // TODO: not used now, check if we could remove it later Clear(tctx *tcontext.Context) error // Close closes online ddl plugin Close() @@ -132,8 +132,8 @@ func (s *OnlineDDLStorage) Load(tctx *tcontext.Context) error { s.Lock() defer s.Unlock() - query := fmt.Sprintf("SELECT `ghost_schema`, `ghost_table`, `ddls` FROM %s WHERE `id`='%s'", s.tableName, s.id) - rows, err := s.dbConn.querySQL(tctx, query) + query := fmt.Sprintf("SELECT `ghost_schema`, `ghost_table`, `ddls` FROM %s WHERE `id`= ?", s.tableName) + rows, err := s.dbConn.querySQL(tctx, query, s.id) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -218,8 +218,8 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, return terror.ErrSyncerUnitOnlineDDLInvalidMeta.Delegate(err) } - query := fmt.Sprintf("REPLACE INTO %s(`id`,`ghost_schema`, `ghost_table`, `ddls`) VALUES ('%s', '%s', '%s', '%s')", s.tableName, s.id, ghostSchema, ghostTable, escapeSingleQuote(string(ddlsBytes))) - _, err = s.dbConn.executeSQL(tctx, []string{query}) + query := fmt.Sprintf("REPLACE INTO %s(`id`,`ghost_schema`, `ghost_table`, `ddls`) VALUES (?, ?, ?, ?)", s.tableName) + _, err = s.dbConn.executeSQL(tctx, []string{query}, []interface{}{s.id, ghostSchema, ghostTable, string(ddlsBytes)}) return terror.WithScope(err, terror.ScopeDownstream) } @@ -234,8 +234,8 @@ func (s *OnlineDDLStorage) Delete(tctx *tcontext.Context, ghostSchema, ghostTabl } // delete all checkpoints - sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s' and `ghost_schema` = '%s' and `ghost_table` = '%s'", s.tableName, s.id, ghostSchema, ghostTable) - _, err := s.dbConn.executeSQL(tctx, []string{sql}) + sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = ? and `ghost_schema` = ? and `ghost_table` = ?", s.tableName) + _, err := s.dbConn.executeSQL(tctx, []string{sql}, []interface{}{s.id, ghostSchema, ghostTable}) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -250,8 +250,8 @@ func (s *OnlineDDLStorage) Clear(tctx *tcontext.Context) error { defer s.Unlock() // delete all checkpoints - sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s'", s.tableName, s.id) - _, err := s.dbConn.executeSQL(tctx, []string{sql}) + sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = ?", s.tableName) + _, err := s.dbConn.executeSQL(tctx, []string{sql}, []interface{}{s.id}) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -299,7 +299,3 @@ func (s *OnlineDDLStorage) createTable(tctx *tcontext.Context) error { _, err := s.dbConn.executeSQL(tctx, []string{sql}) return terror.WithScope(err, terror.ScopeDownstream) } - -func escapeSingleQuote(str string) string { - return strings.ReplaceAll(str, "'", "''") -} diff --git a/syncer/syncer.go b/syncer/syncer.go index 843c2ec1b1..1e2168baa0 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -54,6 +54,7 @@ import ( fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" + parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/schema" "github.com/pingcap/dm/pkg/shardddl/pessimism" "github.com/pingcap/dm/pkg/streamer" @@ -1751,6 +1752,27 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", originSQL), zap.String("schema", usedSchema)) *ec.lastLocation = *ec.currentLocation // before record skip location, update lastLocation + + // we try to insert an empty SQL to s.onlineDDL, because ignoring it will cause a "not found" error + if s.onlineDDL == nil { + return s.recordSkipSQLsLocation(*ec.lastLocation) + } + + stmts, err2 := parserpkg.Parse(parser2, originSQL, "", "") + if err2 != nil { + ec.tctx.L().Info("failed to parse a filtered SQL for online DDL", zap.String("SQL", originSQL)) + } + // if err2 != nil, stmts should be nil so below for-loop is skipped + for _, stmt := range stmts { + if _, ok := stmt.(ast.DDLNode); ok { + tableNames, err3 := parserpkg.FetchDDLTableNames(usedSchema, stmt) + if err3 != nil { + continue + } + // nolint:errcheck + s.onlineDDL.Apply(ec.tctx, tableNames, "", stmt) + } + } return s.recordSkipSQLsLocation(*ec.lastLocation) } if !parseResult.isDDL { diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 26f1de360c..8539b5d891 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -206,7 +206,8 @@ function run() { "\"result\": true" 3 \ "\"source\": \"$SOURCE_ID1\"" 1 \ "\"source\": \"$SOURCE_ID2\"" 1 \ - "\"stage\": \"Running\"" 4 + "\"stage\": \"Running\"" 4 \ + "\"synced\": true" 2 # update_task_not_paused $TASK_CONF # stop relay because get_config_to_file will stop source diff --git a/tests/online_ddl/conf/dm-task.yaml b/tests/online_ddl/conf/dm-task.yaml index 301f06caa7..57decfc1cb 100644 --- a/tests/online_ddl/conf/dm-task.yaml +++ b/tests/online_ddl/conf/dm-task.yaml @@ -21,6 +21,7 @@ mysql-instances: mydumper-config-name: "global" loader-config-name: "global" syncer-config-name: "global" + filter-rules: ["filter-rule-index"] - source-id: "mysql-replica-02" meta: @@ -32,6 +33,7 @@ mysql-instances: mydumper-config-name: "global" loader-config-name: "global" syncer-config-name: "global" + filter-rules: ["filter-rule-index"] block-allow-list: instance: @@ -68,6 +70,14 @@ column-mappings: target-column: "id" arguments: ["2", "", "t"] +filters: + filter-rule-index: + schema-pattern: "*" + table-pattern: "*" + events: ["create index","drop index"] + sql-pattern: ["ALTER\\s.*TABLE[\\s\\S]*ADD\\s+KEY\\s+NAME", "ALTER\\s.*TABLE[\\s\\S]*DROP\\s+KEY"] + action: Ignore + mydumpers: global: threads: 4 diff --git a/tests/online_ddl/data/db1.increment2.sql b/tests/online_ddl/data/db1.increment2.sql index 4fc3d041cd..059507ec35 100644 --- a/tests/online_ddl/data/db1.increment2.sql +++ b/tests/online_ddl/data/db1.increment2.sql @@ -3,6 +3,8 @@ insert into t1 (uid, name, info) values (10006, 'name of 10006', '{"age": 10006} insert into t2 (uid, name, info) values (20008, 'name of 20008', '{"age": 20008}'); alter table t1 add column address varchar(255); alter table t2 add column address varchar(255); +alter table t1 add key address (address); +alter table t2 add key address (address); insert into t2 (uid, name, info, address) values (20009, 'name of 20009', '{"age": 20009}', 'address of 20009'); insert into t1 (uid, name, info, address) values (10007, 'name of 10007', '{"age": 10007}', 'address of 10007'); alter table t2 drop column age; diff --git a/tests/online_ddl/data/db2.increment2.sql b/tests/online_ddl/data/db2.increment2.sql index 21aeb85a5c..60d93d27af 100644 --- a/tests/online_ddl/data/db2.increment2.sql +++ b/tests/online_ddl/data/db2.increment2.sql @@ -3,6 +3,8 @@ insert into t3 (uid, name, info) values (30004, 'name of 30004', '{"age": 30004} insert into t2 (uid, name, info) values (50002, 'name of 50002', '{"age": 50002}'); alter table t3 add column address varchar(255); alter table t2 add column address varchar(255); +alter table t2 add key address (address); +alter table t3 add key address (address); insert into t2 (uid, name, info, address) values (50003, 'name of 50003', '{"age": 50003}', 'address of 50003'); insert into t3 (uid, name, info, address) values (30005, 'name of 30005', '{"age": 30005}', 'address of 30005'); alter table t2 drop column age; diff --git a/tests/online_ddl/run.sh b/tests/online_ddl/run.sh index 6436bc62b8..43c9fe255d 100755 --- a/tests/online_ddl/run.sh +++ b/tests/online_ddl/run.sh @@ -45,6 +45,11 @@ function real_run() { run_sql_file_online_ddl $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 online_ddl $online_ddl_scheme run_sql_file_online_ddl $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 online_ddl $online_ddl_scheme + # manually create index to pass check_sync_diff + run_sql_tidb "show create table online_ddl.t_target" + check_not_contains "KEY \`name\`" + run_sql_tidb "alter table online_ddl.t_target add key name (name)" + echo "use sync_diff_inspector to check increment data" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml diff --git a/tests/sequence_sharding_removemeta/run.sh b/tests/sequence_sharding_removemeta/run.sh index 40c6862757..13dd321bd3 100755 --- a/tests/sequence_sharding_removemeta/run.sh +++ b/tests/sequence_sharding_removemeta/run.sh @@ -44,6 +44,7 @@ function run() { "show-ddl-locks" \ "\"ID\": \"$lock_id\"" 1 \ "$ddl" 1 + # check after we changed the behaviour that DM-worker queries master status only once, masterBinlog is not empty run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $TEST_NAME" \ "this DM-worker doesn't receive any shard DDL of this group" 0 \