diff --git a/observer/all_channels_blocks_common.go b/observer/all_channels_blocks_common.go index 761649b..a6e3b9c 100644 --- a/observer/all_channels_blocks_common.go +++ b/observer/all_channels_blocks_common.go @@ -1,8 +1,6 @@ package observer import ( - "context" - "github.com/hyperledger/fabric-protos-go/common" "github.com/s7techlab/hlf-sdk-go/api" @@ -10,9 +8,6 @@ import ( type AllChannelBlocksCommon struct { *AllChannelsBlocks[*common.Block] - - commonBlocks chan *CommonBlock - isWork bool } func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksCommon { @@ -22,39 +17,3 @@ func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver ap return &AllChannelBlocksCommon{AllChannelsBlocks: allChsBlocks} } - -func (a *AllChannelBlocksCommon) Observe(ctx context.Context) <-chan *CommonBlock { - if a.isWork { - return a.commonBlocks - } - - a.commonBlocks = make(chan *CommonBlock) - go func() { - a.isWork = true - defer func() { - close(a.commonBlocks) - a.isWork = false - }() - - blocks := a.AllChannelsBlocks.Observe(ctx) - - for { - select { - case <-ctx.Done(): - return - - case cb, ok := <-blocks: - if !ok { - return - } - if cb == nil { - continue - } - - a.commonBlocks <- &CommonBlock{Block: cb} - } - } - }() - - return a.commonBlocks -} diff --git a/observer/all_channels_blocks_common_test.go b/observer/all_channels_blocks_common_test.go index df6b1e7..94277a4 100644 --- a/observer/all_channels_blocks_common_test.go +++ b/observer/all_channels_blocks_common_test.go @@ -19,7 +19,7 @@ var ( peerChannelsMockForCommon *observer.PeerChannelsMock allChannelBlocksCommon *observer.AllChannelBlocksCommon - commonBlocks <-chan *observer.CommonBlock + commonBlocks <-chan *observer.Block[*common.Block] peerChannelsMockConcurrentlyForCommon *observer.PeerChannelsMock allChannelBlocksConcurrentlyCommon *observer.AllChannelBlocksCommon @@ -87,7 +87,7 @@ var _ = Describe("All channels blocks common", func() { Expect(b.Channel).To(Equal(curBlockChannel)) blockNum := channelsBlocksHeights[curBlockChannel] - Expect(b.Block.Block.Header.Number).To(Equal(blockNum)) + Expect(b.Block.Header.Number).To(Equal(blockNum)) channelsBlocksHeights[curBlockChannel]++ diff --git a/observer/all_channels_blocks_parsed.go b/observer/all_channels_blocks_parsed.go index 0a489e9..a4b6e6e 100644 --- a/observer/all_channels_blocks_parsed.go +++ b/observer/all_channels_blocks_parsed.go @@ -1,17 +1,12 @@ package observer import ( - "context" - "github.com/s7techlab/hlf-sdk-go/api" hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) type AllChannelBlocksParsed struct { *AllChannelsBlocks[*hlfproto.Block] - - parsedBlocks chan *ParsedBlock - isWork bool } func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksParsed { @@ -21,39 +16,3 @@ func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver ap return &AllChannelBlocksParsed{AllChannelsBlocks: allChsBlocks} } - -func (a *AllChannelBlocksParsed) Observe(ctx context.Context) <-chan *ParsedBlock { - if a.isWork { - return a.parsedBlocks - } - - a.parsedBlocks = make(chan *ParsedBlock) - go func() { - a.isWork = true - defer func() { - close(a.parsedBlocks) - a.isWork = false - }() - - blocks := a.AllChannelsBlocks.Observe(ctx) - - for { - select { - case <-ctx.Done(): - return - - case cb, ok := <-blocks: - if !ok { - return - } - if cb == nil { - continue - } - - a.parsedBlocks <- &ParsedBlock{Block: cb} - } - } - }() - - return a.parsedBlocks -} diff --git a/observer/all_channels_blocks_parsed_test.go b/observer/all_channels_blocks_parsed_test.go index fa3c28c..80ca6f3 100644 --- a/observer/all_channels_blocks_parsed_test.go +++ b/observer/all_channels_blocks_parsed_test.go @@ -16,7 +16,7 @@ import ( var ( peerChannelsMockForParsed *observer.PeerChannelsMock allChannelBlocksParsed *observer.AllChannelBlocksParsed - parsedBlocks <-chan *observer.ParsedBlock + parsedBlocks <-chan *observer.Block[*hlfproto.Block] peerChannelsMockConcurrentlyForParsed *observer.PeerChannelsMock allChannelBlocksConcurrentlyParsed *observer.AllChannelBlocksParsed @@ -84,7 +84,7 @@ var _ = Describe("All channels blocks parsed", func() { Expect(b.Channel).To(Equal(curBlockChannel)) blockNum := channelsBlocksHeights[curBlockChannel] - Expect(b.Block.Block.Header.Number).To(Equal(blockNum)) + Expect(b.Block.Header.Number).To(Equal(blockNum)) channelsBlocksHeights[curBlockChannel]++ diff --git a/observer/block.go b/observer/block.go index 3d4d0be..39394f6 100644 --- a/observer/block.go +++ b/observer/block.go @@ -1,20 +1,6 @@ package observer -import ( - "github.com/hyperledger/fabric-protos-go/common" - - hlfproto "github.com/s7techlab/hlf-sdk-go/block" -) - type Block[T any] struct { Channel string Block T } - -type CommonBlock struct { - *Block[*common.Block] -} - -type ParsedBlock struct { - *Block[*hlfproto.Block] -} diff --git a/observer/stream.go b/observer/stream.go index bd764a1..90f508c 100644 --- a/observer/stream.go +++ b/observer/stream.go @@ -4,20 +4,8 @@ import ( "context" "strconv" "sync" - - "github.com/hyperledger/fabric-protos-go/common" - - hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) -type CommonBlocksStream struct { - *BlocksStream[*common.Block] -} - -type ParsedBlocksStream struct { - *BlocksStream[*hlfproto.Block] -} - type Stream[T any] interface { Subscribe() (ch chan *Block[T], closer func()) }