Skip to content

Commit

Permalink
Merge pull request #52 from blinklabs-io/feat/plugin-state
Browse files Browse the repository at this point in the history
feat: chainsync input plugin status update callback
  • Loading branch information
agaffney authored Aug 1, 2023
2 parents 5abd83a + f6caee8 commit e86c996
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
44 changes: 33 additions & 11 deletions input/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,36 @@ import (
)

type ChainSync struct {
oConn *ouroboros.Connection
network string
networkMagic uint32
address string
socketPath string
ntcTcp bool
intersectTip bool
intersectPoints []ocommon.Point
includeCbor bool
errorChan chan error
eventChan chan event.Event
oConn *ouroboros.Connection
network string
networkMagic uint32
address string
socketPath string
ntcTcp bool
intersectTip bool
intersectPoints []ocommon.Point
includeCbor bool
statusUpdateFunc StatusUpdateFunc
status *ChainSyncStatus
errorChan chan error
eventChan chan event.Event
}

type ChainSyncStatus struct {
SlotNumber uint64
BlockNumber uint64
BlockHash string
}

type StatusUpdateFunc func(ChainSyncStatus)

// New returns a new ChainSync object with the specified options applied
func New(options ...ChainSyncOptionFunc) *ChainSync {
c := &ChainSync{
errorChan: make(chan error),
eventChan: make(chan event.Event, 10),
intersectPoints: []ocommon.Point{},
status: &ChainSyncStatus{},
}
for _, option := range options {
option(c)
Expand Down Expand Up @@ -176,6 +187,7 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
case ledger.Block:
evt := event.New("chainsync.block", time.Now(), NewBlockEvent(v, c.includeCbor))
c.eventChan <- evt
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash())
case ledger.BlockHeader:
blockSlot := v.SlotNumber()
blockHash, _ := hex.DecodeString(v.Hash())
Expand All @@ -189,6 +201,16 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor))
c.eventChan <- txEvt
}
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash())
}
return nil
}

func (c *ChainSync) updateStatus(slotNumber uint64, blockNumber uint64, blockHash string) {
c.status.SlotNumber = slotNumber
c.status.BlockNumber = blockNumber
c.status.BlockHash = blockHash
if c.statusUpdateFunc != nil {
c.statusUpdateFunc(*(c.status))
}
}
8 changes: 8 additions & 0 deletions input/chainsync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,11 @@ func WithIncludeCbor(includeCbor bool) ChainSyncOptionFunc {
c.includeCbor = includeCbor
}
}

// WithStatusUpdateFunc specifies a callback function for status updates. This is useful for tracking the chain-sync status
// to be able to resume a sync at a later time, especially when any filtering could prevent you from getting all block update events
func WithStatusUpdateFunc(statusUpdateFunc StatusUpdateFunc) ChainSyncOptionFunc {
return func(c *ChainSync) {
c.statusUpdateFunc = statusUpdateFunc
}
}

0 comments on commit e86c996

Please sign in to comment.