Skip to content

Commit

Permalink
Fix resuming binlog streaming Shopify#156
Browse files Browse the repository at this point in the history
A mysql replication event for changing data is always started by
sending a TableMapEvent (describing the table to be changed),
followed by one or more RowsEvent (containing the data to be
changed). If multiple consecutive RowsEvent events are sent for the
same table, the TableMapEvent is typically skipped (after sending it
once), meaning that resuming from such a binlog is not possible.

This commit tracks the position of the most recently processed
TableMapEvent and uses this for resuming (skipping any actual events
between the last write and resume position that were already
processed.

Change-Id: I2bef401ba4f1c0d5f50f177f48c2e866bb411f5d
  • Loading branch information
kolbitsch-lastline committed Mar 12, 2020
1 parent 76555c1 commit 9a7999c
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 70 deletions.
107 changes: 82 additions & 25 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,41 @@ import (

const caughtUpThreshold = 10 * time.Second

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter
type BinlogPosition struct {
// the actual binlog position of an event emitted by the streamer
EventPosition mysql.Position
// the position from which one needs to point the streamer if we want to resume from after
// this event
ResumePosition mysql.Position
}

TableSchema TableSchemaCache
func (b BinlogPosition) String() string {
return fmt.Sprintf("Position(event %s, resume at %s)", b.EventPosition, b.ResumePosition)
}

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
lastStreamedBinlogPosition mysql.Position
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time
type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter

TableSchema TableSchemaCache

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
// what is the last event that we ever received from the streamer
lastStreamedBinlogPosition mysql.Position
// what is the last event that we received and from which it is possible to resume
lastResumeBinlogPosition mysql.Position
// if we have resumed from an earlier position than where we last streamed to (that is, if
// lastResumeBinlogPosition is before lastStreamedBinlogPosition when resuming), up to what
// event should we suppress emitting events
suppressEmitUpToBinlogPosition mysql.Position
// up to what position to we want to continue streaming (if a stop was requested)
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time

stopRequested bool

Expand Down Expand Up @@ -77,40 +97,48 @@ func (s *BinlogStreamer) createBinlogSyncer() error {
return nil
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) {
func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) {
s.ensureLogger()

currentPosition, err := ShowMasterStatusBinlogPosition(s.DB)
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return mysql.Position{}, err
return BinlogPosition{}, err
}

return s.ConnectBinlogStreamerToMysqlFrom(currentPosition)
startPosition := BinlogPosition{
EventPosition: currentPosition,
ResumePosition: currentPosition,
}
return s.ConnectBinlogStreamerToMysqlFrom(startPosition)
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) {
func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error) {
s.ensureLogger()

err := s.createBinlogSyncer()
if err != nil {
return mysql.Position{}, err
return BinlogPosition{}, err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition
s.lastStreamedBinlogPosition = startFromBinlogPosition.EventPosition
s.suppressEmitUpToBinlogPosition = startFromBinlogPosition.EventPosition
s.lastResumeBinlogPosition = startFromBinlogPosition.ResumePosition

s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
"pos": s.lastStreamedBinlogPosition.Pos,
"stream.file": s.lastStreamedBinlogPosition.Name,
"stream.pos": s.lastStreamedBinlogPosition.Pos,
"resume.file": s.lastResumeBinlogPosition.Name,
"resume.pos": s.lastResumeBinlogPosition.Pos,
}).Info("starting binlog streaming")

s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition)
s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastResumeBinlogPosition)
if err != nil {
s.logger.WithError(err).Error("unable to start binlog streamer")
return mysql.Position{}, err
return BinlogPosition{}, err
}

return s.lastStreamedBinlogPosition, err
return startFromBinlogPosition, err
}

func (s *BinlogStreamer) Run() {
Expand Down Expand Up @@ -229,6 +257,18 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven
s.logger.Panicf("logpos: %d %d %T", ev.Header.LogPos, ev.Header.Timestamp, ev.Event)
}

// resuming from a rows-event is not possible, as it may be followed by another rows-event
// without a subsequent TableMapEvent. Thus, if we have a rows-event, we need to keep resuming
// from whatever last non-rows-event
if _, ok := ev.Event.(*replication.RowsEvent); !ok {
// NOTE: We must set this value *before* we set it to the event that we have just
// received. The lastResumeBinlogPosition value indicates which is the last event
// *after* which we can resume from - that's just how replication works.
// Thus, to resume from the event that we were just given, we need to tell the
// replication streamer to resume from after whatever we have seen last
s.lastResumeBinlogPosition = s.lastStreamedBinlogPosition
}

s.lastStreamedBinlogPosition.Pos = ev.Header.LogPos
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
s.lastProcessedEventTime = eventTime
Expand Down Expand Up @@ -256,12 +296,29 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
Pos: ev.Header.LogPos,
}

// we may still be searching for the first event to stream to the caller (because we
// resumed reading upstream events from an earlier table-map event)
if pos.Compare(s.suppressEmitUpToBinlogPosition) <= 0 {
return nil
}

table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table))
if table == nil {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, pos)
var resumePos mysql.Position
if _, ok := ev.Event.(*replication.RowsEvent); ok {
resumePos = s.lastResumeBinlogPosition
} else {
resumePos = pos
}
binlogPosition := BinlogPosition{
EventPosition: pos,
ResumePosition: resumePos,
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, binlogPosition)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
}

if b.StateTracker != nil {
b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition())
b.StateTracker.UpdateLastBinlogPosition(events[len(events)-1].BinlogPosition())
}

return nil
Expand Down
15 changes: 7 additions & 8 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/shopspring/decimal"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
)
Expand Down Expand Up @@ -43,13 +42,13 @@ type DMLEvent interface {
OldValues() RowData
NewValues() RowData
PaginationKey() (uint64, error)
BinlogPosition() mysql.Position
BinlogPosition() BinlogPosition
}

// The base of DMLEvent to provide the necessary methods.
type DMLEventBase struct {
table *TableSchema
pos mysql.Position
pos BinlogPosition
}

func (e *DMLEventBase) Database() string {
Expand All @@ -64,7 +63,7 @@ func (e *DMLEventBase) TableSchema() *TableSchema {
return e.table
}

func (e *DMLEventBase) BinlogPosition() mysql.Position {
func (e *DMLEventBase) BinlogPosition() BinlogPosition {
return e.pos
}

Expand All @@ -73,7 +72,7 @@ type BinlogInsertEvent struct {
*DMLEventBase
}

func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
insertEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -117,7 +116,7 @@ type BinlogUpdateEvent struct {
*DMLEventBase
}

func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
// UPDATE events have two rows in the RowsEvent. The first row is the
// entries of the old record (for WHERE) and the second row is the
// entries of the new record (for SET).
Expand Down Expand Up @@ -177,7 +176,7 @@ func (e *BinlogDeleteEvent) NewValues() RowData {
return nil
}

func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
deleteEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -205,7 +204,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) {
return paginationKeyFromEventData(e.table, e.oldValues)
}

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
Expand Down
6 changes: 3 additions & 3 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
var pos siddontangmysql.Position
var pos BinlogPosition
var err error
if f.StateToResumeFrom != nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition())
Expand All @@ -507,9 +507,9 @@ func (f *Ferry) Start() error {
// If we don't set this now, there is a race condition where Ghostferry
// is terminated with some rows copied but no binlog events are written.
// This guarentees that we are able to restart from a valid location.
f.StateTracker.UpdateLastWrittenBinlogPosition(pos)
f.StateTracker.UpdateLastBinlogPosition(pos)
if f.inlineVerifier != nil {
f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos)
f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos.EventPosition)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error {

if v.StateTracker != nil {
ev := evs[len(evs)-1]
v.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(ev.BinlogPosition())
v.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(ev.BinlogPosition().EventPosition)
}

return nil
Expand Down
5 changes: 2 additions & 3 deletions sharding/test/copy_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/sharding"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -142,15 +141,15 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() {
}

for _, tenantId := range tenantIds {
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{})
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), ghostferry.BinlogPosition{})
applicable, err := t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Nil(err)
t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId))
}
}

func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() {
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{})
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), ghostferry.BinlogPosition{})
_, err = t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error())
}
Expand Down
33 changes: 22 additions & 11 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,32 @@ type SerializableState struct {
CompletedTables map[string]bool
LastWrittenBinlogPosition mysql.Position
LastStoredBinlogPositionForInlineVerifier mysql.Position
LastResumeBinlogPosition mysql.Position
BinlogVerifyStore BinlogVerifySerializedStore
}

func (s *SerializableState) MinBinlogPosition() mysql.Position {
func (s *SerializableState) MinBinlogPosition() BinlogPosition {
nilPosition := mysql.Position{}
var eventBinlogPosition mysql.Position
if s.LastWrittenBinlogPosition == nilPosition {
return s.LastStoredBinlogPositionForInlineVerifier
eventBinlogPosition = s.LastStoredBinlogPositionForInlineVerifier
} else if s.LastStoredBinlogPositionForInlineVerifier == nilPosition {
eventBinlogPosition = s.LastWrittenBinlogPosition
} else if s.LastWrittenBinlogPosition.Compare(s.LastStoredBinlogPositionForInlineVerifier) >= 0 {
eventBinlogPosition = s.LastStoredBinlogPositionForInlineVerifier
} else {
eventBinlogPosition = s.LastWrittenBinlogPosition
}

if s.LastStoredBinlogPositionForInlineVerifier == nilPosition {
return s.LastWrittenBinlogPosition
if s.LastResumeBinlogPosition == nilPosition {
return BinlogPosition{
EventPosition: eventBinlogPosition,
ResumePosition: eventBinlogPosition,
}
}

if s.LastWrittenBinlogPosition.Compare(s.LastStoredBinlogPositionForInlineVerifier) >= 0 {
return s.LastStoredBinlogPositionForInlineVerifier
} else {
return s.LastWrittenBinlogPosition
return BinlogPosition{
EventPosition: eventBinlogPosition,
ResumePosition: s.LastResumeBinlogPosition,
}
}

Expand Down Expand Up @@ -84,6 +93,7 @@ type StateTracker struct {

lastWrittenBinlogPosition mysql.Position
lastStoredBinlogPositionForInlineVerifier mysql.Position
lastResumeBinlogPosition mysql.Position

lastSuccessfulPaginationKeys map[string]uint64
completedTables map[string]bool
Expand Down Expand Up @@ -113,11 +123,12 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri
return s
}

func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) {
func (s *StateTracker) UpdateLastBinlogPosition(pos BinlogPosition) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

s.lastWrittenBinlogPosition = pos
s.lastWrittenBinlogPosition = pos.EventPosition
s.lastResumeBinlogPosition = pos.ResumePosition
}

func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) {
Expand Down
4 changes: 2 additions & 2 deletions test/go/binlog_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv
this.binlogStreamer.AddEventListener(func(evs []ghostferry.DMLEvent) error {
eventAsserted = true
this.Require().Equal(1, len(evs))
this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().Name, "mysql-bin."))
this.Require().True(evs[0].BinlogPosition().Pos > 0)
this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().EventPosition.Name, "mysql-bin."))
this.Require().True(evs[0].BinlogPosition().EventPosition.Pos > 0)
this.binlogStreamer.FlushAndStop()
return nil
})
Expand Down
Loading

0 comments on commit 9a7999c

Please sign in to comment.