Skip to content

Commit

Permalink
Fix resuming state without inline verifier
Browse files Browse the repository at this point in the history
If the inline-verifier is not enabled (as is the case for various
production uses of ghostferry), its binlog position can grow stale. In
some cases it points to such an old position that resuming from it
fails (if the source has already deleted such old replication logs).

This commit fixes this by relying solely on the binlog writer resume
position if the inline verifier is not enabled.

We still fail if the inline verifier *is* enabled and the position is
stale, but there is nothing one can do about that. If verification is
enabled, one must ensure that it's able to keep up with migration.

This fixes Shopify#184

Change-Id: Iec689d7651e533772642f366a0ff80c891d28157
  • Loading branch information
kolbitsch-lastline committed May 13, 2020
1 parent 544002c commit cde20db
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 5 deletions.
1 change: 1 addition & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error)
return BinlogPosition{}, err
}

s.logger.Debugf("connecting to binlog streamer using master state %s", currentPosition)
return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition))
}

Expand Down
15 changes: 13 additions & 2 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,23 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
//
// NOTE: If we don't use the inline verifier, we don't consider its last
// position for the resume position. We could be migrating for a long time,
// and the inline verifier position may grow outdated to the point that it
// is no-longer a valid position (the logs could have been deleted). Since
// the inline verifier position is not updated if it's not enabled and we
// use the oldest position in `MinBinlogPosition()`, resume may fail for
// the minimum position.
// In this case, using the last written position is the better state to use
var pos BinlogPosition
var err error
if f.StateToResumeFrom != nil {
if f.StateToResumeFrom == nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
} else if f.inlineVerifier != nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition())
} else {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.LastWrittenBinlogPosition)
}
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,16 @@ func NewStateTrackerFromTargetDB(f *Ferry) (s *StateTracker, state *Serializable
if err == nil && state == nil {
err = s.initializeDBStateSchema(f.TargetDB, f.Config.ResumeStateFromDB)

s.logger.Debug("initializing resume state from binlog position on source DB")
masterPos, posErr := ShowMasterStatusBinlogPosition(f.SourceDB)
if posErr != nil {
s.logger.WithError(posErr).Error("failed to read current binlog position")
err = posErr
return
}

pos := NewResumableBinlogPosition(masterPos)
s.logger.Debugf("using resume state from binlog position on source DB: %s", pos)
s.UpdateLastWrittenBinlogPosition(pos)
s.UpdateLastStoredBinlogPositionForInlineVerifier(pos)
// we absolutely need to initialize the DB with a proper state of the source
Expand Down Expand Up @@ -615,6 +618,7 @@ func (s *StateTracker) readStateFromDB(f *Ferry) (*SerializableState, error) {
return nil, err
}
f.logger.Infof("found binlog writer resume position data on target DB: %s", state.LastWrittenBinlogPosition)
s.UpdateLastWrittenBinlogPosition(state.LastWrittenBinlogPosition)
}

inlineVerifierTableName := s.getInlineVerifierStateTable()
Expand Down Expand Up @@ -643,7 +647,8 @@ func (s *StateTracker) readStateFromDB(f *Ferry) (*SerializableState, error) {
}).Errorf("parsing inline-verifier resume position data row from target DB failed")
return nil, err
}
f.logger.Infof("found inline-verifier resume position data on target DB: %s", state.LastWrittenBinlogPosition)
f.logger.Infof("found inline-verifier resume position data on target DB: %s", state.LastStoredBinlogPositionForInlineVerifier)
s.UpdateLastStoredBinlogPositionForInlineVerifier(state.LastStoredBinlogPositionForInlineVerifier)
}

return state, nil
Expand Down
61 changes: 59 additions & 2 deletions test/go/state_tracker_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
package test

import (
"fmt"
"testing"

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/siddontang/go-mysql/mysql"
"github.com/stretchr/testify/suite"
)

const (
StateSchemaName = "gftest_state"
)

type StateTrackerTestSuite struct {
suite.Suite
*testhelpers.GhostferryUnitTestSuite
}

func (this *StateTrackerTestSuite) SetupTest() {
this.GhostferryUnitTestSuite.SetupTest()
this.resetDbs()
}

func (this *StateTrackerTestSuite) TearDownTest() {
this.resetDbs()
this.GhostferryUnitTestSuite.TearDownTest()
}

func (this *StateTrackerTestSuite) resetDbs() {
_, err := this.Ferry.TargetDB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", StateSchemaName))
this.Require().Nil(err)
}

func (s *StateTrackerTestSuite) TestMinBinlogPosition() {
Expand Down Expand Up @@ -66,6 +87,42 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() {
s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10})
}

func (s *StateTrackerTestSuite) TestSerializeStateInTargetDB() {
testFerry := s.TestFerry.Ferry
testFerry.ResumeStateFromDB = StateSchemaName

stateTracker1, _, err := ghostferry.NewStateTrackerFromTargetDB(testFerry)
s.Require().Nil(err)

// make sure the state table was created
rows, err := testFerry.TargetDB.Query(fmt.Sprintf("SELECT * FROM `%s`.`_ghostferry_91919__last_binlog_writer_state`", StateSchemaName))
s.Require().Nil(err)
defer rows.Close()
s.Require().True(rows.Next())

state1 := stateTracker1.Serialize(nil, nil)

// now advance the master position and make sure such changes are
// reflected in the state
s.SeedSourceDB(1)
// this is just to make sure that the below code tests what it should: it
// did not read the position from the source DB, but from the state tables
// on the target DB
masterPos, err := ghostferry.ShowMasterStatusBinlogPosition(testFerry.SourceDB)
s.Require().Nil(err)
s.Require().True(masterPos.Compare(state1.LastWrittenBinlogPosition.ResumePosition) > 0)

stateTracker2, state2, err := ghostferry.NewStateTrackerFromTargetDB(testFerry)
s.Require().Nil(err)
s.Require().Equal(state1.LastWrittenBinlogPosition, state2.LastWrittenBinlogPosition)

// make sure that the state is not only reflected in the returned state
// object, but also in the state-tracker itself
state3 := stateTracker2.Serialize(nil, nil)
s.Require().Equal(state2.LastWrittenBinlogPosition, state3.LastWrittenBinlogPosition)
}

func TestStateTrackerTestSuite(t *testing.T) {
suite.Run(t, new(StateTrackerTestSuite))
testhelpers.SetupTest()
suite.Run(t, &StateTrackerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
}

0 comments on commit cde20db

Please sign in to comment.