Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Oct 2, 2023
1 parent 9c512bc commit c7ea95d
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 34 deletions.
11 changes: 7 additions & 4 deletions observer/block_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type (
createStreamWithRetry CreateBlockStreamWithRetry
stopRecreateStream bool

blocks chan *common.Block
blocks chan *Block

isWork bool
cancelObserve context.CancelFunc
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewBlockChannel(channel string, blocksDeliver api.BlocksDeliverer, seekFrom
return observer
}

func (c *BlockChannel) Observe(ctx context.Context) (<-chan *common.Block, error) {
func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

Expand All @@ -95,7 +95,7 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *common.Block, error
return nil, err
}

c.blocks = make(chan *common.Block)
c.blocks = make(chan *Block)

go func() {
c.isWork = true
Expand Down Expand Up @@ -127,7 +127,10 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *common.Block, error
continue
}

c.blocks <- incomingBlock
c.blocks <- &Block{
Block: incomingBlock,
Channel: c.channel,
}

case <-ctxObserve.Done():
if err := c.Stop(); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions observer/block_channel_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type (
transformers []BlockTransformer
configBlock *common.Block

blocks chan *Block
blocks chan *ParsedBlock
isWork bool
cancelObserve context.CancelFunc
mutex sync.Mutex
Expand Down Expand Up @@ -51,7 +51,7 @@ func NewParsedBlockChannel(blockChannel *BlockChannel, opts ...ParsedBlockChanne
return parsedBlockChannel
}

func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *Block, error) {
func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock, error) {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -63,7 +63,7 @@ func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *Block, error)
ctxObserve, cancel := context.WithCancel(ctx)
p.cancelObserve = cancel

commonBlocks, err := p.blockChannel.Observe(ctxObserve)
incomingBlocks, err := p.blockChannel.Observe(ctxObserve)
if err != nil {
return nil, fmt.Errorf("observe common blocks: %w", err)
}
Expand All @@ -73,19 +73,19 @@ func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *Block, error)

for {
select {
case commonBlock, hasMore := <-commonBlocks:
case incomingBlock, hasMore := <-incomingBlocks:
if !hasMore {
continue
}

if commonBlock == nil {
if incomingBlock == nil {
continue
}

block := &Block{
block := &ParsedBlock{
Channel: p.blockChannel.channel,
}
block.Block, block.Error = hlfproto.ParseBlock(commonBlock, hlfproto.WithConfigBlock(p.configBlock))
block.Block, block.Error = hlfproto.ParseBlock(incomingBlock.Block, hlfproto.WithConfigBlock(p.configBlock))

for pos, transformer := range p.transformers {
if err = transformer.Transform(block); err != nil {
Expand Down
11 changes: 5 additions & 6 deletions observer/block_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"go.uber.org/zap"

"github.com/s7techlab/hlf-sdk-go/api"
Expand All @@ -25,8 +24,8 @@ type (
stopRecreateStream bool
logger *zap.Logger

blocks chan *common.Block
blocksByChannels map[string]chan *common.Block
blocks chan *Block
blocksByChannels map[string]chan *Block

isWork bool
cancelObserve context.CancelFunc
Expand Down Expand Up @@ -93,8 +92,8 @@ func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer,
peerChannels: peerChannels,
blockDeliverer: blockDeliverer,
channelObservers: make(map[string]*blockPeerChannel),
blocks: make(chan *common.Block),
blocksByChannels: make(map[string]chan *common.Block),
blocks: make(chan *Block),
blocksByChannels: make(map[string]chan *Block),
seekFrom: blockPeerOpts.seekFrom,
observePeriod: blockPeerOpts.observePeriod,
stopRecreateStream: blockPeerOpts.stopRecreateStream,
Expand All @@ -116,7 +115,7 @@ func (bp *BlockPeer) ChannelObservers() map[string]*blockPeerChannel {
return copyChannelObservers
}

func (bp *BlockPeer) Observe(ctx context.Context) (<-chan *common.Block, error) {
func (bp *BlockPeer) Observe(ctx context.Context) (<-chan *Block, error) {
if bp.isWork {
return bp.blocks, nil
}
Expand Down
5 changes: 2 additions & 3 deletions observer/block_peer_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"context"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"go.uber.org/zap"
)

type ChannelCommonBlocks struct {
Name string
Blocks <-chan *common.Block
Blocks <-chan *Block
}

type BlocksByChannels struct {
Expand Down Expand Up @@ -83,7 +82,7 @@ func (bp *BlockPeer) peerChannelConcurrently(ctx context.Context, channel string
bp.logger.Warn(`init channel observer`, zap.Error(peerChannel.err))
}

blocks := make(chan *common.Block)
blocks := make(chan *Block)
bp.blocksByChannels[channel] = blocks

go func() {
Expand Down
4 changes: 2 additions & 2 deletions observer/block_peer_concurrently_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type ChannelParsedBlocks struct {
Name string
Blocks <-chan *Block
Blocks <-chan *ParsedBlock
}

type ParsedBlocksByChannels struct {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (pbp *ParsedBlockPeer) peerParsedChannelConcurrently(ctx context.Context, c
pbp.blockPeer.logger.Warn(`init parsed channel observer`, zap.Error(peerParsedChannel.err))
}

blocks := make(chan *Block)
blocks := make(chan *ParsedBlock)
pbp.blocksByChannels[channel] = blocks

go func() {
Expand Down
6 changes: 3 additions & 3 deletions observer/block_peer_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type (
transformers []BlockTransformer
configBlocks map[string]*common.Block

blocks chan *Block
blocksByChannels map[string]chan *Block
blocks chan *ParsedBlock
blocksByChannels map[string]chan *ParsedBlock

parsedChannelObservers map[string]*parsedBlockPeerChannel

Expand Down Expand Up @@ -59,7 +59,7 @@ func NewParsedBlockPeer(blocksPeer *BlockPeer, opts ...ParsedBlockPeerOpt) *Pars
return parsedBlockPeer
}

func (pbp *ParsedBlockPeer) Observe(ctx context.Context) (<-chan *Block, error) {
func (pbp *ParsedBlockPeer) Observe(ctx context.Context) (<-chan *ParsedBlock, error) {
if pbp.isWork {
return pbp.blocks, nil
}
Expand Down
7 changes: 2 additions & 5 deletions observer/block_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ import (
"time"

"github.com/hyperledger/fabric-protos-go/common"
hlfproto "github.com/s7techlab/hlf-sdk-go/proto"
)

type (
Block struct {
Block *hlfproto.Block // parsed block
BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil
Channel string
Error error
Block *common.Block
Channel string
}

CreateBlockStream func(context.Context) (<-chan *common.Block, error)
Expand Down
14 changes: 14 additions & 0 deletions observer/block_stream_parsed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package observer

import (
hlfproto "github.com/s7techlab/hlf-sdk-go/proto"
)

type (
ParsedBlock struct {
Block *hlfproto.Block // parsed block
BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil
Channel string
Error error
}
)
4 changes: 2 additions & 2 deletions observer/channel_peer_fetcher_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewChannelPeerFetcherMock(channels map[string]uint64) *ChannelPeerFetcherMo
}
}

func (c *ChannelPeerFetcherMock) GetChannels(ctx context.Context) (*peer.ChannelQueryResponse, error) {
func (c *ChannelPeerFetcherMock) GetChannels(context.Context) (*peer.ChannelQueryResponse, error) {
var channels []*peer.ChannelInfo
for channelName := range c.channels {
channels = append(channels, &peer.ChannelInfo{ChannelId: channelName})
Expand All @@ -29,7 +29,7 @@ func (c *ChannelPeerFetcherMock) GetChannels(ctx context.Context) (*peer.Channel
}, nil
}

func (c *ChannelPeerFetcherMock) GetChainInfo(ctx context.Context, channel string) (*common.BlockchainInfo, error) {
func (c *ChannelPeerFetcherMock) GetChainInfo(_ context.Context, channel string) (*common.BlockchainInfo, error) {
chHeight, exists := c.channels[channel]
if !exists {
return nil, fmt.Errorf("channel '%s' does not exist", channel)
Expand Down
104 changes: 104 additions & 0 deletions observer/stream_parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package observer

import (
"context"
"strconv"
"sync"
)

type StreamParsed interface {
Subscribe() (ch chan *ParsedBlock, closer func())
}

type ParsedBlocksStream struct {
connectionsParsed map[string]chan *ParsedBlock
mu *sync.RWMutex

isWork bool
cancelObserve context.CancelFunc
}

func NewParsedBlocksStream() *ParsedBlocksStream {
return &ParsedBlocksStream{
connectionsParsed: make(map[string]chan *ParsedBlock),
mu: &sync.RWMutex{},
}
}

func (b *ParsedBlocksStream) Observe(ctx context.Context, blocks <-chan *ParsedBlock) {
if b.isWork {
return
}

// ctxObserve using for nested control process without stopped primary context
ctxObserve, cancel := context.WithCancel(ctx)
b.cancelObserve = cancel

go func() {
defer func() {
for connName := range b.connectionsParsed {
b.closeChannel(connName)
}
}()

b.isWork = true

for {
select {
case <-ctxObserve.Done():
// If primary context is done then cancel ctxObserver
b.cancelObserve()
return

case block, ok := <-blocks:
if !ok {
return
}

b.mu.RLock()
for _, connection := range b.connectionsParsed {
connection <- block
}
b.mu.RUnlock()
}
}
}()
}

func (b *ParsedBlocksStream) Subscribe() (chan *ParsedBlock, func()) {
b.mu.Lock()
newConnection := make(chan *ParsedBlock)
name := "channel-" + strconv.Itoa(len(b.connectionsParsed))
b.connectionsParsed[name] = newConnection
b.mu.Unlock()

closer := func() { b.closeChannel(name) }

return newConnection, closer
}

func (b *ParsedBlocksStream) SubscribeParsed() (chan *ParsedBlock, func()) {
b.mu.Lock()
newConnection := make(chan *ParsedBlock)
name := "channel-" + strconv.Itoa(len(b.connectionsParsed))
b.connectionsParsed[name] = newConnection
b.mu.Unlock()

closer := func() { b.closeChannel(name) }

return newConnection, closer
}

func (b *ParsedBlocksStream) closeChannel(name string) {
b.mu.Lock()
close(b.connectionsParsed[name])
delete(b.connectionsParsed, name)
b.mu.Unlock()
}

func (b *ParsedBlocksStream) Stop() {
if b.cancelObserve != nil {
b.cancelObserve()
}
b.isWork = false
}
2 changes: 1 addition & 1 deletion observer/transform/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewAction(actionMach TxActionMatch, opts ...ActionOpt) *Action {
return a
}

func (s *Action) Transform(block *observer.Block) error {
func (s *Action) Transform(block *observer.ParsedBlock) error {
if block.Block == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion observer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package observer

// BlockTransformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json
type BlockTransformer interface {
Transform(*Block) error
Transform(*ParsedBlock) error
}

0 comments on commit c7ea95d

Please sign in to comment.