Skip to content

Commit

Permalink
maintainer, scheduler: support dynamic merge and split (#698)
Browse files Browse the repository at this point in the history
* fix go mod

* add comments

* temp

* fix

* add merge split operator

* support merge

* try use merge split

* add GroupChecker interface

* finish checker inpkg/scheduler/replica

* update

* implement checker

* update

* add checker tests

* add merge test

* check hole

* complete TestDynamicMergeTableBasic

* fix panic

* decrease split threashold

* fix

* update

* fix

* fix

* fix

* fix ut
  • Loading branch information
CharlesCheung96 authored Dec 25, 2024
1 parent 21e1b13 commit 955b924
Show file tree
Hide file tree
Showing 29 changed files with 1,578 additions and 489 deletions.
6 changes: 5 additions & 1 deletion coordinator/changefeed/changefeed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ type ChangefeedDB struct {

func NewChangefeedDB(version int64) *ChangefeedDB {
db := &ChangefeedDB{
// id is the unique id of the changefeed db. The prefix `coordinator` distinguishes
// it from other ReplicationDB. The suffix is the version of the coordinator, which
// is useful to track the scheduling history.
id: fmt.Sprintf("coordinator-%d", version),
changefeeds: make(map[common.ChangeFeedID]*Changefeed),
changefeedDisplayNames: make(map[common.ChangeFeedDisplayName]common.ChangeFeedID),
stopped: make(map[common.ChangeFeedID]*Changefeed),
}
db.ReplicationDB = replica.NewReplicationDB[common.ChangeFeedID, *Changefeed](db.id, db.withRLock)
db.ReplicationDB = replica.NewReplicationDB[common.ChangeFeedID, *Changefeed](db.id,
db.withRLock, replica.NewEmptyChecker)
return db
}

Expand Down
4 changes: 2 additions & 2 deletions coordinator/changefeed/changefeed_db_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type Backend interface {
SetChangefeedProgress(ctx context.Context, id common.ChangeFeedID, progress config.Progress) error
// ResumeChangefeed persists the resumed status to db for a changefeed
ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64) error
// UpdateChangefeedCheckpointTs persists the checkpoints for changefeeds
UpdateChangefeedCheckpointTs(ctx context.Context, cps map[common.ChangeFeedID]uint64) error
// UpdateChangefeedCheckpointTs persists the checkpointTs for changefeeds
UpdateChangefeedCheckpointTs(ctx context.Context, checkpointTs map[common.ChangeFeedID]uint64) error
}

// ChangefeedMetaWrapper is a wrapper for the changefeed load from the DB
Expand Down
29 changes: 15 additions & 14 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,29 @@ import (
"go.uber.org/zap"
)

// Controller schedules and balance changefeeds
// there are 3 main components in the controller, scheduler, ChangefeedDB and operator controller
// Controller schedules and balance changefeeds, there are 3 main components:
// 1. scheduler: generate operators for handling different scheduling tasks.
// 2. operatorController: manage all operators and execute them periodically.
// 3. changefeedDB: store all changefeeds info and their status in memory.
// 4. backend: the durable storage for storing changefeed metadata.
type Controller struct {
bootstrapped *atomic.Bool
version int64

nodeChanged *atomic.Bool
version int64

cfScheduler *scheduler.Controller
scheduler *scheduler.Controller
operatorController *operator.Controller
changefeedDB *changefeed.ChangefeedDB
messageCenter messaging.MessageCenter
nodeManager *watcher.NodeManager
batchSize int
backend changefeed.Backend

bootstrapped *atomic.Bool
bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse]

nodeChanged *atomic.Bool
nodeManager *watcher.NodeManager

stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool
taskHandlers []*threadpool.TaskHandle
backend changefeed.Backend
messageCenter messaging.MessageCenter

updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed
stateChangedCh chan *ChangefeedStateChangeEvent
Expand Down Expand Up @@ -92,9 +94,8 @@ func NewController(
oc := operator.NewOperatorController(mc, selfNode, changefeedDB, backend, nodeManager, batchSize)
c := &Controller{
version: version,
batchSize: batchSize,
bootstrapped: atomic.NewBool(false),
cfScheduler: scheduler.NewController(map[string]scheduler.Scheduler{
scheduler: scheduler.NewController(map[string]scheduler.Scheduler{
scheduler.BasicScheduler: scheduler.NewBasicScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, oc.NewAddMaintainerOperator),
scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, balanceInterval, oc.NewMoveMaintainerOperator),
}),
Expand Down Expand Up @@ -358,7 +359,7 @@ func (c *Controller) FinishBootstrap(workingMap map[common.ChangeFeedID]remoteMa
}

// start operator and scheduler
c.taskHandlers = append(c.taskHandlers, c.cfScheduler.Start(c.taskScheduler)...)
c.taskHandlers = append(c.taskHandlers, c.scheduler.Start(c.taskScheduler)...)
operatorControllerHandle := c.taskScheduler.Submit(c.operatorController, time.Now())
c.taskHandlers = append(c.taskHandlers, operatorControllerHandle)
c.bootstrapped.Store(true)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/pingcap/tidb v1.1.0-beta.0.20241014034929-94b2ac04a0c4
github.com/pingcap/tidb/pkg/parser v0.0.0-20241014034929-94b2ac04a0c4
github.com/pingcap/tiflow v0.0.0-20241023094956-dd2d54ad4c19
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.4
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand Down Expand Up @@ -255,7 +256,6 @@ require (
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 // indirect
github.com/pingcap/tipb v0.0.0-20241008083645-0bcddae67837 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
47 changes: 24 additions & 23 deletions maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Controller struct {
tsoClient replica.TSOClient

splitter *split.Splitter
spanReplicationEnabled bool
enableTableAcrossNodes bool
startCheckpointTs uint64
ddlDispatcherID common.DispatcherID

Expand All @@ -74,25 +74,29 @@ func NewController(changefeedID common.ChangeFeedID,
ddlSpan *replica.SpanReplication,
batchSize int, balanceInterval time.Duration) *Controller {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
replicaSetDB := replica.NewReplicaSetDB(changefeedID, ddlSpan)
enableTableAcrossNodes := false
var splitter *split.Splitter
if cfConfig != nil && cfConfig.Scheduler.EnableTableAcrossNodes {
enableTableAcrossNodes = true
splitter = split.NewSplitter(changefeedID, pdapi, regionCache, cfConfig.Scheduler)
}
replicaSetDB := replica.NewReplicaSetDB(changefeedID, ddlSpan, enableTableAcrossNodes)
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
oc := operator.NewOperatorController(changefeedID, mc, replicaSetDB, nodeManager, batchSize)
s := &Controller{
startCheckpointTs: checkpointTs,
changefeedID: changefeedID,
bootstrapped: false,
ddlDispatcherID: ddlSpan.ID,
operatorController: oc,
messageCenter: mc,
replicationDB: replicaSetDB,
nodeManager: nodeManager,
taskScheduler: taskScheduler,
cfConfig: cfConfig,
tsoClient: tsoClient,
}
if cfConfig != nil && cfConfig.Scheduler.EnableTableAcrossNodes {
s.splitter = split.NewSplitter(changefeedID, pdapi, regionCache, cfConfig.Scheduler)
s.spanReplicationEnabled = true
startCheckpointTs: checkpointTs,
changefeedID: changefeedID,
bootstrapped: false,
ddlDispatcherID: ddlSpan.ID,
operatorController: oc,
messageCenter: mc,
replicationDB: replicaSetDB,
nodeManager: nodeManager,
taskScheduler: taskScheduler,
cfConfig: cfConfig,
tsoClient: tsoClient,
splitter: splitter,
enableTableAcrossNodes: enableTableAcrossNodes,
}
s.schedulerController = NewScheduleController(changefeedID, batchSize, oc, replicaSetDB, nodeManager, balanceInterval, s.splitter)
return s
Expand Down Expand Up @@ -130,10 +134,7 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS
zap.Stringer("node", nodeID))
continue
}
stm.UpdateStatus(status)
if c.spanReplicationEnabled {
c.replicationDB.UpdateHotSpan(stm, status)
}
c.replicationDB.UpdateStatus(stm, status)
}
}

Expand Down Expand Up @@ -169,7 +170,7 @@ func (c *Controller) AddNewTable(table commonEvent.Table, startTs uint64) {
EndKey: span.EndKey,
}
tableSpans := []*heartbeatpb.TableSpan{tableSpan}
if c.spanReplicationEnabled {
if c.enableTableAcrossNodes {
//split the whole table span base on the configuration, todo: background split table
tableSpans = c.splitter.SplitSpans(context.Background(), tableSpan, len(c.nodeManager.GetAliveNodes()), 0)
}
Expand Down Expand Up @@ -272,7 +273,7 @@ func (c *Controller) FinishBootstrap(
zap.String("changefeed", c.changefeedID.Name()),
zap.Int64("tableID", table.TableID))
c.addWorkingSpans(tableMap)
if c.spanReplicationEnabled {
if c.enableTableAcrossNodes {
holes := split.FindHoles(tableMap, tableSpan)
// todo: split the hole
c.addNewSpans(table.SchemaID, holes, c.startCheckpointTs)
Expand Down
Loading

0 comments on commit 955b924

Please sign in to comment.