diff --git a/api/client.go b/api/client.go index babd8e6..0633a15 100644 --- a/api/client.go +++ b/api/client.go @@ -7,6 +7,8 @@ import ( "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/msp" + + "github.com/s7techlab/hlf-sdk-go/block" ) type CurrentIdentity interface { @@ -43,6 +45,16 @@ type BlocksDeliverer interface { ) (blockChan <-chan *common.Block, closer func() error, err error) } +type ParsedBlocksDeliverer interface { + // ParsedBlocks the same as BlocksDeliverer.Blocks, but returns a channel with parsed blocks + ParsedBlocks( + ctx context.Context, + channel string, + identity msp.SigningIdentity, + blockRange ...int64, + ) (parsedBlockChan <-chan *block.Block, parsedCloser func() error, err error) +} + type Querier interface { CurrentIdentity // Query - shortcut for querying chaincodes diff --git a/client/peer.go b/client/peer.go index 6dce33a..60fee12 100644 --- a/client/peer.go +++ b/client/peer.go @@ -16,6 +16,7 @@ import ( "github.com/s7techlab/hlf-sdk-go/api" "github.com/s7techlab/hlf-sdk-go/api/config" + "github.com/s7techlab/hlf-sdk-go/block" "github.com/s7techlab/hlf-sdk-go/client/channel" "github.com/s7techlab/hlf-sdk-go/client/deliver" grpcclient "github.com/s7techlab/hlf-sdk-go/client/grpc" @@ -140,7 +141,7 @@ func (p *peer) Query( return response.Response, nil } -func (p *peer) Blocks(ctx context.Context, channel string, identity msp.SigningIdentity, blockRange ...int64) (blockChan <-chan *common.Block, closer func() error, err error) { +func (p *peer) Blocks(ctx context.Context, channel string, identity msp.SigningIdentity, blockRange ...int64) (<-chan *common.Block, func() error, error) { p.logger.Debug(`peer blocks request`, zap.String(`uri`, p.Uri()), zap.String(`channel`, channel), @@ -169,6 +170,44 @@ func (p *peer) Blocks(ctx context.Context, channel string, identity msp.SigningI return bs.Blocks(), bs.Close, nil } +func (p *peer) ParsedBlocks(ctx context.Context, channel string, identity msp.SigningIdentity, blockRange ...int64) (<-chan *block.Block, func() error, error) { + commonBlocks, commonCloser, err := p.Blocks(ctx, channel, identity, blockRange...) + if err != nil { + return nil, nil, err + } + + parsedBlockChan := make(chan *block.Block) + go func() { + for { + select { + case b, ok := <-commonBlocks: + if !ok { + return + } + + parsedBlock, err := block.ParseBlock(b) + if err != nil { + p.logger.Error("parse block", zap.String("channel", channel), zap.Uint64("number", b.Header.Number)) + continue + } + + parsedBlockChan <- parsedBlock + } + } + }() + + parsedCloser := func() error { + if closerErr := commonCloser(); closerErr != nil { + return closerErr + } + + close(parsedBlockChan) + return nil + } + + return parsedBlockChan, parsedCloser, nil +} + func (p *peer) Events(ctx context.Context, channel string, chaincode string, identity msp.SigningIdentity, blockRange ...int64) (events chan interface { Event() *fabricPeer.ChaincodeEvent Block() uint64