Skip to content

Commit

Permalink
observer refactoring to generics without Parsed Common
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Apr 9, 2024
1 parent e5ee138 commit d49f7b4
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 112 deletions.
41 changes: 0 additions & 41 deletions observer/all_channels_blocks_common.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package observer

import (
"context"

"github.com/hyperledger/fabric-protos-go/common"

"github.com/s7techlab/hlf-sdk-go/api"
)

type AllChannelBlocksCommon struct {
*AllChannelsBlocks[*common.Block]

commonBlocks chan *CommonBlock
isWork bool
}

func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksCommon {
Expand All @@ -22,39 +17,3 @@ func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver ap

return &AllChannelBlocksCommon{AllChannelsBlocks: allChsBlocks}
}

func (a *AllChannelBlocksCommon) Observe(ctx context.Context) <-chan *CommonBlock {
if a.isWork {
return a.commonBlocks
}

a.commonBlocks = make(chan *CommonBlock)
go func() {
a.isWork = true
defer func() {
close(a.commonBlocks)
a.isWork = false
}()

blocks := a.AllChannelsBlocks.Observe(ctx)

for {
select {
case <-ctx.Done():
return

case cb, ok := <-blocks:
if !ok {
return
}
if cb == nil {
continue
}

a.commonBlocks <- &CommonBlock{Block: cb}
}
}
}()

return a.commonBlocks
}
4 changes: 2 additions & 2 deletions observer/all_channels_blocks_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

peerChannelsMockForCommon *observer.PeerChannelsMock
allChannelBlocksCommon *observer.AllChannelBlocksCommon
commonBlocks <-chan *observer.CommonBlock
commonBlocks <-chan *observer.Block[*common.Block]

peerChannelsMockConcurrentlyForCommon *observer.PeerChannelsMock
allChannelBlocksConcurrentlyCommon *observer.AllChannelBlocksCommon
Expand Down Expand Up @@ -87,7 +87,7 @@ var _ = Describe("All channels blocks common", func() {
Expect(b.Channel).To(Equal(curBlockChannel))

blockNum := channelsBlocksHeights[curBlockChannel]
Expect(b.Block.Block.Header.Number).To(Equal(blockNum))
Expect(b.Block.Header.Number).To(Equal(blockNum))

channelsBlocksHeights[curBlockChannel]++

Expand Down
41 changes: 0 additions & 41 deletions observer/all_channels_blocks_parsed.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package observer

import (
"context"

"github.com/s7techlab/hlf-sdk-go/api"
hlfproto "github.com/s7techlab/hlf-sdk-go/block"
)

type AllChannelBlocksParsed struct {
*AllChannelsBlocks[*hlfproto.Block]

parsedBlocks chan *ParsedBlock
isWork bool
}

func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksParsed {
Expand All @@ -21,39 +16,3 @@ func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver ap

return &AllChannelBlocksParsed{AllChannelsBlocks: allChsBlocks}
}

func (a *AllChannelBlocksParsed) Observe(ctx context.Context) <-chan *ParsedBlock {
if a.isWork {
return a.parsedBlocks
}

a.parsedBlocks = make(chan *ParsedBlock)
go func() {
a.isWork = true
defer func() {
close(a.parsedBlocks)
a.isWork = false
}()

blocks := a.AllChannelsBlocks.Observe(ctx)

for {
select {
case <-ctx.Done():
return

case cb, ok := <-blocks:
if !ok {
return
}
if cb == nil {
continue
}

a.parsedBlocks <- &ParsedBlock{Block: cb}
}
}
}()

return a.parsedBlocks
}
4 changes: 2 additions & 2 deletions observer/all_channels_blocks_parsed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
var (
peerChannelsMockForParsed *observer.PeerChannelsMock
allChannelBlocksParsed *observer.AllChannelBlocksParsed
parsedBlocks <-chan *observer.ParsedBlock
parsedBlocks <-chan *observer.Block[*hlfproto.Block]

peerChannelsMockConcurrentlyForParsed *observer.PeerChannelsMock
allChannelBlocksConcurrentlyParsed *observer.AllChannelBlocksParsed
Expand Down Expand Up @@ -84,7 +84,7 @@ var _ = Describe("All channels blocks parsed", func() {
Expect(b.Channel).To(Equal(curBlockChannel))

blockNum := channelsBlocksHeights[curBlockChannel]
Expect(b.Block.Block.Header.Number).To(Equal(blockNum))
Expect(b.Block.Header.Number).To(Equal(blockNum))

channelsBlocksHeights[curBlockChannel]++

Expand Down
14 changes: 0 additions & 14 deletions observer/block.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,6 @@
package observer

import (
"github.com/hyperledger/fabric-protos-go/common"

hlfproto "github.com/s7techlab/hlf-sdk-go/block"
)

type Block[T any] struct {
Channel string
Block T
}

type CommonBlock struct {
*Block[*common.Block]
}

type ParsedBlock struct {
*Block[*hlfproto.Block]
}
12 changes: 0 additions & 12 deletions observer/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,8 @@ import (
"context"
"strconv"
"sync"

"github.com/hyperledger/fabric-protos-go/common"

hlfproto "github.com/s7techlab/hlf-sdk-go/block"
)

type CommonBlocksStream struct {
*BlocksStream[*common.Block]
}

type ParsedBlocksStream struct {
*BlocksStream[*hlfproto.Block]
}

type Stream[T any] interface {
Subscribe() (ch chan *Block[T], closer func())
}
Expand Down

0 comments on commit d49f7b4

Please sign in to comment.