diff --git a/observer/all_channels_blocks.go b/observer/all_channels_blocks.go index b43872a..8f17501 100644 --- a/observer/all_channels_blocks.go +++ b/observer/all_channels_blocks.go @@ -57,7 +57,7 @@ var DefaultAllChannelsBlocksOpts = &AllChannelsBlocksOpts{ logger: zap.NewNop(), } -func WithBlockPeerLogger(logger *zap.Logger) AllChannelsBlocksOpt { +func WithAllChannelsBlocksLogger(logger *zap.Logger) AllChannelsBlocksOpt { return func(opts *AllChannelsBlocksOpts) { opts.logger = logger } @@ -75,7 +75,7 @@ func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) AllChannelsBlocksOpt { } } -func WithBlockPeerObservePeriod(observePeriod time.Duration) AllChannelsBlocksOpt { +func WithAllChannelsBlocksObservePeriod(observePeriod time.Duration) AllChannelsBlocksOpt { return func(opts *AllChannelsBlocksOpts) { if observePeriod != 0 { opts.observePeriod = observePeriod @@ -96,9 +96,9 @@ func NewAllChannelsBlocks[T any]( opts ...AllChannelsBlocksOpt, ) *AllChannelsBlocks[T] { - blockPeerOpts := DefaultAllChannelsBlocksOpts + allChannelsBlocksOpts := DefaultAllChannelsBlocksOpts for _, opt := range opts { - opt(blockPeerOpts) + opt(allChannelsBlocksOpts) } return &AllChannelsBlocks[T]{ @@ -109,12 +109,12 @@ func NewAllChannelsBlocks[T any]( peerChannelsGetter: peerChannelsGetter, deliverer: deliverer, createStreamWithRetry: createStreamWithRetry, - observePeriod: blockPeerOpts.observePeriod, + observePeriod: allChannelsBlocksOpts.observePeriod, - seekFrom: blockPeerOpts.seekFrom, - seekFromFetcher: blockPeerOpts.seekFromFetcher, - stopRecreateStream: blockPeerOpts.stopRecreateStream, - logger: blockPeerOpts.logger, + seekFrom: allChannelsBlocksOpts.seekFrom, + seekFromFetcher: allChannelsBlocksOpts.seekFromFetcher, + stopRecreateStream: allChannelsBlocksOpts.stopRecreateStream, + logger: allChannelsBlocksOpts.logger, } } diff --git a/observer/all_channels_blocks_common.go b/observer/all_channels_blocks_common.go index 761649b..a6e3b9c 100644 --- a/observer/all_channels_blocks_common.go +++ b/observer/all_channels_blocks_common.go @@ -1,8 +1,6 @@ package observer import ( - "context" - "github.com/hyperledger/fabric-protos-go/common" "github.com/s7techlab/hlf-sdk-go/api" @@ -10,9 +8,6 @@ import ( type AllChannelBlocksCommon struct { *AllChannelsBlocks[*common.Block] - - commonBlocks chan *CommonBlock - isWork bool } func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksCommon { @@ -22,39 +17,3 @@ func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver ap return &AllChannelBlocksCommon{AllChannelsBlocks: allChsBlocks} } - -func (a *AllChannelBlocksCommon) Observe(ctx context.Context) <-chan *CommonBlock { - if a.isWork { - return a.commonBlocks - } - - a.commonBlocks = make(chan *CommonBlock) - go func() { - a.isWork = true - defer func() { - close(a.commonBlocks) - a.isWork = false - }() - - blocks := a.AllChannelsBlocks.Observe(ctx) - - for { - select { - case <-ctx.Done(): - return - - case cb, ok := <-blocks: - if !ok { - return - } - if cb == nil { - continue - } - - a.commonBlocks <- &CommonBlock{Block: cb} - } - } - }() - - return a.commonBlocks -} diff --git a/observer/all_channels_blocks_common_test.go b/observer/all_channels_blocks_common_test.go index df6b1e7..1d52afa 100644 --- a/observer/all_channels_blocks_common_test.go +++ b/observer/all_channels_blocks_common_test.go @@ -19,7 +19,7 @@ var ( peerChannelsMockForCommon *observer.PeerChannelsMock allChannelBlocksCommon *observer.AllChannelBlocksCommon - commonBlocks <-chan *observer.CommonBlock + commonBlocks <-chan *observer.Block[*common.Block] peerChannelsMockConcurrentlyForCommon *observer.PeerChannelsMock allChannelBlocksConcurrentlyCommon *observer.AllChannelBlocksCommon @@ -37,7 +37,7 @@ func allChannelsBlocksCommonTestBeforeSuit() { } allChannelBlocksCommon = observer.NewAllChannelBlocksCommon(peerChannelsMockForCommon, blockDelivererMock, - observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond)) commonBlocks = allChannelBlocksCommon.Observe(ctx) @@ -47,7 +47,7 @@ func allChannelsBlocksCommonTestBeforeSuit() { } allChannelBlocksConcurrentlyCommon = observer.NewAllChannelBlocksCommon(peerChannelsMockConcurrentlyForCommon, blockDelivererMock, - observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond)) channelWithChannelsCommon = allChannelBlocksConcurrentlyCommon.ObserveByChannels(ctx) } @@ -66,7 +66,7 @@ var _ = Describe("All channels blocks common", func() { peerChannelsMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to commonBlockPeer observer + // wait to allChannelsBlocksCommon observer time.Sleep(time.Millisecond * 10) channels := allChannelBlocksCommon.Channels() @@ -87,7 +87,7 @@ var _ = Describe("All channels blocks common", func() { Expect(b.Channel).To(Equal(curBlockChannel)) blockNum := channelsBlocksHeights[curBlockChannel] - Expect(b.Block.Block.Header.Number).To(Equal(blockNum)) + Expect(b.Block.Header.Number).To(Equal(blockNum)) channelsBlocksHeights[curBlockChannel]++ @@ -148,7 +148,7 @@ var _ = Describe("All channels blocks common", func() { peerChannelsMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to commonBlockPeer observer + // wait to allChannelsBlocksCommon observer time.Sleep(time.Millisecond * 200) channels := allChannelBlocksConcurrentlyCommon.Channels() diff --git a/observer/all_channels_blocks_parsed.go b/observer/all_channels_blocks_parsed.go index 0a489e9..a4b6e6e 100644 --- a/observer/all_channels_blocks_parsed.go +++ b/observer/all_channels_blocks_parsed.go @@ -1,17 +1,12 @@ package observer import ( - "context" - "github.com/s7techlab/hlf-sdk-go/api" hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) type AllChannelBlocksParsed struct { *AllChannelsBlocks[*hlfproto.Block] - - parsedBlocks chan *ParsedBlock - isWork bool } func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksParsed { @@ -21,39 +16,3 @@ func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver ap return &AllChannelBlocksParsed{AllChannelsBlocks: allChsBlocks} } - -func (a *AllChannelBlocksParsed) Observe(ctx context.Context) <-chan *ParsedBlock { - if a.isWork { - return a.parsedBlocks - } - - a.parsedBlocks = make(chan *ParsedBlock) - go func() { - a.isWork = true - defer func() { - close(a.parsedBlocks) - a.isWork = false - }() - - blocks := a.AllChannelsBlocks.Observe(ctx) - - for { - select { - case <-ctx.Done(): - return - - case cb, ok := <-blocks: - if !ok { - return - } - if cb == nil { - continue - } - - a.parsedBlocks <- &ParsedBlock{Block: cb} - } - } - }() - - return a.parsedBlocks -} diff --git a/observer/all_channels_blocks_parsed_test.go b/observer/all_channels_blocks_parsed_test.go index fa3c28c..cabd257 100644 --- a/observer/all_channels_blocks_parsed_test.go +++ b/observer/all_channels_blocks_parsed_test.go @@ -16,7 +16,7 @@ import ( var ( peerChannelsMockForParsed *observer.PeerChannelsMock allChannelBlocksParsed *observer.AllChannelBlocksParsed - parsedBlocks <-chan *observer.ParsedBlock + parsedBlocks <-chan *observer.Block[*hlfproto.Block] peerChannelsMockConcurrentlyForParsed *observer.PeerChannelsMock allChannelBlocksConcurrentlyParsed *observer.AllChannelBlocksParsed @@ -34,7 +34,7 @@ func allChannelsBlocksParsedTestBeforeSuit() { } allChannelBlocksParsed = observer.NewAllChannelBlocksParsed(peerChannelsMockForParsed, blockDelivererMock, - observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond)) parsedBlocks = allChannelBlocksParsed.Observe(ctx) @@ -44,7 +44,7 @@ func allChannelsBlocksParsedTestBeforeSuit() { } allChannelBlocksConcurrentlyParsed = observer.NewAllChannelBlocksParsed(peerChannelsMockConcurrentlyForParsed, blockDelivererMock, - observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond)) channelWithChannelsParsed = allChannelBlocksConcurrentlyParsed.ObserveByChannels(ctx) } @@ -63,7 +63,7 @@ var _ = Describe("All channels blocks parsed", func() { peerChannelsMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to parsedBlockPeer observer + // wait to allChannelsBlocksParsed observer time.Sleep(time.Second + time.Millisecond*10) channels := allChannelBlocksParsed.Channels() @@ -84,7 +84,7 @@ var _ = Describe("All channels blocks parsed", func() { Expect(b.Channel).To(Equal(curBlockChannel)) blockNum := channelsBlocksHeights[curBlockChannel] - Expect(b.Block.Block.Header.Number).To(Equal(blockNum)) + Expect(b.Block.Header.Number).To(Equal(blockNum)) channelsBlocksHeights[curBlockChannel]++ @@ -145,7 +145,7 @@ var _ = Describe("All channels blocks parsed", func() { peerChannelsMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to blockPeer observer + // wait to allChannelsBlocksParsed observer time.Sleep(time.Millisecond * 200) channels := allChannelBlocksConcurrentlyParsed.Channels() diff --git a/observer/block.go b/observer/block.go index 3d4d0be..39394f6 100644 --- a/observer/block.go +++ b/observer/block.go @@ -1,20 +1,6 @@ package observer -import ( - "github.com/hyperledger/fabric-protos-go/common" - - hlfproto "github.com/s7techlab/hlf-sdk-go/block" -) - type Block[T any] struct { Channel string Block T } - -type CommonBlock struct { - *Block[*common.Block] -} - -type ParsedBlock struct { - *Block[*hlfproto.Block] -} diff --git a/observer/peer_channels.go b/observer/peer_channels.go index 19b757b..a6ecd2f 100644 --- a/observer/peer_channels.go +++ b/observer/peer_channels.go @@ -15,7 +15,7 @@ import ( "github.com/s7techlab/hlf-sdk-go/api" ) -const DefaultChannelPeerObservePeriod = 30 * time.Second +const DefaultPeerChannelsObservePeriod = 30 * time.Second type ( ChannelInfo struct { @@ -47,53 +47,53 @@ type ( api.ChainInfoGetter } - ChannelPeerOpts struct { + PeerChannelsOpts struct { channels []ChannelToMatch observePeriod time.Duration logger *zap.Logger } - ChannelPeerOpt func(*ChannelPeerOpts) + PeerChannelsOpt func(*PeerChannelsOpts) ) -var DefaultChannelPeerOpts = &ChannelPeerOpts{ +var DefaultPeerChannelsOpts = &PeerChannelsOpts{ channels: MatchAllChannels, - observePeriod: DefaultChannelPeerObservePeriod, + observePeriod: DefaultPeerChannelsObservePeriod, logger: zap.NewNop(), } -func WithChannels(channels []ChannelToMatch) ChannelPeerOpt { - return func(opts *ChannelPeerOpts) { +func WithChannels(channels []ChannelToMatch) PeerChannelsOpt { + return func(opts *PeerChannelsOpts) { opts.channels = channels } } -func WithChannelPeerLogger(logger *zap.Logger) ChannelPeerOpt { - return func(opts *ChannelPeerOpts) { +func WithPeerChannelsLogger(logger *zap.Logger) PeerChannelsOpt { + return func(opts *PeerChannelsOpts) { opts.logger = logger } } -func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...ChannelPeerOpt) (*PeerChannels, error) { - channelPeerOpts := DefaultChannelPeerOpts +func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...PeerChannelsOpt) (*PeerChannels, error) { + peerChannelsOpts := DefaultPeerChannelsOpts for _, opt := range opts { - opt(channelPeerOpts) + opt(peerChannelsOpts) } - channelsMatcher, err := NewChannelsMatcher(channelPeerOpts.channels) + channelsMatcher, err := NewChannelsMatcher(peerChannelsOpts.channels) if err != nil { return nil, fmt.Errorf(`channels matcher: %w`, err) } - channelPeer := &PeerChannels{ + peerChannels := &PeerChannels{ channelFetcher: peerChannelsFetcher, channelsMatcher: channelsMatcher, channels: make(map[string]*ChannelInfo), - observePeriod: channelPeerOpts.observePeriod, - logger: channelPeerOpts.logger, + observePeriod: peerChannelsOpts.observePeriod, + logger: peerChannelsOpts.logger, } - return channelPeer, nil + return peerChannels, nil } func (pc *PeerChannels) Stop() { diff --git a/observer/stream.go b/observer/stream.go index bd764a1..90f508c 100644 --- a/observer/stream.go +++ b/observer/stream.go @@ -4,20 +4,8 @@ import ( "context" "strconv" "sync" - - "github.com/hyperledger/fabric-protos-go/common" - - hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) -type CommonBlocksStream struct { - *BlocksStream[*common.Block] -} - -type ParsedBlocksStream struct { - *BlocksStream[*hlfproto.Block] -} - type Stream[T any] interface { Subscribe() (ch chan *Block[T], closer func()) }