Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/separate-observer-parsing #132

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions observer/block_channel.go → observer/block_channel_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewBlockChannel(channel string, blocksDeliver api.BlocksDeliverer, seekFrom
}

func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

if c.isWork {
return c.blocks, nil
Expand Down Expand Up @@ -145,15 +145,18 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) {
}

func (c *BlockChannel) Stop() error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

err := c.Channel.stop()

// If primary context is done then cancel ctxObserver
if c.cancelObserve != nil {
c.cancelObserve()
}

close(c.blocks)

c.isWork = false
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
blocks chan *ParsedBlock
isWork bool
cancelObserve context.CancelFunc
mutex sync.Mutex
mu sync.Mutex
}

ParsedBlockChannelOpt func(*ParsedBlockChannel)
Expand Down Expand Up @@ -52,8 +52,8 @@ func NewParsedBlockChannel(blockChannel *BlockChannel, opts ...ParsedBlockChanne
}

func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.mu.Lock()
defer p.mu.Unlock()

if p.isWork {
return p.blocks, nil
Expand Down Expand Up @@ -98,7 +98,9 @@ func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock,
p.blocks <- block

case <-ctxObserve.Done():
p.Stop()
if err = p.Stop(); err != nil {
p.blockChannel.lastError = err
}
return
}
}
Expand All @@ -107,14 +109,19 @@ func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock,
return p.blocks, nil
}

func (p *ParsedBlockChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
func (p *ParsedBlockChannel) Stop() error {
p.mu.Lock()
defer p.mu.Unlock()

err := p.blockChannel.Stop()

// If primary context is done then cancel ctxObserver
if p.cancelObserve != nil {
p.cancelObserve()
}

close(p.blocks)

p.isWork = false
return err
}
12 changes: 9 additions & 3 deletions observer/block_peer.go → observer/block_peer_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func (bp *BlockPeer) ChannelObservers() map[string]*blockPeerChannel {
return copyChannelObservers
}

func (bp *BlockPeer) Observe(ctx context.Context) (<-chan *Block, error) {
func (bp *BlockPeer) Observe(ctx context.Context) <-chan *Block {
if bp.isWork {
return bp.blocks, nil
return bp.blocks
}

// ctxObserve using for nested control process without stopped primary context
Expand All @@ -143,7 +143,7 @@ func (bp *BlockPeer) Observe(ctx context.Context) (<-chan *Block, error) {
}
}()

return bp.blocks, nil
return bp.blocks
}

func (bp *BlockPeer) Stop() {
Expand All @@ -161,6 +161,12 @@ func (bp *BlockPeer) Stop() {
if bp.cancelObserve != nil {
bp.cancelObserve()
}

close(bp.blocks)
for _, blocksByChannel := range bp.blocksByChannels {
close(blocksByChannel)
}

bp.isWork = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ func (b *BlocksByChannels) Observe() chan *ChannelCommonBlocks {
return b.channels
}

func (bp *BlockPeer) ObserveByChannels(ctx context.Context) (*BlocksByChannels, error) {
bp.mu.Lock()
defer bp.mu.Unlock()

func (bp *BlockPeer) ObserveByChannels(ctx context.Context) *BlocksByChannels {
blocksByChannels := &BlocksByChannels{
channels: make(chan *ChannelCommonBlocks),
}
Expand All @@ -50,10 +47,13 @@ func (bp *BlockPeer) ObserveByChannels(ctx context.Context) (*BlocksByChannels,
bp.Stop()
}()

return blocksByChannels, nil
return blocksByChannels
}

func (bp *BlockPeer) initChannelsConcurrently(ctx context.Context, blocksByChannels *BlocksByChannels) {
bp.mu.Lock()
defer bp.mu.Unlock()

for channel := range bp.peerChannels.Channels() {
if _, ok := bp.channelObservers[channel]; !ok {
bp.logger.Info(`add channel observer concurrently`, zap.String(`channel`, channel))
Expand Down
174 changes: 174 additions & 0 deletions observer/block_peer_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package observer_test

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

sdkmocks "github.com/s7techlab/hlf-sdk-go/client/testing"
"github.com/s7techlab/hlf-sdk-go/observer"
testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks"
)

var (
ctx = context.Background()

channelPeerMockForCommon *observer.ChannelPeerMock
commonBlockPeer *observer.BlockPeer
commonBlocks <-chan *observer.Block

channelPeerMockConcurrentlyForCommon *observer.ChannelPeerMock
commonBlockPeerConcurrently *observer.BlockPeer
commonBlocksByChannels *observer.BlocksByChannels
)

func blockPeerCommonTestBeforeSuit() {
const closeChannelWhenAllRead = true
blockDelivererMock, err := sdkmocks.NewBlocksDelivererMock(fmt.Sprintf("../%s", testdata.Path), closeChannelWhenAllRead)
Expect(err).ShouldNot(HaveOccurred())

channelPeerMockForCommon = observer.NewChannelPeerMock()
for _, channel := range testdata.Channels {
channelPeerMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

commonBlockPeer = observer.NewBlockPeer(channelPeerMockForCommon, blockDelivererMock,
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond))

commonBlocks = commonBlockPeer.Observe(ctx)

channelPeerMockConcurrentlyForCommon = observer.NewChannelPeerMock()
for _, channel := range testdata.Channels {
channelPeerMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

commonBlockPeerConcurrently = observer.NewBlockPeer(channelPeerMockConcurrentlyForCommon, blockDelivererMock,
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond))

commonBlocksByChannels = commonBlockPeerConcurrently.ObserveByChannels(ctx)
}

var _ = Describe("Block Peer", func() {
Context("Block peer", func() {
It("should return current number of channels", func() {
channelObservers := commonBlockPeer.ChannelObservers()
Expect(channelObservers).To(HaveLen(len(testdata.Channels)))
})

It("should add channels to channelPeerMock", func() {
newChannels := map[string]struct{}{"channel1": {}, "channel2": {}, "channel3": {}}

for channel := range newChannels {
channelPeerMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to commonBlockPeer observer
time.Sleep(time.Millisecond * 10)

channelObservers := commonBlockPeer.ChannelObservers()
Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels)))
})

It("should return correct channels heights", func() {
channelsBlocksHeights := map[string]uint64{testdata.SampleChannel: 0, testdata.FabcarChannel: 0}
for b := range commonBlocks {
curBlockChannel := ""
// it must only these channels, new ones do not have any blocks
if b.Channel == testdata.SampleChannel {
curBlockChannel = testdata.SampleChannel
} else if b.Channel == testdata.FabcarChannel {
curBlockChannel = testdata.FabcarChannel
}

Expect(b.Channel).To(Equal(curBlockChannel))

blockNum := channelsBlocksHeights[curBlockChannel]
Expect(b.Block.Header.Number).To(Equal(blockNum))

channelsBlocksHeights[curBlockChannel]++

if channelsBlocksHeights[testdata.SampleChannel] == testdata.SampleChannelHeight && channelsBlocksHeights[testdata.FabcarChannel] == testdata.FabcarChannelHeight {
break
}
}

Expect(channelsBlocksHeights[testdata.SampleChannel]).To(Equal(testdata.SampleChannelHeight))
Expect(channelsBlocksHeights[testdata.FabcarChannel]).To(Equal(testdata.FabcarChannelHeight))
})
})

Context("Block peer concurrently", func() {
It("should return current number of channels", func() {
channelObservers := commonBlockPeerConcurrently.ChannelObservers()
Expect(channelObservers).To(HaveLen(len(testdata.Channels)))

channelsWithBlocks := commonBlocksByChannels.Observe()

for i := 0; i < len(testdata.Channels); i++ {
sampleOrFabcarChannelBlocks := <-channelsWithBlocks

curBlockChannel := ""
curChannelHeight := uint64(0)
// it must only these channels, new ones do not have any blocks
if sampleOrFabcarChannelBlocks.Name == testdata.SampleChannel {
curBlockChannel = testdata.SampleChannel
curChannelHeight = testdata.SampleChannelHeight
} else if sampleOrFabcarChannelBlocks.Name == testdata.FabcarChannel {
curBlockChannel = testdata.FabcarChannel
curChannelHeight = testdata.FabcarChannelHeight
}

Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(curBlockChannel))
Expect(sampleOrFabcarChannelBlocks.Blocks).NotTo(BeNil())

channelBlocksHeight := uint64(0)
for block := range sampleOrFabcarChannelBlocks.Blocks {
Expect(block.Channel).To(Equal(curBlockChannel))
Expect(block.Block.Header.Number).To(Equal(channelBlocksHeight))

channelBlocksHeight++

if channelBlocksHeight == curChannelHeight {
break
}
}

Expect(channelBlocksHeight).To(Equal(curChannelHeight))
}
})

It("should add channels to channelPeerMock", func() {
channel4, channel5, channel6 := "channel4", "channel5", "channel6"
newChannels := []string{channel4, channel5, channel6}
for _, channel := range newChannels {
channelPeerMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to commonBlockPeer observer
time.Sleep(time.Millisecond * 200)

channelObservers := commonBlockPeerConcurrently.ChannelObservers()
Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels)))

channelsWithBlocks := commonBlocksByChannels.Observe()

for i := 0; i < len(newChannels); i++ {
channel4Or5Or6Blocks := <-channelsWithBlocks

if channel4Or5Or6Blocks.Name == channel4 {
Expect(channel4Or5Or6Blocks.Name).To(Equal(channel4))
Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil())
} else if channel4Or5Or6Blocks.Name == channel5 {
Expect(channel4Or5Or6Blocks.Name).To(Equal(channel5))
Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil())
} else {
Expect(channel4Or5Or6Blocks.Name).To(Equal(channel6))
Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil())
}
}
})
})
})
Loading
Loading