diff --git a/observer/block_peer_common.go b/observer/block_peer_common.go index 854d37c..1893808 100644 --- a/observer/block_peer_common.go +++ b/observer/block_peer_common.go @@ -193,9 +193,11 @@ func (bp *BlockPeer) initChannels(ctx context.Context) { func (bp *BlockPeer) getSeekFrom(channel string) SeekFromFetcher { seekFrom := ChannelSeekOldest() // at first check seekFrom var, if it is empty, check seekFromFetcher + bp.mu.Lock() seekFromNum, exist := bp.seekFrom[channel] + bp.mu.Unlock() if exist { - seekFrom = ChannelSeekFrom(seekFromNum) + seekFrom = ChannelSeekFrom(seekFromNum - 1) } else { // if seekFromFetcher is also empty, use ChannelSeekOldest if bp.seekFromFetcher != nil {