From 9cfeb2e732fa69d97e8972150d349024ef87a1c7 Mon Sep 17 00:00:00 2001 From: Nikita Neznaemov Date: Mon, 20 Nov 2023 16:16:58 +0300 Subject: [PATCH] seek-from-callback --- observer/block_peer_common.go | 25 +++++++++++++++------- observer/block_peer_common_concurrently.go | 6 +++--- observer/block_peer_parsed.go | 6 +++--- observer/block_peer_parsed_concurrently.go | 6 +++--- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/observer/block_peer_common.go b/observer/block_peer_common.go index aab3f16..2386943 100644 --- a/observer/block_peer_common.go +++ b/observer/block_peer_common.go @@ -19,7 +19,8 @@ type ( peerChannels PeerChannels blockDeliverer api.BlocksDeliverer channelObservers map[string]*BlockPeerChannel - seekFrom map[string]SeekFromFetcher + seekFromFetcher SeekFromFetcher + seekFromMap map[string]uint64 observePeriod time.Duration stopRecreateStream bool logger *zap.Logger @@ -37,7 +38,8 @@ type ( } BlockPeerOpts struct { - seekFrom map[string]SeekFromFetcher + seekFromFetcher SeekFromFetcher + seekFromMap map[string]uint64 observePeriod time.Duration stopRecreateStream bool logger *zap.Logger @@ -62,9 +64,15 @@ func WithBlockPeerLogger(logger *zap.Logger) BlockPeerOpt { } } -func WithSeekFrom(seekFrom map[string]SeekFromFetcher) BlockPeerOpt { +func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) BlockPeerOpt { return func(opts *BlockPeerOpts) { - opts.seekFrom = seekFrom + opts.seekFromFetcher = seekFromFetcher + } +} + +func WithSeekFromMap(seekFromMap map[string]uint64) BlockPeerOpt { + return func(opts *BlockPeerOpts) { + opts.seekFromMap = seekFromMap } } @@ -94,7 +102,8 @@ func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer, channelObservers: make(map[string]*BlockPeerChannel), blocks: make(chan *Block), blocksByChannels: make(map[string]chan *Block), - seekFrom: blockPeerOpts.seekFrom, + seekFromFetcher: blockPeerOpts.seekFromFetcher, + seekFromMap: blockPeerOpts.seekFromMap, observePeriod: blockPeerOpts.observePeriod, stopRecreateStream: blockPeerOpts.stopRecreateStream, logger: blockPeerOpts.logger, @@ -181,9 +190,9 @@ func (bp *BlockPeer) initChannels(ctx context.Context) { } func (bp *BlockPeer) peerChannel(ctx context.Context, channel string) *BlockPeerChannel { - seekFrom, exist := bp.seekFrom[channel] - if !exist { - seekFrom = ChannelSeekOldest() + seekFrom := bp.seekFromFetcher + if seekFrom == nil { + seekFrom = ChannelSeekFrom(bp.seekFromMap[channel]) } peerChannel := &BlockPeerChannel{} diff --git a/observer/block_peer_common_concurrently.go b/observer/block_peer_common_concurrently.go index 75add18..0320bbb 100644 --- a/observer/block_peer_common_concurrently.go +++ b/observer/block_peer_common_concurrently.go @@ -64,9 +64,9 @@ func (bp *BlockPeer) initChannelsConcurrently(ctx context.Context, blocksByChann } func (bp *BlockPeer) peerChannelConcurrently(ctx context.Context, channel string, blocksByChannels *BlocksByChannels) *BlockPeerChannel { - seekFrom, exist := bp.seekFrom[channel] - if !exist { - seekFrom = ChannelSeekOldest() + seekFrom := bp.seekFromFetcher + if seekFrom == nil { + seekFrom = ChannelSeekFrom(bp.seekFromMap[channel]) } peerChannel := &BlockPeerChannel{} diff --git a/observer/block_peer_parsed.go b/observer/block_peer_parsed.go index a06af51..55deaba 100644 --- a/observer/block_peer_parsed.go +++ b/observer/block_peer_parsed.go @@ -144,9 +144,9 @@ func (pbp *ParsedBlockPeer) initParsedChannels(ctx context.Context) { } func (pbp *ParsedBlockPeer) peerParsedChannel(ctx context.Context, channel string) *ParsedBlockPeerChannel { - seekFrom, exist := pbp.blockPeer.seekFrom[channel] - if !exist { - seekFrom = ChannelSeekOldest() + seekFrom := pbp.blockPeer.seekFromFetcher + if seekFrom == nil { + seekFrom = ChannelSeekFrom(pbp.blockPeer.seekFromMap[channel]) } commonBlockChannel := NewBlockChannel( diff --git a/observer/block_peer_parsed_concurrently.go b/observer/block_peer_parsed_concurrently.go index 6c96b0e..351d041 100644 --- a/observer/block_peer_parsed_concurrently.go +++ b/observer/block_peer_parsed_concurrently.go @@ -66,9 +66,9 @@ func (pbp *ParsedBlockPeer) initParsedChannelsConcurrently(ctx context.Context, } func (pbp *ParsedBlockPeer) peerParsedChannelConcurrently(ctx context.Context, channel string, blocksByChannels *ParsedBlocksByChannels) *ParsedBlockPeerChannel { - seekFrom, exist := pbp.blockPeer.seekFrom[channel] - if !exist { - seekFrom = ChannelSeekOldest() + seekFrom := pbp.blockPeer.seekFromFetcher + if seekFrom == nil { + seekFrom = ChannelSeekFrom(pbp.blockPeer.seekFromMap[channel]) } commonBlockChannel := NewBlockChannel(