Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: save empty SQL when online DDL is filtered (#1659) (#1668)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 10, 2021
1 parent 2707ad5 commit fcf9adf
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 25 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ ErrSyncerUnitPTApplyEmptyTable,[code=36047:class=sync-unit:scope=internal:level=
ErrSyncerUnitPTRenameTableNotValid,[code=36048:class=sync-unit:scope=internal:level=high], "Message: tables should contain old and new table name"
ErrSyncerUnitPTRenameToPTTable,[code=36049:class=sync-unit:scope=internal:level=high], "Message: rename table to pt temporary table %s not supported"
ErrSyncerUnitPTRenamePTTblToOther,[code=36050:class=sync-unit:scope=internal:level=high], "Message: rename pt temporary table to other temporary table %s not supported"
ErrSyncerUnitPTOnlineDDLOnPTTbl,[code=36051:class=sync-unit:scope=internal:level=high], "Message: online ddl metadata for pt temporary table `%s`.`%s` not found"
ErrSyncerUnitPTOnlineDDLOnPTTbl,[code=36051:class=sync-unit:scope=internal:level=high], "Message: online ddl metadata for pt temporary table `%s`.`%s` not found, Workaround: This error may caused when the online DDL is filtered by binlog event filter, if so, please use `handle-error skip` sometimes to skip related DDLs."
ErrSyncerUnitRemoteSteamerWithGTID,[code=36052:class=sync-unit:scope=internal:level=high], "Message: open remote streamer with GTID mode not supported"
ErrSyncerUnitRemoteSteamerStartSync,[code=36053:class=sync-unit:scope=internal:level=high], "Message: start syncing binlog from remote streamer"
ErrSyncerUnitGetTableFromDB,[code=36054:class=sync-unit:scope=internal:level=high], "Message: invalid table `%s`.`%s`"
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest
}

// adjust unsynced field in sync status by looking at DDL locks.
// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself
// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself.
func (s *Server) fillUnsyncedStatus(resps []*pb.QueryStatusResponse) {
for _, resp := range resps {
for _, subtaskStatus := range resp.SubTaskStatus {
Expand Down
4 changes: 2 additions & 2 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,12 @@ func (w *Worker) postProcessStatus(
syncStatus.Synced = true
}
} else {
syncPos, err := binlog.PositionFromStr(syncStatus.SyncerBinlog)
syncPos, err := binlog.PositionFromPosStr(syncStatus.SyncerBinlog)
if err != nil {
w.l.Debug("fail to parse mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err))
continue
}
masterPos, err := binlog.PositionFromStr(masterBinlogPos)
masterPos, err := binlog.PositionFromPosStr(masterBinlogPos)
if err != nil {
w.l.Debug("fail to parse mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err))
continue
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1819,7 +1819,7 @@ tags = ["internal", "high"]
[error.DM-sync-unit-36051]
message = "online ddl metadata for pt temporary table `%s`.`%s` not found"
description = ""
workaround = ""
workaround = "This error may caused when the online DDL is filtered by binlog event filter, if so, please use `handle-error skip` sometimes to skip related DDLs."
tags = ["internal", "high"]

[error.DM-sync-unit-36052]
Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ var (
ErrSyncerUnitPTRenameTableNotValid = New(codeSyncerUnitPTRenameTableNotValid, ClassSyncUnit, ScopeInternal, LevelHigh, "tables should contain old and new table name", "")
ErrSyncerUnitPTRenameToPTTable = New(codeSyncerUnitPTRenameToPTTable, ClassSyncUnit, ScopeInternal, LevelHigh, "rename table to pt temporary table %s not supported", "")
ErrSyncerUnitPTRenamePTTblToOther = New(codeSyncerUnitPTRenamePTTblToOther, ClassSyncUnit, ScopeInternal, LevelHigh, "rename pt temporary table to other temporary table %s not supported", "")
ErrSyncerUnitPTOnlineDDLOnPTTbl = New(codeSyncerUnitPTOnlineDDLOnPTTbl, ClassSyncUnit, ScopeInternal, LevelHigh, "online ddl metadata for pt temporary table `%s`.`%s` not found", "")
ErrSyncerUnitPTOnlineDDLOnPTTbl = New(codeSyncerUnitPTOnlineDDLOnPTTbl, ClassSyncUnit, ScopeInternal, LevelHigh, "online ddl metadata for pt temporary table `%s`.`%s` not found", "This error may caused when the online DDL is filtered by binlog event filter, if so, please use `handle-error skip` sometimes to skip related DDLs.")
ErrSyncerUnitRemoteSteamerWithGTID = New(codeSyncerUnitRemoteSteamerWithGTID, ClassSyncUnit, ScopeInternal, LevelHigh, "open remote streamer with GTID mode not supported", "")
ErrSyncerUnitRemoteSteamerStartSync = New(codeSyncerUnitRemoteSteamerStartSync, ClassSyncUnit, ScopeInternal, LevelHigh, "start syncing binlog from remote streamer", "")
ErrSyncerUnitGetTableFromDB = New(codeSyncerUnitGetTableFromDB, ClassSyncUnit, ScopeInternal, LevelHigh, "invalid table `%s`.`%s`", "")
Expand Down
7 changes: 7 additions & 0 deletions pkg/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
var upgrades = []func(cli *clientv3.Client, uctx Context) error{
upgradeToVer1,
upgradeToVer2,
upgradeToVer4,
}

// upgradesBeforeScheduler records all upgrade functions before scheduler start. e.g. etcd key changed.
Expand Down Expand Up @@ -267,3 +268,9 @@ func upgradeToVer3(ctx context.Context, cli *clientv3.Client) error {
_, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return err
}

// upgradeToVer4 does nothing, version 4 is just to make sure cluster from version 3 could re-run bootstrap, because
// version 3 (v2.0.2) has some bugs and user may downgrade.
func upgradeToVer4(cli *clientv3.Client, uctx Context) error {
return nil
}
2 changes: 1 addition & 1 deletion pkg/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (t *testForEtcd) TestTryUpgrade(c *C) {
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)
c.Assert(ver, DeepEquals, CurrentVersion)
c.Assert(mockVerNo, Equals, uint64(4))
c.Assert(mockVerNo, Equals, uint64(5))

// try to upgrade again, do nothing because the version is the same.
mockVerNo = 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/upgrade/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// The current internal version number of the DM cluster used when upgrading from an older version.
// NOTE: +1 when a new incompatible version is introduced, so it's different from the release version.
// NOTE: it's the version of the cluster (= the version of DM-master leader now), other component versions are not recorded yet.
currentInternalNo uint64 = 3
currentInternalNo uint64 = 4
// The minimum internal version number of the DM cluster used when importing from v1.0.x.
minInternalNo uint64 = 0
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func ExtractTaskFromLockID(lockID string) string {
return strs[1]
}

// ExtractDBAndTableFromLockID extract schema and table from lockID
// ExtractDBAndTableFromLockID extract schema and table from lockID.
func ExtractDBAndTableFromLockID(lockID string) (string, string) {
strs := lockIDPattern.FindStringSubmatch(lockID)
// strs should be [full-lock-ID, task, db, table] if successful matched
Expand Down
10 changes: 10 additions & 0 deletions syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schem
return sqls, nil, nil
}

// remove empty sqls which inserted because online DDL is filtered
end := 0
for _, sql2 := range sqls {
if sql2 != "" {
sqls[end] = sql2
end++
}
}
sqls = sqls[:end]

// replace ghost table name by real table name
targetTables := []*filter.Table{
{Schema: realSchema, Name: realTable},
Expand Down
26 changes: 11 additions & 15 deletions syncer/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package syncer
import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/pingcap/dm/dm/config"
Expand All @@ -39,11 +38,11 @@ var OnlineDDLSchemes = map[string]func(*tcontext.Context, *config.SubTaskConfig)

// OnlinePlugin handles online ddl solutions like pt, gh-ost.
type OnlinePlugin interface {
// Applys does:
// Apply does:
// * detect online ddl
// * record changes
// * apply online ddl on real table
// returns sqls, replaced/self schema, repliaced/slef table, error
// returns sqls, replaced/self schema, replaced/self table, error
Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
// Finish would delete online ddl from memory and storage
Finish(tctx *tcontext.Context, schema, table string) error
Expand All @@ -54,6 +53,7 @@ type OnlinePlugin interface {
// ResetConn reset db connection
ResetConn(tctx *tcontext.Context) error
// Clear clears all online information
// TODO: not used now, check if we could remove it later
Clear(tctx *tcontext.Context) error
// Close closes online ddl plugin
Close()
Expand Down Expand Up @@ -132,8 +132,8 @@ func (s *OnlineDDLStorage) Load(tctx *tcontext.Context) error {
s.Lock()
defer s.Unlock()

query := fmt.Sprintf("SELECT `ghost_schema`, `ghost_table`, `ddls` FROM %s WHERE `id`='%s'", s.tableName, s.id)
rows, err := s.dbConn.querySQL(tctx, query)
query := fmt.Sprintf("SELECT `ghost_schema`, `ghost_table`, `ddls` FROM %s WHERE `id`= ?", s.tableName)
rows, err := s.dbConn.querySQL(tctx, query, s.id)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down Expand Up @@ -218,8 +218,8 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable,
return terror.ErrSyncerUnitOnlineDDLInvalidMeta.Delegate(err)
}

query := fmt.Sprintf("REPLACE INTO %s(`id`,`ghost_schema`, `ghost_table`, `ddls`) VALUES ('%s', '%s', '%s', '%s')", s.tableName, s.id, ghostSchema, ghostTable, escapeSingleQuote(string(ddlsBytes)))
_, err = s.dbConn.executeSQL(tctx, []string{query})
query := fmt.Sprintf("REPLACE INTO %s(`id`,`ghost_schema`, `ghost_table`, `ddls`) VALUES (?, ?, ?, ?)", s.tableName)
_, err = s.dbConn.executeSQL(tctx, []string{query}, []interface{}{s.id, ghostSchema, ghostTable, string(ddlsBytes)})
return terror.WithScope(err, terror.ScopeDownstream)
}

Expand All @@ -234,8 +234,8 @@ func (s *OnlineDDLStorage) Delete(tctx *tcontext.Context, ghostSchema, ghostTabl
}

// delete all checkpoints
sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s' and `ghost_schema` = '%s' and `ghost_table` = '%s'", s.tableName, s.id, ghostSchema, ghostTable)
_, err := s.dbConn.executeSQL(tctx, []string{sql})
sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = ? and `ghost_schema` = ? and `ghost_table` = ?", s.tableName)
_, err := s.dbConn.executeSQL(tctx, []string{sql}, []interface{}{s.id, ghostSchema, ghostTable})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand All @@ -250,8 +250,8 @@ func (s *OnlineDDLStorage) Clear(tctx *tcontext.Context) error {
defer s.Unlock()

// delete all checkpoints
sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s'", s.tableName, s.id)
_, err := s.dbConn.executeSQL(tctx, []string{sql})
sql := fmt.Sprintf("DELETE FROM %s WHERE `id` = ?", s.tableName)
_, err := s.dbConn.executeSQL(tctx, []string{sql}, []interface{}{s.id})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down Expand Up @@ -299,7 +299,3 @@ func (s *OnlineDDLStorage) createTable(tctx *tcontext.Context) error {
_, err := s.dbConn.executeSQL(tctx, []string{sql})
return terror.WithScope(err, terror.ScopeDownstream)
}

func escapeSingleQuote(str string) string {
return strings.ReplaceAll(str, "'", "''")
}
22 changes: 22 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/schema"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/streamer"
Expand Down Expand Up @@ -1751,6 +1752,27 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds())
ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", originSQL), zap.String("schema", usedSchema))
*ec.lastLocation = *ec.currentLocation // before record skip location, update lastLocation

// we try to insert an empty SQL to s.onlineDDL, because ignoring it will cause a "not found" error
if s.onlineDDL == nil {
return s.recordSkipSQLsLocation(*ec.lastLocation)
}

stmts, err2 := parserpkg.Parse(parser2, originSQL, "", "")
if err2 != nil {
ec.tctx.L().Info("failed to parse a filtered SQL for online DDL", zap.String("SQL", originSQL))
}
// if err2 != nil, stmts should be nil so below for-loop is skipped
for _, stmt := range stmts {
if _, ok := stmt.(ast.DDLNode); ok {
tableNames, err3 := parserpkg.FetchDDLTableNames(usedSchema, stmt)
if err3 != nil {
continue
}
// nolint:errcheck
s.onlineDDL.Apply(ec.tctx, tableNames, "", stmt)
}
}
return s.recordSkipSQLsLocation(*ec.lastLocation)
}
if !parseResult.isDDL {
Expand Down
3 changes: 2 additions & 1 deletion tests/dmctl_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ function run() {
"\"result\": true" 3 \
"\"source\": \"$SOURCE_ID1\"" 1 \
"\"source\": \"$SOURCE_ID2\"" 1 \
"\"stage\": \"Running\"" 4
"\"stage\": \"Running\"" 4 \
"\"synced\": true" 2
# update_task_not_paused $TASK_CONF

# stop relay because get_config_to_file will stop source
Expand Down
10 changes: 10 additions & 0 deletions tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mysql-instances:
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
filter-rules: ["filter-rule-index"]

- source-id: "mysql-replica-02"
meta:
Expand All @@ -32,6 +33,7 @@ mysql-instances:
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
filter-rules: ["filter-rule-index"]

block-allow-list:
instance:
Expand Down Expand Up @@ -68,6 +70,14 @@ column-mappings:
target-column: "id"
arguments: ["2", "", "t"]

filters:
filter-rule-index:
schema-pattern: "*"
table-pattern: "*"
events: ["create index","drop index"]
sql-pattern: ["ALTER\\s.*TABLE[\\s\\S]*ADD\\s+KEY\\s+NAME", "ALTER\\s.*TABLE[\\s\\S]*DROP\\s+KEY"]
action: Ignore

mydumpers:
global:
threads: 4
Expand Down
2 changes: 2 additions & 0 deletions tests/online_ddl/data/db1.increment2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ insert into t1 (uid, name, info) values (10006, 'name of 10006', '{"age": 10006}
insert into t2 (uid, name, info) values (20008, 'name of 20008', '{"age": 20008}');
alter table t1 add column address varchar(255);
alter table t2 add column address varchar(255);
alter table t1 add key address (address);
alter table t2 add key address (address);
insert into t2 (uid, name, info, address) values (20009, 'name of 20009', '{"age": 20009}', 'address of 20009');
insert into t1 (uid, name, info, address) values (10007, 'name of 10007', '{"age": 10007}', 'address of 10007');
alter table t2 drop column age;
Expand Down
2 changes: 2 additions & 0 deletions tests/online_ddl/data/db2.increment2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ insert into t3 (uid, name, info) values (30004, 'name of 30004', '{"age": 30004}
insert into t2 (uid, name, info) values (50002, 'name of 50002', '{"age": 50002}');
alter table t3 add column address varchar(255);
alter table t2 add column address varchar(255);
alter table t2 add key address (address);
alter table t3 add key address (address);
insert into t2 (uid, name, info, address) values (50003, 'name of 50003', '{"age": 50003}', 'address of 50003');
insert into t3 (uid, name, info, address) values (30005, 'name of 30005', '{"age": 30005}', 'address of 30005');
alter table t2 drop column age;
Expand Down
5 changes: 5 additions & 0 deletions tests/online_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ function real_run() {
run_sql_file_online_ddl $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 online_ddl $online_ddl_scheme
run_sql_file_online_ddl $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 online_ddl $online_ddl_scheme

# manually create index to pass check_sync_diff
run_sql_tidb "show create table online_ddl.t_target"
check_not_contains "KEY \`name\`"
run_sql_tidb "alter table online_ddl.t_target add key name (name)"

echo "use sync_diff_inspector to check increment data"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

Expand Down
1 change: 1 addition & 0 deletions tests/sequence_sharding_removemeta/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ function run() {
"show-ddl-locks" \
"\"ID\": \"$lock_id\"" 1 \
"$ddl" 1
# check after we changed the behaviour that DM-worker queries master status only once, masterBinlog is not empty
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $TEST_NAME" \
"this DM-worker doesn't receive any shard DDL of this group" 0 \
Expand Down

0 comments on commit fcf9adf

Please sign in to comment.