Skip to content

Commit

Permalink
seek-from-callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Nov 20, 2023
1 parent 28bf306 commit 9cfeb2e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
25 changes: 17 additions & 8 deletions observer/block_peer_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type (
peerChannels PeerChannels
blockDeliverer api.BlocksDeliverer
channelObservers map[string]*BlockPeerChannel
seekFrom map[string]SeekFromFetcher
seekFromFetcher SeekFromFetcher
seekFromMap map[string]uint64
observePeriod time.Duration
stopRecreateStream bool
logger *zap.Logger
Expand All @@ -37,7 +38,8 @@ type (
}

BlockPeerOpts struct {
seekFrom map[string]SeekFromFetcher
seekFromFetcher SeekFromFetcher
seekFromMap map[string]uint64
observePeriod time.Duration
stopRecreateStream bool
logger *zap.Logger
Expand All @@ -62,9 +64,15 @@ func WithBlockPeerLogger(logger *zap.Logger) BlockPeerOpt {
}
}

func WithSeekFrom(seekFrom map[string]SeekFromFetcher) BlockPeerOpt {
func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) BlockPeerOpt {
return func(opts *BlockPeerOpts) {
opts.seekFrom = seekFrom
opts.seekFromFetcher = seekFromFetcher
}
}

func WithSeekFromMap(seekFromMap map[string]uint64) BlockPeerOpt {
return func(opts *BlockPeerOpts) {
opts.seekFromMap = seekFromMap
}
}

Expand Down Expand Up @@ -94,7 +102,8 @@ func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer,
channelObservers: make(map[string]*BlockPeerChannel),
blocks: make(chan *Block),
blocksByChannels: make(map[string]chan *Block),
seekFrom: blockPeerOpts.seekFrom,
seekFromFetcher: blockPeerOpts.seekFromFetcher,
seekFromMap: blockPeerOpts.seekFromMap,
observePeriod: blockPeerOpts.observePeriod,
stopRecreateStream: blockPeerOpts.stopRecreateStream,
logger: blockPeerOpts.logger,
Expand Down Expand Up @@ -181,9 +190,9 @@ func (bp *BlockPeer) initChannels(ctx context.Context) {
}

func (bp *BlockPeer) peerChannel(ctx context.Context, channel string) *BlockPeerChannel {
seekFrom, exist := bp.seekFrom[channel]
if !exist {
seekFrom = ChannelSeekOldest()
seekFrom := bp.seekFromFetcher
if seekFrom == nil {
seekFrom = ChannelSeekFrom(bp.seekFromMap[channel])
}

peerChannel := &BlockPeerChannel{}
Expand Down
6 changes: 3 additions & 3 deletions observer/block_peer_common_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (bp *BlockPeer) initChannelsConcurrently(ctx context.Context, blocksByChann
}

func (bp *BlockPeer) peerChannelConcurrently(ctx context.Context, channel string, blocksByChannels *BlocksByChannels) *BlockPeerChannel {
seekFrom, exist := bp.seekFrom[channel]
if !exist {
seekFrom = ChannelSeekOldest()
seekFrom := bp.seekFromFetcher
if seekFrom == nil {
seekFrom = ChannelSeekFrom(bp.seekFromMap[channel])
}

peerChannel := &BlockPeerChannel{}
Expand Down
6 changes: 3 additions & 3 deletions observer/block_peer_parsed.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func (pbp *ParsedBlockPeer) initParsedChannels(ctx context.Context) {
}

func (pbp *ParsedBlockPeer) peerParsedChannel(ctx context.Context, channel string) *ParsedBlockPeerChannel {
seekFrom, exist := pbp.blockPeer.seekFrom[channel]
if !exist {
seekFrom = ChannelSeekOldest()
seekFrom := pbp.blockPeer.seekFromFetcher
if seekFrom == nil {
seekFrom = ChannelSeekFrom(pbp.blockPeer.seekFromMap[channel])
}

commonBlockChannel := NewBlockChannel(
Expand Down
6 changes: 3 additions & 3 deletions observer/block_peer_parsed_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (pbp *ParsedBlockPeer) initParsedChannelsConcurrently(ctx context.Context,
}

func (pbp *ParsedBlockPeer) peerParsedChannelConcurrently(ctx context.Context, channel string, blocksByChannels *ParsedBlocksByChannels) *ParsedBlockPeerChannel {
seekFrom, exist := pbp.blockPeer.seekFrom[channel]
if !exist {
seekFrom = ChannelSeekOldest()
seekFrom := pbp.blockPeer.seekFromFetcher
if seekFrom == nil {
seekFrom = ChannelSeekFrom(pbp.blockPeer.seekFromMap[channel])
}

commonBlockChannel := NewBlockChannel(
Expand Down

0 comments on commit 9cfeb2e

Please sign in to comment.