Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resuming state without inline verifier #30

Merged
merged 2 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,11 @@ Features/fixes added in this fork include
Enabling inline/iterative verifiers causes an error at runtime if data
is requested to be verified for an incompatible table.
- because we support signed integer primary keys now, the maximum key value
supported is now 2**63 (previously 2**64). In practice it is unlikely to
have DB key values of this size, and it is thus not configurable to
provide the legacy behavior.
supported is now 2<sup>63</sup> (previously 2<sup>64</sup>). In practice
it is unlikely to have DB key values of this size, and it is thus not
configurable to provide the legacy behavior.
- more robust [disabling of inline-verifier](https://github.com/Shopify/ghostferry/issues/184):
this fix has not made it into upstream master yet.

Overview of How it Works
------------------------
Expand Down
1 change: 1 addition & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error)
return BinlogPosition{}, err
}

s.logger.Debugf("connecting to binlog streamer using master state %s", currentPosition)
return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition))
}

Expand Down
15 changes: 13 additions & 2 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,23 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
//
// NOTE: If we don't use the inline verifier, we don't consider its last
// position for the resume position. We could be migrating for a long time,
// and the inline verifier position may grow outdated to the point that it
// is no-longer a valid position (the logs could have been deleted). Since
// the inline verifier position is not updated if it's not enabled and we
// use the oldest position in `MinBinlogPosition()`, resume may fail for
// the minimum position.
// In this case, using the last written position is the better state to use
var pos BinlogPosition
var err error
if f.StateToResumeFrom != nil {
if f.StateToResumeFrom == nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
} else if f.inlineVerifier != nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition())
} else {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.LastWrittenBinlogPosition)
}
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,16 @@ func NewStateTrackerFromTargetDB(f *Ferry) (s *StateTracker, state *Serializable
if err == nil && state == nil {
err = s.initializeDBStateSchema(f.TargetDB, f.Config.ResumeStateFromDB)

s.logger.Debug("initializing resume state from binlog position on source DB")
masterPos, posErr := ShowMasterStatusBinlogPosition(f.SourceDB)
if posErr != nil {
s.logger.WithError(posErr).Error("failed to read current binlog position")
err = posErr
return
}

pos := NewResumableBinlogPosition(masterPos)
s.logger.Debugf("using resume state from binlog position on source DB: %s", pos)
s.UpdateLastWrittenBinlogPosition(pos)
s.UpdateLastStoredBinlogPositionForInlineVerifier(pos)
// we absolutely need to initialize the DB with a proper state of the source
Expand Down Expand Up @@ -615,6 +618,7 @@ func (s *StateTracker) readStateFromDB(f *Ferry) (*SerializableState, error) {
return nil, err
}
f.logger.Infof("found binlog writer resume position data on target DB: %s", state.LastWrittenBinlogPosition)
s.UpdateLastWrittenBinlogPosition(state.LastWrittenBinlogPosition)
}

inlineVerifierTableName := s.getInlineVerifierStateTable()
Expand Down Expand Up @@ -643,7 +647,8 @@ func (s *StateTracker) readStateFromDB(f *Ferry) (*SerializableState, error) {
}).Errorf("parsing inline-verifier resume position data row from target DB failed")
return nil, err
}
f.logger.Infof("found inline-verifier resume position data on target DB: %s", state.LastWrittenBinlogPosition)
f.logger.Infof("found inline-verifier resume position data on target DB: %s", state.LastStoredBinlogPositionForInlineVerifier)
s.UpdateLastStoredBinlogPositionForInlineVerifier(state.LastStoredBinlogPositionForInlineVerifier)
}

return state, nil
Expand Down
61 changes: 59 additions & 2 deletions test/go/state_tracker_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
package test

import (
"fmt"
"testing"

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/siddontang/go-mysql/mysql"
"github.com/stretchr/testify/suite"
)

const (
StateSchemaName = "gftest_state"
)

type StateTrackerTestSuite struct {
suite.Suite
*testhelpers.GhostferryUnitTestSuite
}

func (this *StateTrackerTestSuite) SetupTest() {
this.GhostferryUnitTestSuite.SetupTest()
this.resetDbs()
}

func (this *StateTrackerTestSuite) TearDownTest() {
this.resetDbs()
this.GhostferryUnitTestSuite.TearDownTest()
}

func (this *StateTrackerTestSuite) resetDbs() {
_, err := this.Ferry.TargetDB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", StateSchemaName))
this.Require().Nil(err)
}

func (s *StateTrackerTestSuite) TestMinBinlogPosition() {
Expand Down Expand Up @@ -66,6 +87,42 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() {
s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10})
}

func (s *StateTrackerTestSuite) TestSerializeStateInTargetDB() {
testFerry := s.TestFerry.Ferry
testFerry.ResumeStateFromDB = StateSchemaName

stateTracker1, _, err := ghostferry.NewStateTrackerFromTargetDB(testFerry)
s.Require().Nil(err)

// make sure the state table was created
rows, err := testFerry.TargetDB.Query(fmt.Sprintf("SELECT * FROM `%s`.`_ghostferry_91919__last_binlog_writer_state`", StateSchemaName))
s.Require().Nil(err)
defer rows.Close()
s.Require().True(rows.Next())

state1 := stateTracker1.Serialize(nil, nil)

// now advance the master position and make sure such changes are
// reflected in the state
s.SeedSourceDB(1)
// this is just to make sure that the below code tests what it should: it
// did not read the position from the source DB, but from the state tables
// on the target DB
masterPos, err := ghostferry.ShowMasterStatusBinlogPosition(testFerry.SourceDB)
s.Require().Nil(err)
s.Require().True(masterPos.Compare(state1.LastWrittenBinlogPosition.ResumePosition) > 0)

stateTracker2, state2, err := ghostferry.NewStateTrackerFromTargetDB(testFerry)
s.Require().Nil(err)
s.Require().Equal(state1.LastWrittenBinlogPosition, state2.LastWrittenBinlogPosition)

// make sure that the state is not only reflected in the returned state
// object, but also in the state-tracker itself
state3 := stateTracker2.Serialize(nil, nil)
s.Require().Equal(state2.LastWrittenBinlogPosition, state3.LastWrittenBinlogPosition)
}

func TestStateTrackerTestSuite(t *testing.T) {
suite.Run(t, new(StateTrackerTestSuite))
testhelpers.SetupTest()
suite.Run(t, &StateTrackerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
}