From 9a7999c30fae9e1cf6ae1c3406fa601420a112a4 Mon Sep 17 00:00:00 2001 From: Clemens Kolbitsch Date: Wed, 11 Mar 2020 20:21:09 -0700 Subject: [PATCH] Fix resuming binlog streaming #156 A mysql replication event for changing data is always started by sending a TableMapEvent (describing the table to be changed), followed by one or more RowsEvent (containing the data to be changed). If multiple consecutive RowsEvent events are sent for the same table, the TableMapEvent is typically skipped (after sending it once), meaning that resuming from such a binlog is not possible. This commit tracks the position of the most recently processed TableMapEvent and uses this for resuming (skipping any actual events between the last write and resume position that were already processed. Change-Id: I2bef401ba4f1c0d5f50f177f48c2e866bb411f5d --- binlog_streamer.go | 107 +++++++++++++++++++++++------- binlog_writer.go | 2 +- dml_events.go | 15 ++--- ferry.go | 6 +- inline_verifier.go | 2 +- sharding/test/copy_filter_test.go | 5 +- state_tracker.go | 33 ++++++--- test/go/binlog_streamer_test.go | 4 +- test/go/dml_events_test.go | 23 +++---- test/go/state_tracker_test.go | 27 ++++++-- 10 files changed, 154 insertions(+), 70 deletions(-) diff --git a/binlog_streamer.go b/binlog_streamer.go index 611188d7..d01016d0 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -15,21 +15,41 @@ import ( const caughtUpThreshold = 10 * time.Second -type BinlogStreamer struct { - DB *sql.DB - DBConfig *DatabaseConfig - MyServerId uint32 - ErrorHandler ErrorHandler - Filter CopyFilter +type BinlogPosition struct { + // the actual binlog position of an event emitted by the streamer + EventPosition mysql.Position + // the position from which one needs to point the streamer if we want to resume from after + // this event + ResumePosition mysql.Position +} - TableSchema TableSchemaCache +func (b BinlogPosition) String() string { + return fmt.Sprintf("Position(event %s, resume at %s)", b.EventPosition, b.ResumePosition) +} - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - lastStreamedBinlogPosition mysql.Position - targetBinlogPosition mysql.Position - lastProcessedEventTime time.Time - lastLagMetricEmittedTime time.Time +type BinlogStreamer struct { + DB *sql.DB + DBConfig *DatabaseConfig + MyServerId uint32 + ErrorHandler ErrorHandler + Filter CopyFilter + + TableSchema TableSchemaCache + + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + // what is the last event that we ever received from the streamer + lastStreamedBinlogPosition mysql.Position + // what is the last event that we received and from which it is possible to resume + lastResumeBinlogPosition mysql.Position + // if we have resumed from an earlier position than where we last streamed to (that is, if + // lastResumeBinlogPosition is before lastStreamedBinlogPosition when resuming), up to what + // event should we suppress emitting events + suppressEmitUpToBinlogPosition mysql.Position + // up to what position to we want to continue streaming (if a stop was requested) + targetBinlogPosition mysql.Position + lastProcessedEventTime time.Time + lastLagMetricEmittedTime time.Time stopRequested bool @@ -77,40 +97,48 @@ func (s *BinlogStreamer) createBinlogSyncer() error { return nil } -func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) { +func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) { s.ensureLogger() currentPosition, err := ShowMasterStatusBinlogPosition(s.DB) if err != nil { s.logger.WithError(err).Error("failed to read current binlog position") - return mysql.Position{}, err + return BinlogPosition{}, err } - return s.ConnectBinlogStreamerToMysqlFrom(currentPosition) + startPosition := BinlogPosition{ + EventPosition: currentPosition, + ResumePosition: currentPosition, + } + return s.ConnectBinlogStreamerToMysqlFrom(startPosition) } -func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) { +func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error) { s.ensureLogger() err := s.createBinlogSyncer() if err != nil { - return mysql.Position{}, err + return BinlogPosition{}, err } - s.lastStreamedBinlogPosition = startFromBinlogPosition + s.lastStreamedBinlogPosition = startFromBinlogPosition.EventPosition + s.suppressEmitUpToBinlogPosition = startFromBinlogPosition.EventPosition + s.lastResumeBinlogPosition = startFromBinlogPosition.ResumePosition s.logger.WithFields(logrus.Fields{ - "file": s.lastStreamedBinlogPosition.Name, - "pos": s.lastStreamedBinlogPosition.Pos, + "stream.file": s.lastStreamedBinlogPosition.Name, + "stream.pos": s.lastStreamedBinlogPosition.Pos, + "resume.file": s.lastResumeBinlogPosition.Name, + "resume.pos": s.lastResumeBinlogPosition.Pos, }).Info("starting binlog streaming") - s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition) + s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastResumeBinlogPosition) if err != nil { s.logger.WithError(err).Error("unable to start binlog streamer") - return mysql.Position{}, err + return BinlogPosition{}, err } - return s.lastStreamedBinlogPosition, err + return startFromBinlogPosition, err } func (s *BinlogStreamer) Run() { @@ -229,6 +257,18 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven s.logger.Panicf("logpos: %d %d %T", ev.Header.LogPos, ev.Header.Timestamp, ev.Event) } + // resuming from a rows-event is not possible, as it may be followed by another rows-event + // without a subsequent TableMapEvent. Thus, if we have a rows-event, we need to keep resuming + // from whatever last non-rows-event + if _, ok := ev.Event.(*replication.RowsEvent); !ok { + // NOTE: We must set this value *before* we set it to the event that we have just + // received. The lastResumeBinlogPosition value indicates which is the last event + // *after* which we can resume from - that's just how replication works. + // Thus, to resume from the event that we were just given, we need to tell the + // replication streamer to resume from after whatever we have seen last + s.lastResumeBinlogPosition = s.lastStreamedBinlogPosition + } + s.lastStreamedBinlogPosition.Pos = ev.Header.LogPos eventTime := time.Unix(int64(ev.Header.Timestamp), 0) s.lastProcessedEventTime = eventTime @@ -256,12 +296,29 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error { Pos: ev.Header.LogPos, } + // we may still be searching for the first event to stream to the caller (because we + // resumed reading upstream events from an earlier table-map event) + if pos.Compare(s.suppressEmitUpToBinlogPosition) <= 0 { + return nil + } + table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table)) if table == nil { return nil } - dmlEvs, err := NewBinlogDMLEvents(table, ev, pos) + var resumePos mysql.Position + if _, ok := ev.Event.(*replication.RowsEvent); ok { + resumePos = s.lastResumeBinlogPosition + } else { + resumePos = pos + } + binlogPosition := BinlogPosition{ + EventPosition: pos, + ResumePosition: resumePos, + } + + dmlEvs, err := NewBinlogDMLEvents(table, ev, binlogPosition) if err != nil { return err } diff --git a/binlog_writer.go b/binlog_writer.go index f005c987..02637643 100644 --- a/binlog_writer.go +++ b/binlog_writer.go @@ -110,7 +110,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error { } if b.StateTracker != nil { - b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition()) + b.StateTracker.UpdateLastBinlogPosition(events[len(events)-1].BinlogPosition()) } return nil diff --git a/dml_events.go b/dml_events.go index a260b378..763d5714 100644 --- a/dml_events.go +++ b/dml_events.go @@ -8,7 +8,6 @@ import ( "github.com/shopspring/decimal" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" ) @@ -43,13 +42,13 @@ type DMLEvent interface { OldValues() RowData NewValues() RowData PaginationKey() (uint64, error) - BinlogPosition() mysql.Position + BinlogPosition() BinlogPosition } // The base of DMLEvent to provide the necessary methods. type DMLEventBase struct { table *TableSchema - pos mysql.Position + pos BinlogPosition } func (e *DMLEventBase) Database() string { @@ -64,7 +63,7 @@ func (e *DMLEventBase) TableSchema() *TableSchema { return e.table } -func (e *DMLEventBase) BinlogPosition() mysql.Position { +func (e *DMLEventBase) BinlogPosition() BinlogPosition { return e.pos } @@ -73,7 +72,7 @@ type BinlogInsertEvent struct { *DMLEventBase } -func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { insertEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { @@ -117,7 +116,7 @@ type BinlogUpdateEvent struct { *DMLEventBase } -func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { // UPDATE events have two rows in the RowsEvent. The first row is the // entries of the old record (for WHERE) and the second row is the // entries of the new record (for SET). @@ -177,7 +176,7 @@ func (e *BinlogDeleteEvent) NewValues() RowData { return nil } -func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) { deleteEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { @@ -205,7 +204,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) { return paginationKeyFromEventData(e.table, e.oldValues) } -func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition) ([]DMLEvent, error) { rowsEvent := ev.Event.(*replication.RowsEvent) for _, row := range rowsEvent.Rows { diff --git a/ferry.go b/ferry.go index 65750a74..ad8d3c9e 100644 --- a/ferry.go +++ b/ferry.go @@ -493,7 +493,7 @@ func (f *Ferry) Start() error { // miss some records that are inserted between the time the // DataIterator determines the range of IDs to copy and the time that // the starting binlog coordinates are determined. - var pos siddontangmysql.Position + var pos BinlogPosition var err error if f.StateToResumeFrom != nil { pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition()) @@ -507,9 +507,9 @@ func (f *Ferry) Start() error { // If we don't set this now, there is a race condition where Ghostferry // is terminated with some rows copied but no binlog events are written. // This guarentees that we are able to restart from a valid location. - f.StateTracker.UpdateLastWrittenBinlogPosition(pos) + f.StateTracker.UpdateLastBinlogPosition(pos) if f.inlineVerifier != nil { - f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos) + f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos.EventPosition) } return nil diff --git a/inline_verifier.go b/inline_verifier.go index 957f6ddb..486c4a17 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -569,7 +569,7 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error { if v.StateTracker != nil { ev := evs[len(evs)-1] - v.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(ev.BinlogPosition()) + v.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(ev.BinlogPosition().EventPosition) } return nil diff --git a/sharding/test/copy_filter_test.go b/sharding/test/copy_filter_test.go index d1b93e9c..1585d845 100644 --- a/sharding/test/copy_filter_test.go +++ b/sharding/test/copy_filter_test.go @@ -7,7 +7,6 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/sharding" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" @@ -142,7 +141,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } for _, tenantId := range tenantIds { - dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}) + dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), ghostferry.BinlogPosition{}) applicable, err := t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Nil(err) t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId)) @@ -150,7 +149,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() { - dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), ghostferry.BinlogPosition{}) _, err = t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error()) } diff --git a/state_tracker.go b/state_tracker.go index c2eab740..04d83266 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -38,23 +38,32 @@ type SerializableState struct { CompletedTables map[string]bool LastWrittenBinlogPosition mysql.Position LastStoredBinlogPositionForInlineVerifier mysql.Position + LastResumeBinlogPosition mysql.Position BinlogVerifyStore BinlogVerifySerializedStore } -func (s *SerializableState) MinBinlogPosition() mysql.Position { +func (s *SerializableState) MinBinlogPosition() BinlogPosition { nilPosition := mysql.Position{} + var eventBinlogPosition mysql.Position if s.LastWrittenBinlogPosition == nilPosition { - return s.LastStoredBinlogPositionForInlineVerifier + eventBinlogPosition = s.LastStoredBinlogPositionForInlineVerifier + } else if s.LastStoredBinlogPositionForInlineVerifier == nilPosition { + eventBinlogPosition = s.LastWrittenBinlogPosition + } else if s.LastWrittenBinlogPosition.Compare(s.LastStoredBinlogPositionForInlineVerifier) >= 0 { + eventBinlogPosition = s.LastStoredBinlogPositionForInlineVerifier + } else { + eventBinlogPosition = s.LastWrittenBinlogPosition } - if s.LastStoredBinlogPositionForInlineVerifier == nilPosition { - return s.LastWrittenBinlogPosition + if s.LastResumeBinlogPosition == nilPosition { + return BinlogPosition{ + EventPosition: eventBinlogPosition, + ResumePosition: eventBinlogPosition, + } } - - if s.LastWrittenBinlogPosition.Compare(s.LastStoredBinlogPositionForInlineVerifier) >= 0 { - return s.LastStoredBinlogPositionForInlineVerifier - } else { - return s.LastWrittenBinlogPosition + return BinlogPosition{ + EventPosition: eventBinlogPosition, + ResumePosition: s.LastResumeBinlogPosition, } } @@ -84,6 +93,7 @@ type StateTracker struct { lastWrittenBinlogPosition mysql.Position lastStoredBinlogPositionForInlineVerifier mysql.Position + lastResumeBinlogPosition mysql.Position lastSuccessfulPaginationKeys map[string]uint64 completedTables map[string]bool @@ -113,11 +123,12 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri return s } -func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) { +func (s *StateTracker) UpdateLastBinlogPosition(pos BinlogPosition) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() - s.lastWrittenBinlogPosition = pos + s.lastWrittenBinlogPosition = pos.EventPosition + s.lastResumeBinlogPosition = pos.ResumePosition } func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) { diff --git a/test/go/binlog_streamer_test.go b/test/go/binlog_streamer_test.go index cb2891bd..80de1314 100644 --- a/test/go/binlog_streamer_test.go +++ b/test/go/binlog_streamer_test.go @@ -95,8 +95,8 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv this.binlogStreamer.AddEventListener(func(evs []ghostferry.DMLEvent) error { eventAsserted = true this.Require().Equal(1, len(evs)) - this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().Name, "mysql-bin.")) - this.Require().True(evs[0].BinlogPosition().Pos > 0) + this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().EventPosition.Name, "mysql-bin.")) + this.Require().True(evs[0].BinlogPosition().EventPosition.Pos > 0) this.binlogStreamer.FlushAndStop() return nil }) diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index b3d4e4bc..b64a60fd 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/Shopify/ghostferry" - "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" @@ -61,7 +60,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -80,7 +79,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -95,7 +94,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -115,7 +114,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -134,7 +133,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}, {1000}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -152,7 +151,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -167,7 +166,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventMetadata() { Rows: [][]interface{}{{1000}, {1001}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -185,7 +184,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventGeneratesDeleteQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -206,7 +205,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -221,7 +220,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -236,7 +235,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, ghostferry.BinlogPosition{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) diff --git a/test/go/state_tracker_test.go b/test/go/state_tracker_test.go index a39192a1..db80613b 100644 --- a/test/go/state_tracker_test.go +++ b/test/go/state_tracker_test.go @@ -24,7 +24,7 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() { Pos: 10, }, } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00003", 4}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00003", 4}) serializedState = &ghostferry.SerializableState{ LastWrittenBinlogPosition: mysql.Position{ @@ -37,7 +37,7 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() { Pos: 10, }, } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) serializedState = &ghostferry.SerializableState{ LastWrittenBinlogPosition: mysql.Position{ @@ -50,7 +50,7 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() { Pos: 10, }, } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) serializedState = &ghostferry.SerializableState{ LastStoredBinlogPositionForInlineVerifier: mysql.Position{ @@ -63,7 +63,26 @@ func (s *StateTrackerTestSuite) TestMinBinlogPosition() { Pos: 10, }, } - s.Require().Equal(serializedState.MinBinlogPosition(), mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) + + serializedState = &ghostferry.SerializableState{ + LastStoredBinlogPositionForInlineVerifier: mysql.Position{ + Name: "", + Pos: 0, + }, + + LastWrittenBinlogPosition: mysql.Position{ + Name: "mysql-bin.00002", + Pos: 10, + }, + + LastResumeBinlogPosition: mysql.Position{ + Name: "mysql-bin.00001", + Pos: 12, + }, + } + s.Require().Equal(serializedState.MinBinlogPosition().EventPosition, mysql.Position{"mysql-bin.00002", 10}) + s.Require().Equal(serializedState.MinBinlogPosition().ResumePosition, mysql.Position{"mysql-bin.00001", 12}) } func TestStateTrackerTestSuite(t *testing.T) {