Skip to content

Commit

Permalink
schemastore: fix exchange partition and add more unit tests (#728)
Browse files Browse the repository at this point in the history
* wip

* wip

* wip

* wip

* add test for rename table

* add more unit test

* fix
  • Loading branch information
lidezhu authored Dec 25, 2024
1 parent 0e786ec commit 21e1b13
Show file tree
Hide file tree
Showing 3 changed files with 703 additions and 375 deletions.
21 changes: 15 additions & 6 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, storage kv.St
p.gcTs = gcTs
p.upperBound = UpperBoundMeta{
FinishedDDLTs: 0,
SchemaVersion: 0,
ResolvedTs: gcTs,
}
writeUpperBoundMeta(p.db, p.upperBound)
Expand Down Expand Up @@ -1142,14 +1141,23 @@ func updateDatabaseInfoAndTableInfo(
delete(databaseInfo.Tables, tableID)
}

createTable := func(schemaID int64, tableID int64) {
createTable := func(schemaID int64, tableID int64, tableName string) {
addTableToDB(schemaID, tableID)
tableMap[tableID] = &BasicTableInfo{
SchemaID: schemaID,
Name: event.TableInfo.Name.O,
Name: tableName,
}
}

getTableName := func(tableID int64) string {
tableInfo, ok := tableMap[tableID]
if !ok {
log.Panic("table not found",
zap.Int64("tableID", tableID))
}
return tableInfo.Name
}

dropTable := func(schemaID int64, tableID int64) {
removeTableFromDB(schemaID, tableID)
delete(tableMap, tableID)
Expand All @@ -1169,7 +1177,7 @@ func updateDatabaseInfoAndTableInfo(
}
delete(databaseMap, event.CurrentSchemaID)
case model.ActionCreateTable, model.ActionRecoverTable:
createTable(event.CurrentSchemaID, event.CurrentTableID)
createTable(event.CurrentSchemaID, event.CurrentTableID, event.TableInfo.Name.O)
if isPartitionTable(event.TableInfo) {
partitionInfo := make(BasicPartitionInfo)
for _, id := range getAllPartitionIDs(event.TableInfo) {
Expand All @@ -1191,7 +1199,7 @@ func updateDatabaseInfoAndTableInfo(
// ignore
case model.ActionTruncateTable:
dropTable(event.CurrentSchemaID, event.PrevTableID)
createTable(event.CurrentSchemaID, event.CurrentTableID)
createTable(event.CurrentSchemaID, event.CurrentTableID, event.TableInfo.Name.O)
if isPartitionTable(event.TableInfo) {
delete(partitionMap, event.PrevTableID)
partitionInfo := make(BasicPartitionInfo)
Expand Down Expand Up @@ -1245,8 +1253,9 @@ func updateDatabaseInfoAndTableInfo(
zap.Int64s("droppedIDs", droppedIDs))
}
targetPartitionID := droppedIDs[0]
normalTableName := getTableName(event.PrevTableID)
dropTable(event.PrevSchemaID, event.PrevTableID)
createTable(event.PrevSchemaID, targetPartitionID)
createTable(event.PrevSchemaID, targetPartitionID, normalTableName)
delete(partitionMap[event.CurrentTableID], targetPartitionID)
partitionMap[event.CurrentTableID][event.PrevTableID] = nil
case model.ActionCreateTables:
Expand Down
Loading

0 comments on commit 21e1b13

Please sign in to comment.