From abdd9389a79bb46cfc9980148451b0e4a66d8b2d Mon Sep 17 00:00:00 2001 From: HEXINGZE <2046084122@qq.com> Date: Tue, 3 Dec 2024 00:50:39 +0800 Subject: [PATCH 1/6] bugfix #704 --- pkg/datasource/sql/types/image.go | 11 +- ...sertonduplicate_update_undo_log_builder.go | 231 +++++++++++++----- 2 files changed, 177 insertions(+), 65 deletions(-) diff --git a/pkg/datasource/sql/types/image.go b/pkg/datasource/sql/types/image.go index f755d9c91..77108fae2 100644 --- a/pkg/datasource/sql/types/image.go +++ b/pkg/datasource/sql/types/image.go @@ -18,6 +18,7 @@ package types import ( + "database/sql/driver" "encoding/base64" "encoding/json" "reflect" @@ -117,14 +118,16 @@ type RecordImage struct { // Rows data row Rows []RowImage `json:"rows"` // TableMeta table information schema - TableMeta *TableMeta `json:"-"` + TableMeta *TableMeta `json:"-"` + PrimaryKeyMap map[string][]driver.Value `json:"primaryKeyMap,omitempty"` } func NewEmptyRecordImage(tableMeta *TableMeta, sqlType SQLType) *RecordImage { return &RecordImage{ - TableName: tableMeta.TableName, - TableMeta: tableMeta, - SQLType: sqlType, + TableName: tableMeta.TableName, + TableMeta: tableMeta, + SQLType: sqlType, + PrimaryKeyMap: make(map[string][]driver.Value), } } diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go index 4a23dedda..5da3d35e7 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go @@ -97,68 +97,136 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a if err := checkDuplicateKeyUpdate(insertStmt, metaData); err != nil { return "", nil, err } - var selectArgs []driver.Value + + // Reset primary keys map + u.BeforeImageSqlPrimaryKeys = make(map[string]bool) + pkIndexMap := u.getPkIndex(insertStmt, metaData) var pkIndexArray []int for _, val := range pkIndexMap { - tmpVal := val - pkIndexArray = append(pkIndexArray, tmpVal) + pkIndexArray = append(pkIndexArray, val) } insertRows, err := getInsertRows(insertStmt, pkIndexArray) if err != nil { return "", nil, err } - insertNum := len(insertRows) + paramMap, err := u.buildImageParameters(insertStmt, args, insertRows) if err != nil { return "", nil, err } - sql := strings.Builder{} - sql.WriteString("SELECT * FROM " + metaData.TableName + " ") + // 如果没有参数或没有主键索引,直接返回空 + if len(paramMap) == 0 || len(metaData.Indexs) == 0 { + return "", nil, nil + } + + // 检查是否有主键 + hasPK := false + for _, index := range metaData.Indexs { + if strings.EqualFold("PRIMARY", index.Name) { + hasPK = true + break + } + } + if !hasPK { + return "", nil, nil + } + + var sql strings.Builder + sql.WriteString("SELECT * FROM " + metaData.TableName + " ") + + var selectArgs []driver.Value isContainWhere := false - for i := 0; i < insertNum; i++ { - finalI := i - paramAppenderTempList := make([]driver.Value, 0) + hasConditions := false + + for i := 0; i < len(insertRows); i++ { + var rowConditions []string + var rowArgs []driver.Value + usedParams := make(map[string]bool) + + // First try unique indexes for _, index := range metaData.Indexs { - //unique index - if index.NonUnique || isIndexValueNotNull(index, paramMap, finalI) == false { + if index.NonUnique || strings.EqualFold("PRIMARY", index.Name) { continue } - columnIsNull := true - uniqueList := make([]string, 0) - for _, columnMeta := range index.Columns { - columnName := columnMeta.ColumnName - imageParameters, ok := paramMap[columnName] - if !ok && columnMeta.ColumnDef != nil { - if strings.EqualFold("PRIMARY", index.Name) { - u.BeforeImageSqlPrimaryKeys[columnName] = true - } - uniqueList = append(uniqueList, columnName+" = DEFAULT("+columnName+") ") - columnIsNull = false - continue + + if !isIndexValueNotNull(index, paramMap, i) { + continue + } + + var indexConditions []string + var indexArgs []driver.Value + allColumnsPresent := true + for _, colMeta := range index.Columns { + columnName := colMeta.ColumnName + if params, ok := paramMap[columnName]; ok && len(params) > i && params[i] != nil { + indexConditions = append(indexConditions, columnName+" = ? ") + indexArgs = append(indexArgs, params[i]) + usedParams[columnName] = true + } else if colMeta.ColumnDef != nil { + indexConditions = append(indexConditions, columnName+" = DEFAULT("+columnName+")") + } else { + allColumnsPresent = false + break } - if strings.EqualFold("PRIMARY", index.Name) { - u.BeforeImageSqlPrimaryKeys[columnName] = true + } + + if allColumnsPresent && len(indexConditions) > 0 { + rowConditions = append(rowConditions, "("+strings.Join(indexConditions, " and ")+")") + rowArgs = append(rowArgs, indexArgs...) + hasConditions = true + } + } + + // Then try primary key + for _, index := range metaData.Indexs { + if !strings.EqualFold("PRIMARY", index.Name) { + continue + } + + var pkConditions []string + var pkArgs []driver.Value + for _, colMeta := range index.Columns { + columnName := colMeta.ColumnName + u.BeforeImageSqlPrimaryKeys[columnName] = true + if params, ok := paramMap[columnName]; ok && len(params) > i && params[i] != nil && !usedParams[columnName] { + pkConditions = append(pkConditions, columnName+" = ? ") + pkArgs = append(pkArgs, params[i]) } - columnIsNull = false - uniqueList = append(uniqueList, columnName+" = ? ") - paramAppenderTempList = append(paramAppenderTempList, imageParameters[finalI]) } - if !columnIsNull { - if isContainWhere { - sql.WriteString(" OR (" + strings.Join(uniqueList, " and ") + ") ") - } else { - sql.WriteString(" WHERE (" + strings.Join(uniqueList, " and ") + ") ") - isContainWhere = true + if len(pkConditions) > 0 { + rowConditions = append(rowConditions, "("+strings.Join(pkConditions, " and ")+")") + rowArgs = append(rowArgs, pkArgs...) + hasConditions = true + } + } + + if len(rowConditions) > 0 { + if !isContainWhere { + sql.WriteString("WHERE ") + isContainWhere = true + } else { + sql.WriteString(" OR ") + } + for j, condition := range rowConditions { + if j > 0 { + sql.WriteString(" OR ") } + sql.WriteString(condition + " ") } + selectArgs = append(selectArgs, rowArgs...) } - selectArgs = append(selectArgs, paramAppenderTempList...) } - log.Infof("build select sql by insert on duplicate sourceQuery, sql {}", sql.String()) - return sql.String(), selectArgs, nil + + if !hasConditions { + return "", nil, nil + } + + sqlStr := sql.String() + log.Infof("build select sql by insert on duplicate sourceQuery, sql: %s", sqlStr) + return sqlStr, selectArgs, nil } func (u *MySQLInsertOnDuplicateUndoLogBuilder) AfterImage(ctx context.Context, execCtx *types.ExecContext, beforeImages []*types.RecordImage) ([]*types.RecordImage, error) { @@ -168,18 +236,22 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) AfterImage(ctx context.Context, e log.Errorf("build prepare stmt: %+v", err) return nil, err } + defer stmt.Close() + + tableName := execCtx.ParseContext.InsertStmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O + metaData := execCtx.MetaDataMap[tableName] rows, err := stmt.Query(selectArgs) if err != nil { - log.Errorf("stmt query: %+v", err) return nil, err } - tableName := execCtx.ParseContext.InsertStmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O - metaData := execCtx.MetaDataMap[tableName] + defer rows.Close() + image, err := u.buildRecordImages(rows, &metaData) if err != nil { return nil, err } + return []*types.RecordImage{image}, nil } @@ -190,6 +262,13 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Co if len(beforeImages) > 0 { beforeImage = beforeImages[0] } + + // 如果没有before image,直接返回原始SQL和参数 + if beforeImage == nil || len(beforeImage.Rows) == 0 { + return selectSQL, selectArgs + } + + // 收集主键值 primaryValueMap := make(map[string][]interface{}) for _, row := range beforeImage.Rows { for _, col := range row.Columns { @@ -200,23 +279,53 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Co } var afterImageSql strings.Builder - var primaryValues []driver.Value afterImageSql.WriteString(selectSQL) - for i := 0; i < len(beforeImage.Rows); i++ { - wherePrimaryList := make([]string, 0) - for name, value := range primaryValueMap { - if !u.BeforeImageSqlPrimaryKeys[name] { - wherePrimaryList = append(wherePrimaryList, name+" = ? ") - primaryValues = append(primaryValues, value[i]) + + // 如果原始SQL已经包含了所有需要的条件,直接返回 + if len(primaryValueMap) == 0 || len(selectArgs) == len(beforeImage.Rows)*len(primaryValueMap) { + return selectSQL, selectArgs + } + + // 添加主键条件 + var primaryValues []driver.Value + usedPrimaryKeys := make(map[string]bool) + + for name := range primaryValueMap { + if !u.BeforeImageSqlPrimaryKeys[name] { + usedPrimaryKeys[name] = true + for i := 0; i < len(beforeImage.Rows); i++ { + if value := primaryValueMap[name][i]; value != nil { + if dv, ok := value.(driver.Value); ok { + primaryValues = append(primaryValues, dv) + } else { + primaryValues = append(primaryValues, value) + } + } } } - if len(wherePrimaryList) != 0 { - afterImageSql.WriteString(" OR (" + strings.Join(wherePrimaryList, " and ") + ") ") + } + + if len(primaryValues) > 0 { + afterImageSql.WriteString(" OR (" + strings.Join(u.buildPrimaryKeyConditions(primaryValueMap, usedPrimaryKeys), " and ") + ") ") + } + + finalArgs := make([]driver.Value, len(selectArgs)+len(primaryValues)) + copy(finalArgs, selectArgs) + copy(finalArgs[len(selectArgs):], primaryValues) + + sqlStr := afterImageSql.String() + log.Infof("build after select sql by insert on duplicate sourceQuery, sql %s", sqlStr) + return sqlStr, finalArgs +} + +func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildPrimaryKeyConditions(primaryValueMap map[string][]interface{}, usedPrimaryKeys map[string]bool) []string { + var conditions []string + for name := range primaryValueMap { + if !usedPrimaryKeys[name] { + conditions = append(conditions, name+" = ? ") } } - selectArgs = append(selectArgs, primaryValues...) - log.Infof("build after select sql by insert on duplicate sourceQuery, sql {}", afterImageSql.String()) - return afterImageSql.String(), selectArgs + return conditions } func checkDuplicateKeyUpdate(insert *ast.InsertStmt, metaData types.TableMeta) error { @@ -243,11 +352,10 @@ func checkDuplicateKeyUpdate(insert *ast.InsertStmt, metaData types.TableMeta) e // build sql params func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildImageParameters(insert *ast.InsertStmt, args []driver.Value, insertRows [][]interface{}) (map[string][]driver.Value, error) { - var ( - parameterMap = make(map[string][]driver.Value) - ) + parameterMap := make(map[string][]driver.Value) insertColumns := getInsertColumns(insert) - var placeHolderIndex = 0 + placeHolderIndex := 0 + for _, row := range insertRows { if len(row) != len(insertColumns) { log.Errorf("insert row's column size not equal to insert column size") @@ -256,13 +364,14 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildImageParameters(insert *ast. for i, col := range insertColumns { columnName := executor.DelEscape(col, types.DBTypeMySQL) val := row[i] - rStr, ok := val.(string) - if ok && strings.EqualFold(rStr, SqlPlaceholder) { - objects := args[placeHolderIndex] - parameterMap[columnName] = append(parameterMap[col], objects) + if str, ok := val.(string); ok && strings.EqualFold(str, SqlPlaceholder) { + if placeHolderIndex >= len(args) { + return nil, fmt.Errorf("not enough parameters for placeholders") + } + parameterMap[columnName] = append(parameterMap[columnName], args[placeHolderIndex]) placeHolderIndex++ } else { - parameterMap[columnName] = append(parameterMap[col], val) + parameterMap[columnName] = append(parameterMap[columnName], val) } } } From 4debbde8bfff2baf2e522c16b1a5a98ec65104ab Mon Sep 17 00:00:00 2001 From: HEXINGZE <2046084122@qq.com> Date: Tue, 3 Dec 2024 01:00:28 +0800 Subject: [PATCH 2/6] bugfix #704 --- ...sertonduplicate_update_undo_log_builder.go | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go index 5da3d35e7..66b03750d 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go @@ -110,18 +110,13 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a if err != nil { return "", nil, err } - paramMap, err := u.buildImageParameters(insertStmt, args, insertRows) if err != nil { return "", nil, err } - - // 如果没有参数或没有主键索引,直接返回空 if len(paramMap) == 0 || len(metaData.Indexs) == 0 { return "", nil, nil } - - // 检查是否有主键 hasPK := false for _, index := range metaData.Indexs { if strings.EqualFold("PRIMARY", index.Name) { @@ -132,29 +127,24 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a if !hasPK { return "", nil, nil } - var sql strings.Builder sql.WriteString("SELECT * FROM " + metaData.TableName + " ") var selectArgs []driver.Value isContainWhere := false hasConditions := false - for i := 0; i < len(insertRows); i++ { var rowConditions []string var rowArgs []driver.Value usedParams := make(map[string]bool) - // First try unique indexes for _, index := range metaData.Indexs { if index.NonUnique || strings.EqualFold("PRIMARY", index.Name) { continue } - if !isIndexValueNotNull(index, paramMap, i) { continue } - var indexConditions []string var indexArgs []driver.Value allColumnsPresent := true @@ -171,20 +161,17 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a break } } - if allColumnsPresent && len(indexConditions) > 0 { rowConditions = append(rowConditions, "("+strings.Join(indexConditions, " and ")+")") rowArgs = append(rowArgs, indexArgs...) hasConditions = true } } - // Then try primary key for _, index := range metaData.Indexs { if !strings.EqualFold("PRIMARY", index.Name) { continue } - var pkConditions []string var pkArgs []driver.Value for _, colMeta := range index.Columns { @@ -195,14 +182,12 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a pkArgs = append(pkArgs, params[i]) } } - if len(pkConditions) > 0 { rowConditions = append(rowConditions, "("+strings.Join(pkConditions, " and ")+")") rowArgs = append(rowArgs, pkArgs...) hasConditions = true } } - if len(rowConditions) > 0 { if !isContainWhere { sql.WriteString("WHERE ") @@ -219,11 +204,9 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a selectArgs = append(selectArgs, rowArgs...) } } - if !hasConditions { return "", nil, nil } - sqlStr := sql.String() log.Infof("build select sql by insert on duplicate sourceQuery, sql: %s", sqlStr) return sqlStr, selectArgs, nil @@ -237,38 +220,29 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) AfterImage(ctx context.Context, e return nil, err } defer stmt.Close() - tableName := execCtx.ParseContext.InsertStmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O metaData := execCtx.MetaDataMap[tableName] - rows, err := stmt.Query(selectArgs) if err != nil { return nil, err } defer rows.Close() - image, err := u.buildRecordImages(rows, &metaData) if err != nil { return nil, err } - return []*types.RecordImage{image}, nil } func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Context, beforeImages []*types.RecordImage) (string, []driver.Value) { selectSQL, selectArgs := u.BeforeSelectSql, u.Args - var beforeImage *types.RecordImage if len(beforeImages) > 0 { beforeImage = beforeImages[0] } - - // 如果没有before image,直接返回原始SQL和参数 if beforeImage == nil || len(beforeImage.Rows) == 0 { return selectSQL, selectArgs } - - // 收集主键值 primaryValueMap := make(map[string][]interface{}) for _, row := range beforeImage.Rows { for _, col := range row.Columns { @@ -277,19 +251,13 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Co } } } - var afterImageSql strings.Builder afterImageSql.WriteString(selectSQL) - - // 如果原始SQL已经包含了所有需要的条件,直接返回 if len(primaryValueMap) == 0 || len(selectArgs) == len(beforeImage.Rows)*len(primaryValueMap) { return selectSQL, selectArgs } - - // 添加主键条件 var primaryValues []driver.Value usedPrimaryKeys := make(map[string]bool) - for name := range primaryValueMap { if !u.BeforeImageSqlPrimaryKeys[name] { usedPrimaryKeys[name] = true @@ -304,15 +272,12 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Co } } } - if len(primaryValues) > 0 { afterImageSql.WriteString(" OR (" + strings.Join(u.buildPrimaryKeyConditions(primaryValueMap, usedPrimaryKeys), " and ") + ") ") } - finalArgs := make([]driver.Value, len(selectArgs)+len(primaryValues)) copy(finalArgs, selectArgs) copy(finalArgs[len(selectArgs):], primaryValues) - sqlStr := afterImageSql.String() log.Infof("build after select sql by insert on duplicate sourceQuery, sql %s", sqlStr) return sqlStr, finalArgs From 56f26454fc3bc0cb1afeda4368166621f86bb5b1 Mon Sep 17 00:00:00 2001 From: HEXINGZE <2046084122@qq.com> Date: Sat, 7 Dec 2024 00:10:30 +0800 Subject: [PATCH 3/6] bugfix704-2 --- ...sertonduplicate_update_undo_log_builder.go | 9 +++++---- ...nduplicate_update_undo_log_builder_test.go | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go index 66b03750d..ff7bcde89 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go @@ -99,7 +99,7 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a } // Reset primary keys map - u.BeforeImageSqlPrimaryKeys = make(map[string]bool) + u.BeforeImageSqlPrimaryKeys = make(map[string]bool, len(metaData.Indexs)) pkIndexMap := u.getPkIndex(insertStmt, metaData) var pkIndexArray []int @@ -134,9 +134,9 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a isContainWhere := false hasConditions := false for i := 0; i < len(insertRows); i++ { - var rowConditions []string - var rowArgs []driver.Value - usedParams := make(map[string]bool) + var rowConditions = make([]string, 0, cap(insertRows[i])) + var rowArgs = make([]driver.Value, 0, cap(insertRows[i])) + usedParams := make(map[string]bool, len(paramMap)) // First try unique indexes for _, index := range metaData.Indexs { if index.NonUnique || strings.EqualFold("PRIMARY", index.Name) { @@ -210,6 +210,7 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a sqlStr := sql.String() log.Infof("build select sql by insert on duplicate sourceQuery, sql: %s", sqlStr) return sqlStr, selectArgs, nil + } func (u *MySQLInsertOnDuplicateUndoLogBuilder) AfterImage(ctx context.Context, execCtx *types.ExecContext, beforeImages []*types.RecordImage) ([]*types.RecordImage, error) { diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go index 03e028fe3..13aa47fdd 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go @@ -143,6 +143,26 @@ func TestInsertOnDuplicateBuildBeforeImageSQL(t *testing.T) { expectQuery1: "SELECT * FROM t_user WHERE (name = ? and age = ? ) OR (name = ? and age = ? ) ", expectQueryArgs1: []driver.Value{"Jack1", 81, "Michal", int64(35)}, }, + // Test case for null unique index + { + execCtx: &types.ExecContext{ + Query: "insert into t_unique(id, a, b) values(1, NULL, 2) on duplicate key update b = 5", + MetaDataMap: map[string]types.TableMeta{"t_unique": tableMeta1}, + }, + sourceQueryArgs: []driver.Value{1, nil, 2}, + expectQuery1: "SELECT * FROM t_unique WHERE (id = ? ) ", + expectQueryArgs1: []driver.Value{1}, + }, + // Test case for null primary key + { + execCtx: &types.ExecContext{ + Query: "insert into t_unique(id, b) values(NULL, 2) on duplicate key update b = 5", + MetaDataMap: map[string]types.TableMeta{"t_unique": tableMeta1}, + }, + sourceQueryArgs: []driver.Value{nil, 2}, + expectQuery1: "SELECT * FROM t_unique WHERE (b = ? ) ", + expectQueryArgs1: []driver.Value{2}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 321906e8324435c94b0f8c01a8dc5e84c68d42f9 Mon Sep 17 00:00:00 2001 From: HEXINGZE <2046084122@qq.com> Date: Sat, 7 Dec 2024 21:01:15 +0800 Subject: [PATCH 4/6] bugfix-test-2 --- ...nduplicate_update_undo_log_builder_test.go | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go index 13aa47fdd..b49ed912f 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go @@ -146,23 +146,33 @@ func TestInsertOnDuplicateBuildBeforeImageSQL(t *testing.T) { // Test case for null unique index { execCtx: &types.ExecContext{ - Query: "insert into t_unique(id, a, b) values(1, NULL, 2) on duplicate key update b = 5", - MetaDataMap: map[string]types.TableMeta{"t_unique": tableMeta1}, + Query: "insert into t_user(id, name, age) values(?, ?, ?) on duplicate key update age = ?", + MetaDataMap: map[string]types.TableMeta{"t_user": tableMeta1}, }, - sourceQueryArgs: []driver.Value{1, nil, 2}, - expectQuery1: "SELECT * FROM t_unique WHERE (id = ? ) ", + sourceQueryArgs: []driver.Value{1, nil, 2, 5}, + expectQuery1: "SELECT * FROM t_user WHERE (id = ? ) ", expectQueryArgs1: []driver.Value{1}, }, // Test case for null primary key { execCtx: &types.ExecContext{ - Query: "insert into t_unique(id, b) values(NULL, 2) on duplicate key update b = 5", - MetaDataMap: map[string]types.TableMeta{"t_unique": tableMeta1}, + Query: "insert into t_user(id, age) values(?, ?) on duplicate key update age = ?", + MetaDataMap: map[string]types.TableMeta{"t_user": tableMeta1}, }, - sourceQueryArgs: []driver.Value{nil, 2}, - expectQuery1: "SELECT * FROM t_unique WHERE (b = ? ) ", + sourceQueryArgs: []driver.Value{nil, 2, 5}, + expectQuery1: "SELECT * FROM t_user WHERE (age = ? )", expectQueryArgs1: []driver.Value{2}, }, + // Test case for null unique index with no primary key + { + execCtx: &types.ExecContext{ + Query: "insert into t_user(name, age) values(?, ?) on duplicate key update age = ?", + MetaDataMap: map[string]types.TableMeta{"t_user": tableMeta2}, + }, + sourceQueryArgs: []driver.Value{nil, 2, 5}, + expectQuery1: "", + expectQueryArgs1: nil, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -170,6 +180,7 @@ func TestInsertOnDuplicateBuildBeforeImageSQL(t *testing.T) { assert.Nil(t, err) tt.execCtx.ParseContext = c query, args, err := builder.buildBeforeImageSQL(tt.execCtx.ParseContext.InsertStmt, tt.execCtx.MetaDataMap["t_user"], tt.sourceQueryArgs) + t.Logf("Query: %s, Args: %v", query, args) // 添加调试日志 assert.Nil(t, err) if query == tt.expectQuery1 { assert.Equal(t, tt.expectQuery1, query) From b60e020cff5996a128f181a7632db29e4a8968a2 Mon Sep 17 00:00:00 2001 From: HEXINGZE <2046084122@qq.com> Date: Sat, 7 Dec 2024 21:02:41 +0800 Subject: [PATCH 5/6] bugfix-test-2 --- .../mysql_insertonduplicate_update_undo_log_builder_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go index b49ed912f..07f5e87bd 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go @@ -180,7 +180,6 @@ func TestInsertOnDuplicateBuildBeforeImageSQL(t *testing.T) { assert.Nil(t, err) tt.execCtx.ParseContext = c query, args, err := builder.buildBeforeImageSQL(tt.execCtx.ParseContext.InsertStmt, tt.execCtx.MetaDataMap["t_user"], tt.sourceQueryArgs) - t.Logf("Query: %s, Args: %v", query, args) // 添加调试日志 assert.Nil(t, err) if query == tt.expectQuery1 { assert.Equal(t, tt.expectQuery1, query) From 5675b29a52e066a1e86aa80d54274df81ff7cf08 Mon Sep 17 00:00:00 2001 From: HEXINGZE <2046084122@qq.com> Date: Sun, 22 Dec 2024 01:59:42 +0800 Subject: [PATCH 6/6] pr725 bugfix --- ...sertonduplicate_update_undo_log_builder.go | 115 ++++++++++-------- ...nduplicate_update_undo_log_builder_test.go | 33 +++++ 2 files changed, 97 insertions(+), 51 deletions(-) diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go index ff7bcde89..2758c9814 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder.go @@ -97,10 +97,7 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a if err := checkDuplicateKeyUpdate(insertStmt, metaData); err != nil { return "", nil, err } - - // Reset primary keys map u.BeforeImageSqlPrimaryKeys = make(map[string]bool, len(metaData.Indexs)) - pkIndexMap := u.getPkIndex(insertStmt, metaData) var pkIndexArray []int for _, val := range pkIndexMap { @@ -120,74 +117,71 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a hasPK := false for _, index := range metaData.Indexs { if strings.EqualFold("PRIMARY", index.Name) { - hasPK = true + allPKColumnsHaveValue := true + for _, col := range index.Columns { + if params, ok := paramMap[col.ColumnName]; !ok || len(params) == 0 || params[0] == nil { + allPKColumnsHaveValue = false + break + } + } + hasPK = allPKColumnsHaveValue break } } if !hasPK { - return "", nil, nil + hasValidUniqueIndex := false + for _, index := range metaData.Indexs { + if !index.NonUnique && !strings.EqualFold("PRIMARY", index.Name) { + if _, _, valid := validateIndexPrefix(index, paramMap, 0); valid { + hasValidUniqueIndex = true + break + } + } + } + if !hasValidUniqueIndex { + return "", nil, nil + } } var sql strings.Builder sql.WriteString("SELECT * FROM " + metaData.TableName + " ") - var selectArgs []driver.Value isContainWhere := false hasConditions := false for i := 0; i < len(insertRows); i++ { - var rowConditions = make([]string, 0, cap(insertRows[i])) - var rowArgs = make([]driver.Value, 0, cap(insertRows[i])) - usedParams := make(map[string]bool, len(paramMap)) + var rowConditions []string + var rowArgs []driver.Value + usedParams := make(map[string]bool) + // First try unique indexes for _, index := range metaData.Indexs { if index.NonUnique || strings.EqualFold("PRIMARY", index.Name) { continue } - if !isIndexValueNotNull(index, paramMap, i) { - continue - } - var indexConditions []string - var indexArgs []driver.Value - allColumnsPresent := true - for _, colMeta := range index.Columns { - columnName := colMeta.ColumnName - if params, ok := paramMap[columnName]; ok && len(params) > i && params[i] != nil { - indexConditions = append(indexConditions, columnName+" = ? ") - indexArgs = append(indexArgs, params[i]) - usedParams[columnName] = true - } else if colMeta.ColumnDef != nil { - indexConditions = append(indexConditions, columnName+" = DEFAULT("+columnName+")") - } else { - allColumnsPresent = false - break - } - } - if allColumnsPresent && len(indexConditions) > 0 { - rowConditions = append(rowConditions, "("+strings.Join(indexConditions, " and ")+")") - rowArgs = append(rowArgs, indexArgs...) + if conditions, args, valid := validateIndexPrefix(index, paramMap, i); valid { + rowConditions = append(rowConditions, "("+strings.Join(conditions, " and ")+")") + rowArgs = append(rowArgs, args...) hasConditions = true + for _, colMeta := range index.Columns { + usedParams[colMeta.ColumnName] = true + } } } + // Then try primary key for _, index := range metaData.Indexs { if !strings.EqualFold("PRIMARY", index.Name) { continue } - var pkConditions []string - var pkArgs []driver.Value - for _, colMeta := range index.Columns { - columnName := colMeta.ColumnName - u.BeforeImageSqlPrimaryKeys[columnName] = true - if params, ok := paramMap[columnName]; ok && len(params) > i && params[i] != nil && !usedParams[columnName] { - pkConditions = append(pkConditions, columnName+" = ? ") - pkArgs = append(pkArgs, params[i]) - } - } - if len(pkConditions) > 0 { - rowConditions = append(rowConditions, "("+strings.Join(pkConditions, " and ")+")") - rowArgs = append(rowArgs, pkArgs...) + if conditions, args, valid := validateIndexPrefix(index, paramMap, i); valid { + rowConditions = append(rowConditions, "("+strings.Join(conditions, " and ")+")") + rowArgs = append(rowArgs, args...) hasConditions = true + for _, colMeta := range index.Columns { + usedParams[colMeta.ColumnName] = true + } } } + if len(rowConditions) > 0 { if !isContainWhere { sql.WriteString("WHERE ") @@ -195,12 +189,7 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a } else { sql.WriteString(" OR ") } - for j, condition := range rowConditions { - if j > 0 { - sql.WriteString(" OR ") - } - sql.WriteString(condition + " ") - } + sql.WriteString(strings.Join(rowConditions, " OR ") + " ") selectArgs = append(selectArgs, rowArgs...) } } @@ -210,7 +199,6 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a sqlStr := sql.String() log.Infof("build select sql by insert on duplicate sourceQuery, sql: %s", sqlStr) return sqlStr, selectArgs, nil - } func (u *MySQLInsertOnDuplicateUndoLogBuilder) AfterImage(ctx context.Context, execCtx *types.ExecContext, beforeImages []*types.RecordImage) ([]*types.RecordImage, error) { @@ -371,3 +359,28 @@ func isIndexValueNotNull(indexMeta types.IndexMeta, imageParameterMap map[string } return true } + +func validateIndexPrefix(index types.IndexMeta, paramMap map[string][]driver.Value, rowIndex int) ([]string, []driver.Value, bool) { + var indexConditions []string + var indexArgs []driver.Value + if len(index.Columns) > 1 { + for _, colMeta := range index.Columns { + params, ok := paramMap[colMeta.ColumnName] + if !ok || len(params) <= rowIndex || params[rowIndex] == nil { + return nil, nil, false + } + } + } + for _, colMeta := range index.Columns { + columnName := colMeta.ColumnName + params, ok := paramMap[columnName] + if ok && len(params) > rowIndex && params[rowIndex] != nil { + indexConditions = append(indexConditions, columnName+" = ? ") + indexArgs = append(indexArgs, params[rowIndex]) + } + } + if len(indexConditions) != len(index.Columns) { + return nil, nil, false + } + return indexConditions, indexArgs, true +} diff --git a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go index 07f5e87bd..4e831f4cd 100644 --- a/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/mysql_insertonduplicate_update_undo_log_builder_test.go @@ -173,6 +173,39 @@ func TestInsertOnDuplicateBuildBeforeImageSQL(t *testing.T) { expectQuery1: "", expectQueryArgs1: nil, }, + // Test case for composite index with all columns + { + name: "composite_index_full", + execCtx: &types.ExecContext{ + Query: "insert into t_user(id, name, age) values(?,?,?) on duplicate key update other = ?", + MetaDataMap: map[string]types.TableMeta{"t_user": tableMeta1}, + }, + sourceQueryArgs: []driver.Value{1, "Jack", 25, "other"}, + expectQuery1: "SELECT * FROM t_user WHERE (name = ? and age = ? ) OR (id = ? ) ", + expectQueryArgs1: []driver.Value{"Jack", 25, 1}, + }, + // Test case for composite index with null value + { + name: "composite_index_with_null", + execCtx: &types.ExecContext{ + Query: "insert into t_user(id, name, age) values(?,?,?) on duplicate key update other = ?", + MetaDataMap: map[string]types.TableMeta{"t_user": tableMeta1}, + }, + sourceQueryArgs: []driver.Value{1, "Jack", nil, "other"}, + expectQuery1: "SELECT * FROM t_user WHERE (id = ? ) ", + expectQueryArgs1: []driver.Value{1}, + }, + // Test case for composite index with leftmost prefix only + { + name: "composite_index_leftmost_prefix", + execCtx: &types.ExecContext{ + Query: "insert into t_user(id, name) values(?,?) on duplicate key update other = ?", + MetaDataMap: map[string]types.TableMeta{"t_user": tableMeta1}, + }, + sourceQueryArgs: []driver.Value{1, "Jack", "other"}, + expectQuery1: "SELECT * FROM t_user WHERE (id = ? ) ", + expectQueryArgs1: []driver.Value{1}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {