Skip to content

Commit

Permalink
observer refactoring to generics without Parsed Common
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Apr 9, 2024
1 parent e5ee138 commit 818a5d6
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 146 deletions.
18 changes: 9 additions & 9 deletions observer/all_channels_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var DefaultAllChannelsBlocksOpts = &AllChannelsBlocksOpts{
logger: zap.NewNop(),
}

func WithBlockPeerLogger(logger *zap.Logger) AllChannelsBlocksOpt {
func WithAllChannelsBlocksLogger(logger *zap.Logger) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
opts.logger = logger
}
Expand All @@ -75,7 +75,7 @@ func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) AllChannelsBlocksOpt {
}
}

func WithBlockPeerObservePeriod(observePeriod time.Duration) AllChannelsBlocksOpt {
func WithAllChannelsBlocksObservePeriod(observePeriod time.Duration) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
if observePeriod != 0 {
opts.observePeriod = observePeriod
Expand All @@ -96,9 +96,9 @@ func NewAllChannelsBlocks[T any](
opts ...AllChannelsBlocksOpt,
) *AllChannelsBlocks[T] {

blockPeerOpts := DefaultAllChannelsBlocksOpts
allChannelsBlocksOpts := DefaultAllChannelsBlocksOpts
for _, opt := range opts {
opt(blockPeerOpts)
opt(allChannelsBlocksOpts)
}

return &AllChannelsBlocks[T]{
Expand All @@ -109,12 +109,12 @@ func NewAllChannelsBlocks[T any](
peerChannelsGetter: peerChannelsGetter,
deliverer: deliverer,
createStreamWithRetry: createStreamWithRetry,
observePeriod: blockPeerOpts.observePeriod,
observePeriod: allChannelsBlocksOpts.observePeriod,

seekFrom: blockPeerOpts.seekFrom,
seekFromFetcher: blockPeerOpts.seekFromFetcher,
stopRecreateStream: blockPeerOpts.stopRecreateStream,
logger: blockPeerOpts.logger,
seekFrom: allChannelsBlocksOpts.seekFrom,
seekFromFetcher: allChannelsBlocksOpts.seekFromFetcher,
stopRecreateStream: allChannelsBlocksOpts.stopRecreateStream,
logger: allChannelsBlocksOpts.logger,
}
}

Expand Down
41 changes: 0 additions & 41 deletions observer/all_channels_blocks_common.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package observer

import (
"context"

"github.com/hyperledger/fabric-protos-go/common"

"github.com/s7techlab/hlf-sdk-go/api"
)

type AllChannelBlocksCommon struct {
*AllChannelsBlocks[*common.Block]

commonBlocks chan *CommonBlock
isWork bool
}

func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksCommon {
Expand All @@ -22,39 +17,3 @@ func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver ap

return &AllChannelBlocksCommon{AllChannelsBlocks: allChsBlocks}
}

func (a *AllChannelBlocksCommon) Observe(ctx context.Context) <-chan *CommonBlock {
if a.isWork {
return a.commonBlocks
}

a.commonBlocks = make(chan *CommonBlock)
go func() {
a.isWork = true
defer func() {
close(a.commonBlocks)
a.isWork = false
}()

blocks := a.AllChannelsBlocks.Observe(ctx)

for {
select {
case <-ctx.Done():
return

case cb, ok := <-blocks:
if !ok {
return
}
if cb == nil {
continue
}

a.commonBlocks <- &CommonBlock{Block: cb}
}
}
}()

return a.commonBlocks
}
12 changes: 6 additions & 6 deletions observer/all_channels_blocks_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

peerChannelsMockForCommon *observer.PeerChannelsMock
allChannelBlocksCommon *observer.AllChannelBlocksCommon
commonBlocks <-chan *observer.CommonBlock
commonBlocks <-chan *observer.Block[*common.Block]

peerChannelsMockConcurrentlyForCommon *observer.PeerChannelsMock
allChannelBlocksConcurrentlyCommon *observer.AllChannelBlocksCommon
Expand All @@ -37,7 +37,7 @@ func allChannelsBlocksCommonTestBeforeSuit() {
}

allChannelBlocksCommon = observer.NewAllChannelBlocksCommon(peerChannelsMockForCommon, blockDelivererMock,
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond))
observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond))

commonBlocks = allChannelBlocksCommon.Observe(ctx)

Expand All @@ -47,7 +47,7 @@ func allChannelsBlocksCommonTestBeforeSuit() {
}

allChannelBlocksConcurrentlyCommon = observer.NewAllChannelBlocksCommon(peerChannelsMockConcurrentlyForCommon, blockDelivererMock,
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond))
observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond))

channelWithChannelsCommon = allChannelBlocksConcurrentlyCommon.ObserveByChannels(ctx)
}
Expand All @@ -66,7 +66,7 @@ var _ = Describe("All channels blocks common", func() {
peerChannelsMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to commonBlockPeer observer
// wait to allChannelsBlocksCommon observer
time.Sleep(time.Millisecond * 10)

channels := allChannelBlocksCommon.Channels()
Expand All @@ -87,7 +87,7 @@ var _ = Describe("All channels blocks common", func() {
Expect(b.Channel).To(Equal(curBlockChannel))

blockNum := channelsBlocksHeights[curBlockChannel]
Expect(b.Block.Block.Header.Number).To(Equal(blockNum))
Expect(b.Block.Header.Number).To(Equal(blockNum))

channelsBlocksHeights[curBlockChannel]++

Expand Down Expand Up @@ -148,7 +148,7 @@ var _ = Describe("All channels blocks common", func() {
peerChannelsMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to commonBlockPeer observer
// wait to allChannelsBlocksCommon observer
time.Sleep(time.Millisecond * 200)

channels := allChannelBlocksConcurrentlyCommon.Channels()
Expand Down
41 changes: 0 additions & 41 deletions observer/all_channels_blocks_parsed.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package observer

import (
"context"

"github.com/s7techlab/hlf-sdk-go/api"
hlfproto "github.com/s7techlab/hlf-sdk-go/block"
)

type AllChannelBlocksParsed struct {
*AllChannelsBlocks[*hlfproto.Block]

parsedBlocks chan *ParsedBlock
isWork bool
}

func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksParsed {
Expand All @@ -21,39 +16,3 @@ func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver ap

return &AllChannelBlocksParsed{AllChannelsBlocks: allChsBlocks}
}

func (a *AllChannelBlocksParsed) Observe(ctx context.Context) <-chan *ParsedBlock {
if a.isWork {
return a.parsedBlocks
}

a.parsedBlocks = make(chan *ParsedBlock)
go func() {
a.isWork = true
defer func() {
close(a.parsedBlocks)
a.isWork = false
}()

blocks := a.AllChannelsBlocks.Observe(ctx)

for {
select {
case <-ctx.Done():
return

case cb, ok := <-blocks:
if !ok {
return
}
if cb == nil {
continue
}

a.parsedBlocks <- &ParsedBlock{Block: cb}
}
}
}()

return a.parsedBlocks
}
12 changes: 6 additions & 6 deletions observer/all_channels_blocks_parsed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
var (
peerChannelsMockForParsed *observer.PeerChannelsMock
allChannelBlocksParsed *observer.AllChannelBlocksParsed
parsedBlocks <-chan *observer.ParsedBlock
parsedBlocks <-chan *observer.Block[*hlfproto.Block]

peerChannelsMockConcurrentlyForParsed *observer.PeerChannelsMock
allChannelBlocksConcurrentlyParsed *observer.AllChannelBlocksParsed
Expand All @@ -34,7 +34,7 @@ func allChannelsBlocksParsedTestBeforeSuit() {
}

allChannelBlocksParsed = observer.NewAllChannelBlocksParsed(peerChannelsMockForParsed, blockDelivererMock,
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond))
observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond))

parsedBlocks = allChannelBlocksParsed.Observe(ctx)

Expand All @@ -44,7 +44,7 @@ func allChannelsBlocksParsedTestBeforeSuit() {
}

allChannelBlocksConcurrentlyParsed = observer.NewAllChannelBlocksParsed(peerChannelsMockConcurrentlyForParsed, blockDelivererMock,
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond))
observer.WithBlockStopRecreateStream(true), observer.WithAllChannelsBlocksObservePeriod(time.Nanosecond))

channelWithChannelsParsed = allChannelBlocksConcurrentlyParsed.ObserveByChannels(ctx)
}
Expand All @@ -63,7 +63,7 @@ var _ = Describe("All channels blocks parsed", func() {
peerChannelsMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to parsedBlockPeer observer
// wait to allChannelsBlocksParsed observer
time.Sleep(time.Second + time.Millisecond*10)

channels := allChannelBlocksParsed.Channels()
Expand All @@ -84,7 +84,7 @@ var _ = Describe("All channels blocks parsed", func() {
Expect(b.Channel).To(Equal(curBlockChannel))

blockNum := channelsBlocksHeights[curBlockChannel]
Expect(b.Block.Block.Header.Number).To(Equal(blockNum))
Expect(b.Block.Header.Number).To(Equal(blockNum))

channelsBlocksHeights[curBlockChannel]++

Expand Down Expand Up @@ -145,7 +145,7 @@ var _ = Describe("All channels blocks parsed", func() {
peerChannelsMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to blockPeer observer
// wait to allChannelsBlocksParsed observer
time.Sleep(time.Millisecond * 200)

channels := allChannelBlocksConcurrentlyParsed.Channels()
Expand Down
14 changes: 0 additions & 14 deletions observer/block.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,6 @@
package observer

import (
"github.com/hyperledger/fabric-protos-go/common"

hlfproto "github.com/s7techlab/hlf-sdk-go/block"
)

type Block[T any] struct {
Channel string
Block T
}

type CommonBlock struct {
*Block[*common.Block]
}

type ParsedBlock struct {
*Block[*hlfproto.Block]
}
34 changes: 17 additions & 17 deletions observer/peer_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/s7techlab/hlf-sdk-go/api"
)

const DefaultChannelPeerObservePeriod = 30 * time.Second
const DefaultPeerChannelsObservePeriod = 30 * time.Second

type (
ChannelInfo struct {
Expand Down Expand Up @@ -47,53 +47,53 @@ type (
api.ChainInfoGetter
}

ChannelPeerOpts struct {
PeerChannelsOpts struct {
channels []ChannelToMatch
observePeriod time.Duration
logger *zap.Logger
}

ChannelPeerOpt func(*ChannelPeerOpts)
PeerChannelsOpt func(*PeerChannelsOpts)
)

var DefaultChannelPeerOpts = &ChannelPeerOpts{
var DefaultPeerChannelsOpts = &PeerChannelsOpts{
channels: MatchAllChannels,
observePeriod: DefaultChannelPeerObservePeriod,
observePeriod: DefaultPeerChannelsObservePeriod,
logger: zap.NewNop(),
}

func WithChannels(channels []ChannelToMatch) ChannelPeerOpt {
return func(opts *ChannelPeerOpts) {
func WithChannels(channels []ChannelToMatch) PeerChannelsOpt {
return func(opts *PeerChannelsOpts) {
opts.channels = channels
}
}

func WithChannelPeerLogger(logger *zap.Logger) ChannelPeerOpt {
return func(opts *ChannelPeerOpts) {
func WithPeerChannelsLogger(logger *zap.Logger) PeerChannelsOpt {
return func(opts *PeerChannelsOpts) {
opts.logger = logger
}
}

func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...ChannelPeerOpt) (*PeerChannels, error) {
channelPeerOpts := DefaultChannelPeerOpts
func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...PeerChannelsOpt) (*PeerChannels, error) {
peerChannelsOpts := DefaultPeerChannelsOpts
for _, opt := range opts {
opt(channelPeerOpts)
opt(peerChannelsOpts)
}

channelsMatcher, err := NewChannelsMatcher(channelPeerOpts.channels)
channelsMatcher, err := NewChannelsMatcher(peerChannelsOpts.channels)
if err != nil {
return nil, fmt.Errorf(`channels matcher: %w`, err)
}

channelPeer := &PeerChannels{
peerChannels := &PeerChannels{
channelFetcher: peerChannelsFetcher,
channelsMatcher: channelsMatcher,
channels: make(map[string]*ChannelInfo),
observePeriod: channelPeerOpts.observePeriod,
logger: channelPeerOpts.logger,
observePeriod: peerChannelsOpts.observePeriod,
logger: peerChannelsOpts.logger,
}

return channelPeer, nil
return peerChannels, nil
}

func (pc *PeerChannels) Stop() {
Expand Down
12 changes: 0 additions & 12 deletions observer/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,8 @@ import (
"context"
"strconv"
"sync"

"github.com/hyperledger/fabric-protos-go/common"

hlfproto "github.com/s7techlab/hlf-sdk-go/block"
)

type CommonBlocksStream struct {
*BlocksStream[*common.Block]
}

type ParsedBlocksStream struct {
*BlocksStream[*hlfproto.Block]
}

type Stream[T any] interface {
Subscribe() (ch chan *Block[T], closer func())
}
Expand Down

0 comments on commit 818a5d6

Please sign in to comment.