From f1496f4262f3ab059039ad84d49f6e5c324aa5e5 Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Wed, 11 Mar 2020 20:21:09 -0700 Subject: [PATCH] Fix resuming binlog streaming #156 A mysql replication event for changing data is always started by sending a TableMapEvent (describing the table to be changed), followed by one or more RowsEvent (containing the data to be changed). If multiple consecutive RowsEvent events are sent for the same table, the TableMapEvent is typically skipped (after sending it once), meaning that resuming from such a binlog is not possible. Resuming at a binlog position between the TableMapEvent and a RowsEvent is not possible, as the binlog streamer would miss the table definition for following DML statements. This commit tracks the position of the most recently processed TableMapEvent and uses this for resuming (skipping any actual events between the last write and resume position that were already processed). Change-Id: I2bef401ba4f1c0d5f50f177f48c2e866bb411f5d --- binlog_streamer.go | 147 ++++++++++++++++++---- dml_events.go | 15 ++- ferry.go | 8 +- sharding/test/copy_filter_test.go | 5 +- state_tracker.go | 18 ++- status_deprecated.go | 2 +- test/go/binlog_streamer_test.go | 15 ++- test/go/dml_events_test.go | 23 ++-- test/go/state_tracker_test.go | 40 +++--- test/integration/interrupt_resume_test.rb | 58 ++++++++- 10 files changed, 244 insertions(+), 87 deletions(-) diff --git a/binlog_streamer.go b/binlog_streamer.go index 611188d7..5351e474 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -15,21 +15,66 @@ import ( const caughtUpThreshold = 10 * time.Second -type BinlogStreamer struct { - DB *sql.DB - DBConfig *DatabaseConfig - MyServerId uint32 - ErrorHandler ErrorHandler - Filter CopyFilter +type BinlogPosition struct { + // A binlog position emitted by the binlog-streamer consists of two parts: + // First, the last emitted event position, which refers to the event that + // we received from the MySQL master and that we hand to clients. Second, + // a position from which we can resume a binlog-streamer. + // Ideally, these two values would be the same, but in reality they are + // not, because some events are streamed in a series (e.g. DML events + // require a table-map events to be seen before). + // As a result, we always stream event positions as a pair - if a binlog + // streamer is resumed from an event that is not safe to resume from, we + // resume from the most recent (earlier) event from which we can safely + // resume and simply suppress emitting these events up to the point of the + // last event returned. + // + // the actual binlog position of an event emitted by the streamer + EventPosition mysql.Position + // the position from which one needs to point the streamer if we want to + // resume from after this event + ResumePosition mysql.Position +} + +func NewResumableBinlogPosition(pos mysql.Position) BinlogPosition { + return BinlogPosition{pos, pos} +} - TableSchema TableSchemaCache +func (p BinlogPosition) Compare(o BinlogPosition) int { + // comparison always happens on the actual event + return p.EventPosition.Compare(o.EventPosition) +} - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - lastStreamedBinlogPosition mysql.Position - targetBinlogPosition mysql.Position - lastProcessedEventTime time.Time - lastLagMetricEmittedTime time.Time +func (b BinlogPosition) String() string { + return fmt.Sprintf("Position(event %s, resume at %s)", b.EventPosition, b.ResumePosition) +} + +type BinlogStreamer struct { + DB *sql.DB + DBConfig *DatabaseConfig + MyServerId uint32 + ErrorHandler ErrorHandler + Filter CopyFilter + + TableSchema TableSchemaCache + + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + // what is the last event that we ever received from the streamer + lastStreamedBinlogPosition mysql.Position + // what is the last event that we received and from which it is possible + // to resume + lastResumeBinlogPosition mysql.Position + // if we have resumed from an earlier position than where we last streamed + // to (that is, if lastResumeBinlogPosition is before + // lastStreamedBinlogPosition when resuming), up to what event should we + // suppress emitting events + suppressEmitUpToBinlogPosition mysql.Position + // up to what position to we want to continue streaming (if a stop was + // requested) + targetBinlogPosition mysql.Position + lastProcessedEventTime time.Time + lastLagMetricEmittedTime time.Time stopRequested bool @@ -77,40 +122,49 @@ func (s *BinlogStreamer) createBinlogSyncer() error { return nil } -func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) { +func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) { s.ensureLogger() currentPosition, err := ShowMasterStatusBinlogPosition(s.DB) if err != nil { s.logger.WithError(err).Error("failed to read current binlog position") - return mysql.Position{}, err + return BinlogPosition{}, err } - return s.ConnectBinlogStreamerToMysqlFrom(currentPosition) + return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition)) } -func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) { +func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error) { s.ensureLogger() err := s.createBinlogSyncer() if err != nil { - return mysql.Position{}, err + return BinlogPosition{}, err } - s.lastStreamedBinlogPosition = startFromBinlogPosition + if startFromBinlogPosition.EventPosition.Compare(startFromBinlogPosition.ResumePosition) < 0 { + err = fmt.Errorf("invalid resume position %s: last event must not be before resume position", startFromBinlogPosition) + return BinlogPosition{}, err + } + + s.lastStreamedBinlogPosition = startFromBinlogPosition.EventPosition + s.suppressEmitUpToBinlogPosition = startFromBinlogPosition.EventPosition + s.lastResumeBinlogPosition = startFromBinlogPosition.ResumePosition s.logger.WithFields(logrus.Fields{ - "file": s.lastStreamedBinlogPosition.Name, - "pos": s.lastStreamedBinlogPosition.Pos, + "stream.file": s.lastStreamedBinlogPosition.Name, + "stream.pos": s.lastStreamedBinlogPosition.Pos, + "resume.file": s.lastResumeBinlogPosition.Name, + "resume.pos": s.lastResumeBinlogPosition.Pos, }).Info("starting binlog streaming") - s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition) + s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastResumeBinlogPosition) if err != nil { s.logger.WithError(err).Error("unable to start binlog streamer") - return mysql.Position{}, err + return BinlogPosition{}, err } - return s.lastStreamedBinlogPosition, err + return startFromBinlogPosition, err } func (s *BinlogStreamer) Run() { @@ -233,6 +287,10 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven eventTime := time.Unix(int64(ev.Header.Timestamp), 0) s.lastProcessedEventTime = eventTime + if resumablePosition, evIsResumable := s.getResumePositionForEvent(ev); evIsResumable { + s.lastResumeBinlogPosition = resumablePosition + } + if time.Since(s.lastLagMetricEmittedTime) >= time.Second { lag := time.Since(eventTime) metrics.Gauge("BinlogStreamer.Lag", lag.Seconds(), nil, 1.0) @@ -240,6 +298,33 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven } } +func (s *BinlogStreamer) getResumePositionForEvent(ev *replication.BinlogEvent) (resumablePosition mysql.Position, evIsResumable bool) { + // resuming from a RowsEvent is not possible, as it may be followed by + // another rows-event without a subsequent TableMapEvent. Thus, if we have + // a rows-event, we need to keep resuming from whatever last non-rows-event + // + // The same is true for TableMapEvents themselves, as we cannot resume right + // after the event: we need to re-stream the event itself to get ready for + // a RowsEvent + switch ev.Event.(type) { + case *replication.RowsEvent, *replication.TableMapEvent: + // it's not resumable - we need to return whatever was save to resume + // from before + resumablePosition = s.lastResumeBinlogPosition + default: + // it is safe to resume from here + evIsResumable = true + resumablePosition = mysql.Position{ + // The filename is only changed and visible during the RotateEvent, which + // is handled transparently in Run(). + Name: s.lastStreamedBinlogPosition.Name, + Pos: ev.Header.LogPos, + } + } + + return +} + func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error { eventTime := time.Unix(int64(ev.Header.Timestamp), 0) rowsEvent := ev.Event.(*replication.RowsEvent) @@ -256,12 +341,24 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error { Pos: ev.Header.LogPos, } + // we may still be searching for the first event to stream to listeners, if + // we resumed reading upstream events from an earlier event + if pos.Compare(s.suppressEmitUpToBinlogPosition) <= 0 { + return nil + } + + resumePosition, _ := s.getResumePositionForEvent(ev) + binlogPosition := BinlogPosition{ + EventPosition: pos, + ResumePosition: resumePosition, + } + table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table)) if table == nil { return nil } - dmlEvs, err := NewBinlogDMLEvents(table, ev, pos) + dmlEvs, err := NewBinlogDMLEvents(table, ev, binlogPosition) if err != nil { return err } diff --git a/dml_events.go b/dml_events.go index a260b378..763d5714 100644 --- a/dml_events.go +++ b/dml_events.go @@ -8,7 +8,6 @@ import ( "github.com/shopspring/decimal" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" ) @@ -43,13 +42,13 @@ type DMLEvent interface { OldValues() RowData NewValues() RowData PaginationKey() (uint64, error) - BinlogPosition() mysql.Position + BinlogPosition() BinlogPosition } // The base of DMLEvent to provide the necessary methods. type DMLEventBase struct { table *TableSchema - pos mysql.Position + pos BinlogPosition } func (e *DMLEventBase) Database() string { @@ -64,7 +63,7 @@ func (e *DMLEventBase) TableSchema() *TableSchema { return e.table } -func (e *DMLEventBase) BinlogPosition() mysql.Position { +func (e *DMLEventBase) BinlogPosition() BinlogPosition { return e.pos } @@ -73,7 +72,7 @@ type BinlogInsertEvent struct { *DMLEventBase } -func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { insertEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { @@ -117,7 +116,7 @@ type BinlogUpdateEvent struct { *DMLEventBase } -func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { // UPDATE events have two rows in the RowsEvent. The first row is the // entries of the old record (for WHERE) and the second row is the // entries of the new record (for SET). @@ -177,7 +176,7 @@ func (e *BinlogDeleteEvent) NewValues() RowData { return nil } -func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { deleteEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { @@ -205,7 +204,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) { return paginationKeyFromEventData(e.table, e.oldValues) } -func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition) ([]DMLEvent, error) { rowsEvent := ev.Event.(*replication.RowsEvent) for _, row := range rowsEvent.Rows { diff --git a/ferry.go b/ferry.go index 65750a74..4288fb5f 100644 --- a/ferry.go +++ b/ferry.go @@ -493,7 +493,7 @@ func (f *Ferry) Start() error { // miss some records that are inserted between the time the // DataIterator determines the range of IDs to copy and the time that // the starting binlog coordinates are determined. - var pos siddontangmysql.Position + var pos BinlogPosition var err error if f.StateToResumeFrom != nil { pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition()) @@ -504,9 +504,9 @@ func (f *Ferry) Start() error { return err } - // If we don't set this now, there is a race condition where Ghostferry + // If we don't set this now, there is a race condition where ghostferry // is terminated with some rows copied but no binlog events are written. - // This guarentees that we are able to restart from a valid location. + // This guarantees that we are able to restart from a valid location. f.StateTracker.UpdateLastWrittenBinlogPosition(pos) if f.inlineVerifier != nil { f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos) @@ -749,7 +749,7 @@ func (f *Ferry) Progress() *Progress { s.Throttled = f.Throttler.Throttled() // Binlog Progress - s.LastSuccessfulBinlogPos = f.BinlogStreamer.lastStreamedBinlogPosition + s.LastSuccessfulBinlogPos = f.BinlogStreamer.GetLastStreamedBinlogPosition() s.BinlogStreamerLag = time.Now().Sub(f.BinlogStreamer.lastProcessedEventTime).Seconds() s.FinalBinlogPos = f.BinlogStreamer.targetBinlogPosition diff --git a/sharding/test/copy_filter_test.go b/sharding/test/copy_filter_test.go index d1b93e9c..1585d845 100644 --- a/sharding/test/copy_filter_test.go +++ b/sharding/test/copy_filter_test.go @@ -7,7 +7,6 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/sharding" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" @@ -142,7 +141,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } for _, tenantId := range tenantIds { - dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}) + dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), ghostferry.BinlogPosition{}) applicable, err := t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Nil(err) t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId)) @@ -150,7 +149,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() { - dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), ghostferry.BinlogPosition{}) _, err = t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error()) } diff --git a/state_tracker.go b/state_tracker.go index c2eab740..9746af22 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -5,8 +5,6 @@ import ( "math" "sync" "time" - - "github.com/siddontang/go-mysql/mysql" ) // StateTracker design @@ -36,13 +34,13 @@ type SerializableState struct { LastSuccessfulPaginationKeys map[string]uint64 CompletedTables map[string]bool - LastWrittenBinlogPosition mysql.Position - LastStoredBinlogPositionForInlineVerifier mysql.Position + LastWrittenBinlogPosition BinlogPosition + LastStoredBinlogPositionForInlineVerifier BinlogPosition BinlogVerifyStore BinlogVerifySerializedStore } -func (s *SerializableState) MinBinlogPosition() mysql.Position { - nilPosition := mysql.Position{} +func (s *SerializableState) MinBinlogPosition() BinlogPosition { + nilPosition := BinlogPosition{} if s.LastWrittenBinlogPosition == nilPosition { return s.LastStoredBinlogPositionForInlineVerifier } @@ -82,8 +80,8 @@ type StateTracker struct { BinlogRWMutex *sync.RWMutex CopyRWMutex *sync.RWMutex - lastWrittenBinlogPosition mysql.Position - lastStoredBinlogPositionForInlineVerifier mysql.Position + lastWrittenBinlogPosition BinlogPosition + lastStoredBinlogPositionForInlineVerifier BinlogPosition lastSuccessfulPaginationKeys map[string]uint64 completedTables map[string]bool @@ -113,14 +111,14 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri return s } -func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) { +func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos BinlogPosition) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() s.lastWrittenBinlogPosition = pos } -func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) { +func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos BinlogPosition) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() diff --git a/status_deprecated.go b/status_deprecated.go index 7e9bab28..412193f2 100644 --- a/status_deprecated.go +++ b/status_deprecated.go @@ -75,7 +75,7 @@ func FetchStatusDeprecated(f *Ferry, v Verifier) *StatusDeprecated { status.AutomaticCutover = f.Config.AutomaticCutover status.BinlogStreamerStopRequested = f.BinlogStreamer.stopRequested - status.LastSuccessfulBinlogPos = f.BinlogStreamer.lastStreamedBinlogPosition + status.LastSuccessfulBinlogPos = f.BinlogStreamer.GetLastStreamedBinlogPosition() status.TargetBinlogPos = f.BinlogStreamer.targetBinlogPosition status.Throttled = f.Throttler.Throttled() diff --git a/test/go/binlog_streamer_test.go b/test/go/binlog_streamer_test.go index cb2891bd..c69e6872 100644 --- a/test/go/binlog_streamer_test.go +++ b/test/go/binlog_streamer_test.go @@ -8,6 +8,7 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" + "github.com/siddontang/go-mysql/mysql" "github.com/stretchr/testify/suite" ) @@ -95,8 +96,8 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv this.binlogStreamer.AddEventListener(func(evs []ghostferry.DMLEvent) error { eventAsserted = true this.Require().Equal(1, len(evs)) - this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().Name, "mysql-bin.")) - this.Require().True(evs[0].BinlogPosition().Pos > 0) + this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().EventPosition.Name, "mysql-bin.")) + this.Require().True(evs[0].BinlogPosition().EventPosition.Pos > 0) this.binlogStreamer.FlushAndStop() return nil }) @@ -115,6 +116,16 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv this.Require().True(eventAsserted) } +func (this *BinlogStreamerTestSuite) TestResumingFromInvalidResumePositionAfterEventPosition() { + pos := ghostferry.BinlogPosition{ + EventPosition: mysql.Position{"mysql-bin.00002", 10}, + ResumePosition: mysql.Position{"mysql-bin.00002", 11}, + } + _, err := this.binlogStreamer.ConnectBinlogStreamerToMysqlFrom(pos) + this.Require().NotNil(err) + this.Require().Contains(err.Error(), "last event must not be before resume position") +} + func TestBinlogStreamerTestSuite(t *testing.T) { testhelpers.SetupTest() suite.Run(t, &BinlogStreamerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index b3d4e4bc..b64a60fd 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/Shopify/ghostferry" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" @@ -61,7 +60,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -80,7 +79,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -95,7 +94,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -115,7 +114,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -134,7 +133,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}, {1000}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -152,7 +151,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -167,7 +166,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventMetadata() { Rows: [][]interface{}{{1000}, {1001}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -185,7 +184,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventGeneratesDeleteQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -206,7 +205,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -221,7 +220,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -236,7 +235,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) diff --git a/test/go/state_tracker_test.go b/test/go/state_tracker_test.go index a39192a1..c67e6baf 100644 --- a/test/go/state_tracker_test.go +++ b/test/go/state_tracker_test.go @@ -14,56 +14,56 @@ type StateTrackerTestSuite struct { func (s *StateTrackerTestSuite) TestMinBinlogPosition() { serializedState := &ghostferry.SerializableState{ - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00003", Pos: 4, - }, + }), - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00003", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00003", 4}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00003", 4}) serializedState = &ghostferry.SerializableState{ - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00003", Pos: 4, - }, + }), - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00002", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) serializedState = &ghostferry.SerializableState{ - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "", Pos: 0, - }, + }), - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00002", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) serializedState = &ghostferry.SerializableState{ - LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + LastStoredBinlogPositionForInlineVerifier: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "", Pos: 0, - }, + }), - LastWrittenBinlogPosition: mysql.Position{ + LastWrittenBinlogPosition: ghostferry.NewResumableBinlogPosition(mysql.Position{ Name: "mysql-bin.00002", Pos: 10, - }, + }), } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) } func TestStateTrackerTestSuite(t *testing.T) { diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 6122d3cc..13ecb441 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -121,8 +121,10 @@ def test_interrupt_resume_will_not_emit_binlog_position_for_inline_verifier_if_n dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) - assert_equal "", dumped_state["LastStoredBinlogPositionForInlineVerifier"]["Name"] - assert_equal 0, dumped_state["LastStoredBinlogPositionForInlineVerifier"]["Pos"] + assert_equal "", dumped_state["LastStoredBinlogPositionForInlineVerifier"]["EventPosition"]["Name"] + assert_equal 0, dumped_state["LastStoredBinlogPositionForInlineVerifier"]["EventPosition"]["Pos"] + assert_equal "", dumped_state["LastStoredBinlogPositionForInlineVerifier"]["ResumePosition"]["Name"] + assert_equal 0, dumped_state["LastStoredBinlogPositionForInlineVerifier"]["ResumePosition"]["Pos"] end def test_interrupt_resume_inline_verifier_with_datawriter @@ -304,4 +306,56 @@ def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on error_line = ghostferry.error_lines.last assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"] end + + def test_interrupt_resume_between_consecutive_rows_events + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) + + # create a series of rows-events that do not have interleaved table-map + # events. This is the case when multiple rows are affected in a single + # DML event. + # Since we are racing between applying rows and sending the shutdown event, + # we emit a whole bunch of them + num_batches = 20 + num_values_per_batch = 1000 + row_id = 0 + ghostferry.on_status(Ghostferry::Status::BINLOG_STREAMING_STARTED) do + for _batch_id in 0..num_batches do + insert_sql = "INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES " + for value_in_batch in 0..num_values_per_batch do + row_id += 1 + insert_sql += ", " if value_in_batch > 0 + insert_sql += "('data#{row_id}')" + end + source_db.query(insert_sql) + end + end + + ghostferry.on_status(Ghostferry::Status::AFTER_BINLOG_APPLY) do + # while we are emitting events in the loop above, try to inject a shutdown + # signal, hoping to interrupt between applying an INSERT and receiving the + # next table-map event + if row_id > 20 + ghostferry.term_and_wait_for_exit + end + end + + dumped_state = ghostferry.run_expecting_interrupt + + # We can verify if the race occurred (and we successfully worked around it) + # by looking at the dumped state (the LastWrittenBinlogPosition field should + # have different EventPosition and ResumePosition values). + # + # If this starts to make the test unreliable, we may want to remove this or + # further tweak the batch values. + resume_state = dumped_state["LastWrittenBinlogPosition"] + refute_equal resume_state["EventPosition"], resume_state["ResumePosition"] + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + # if we did not resume at a proper state, this invocation of ghostferry + # will crash, complaining that a rows-event is referring to an unknown + # table + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end end