From cde20db45389cbb364026cc514fa16be46bc234e Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Mon, 11 May 2020 20:29:45 -0700 Subject: [PATCH 1/2] Fix resuming state without inline verifier If the inline-verifier is not enabled (as is the case for various production uses of ghostferry), its binlog position can grow stale. In some cases it points to such an old position that resuming from it fails (if the source has already deleted such old replication logs). This commit fixes this by relying solely on the binlog writer resume position if the inline verifier is not enabled. We still fail if the inline verifier *is* enabled and the position is stale, but there is nothing one can do about that. If verification is enabled, one must ensure that it's able to keep up with migration. This fixes #184 Change-Id: Iec689d7651e533772642f366a0ff80c891d28157 --- binlog_streamer.go | 1 + ferry.go | 15 +++++++-- state_tracker.go | 7 +++- test/go/state_tracker_test.go | 61 +++++++++++++++++++++++++++++++++-- 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/binlog_streamer.go b/binlog_streamer.go index b04c3366..494915a7 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -135,6 +135,7 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) return BinlogPosition{}, err } + s.logger.Debugf("connecting to binlog streamer using master state %s", currentPosition) return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition)) } diff --git a/ferry.go b/ferry.go index e5b11daf..717537b1 100644 --- a/ferry.go +++ b/ferry.go @@ -503,12 +503,23 @@ func (f *Ferry) Start() error { // miss some records that are inserted between the time the // DataIterator determines the range of IDs to copy and the time that // the starting binlog coordinates are determined. + // + // NOTE: If we don't use the inline verifier, we don't consider its last + // position for the resume position. We could be migrating for a long time, + // and the inline verifier position may grow outdated to the point that it + // is no-longer a valid position (the logs could have been deleted). Since + // the inline verifier position is not updated if it's not enabled and we + // use the oldest position in `MinBinlogPosition()`, resume may fail for + // the minimum position. + // In this case, using the last written position is the better state to use var pos BinlogPosition var err error - if f.StateToResumeFrom != nil { + if f.StateToResumeFrom == nil { + pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql() + } else if f.inlineVerifier != nil { pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition()) } else { - pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysql() + pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.LastWrittenBinlogPosition) } if err != nil { return err diff --git a/state_tracker.go b/state_tracker.go index b5c65f4b..2a1bf6c8 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -147,13 +147,16 @@ func NewStateTrackerFromTargetDB(f *Ferry) (s *StateTracker, state *Serializable if err == nil && state == nil { err = s.initializeDBStateSchema(f.TargetDB, f.Config.ResumeStateFromDB) + s.logger.Debug("initializing resume state from binlog position on source DB") masterPos, posErr := ShowMasterStatusBinlogPosition(f.SourceDB) if posErr != nil { s.logger.WithError(posErr).Error("failed to read current binlog position") err = posErr return } + pos := NewResumableBinlogPosition(masterPos) + s.logger.Debugf("using resume state from binlog position on source DB: %s", pos) s.UpdateLastWrittenBinlogPosition(pos) s.UpdateLastStoredBinlogPositionForInlineVerifier(pos) // we absolutely need to initialize the DB with a proper state of the source @@ -615,6 +618,7 @@ func (s *StateTracker) readStateFromDB(f *Ferry) (*SerializableState, error) { return nil, err } f.logger.Infof("found binlog writer resume position data on target DB: %s", state.LastWrittenBinlogPosition) + s.UpdateLastWrittenBinlogPosition(state.LastWrittenBinlogPosition) } inlineVerifierTableName := s.getInlineVerifierStateTable() @@ -643,7 +647,8 @@ func (s *StateTracker) readStateFromDB(f *Ferry) (*SerializableState, error) { }).Errorf("parsing inline-verifier resume position data row from target DB failed") return nil, err } - f.logger.Infof("found inline-verifier resume position data on target DB: %s", state.LastWrittenBinlogPosition) + f.logger.Infof("found inline-verifier resume position data on target DB: %s", state.LastStoredBinlogPositionForInlineVerifier) + s.UpdateLastStoredBinlogPositionForInlineVerifier(state.LastStoredBinlogPositionForInlineVerifier) } return state, nil diff --git a/test/go/state_tracker_test.go b/test/go/state_tracker_test.go index c67e6baf..e065f4b5 100644 --- a/test/go/state_tracker_test.go +++ b/test/go/state_tracker_test.go @@ -1,15 +1,36 @@ package test import ( + "fmt" "testing" "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/testhelpers" "github.com/siddontang/go-mysql/mysql" "github.com/stretchr/testify/suite" ) +const ( + StateSchemaName = "gftest_state" +) + type StateTrackerTestSuite struct { - suite.Suite + *testhelpers.GhostferryUnitTestSuite +} + +func (this *StateTrackerTestSuite) SetupTest() { + this.GhostferryUnitTestSuite.SetupTest() + this.resetDbs() +} + +func (this *StateTrackerTestSuite) TearDownTest() { + this.resetDbs() + this.GhostferryUnitTestSuite.TearDownTest() +} + +func (this *StateTrackerTestSuite) resetDbs() { + _, err := this.Ferry.TargetDB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", StateSchemaName)) + this.Require().Nil(err) } func (s *StateTrackerTestSuite) TestMinBinlogPosition() { @@ -66,6 +87,42 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() { s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) } +func (s *StateTrackerTestSuite) TestSerializeStateInTargetDB() { + testFerry := s.TestFerry.Ferry + testFerry.ResumeStateFromDB = StateSchemaName + + stateTracker1, _, err := ghostferry.NewStateTrackerFromTargetDB(testFerry) + s.Require().Nil(err) + + // make sure the state table was created + rows, err := testFerry.TargetDB.Query(fmt.Sprintf("SELECT * FROM `%s`.`_ghostferry_91919__last_binlog_writer_state`", StateSchemaName)) + s.Require().Nil(err) + defer rows.Close() + s.Require().True(rows.Next()) + + state1 := stateTracker1.Serialize(nil, nil) + + // now advance the master position and make sure such changes are + // reflected in the state + s.SeedSourceDB(1) + // this is just to make sure that the below code tests what it should: it + // did not read the position from the source DB, but from the state tables + // on the target DB + masterPos, err := ghostferry.ShowMasterStatusBinlogPosition(testFerry.SourceDB) + s.Require().Nil(err) + s.Require().True(masterPos.Compare(state1.LastWrittenBinlogPosition.ResumePosition) > 0) + + stateTracker2, state2, err := ghostferry.NewStateTrackerFromTargetDB(testFerry) + s.Require().Nil(err) + s.Require().Equal(state1.LastWrittenBinlogPosition, state2.LastWrittenBinlogPosition) + + // make sure that the state is not only reflected in the returned state + // object, but also in the state-tracker itself + state3 := stateTracker2.Serialize(nil, nil) + s.Require().Equal(state2.LastWrittenBinlogPosition, state3.LastWrittenBinlogPosition) +} + func TestStateTrackerTestSuite(t *testing.T) { - suite.Run(t, new(StateTrackerTestSuite)) + testhelpers.SetupTest() + suite.Run(t, &StateTrackerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) } From 8659a9bf2ae40315e0963e9c29b6ac5861c9c98d Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Tue, 12 May 2020 18:39:02 -0700 Subject: [PATCH 2/2] Update README Change-Id: I474a0e7b5dbb10f5018edec21a7eec6f9bc94a98 --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bcf795c1..963c12eb 100644 --- a/README.md +++ b/README.md @@ -110,9 +110,11 @@ Features/fixes added in this fork include Enabling inline/iterative verifiers causes an error at runtime if data is requested to be verified for an incompatible table. - because we support signed integer primary keys now, the maximum key value - supported is now 2**63 (previously 2**64). In practice it is unlikely to - have DB key values of this size, and it is thus not configurable to - provide the legacy behavior. + supported is now 263 (previously 264). In practice + it is unlikely to have DB key values of this size, and it is thus not + configurable to provide the legacy behavior. +- more robust [disabling of inline-verifier](https://github.com/Shopify/ghostferry/issues/184): + this fix has not made it into upstream master yet. Overview of How it Works ------------------------