diff --git a/observer/block_channel.go b/observer/block_channel.go index 6011f9b..e28a8d8 100644 --- a/observer/block_channel.go +++ b/observer/block_channel.go @@ -17,7 +17,7 @@ type ( createStreamWithRetry CreateBlockStreamWithRetry stopRecreateStream bool - blocks chan *common.Block + blocks chan *Block isWork bool cancelObserve context.CancelFunc @@ -74,7 +74,7 @@ func NewBlockChannel(channel string, blocksDeliver api.BlocksDeliverer, seekFrom return observer } -func (c *BlockChannel) Observe(ctx context.Context) (<-chan *common.Block, error) { +func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) { c.mutex.Lock() defer c.mutex.Unlock() @@ -95,7 +95,7 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *common.Block, error return nil, err } - c.blocks = make(chan *common.Block) + c.blocks = make(chan *Block) go func() { c.isWork = true @@ -127,7 +127,10 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *common.Block, error continue } - c.blocks <- incomingBlock + c.blocks <- &Block{ + Block: incomingBlock, + Channel: c.channel, + } case <-ctxObserve.Done(): if err := c.Stop(); err != nil { diff --git a/observer/block_channel_parse.go b/observer/block_channel_parse.go index e6334c7..2e20e76 100644 --- a/observer/block_channel_parse.go +++ b/observer/block_channel_parse.go @@ -18,7 +18,7 @@ type ( transformers []BlockTransformer configBlock *common.Block - blocks chan *Block + blocks chan *ParsedBlock isWork bool cancelObserve context.CancelFunc mutex sync.Mutex @@ -51,7 +51,7 @@ func NewParsedBlockChannel(blockChannel *BlockChannel, opts ...ParsedBlockChanne return parsedBlockChannel } -func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *Block, error) { +func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock, error) { p.mutex.Lock() defer p.mutex.Unlock() @@ -63,7 +63,7 @@ func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *Block, error) ctxObserve, cancel := context.WithCancel(ctx) p.cancelObserve = cancel - commonBlocks, err := p.blockChannel.Observe(ctxObserve) + incomingBlocks, err := p.blockChannel.Observe(ctxObserve) if err != nil { return nil, fmt.Errorf("observe common blocks: %w", err) } @@ -73,19 +73,19 @@ func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *Block, error) for { select { - case commonBlock, hasMore := <-commonBlocks: + case incomingBlock, hasMore := <-incomingBlocks: if !hasMore { continue } - if commonBlock == nil { + if incomingBlock == nil { continue } - block := &Block{ + block := &ParsedBlock{ Channel: p.blockChannel.channel, } - block.Block, block.Error = hlfproto.ParseBlock(commonBlock, hlfproto.WithConfigBlock(p.configBlock)) + block.Block, block.Error = hlfproto.ParseBlock(incomingBlock.Block, hlfproto.WithConfigBlock(p.configBlock)) for pos, transformer := range p.transformers { if err = transformer.Transform(block); err != nil { diff --git a/observer/block_peer.go b/observer/block_peer.go index 88bfa0c..b3083f7 100644 --- a/observer/block_peer.go +++ b/observer/block_peer.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/hyperledger/fabric-protos-go/common" "go.uber.org/zap" "github.com/s7techlab/hlf-sdk-go/api" @@ -25,8 +24,8 @@ type ( stopRecreateStream bool logger *zap.Logger - blocks chan *common.Block - blocksByChannels map[string]chan *common.Block + blocks chan *Block + blocksByChannels map[string]chan *Block isWork bool cancelObserve context.CancelFunc @@ -93,8 +92,8 @@ func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer, peerChannels: peerChannels, blockDeliverer: blockDeliverer, channelObservers: make(map[string]*blockPeerChannel), - blocks: make(chan *common.Block), - blocksByChannels: make(map[string]chan *common.Block), + blocks: make(chan *Block), + blocksByChannels: make(map[string]chan *Block), seekFrom: blockPeerOpts.seekFrom, observePeriod: blockPeerOpts.observePeriod, stopRecreateStream: blockPeerOpts.stopRecreateStream, @@ -116,7 +115,7 @@ func (bp *BlockPeer) ChannelObservers() map[string]*blockPeerChannel { return copyChannelObservers } -func (bp *BlockPeer) Observe(ctx context.Context) (<-chan *common.Block, error) { +func (bp *BlockPeer) Observe(ctx context.Context) (<-chan *Block, error) { if bp.isWork { return bp.blocks, nil } diff --git a/observer/block_peer_concurrently.go b/observer/block_peer_concurrently.go index 8eea6e2..00e3275 100644 --- a/observer/block_peer_concurrently.go +++ b/observer/block_peer_concurrently.go @@ -4,13 +4,12 @@ import ( "context" "time" - "github.com/hyperledger/fabric-protos-go/common" "go.uber.org/zap" ) type ChannelCommonBlocks struct { Name string - Blocks <-chan *common.Block + Blocks <-chan *Block } type BlocksByChannels struct { @@ -83,7 +82,7 @@ func (bp *BlockPeer) peerChannelConcurrently(ctx context.Context, channel string bp.logger.Warn(`init channel observer`, zap.Error(peerChannel.err)) } - blocks := make(chan *common.Block) + blocks := make(chan *Block) bp.blocksByChannels[channel] = blocks go func() { diff --git a/observer/block_peer_concurrently_parse.go b/observer/block_peer_concurrently_parse.go index 825e3c1..0870841 100644 --- a/observer/block_peer_concurrently_parse.go +++ b/observer/block_peer_concurrently_parse.go @@ -9,7 +9,7 @@ import ( type ChannelParsedBlocks struct { Name string - Blocks <-chan *Block + Blocks <-chan *ParsedBlock } type ParsedBlocksByChannels struct { @@ -78,7 +78,7 @@ func (pbp *ParsedBlockPeer) peerParsedChannelConcurrently(ctx context.Context, c pbp.blockPeer.logger.Warn(`init parsed channel observer`, zap.Error(peerParsedChannel.err)) } - blocks := make(chan *Block) + blocks := make(chan *ParsedBlock) pbp.blocksByChannels[channel] = blocks go func() { diff --git a/observer/block_peer_parse.go b/observer/block_peer_parse.go index c9789d4..b0f862b 100644 --- a/observer/block_peer_parse.go +++ b/observer/block_peer_parse.go @@ -17,8 +17,8 @@ type ( transformers []BlockTransformer configBlocks map[string]*common.Block - blocks chan *Block - blocksByChannels map[string]chan *Block + blocks chan *ParsedBlock + blocksByChannels map[string]chan *ParsedBlock parsedChannelObservers map[string]*parsedBlockPeerChannel @@ -59,7 +59,7 @@ func NewParsedBlockPeer(blocksPeer *BlockPeer, opts ...ParsedBlockPeerOpt) *Pars return parsedBlockPeer } -func (pbp *ParsedBlockPeer) Observe(ctx context.Context) (<-chan *Block, error) { +func (pbp *ParsedBlockPeer) Observe(ctx context.Context) (<-chan *ParsedBlock, error) { if pbp.isWork { return pbp.blocks, nil } diff --git a/observer/block_stream.go b/observer/block_stream.go index 00c3b25..7e21388 100644 --- a/observer/block_stream.go +++ b/observer/block_stream.go @@ -5,15 +5,12 @@ import ( "time" "github.com/hyperledger/fabric-protos-go/common" - hlfproto "github.com/s7techlab/hlf-sdk-go/proto" ) type ( Block struct { - Block *hlfproto.Block // parsed block - BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil - Channel string - Error error + Block *common.Block + Channel string } CreateBlockStream func(context.Context) (<-chan *common.Block, error) diff --git a/observer/block_stream_parsed.go b/observer/block_stream_parsed.go new file mode 100644 index 0000000..e995e57 --- /dev/null +++ b/observer/block_stream_parsed.go @@ -0,0 +1,14 @@ +package observer + +import ( + hlfproto "github.com/s7techlab/hlf-sdk-go/proto" +) + +type ( + ParsedBlock struct { + Block *hlfproto.Block // parsed block + BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil + Channel string + Error error + } +) diff --git a/observer/channel_peer_fetcher_mock.go b/observer/channel_peer_fetcher_mock.go index a1e5108..2dc28a6 100644 --- a/observer/channel_peer_fetcher_mock.go +++ b/observer/channel_peer_fetcher_mock.go @@ -18,7 +18,7 @@ func NewChannelPeerFetcherMock(channels map[string]uint64) *ChannelPeerFetcherMo } } -func (c *ChannelPeerFetcherMock) GetChannels(ctx context.Context) (*peer.ChannelQueryResponse, error) { +func (c *ChannelPeerFetcherMock) GetChannels(context.Context) (*peer.ChannelQueryResponse, error) { var channels []*peer.ChannelInfo for channelName := range c.channels { channels = append(channels, &peer.ChannelInfo{ChannelId: channelName}) @@ -29,7 +29,7 @@ func (c *ChannelPeerFetcherMock) GetChannels(ctx context.Context) (*peer.Channel }, nil } -func (c *ChannelPeerFetcherMock) GetChainInfo(ctx context.Context, channel string) (*common.BlockchainInfo, error) { +func (c *ChannelPeerFetcherMock) GetChainInfo(_ context.Context, channel string) (*common.BlockchainInfo, error) { chHeight, exists := c.channels[channel] if !exists { return nil, fmt.Errorf("channel '%s' does not exist", channel) diff --git a/observer/stream_parse.go b/observer/stream_parse.go new file mode 100644 index 0000000..b128171 --- /dev/null +++ b/observer/stream_parse.go @@ -0,0 +1,104 @@ +package observer + +import ( + "context" + "strconv" + "sync" +) + +type StreamParsed interface { + Subscribe() (ch chan *ParsedBlock, closer func()) +} + +type ParsedBlocksStream struct { + connectionsParsed map[string]chan *ParsedBlock + mu *sync.RWMutex + + isWork bool + cancelObserve context.CancelFunc +} + +func NewParsedBlocksStream() *ParsedBlocksStream { + return &ParsedBlocksStream{ + connectionsParsed: make(map[string]chan *ParsedBlock), + mu: &sync.RWMutex{}, + } +} + +func (b *ParsedBlocksStream) Observe(ctx context.Context, blocks <-chan *ParsedBlock) { + if b.isWork { + return + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + b.cancelObserve = cancel + + go func() { + defer func() { + for connName := range b.connectionsParsed { + b.closeChannel(connName) + } + }() + + b.isWork = true + + for { + select { + case <-ctxObserve.Done(): + // If primary context is done then cancel ctxObserver + b.cancelObserve() + return + + case block, ok := <-blocks: + if !ok { + return + } + + b.mu.RLock() + for _, connection := range b.connectionsParsed { + connection <- block + } + b.mu.RUnlock() + } + } + }() +} + +func (b *ParsedBlocksStream) Subscribe() (chan *ParsedBlock, func()) { + b.mu.Lock() + newConnection := make(chan *ParsedBlock) + name := "channel-" + strconv.Itoa(len(b.connectionsParsed)) + b.connectionsParsed[name] = newConnection + b.mu.Unlock() + + closer := func() { b.closeChannel(name) } + + return newConnection, closer +} + +func (b *ParsedBlocksStream) SubscribeParsed() (chan *ParsedBlock, func()) { + b.mu.Lock() + newConnection := make(chan *ParsedBlock) + name := "channel-" + strconv.Itoa(len(b.connectionsParsed)) + b.connectionsParsed[name] = newConnection + b.mu.Unlock() + + closer := func() { b.closeChannel(name) } + + return newConnection, closer +} + +func (b *ParsedBlocksStream) closeChannel(name string) { + b.mu.Lock() + close(b.connectionsParsed[name]) + delete(b.connectionsParsed, name) + b.mu.Unlock() +} + +func (b *ParsedBlocksStream) Stop() { + if b.cancelObserve != nil { + b.cancelObserve() + } + b.isWork = false +} diff --git a/observer/transform/action.go b/observer/transform/action.go index 7b83c8c..31d6879 100644 --- a/observer/transform/action.go +++ b/observer/transform/action.go @@ -68,7 +68,7 @@ func NewAction(actionMach TxActionMatch, opts ...ActionOpt) *Action { return a } -func (s *Action) Transform(block *observer.Block) error { +func (s *Action) Transform(block *observer.ParsedBlock) error { if block.Block == nil { return nil } diff --git a/observer/transformer.go b/observer/transformer.go index b7db610..dc9c87a 100644 --- a/observer/transformer.go +++ b/observer/transformer.go @@ -2,5 +2,5 @@ package observer // BlockTransformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json type BlockTransformer interface { - Transform(*Block) error + Transform(*ParsedBlock) error }