Skip to content

Commit

Permalink
observer refactoring to generics
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Apr 5, 2024
1 parent d172d35 commit 69527d2
Show file tree
Hide file tree
Showing 18 changed files with 1,447 additions and 188 deletions.
4 changes: 4 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func ParseBlock(block *common.Block, opts ...ParseBlockOpt) (*Block, error) {
}

func ParseOrdererIdentity(cb *common.Block) (*msp.SerializedIdentity, error) {
if cb == nil {
return nil, nil
}

meta, err := protoutil.GetMetadataFromBlock(cb, common.BlockMetadataIndex_SIGNATURES)
if err != nil {
return nil, fmt.Errorf("get metadata from block: %w", err)
Expand Down
365 changes: 181 additions & 184 deletions block/block.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions block/block.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion block/block.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "hyperledger/fabric-protos/peer/proposal_response.proto";
import "hyperledger/fabric-protos/peer/transaction.proto";

message Block {
string channel = 4;
common.BlockHeader header = 1;
BlockData data = 2;
BlockMetadata metadata = 3;
Expand Down Expand Up @@ -110,7 +111,6 @@ message Endorsement {

message BlockMetadata {
repeated OrdererSignature ordererSignatures = 1;
bytes raw_unparsed_metadata_signatures = 2;
}

message OrdererSignature {
Expand Down
19 changes: 18 additions & 1 deletion client/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -40,6 +41,9 @@ type peer struct {

endorseDefaultTimeout time.Duration

configBlocks map[string]*common.Block
mu sync.Mutex

logger *zap.Logger
}

Expand Down Expand Up @@ -76,6 +80,7 @@ func NewPeer(dialCtx context.Context, c config.ConnectionConfig, identity msp.Si

// NewFromGRPC allows initializing peer from existing GRPC connection
func NewFromGRPC(conn *grpc.ClientConn, identity msp.SigningIdentity, tlsCertHash []byte, logger *zap.Logger, endorseDefaultTimeout time.Duration) (api.Peer, error) {

if conn == nil {
return nil, errors.New(`empty connection`)
}
Expand Down Expand Up @@ -188,8 +193,20 @@ func (p *peer) ParsedBlocks(ctx context.Context, channel string, identity msp.Si
if !ok {
return
}
if b == nil {
return
}

p.mu.Lock()
var configBlock *common.Block
if b.Header.Number == 0 {
p.configBlocks[channel] = configBlock
} else {
configBlock = p.configBlocks[channel]
}
p.mu.Unlock()

parsedBlock, err := block.ParseBlock(b)
parsedBlock, err := block.ParseBlock(b, block.WithConfigBlock(configBlock))
if err != nil {
p.logger.Error("parse block", zap.String("channel", channel), zap.Uint64("number", b.Header.Number))
continue
Expand Down
259 changes: 259 additions & 0 deletions observer1/all_channels_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package observer1

import (
"context"
"sync"
"time"

"github.com/hyperledger/fabric/msp"
"go.uber.org/zap"
)

const DefaultAllChannelsBlocksObservePeriod = 10 * time.Second

type (
PeerChannelsGetter interface {
Uri() string
Channels() map[string]*ChannelInfo
}

allChannelsBlocks[T any] struct {
channelObservers map[string]*channelBlocks[T]

blocks chan *Block[T]
blocksByChannels map[string]chan *Block[T]

peerChannelsGetter PeerChannelsGetter
deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error)
createStreamWithRetry CreateBlockStreamWithRetry[T]

observePeriod time.Duration

// seekFrom has a higher priority than seekFromFetcher (look getSeekFrom method)
seekFrom map[string]uint64
seekFromFetcher SeekFromFetcher
stopRecreateStream bool

isWork bool
cancelObserve context.CancelFunc

mu sync.RWMutex
logger *zap.Logger
}

AllChannelsBlocksOpts struct {
seekFrom map[string]uint64
seekFromFetcher SeekFromFetcher
observePeriod time.Duration
stopRecreateStream bool
logger *zap.Logger
}

AllChannelsBlocksOpt func(*AllChannelsBlocksOpts)
)

var DefaultAllChannelsBlocksOpts = &AllChannelsBlocksOpts{
observePeriod: DefaultAllChannelsBlocksObservePeriod,
logger: zap.NewNop(),
}

func WithBlockPeerLogger(logger *zap.Logger) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
opts.logger = logger
}
}

func WithSeekFrom(seekFrom map[string]uint64) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
opts.seekFrom = seekFrom
}
}

func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
opts.seekFromFetcher = seekFromFetcher
}
}

func WithBlockPeerObservePeriod(observePeriod time.Duration) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
if observePeriod != 0 {
opts.observePeriod = observePeriod
}
}
}

func WithBlockStopRecreateStream(stop bool) AllChannelsBlocksOpt {
return func(opts *AllChannelsBlocksOpts) {
opts.stopRecreateStream = stop
}
}

func newAllChannelsBlocks[T any](
peerChannelsGetter PeerChannelsGetter,
deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error),
createStreamWithRetry CreateBlockStreamWithRetry[T],
opts ...AllChannelsBlocksOpt,
) *allChannelsBlocks[T] {

blockPeerOpts := DefaultAllChannelsBlocksOpts
for _, opt := range opts {
opt(blockPeerOpts)
}

return &allChannelsBlocks[T]{
channelObservers: make(map[string]*channelBlocks[T]),
blocks: make(chan *Block[T]),
blocksByChannels: make(map[string]chan *Block[T]),

peerChannelsGetter: peerChannelsGetter,
deliverer: deliverer,
createStreamWithRetry: createStreamWithRetry,
observePeriod: blockPeerOpts.observePeriod,

seekFrom: blockPeerOpts.seekFrom,
seekFromFetcher: blockPeerOpts.seekFromFetcher,
stopRecreateStream: blockPeerOpts.stopRecreateStream,
logger: blockPeerOpts.logger,
}
}

func (acb *allChannelsBlocks[T]) Channels() map[string]*Channel {
acb.mu.RLock()
defer acb.mu.RUnlock()

var copyChannels = make(map[string]*Channel, len(acb.channelObservers))
for key, value := range acb.channelObservers {
copyChannels[key] = value.Channel
}

return copyChannels
}

func (acb *allChannelsBlocks[T]) Stop() {
acb.mu.Lock()
defer acb.mu.Unlock()

// acb.blocks and acb.blocksByChannels mustn't be closed here, because they are closed elsewhere

for _, c := range acb.channelObservers {
if err := c.Stop(); err != nil {
zap.Error(err)
}
}

acb.channelObservers = make(map[string]*channelBlocks[T])

if acb.cancelObserve != nil {
acb.cancelObserve()
}

acb.isWork = false
}

func (acb *allChannelsBlocks[T]) Observe(ctx context.Context) <-chan *Block[T] {
if acb.isWork {
return acb.blocks
}

// ctxObserve using for nested control process without stopped primary context
ctxObserve, cancel := context.WithCancel(ctx)
acb.cancelObserve = cancel

acb.startNotObservedChannels(ctxObserve, acb.initChannelsObservers())

acb.blocks = make(chan *Block[T])

// init new channels if they are fetched
go func() {
acb.isWork = true
defer close(acb.blocks)

ticker := time.NewTicker(acb.observePeriod)
for {
select {
case <-ctxObserve.Done():
acb.Stop()
return

case <-ticker.C:
acb.startNotObservedChannels(ctxObserve, acb.initChannelsObservers())
}
}
}()

return acb.blocks
}

func (acb *allChannelsBlocks[T]) startNotObservedChannels(ctx context.Context, notObservedChannels []*channelBlocks[T]) {
for _, notObservedChannel := range notObservedChannels {
chBlocks := notObservedChannel

if _, err := chBlocks.observe(ctx); err != nil {
acb.logger.Warn(`init channel observer`, zap.String("channel", notObservedChannel.channel), zap.Error(err))
}

// channel merger
go func() {
for b := range chBlocks.channelWithBlocks {
acb.blocks <- b
}

//// after all reads peerParsedChannel.observer.blocks close channels
//close(acb.blocks)
//for _, blocks := range acb.blocksByChannels {
// close(blocks)
//}
}()
}
}

func (acb *allChannelsBlocks[T]) initChannelsObservers() []*channelBlocks[T] {
var notObservedChannels []*channelBlocks[T]

for channel := range acb.peerChannelsGetter.Channels() {
acb.mu.RLock()
_, ok := acb.channelObservers[channel]
acb.mu.RUnlock()

if !ok {
acb.logger.Info(`add channel observer`, zap.String(`channel`, channel))

seekFrom := acb.getSeekFrom(channel)

chBlocks := newChannelBlocks[T](
channel,
acb.deliverer,
acb.createStreamWithRetry,
seekFrom,
WithChannelBlockLogger(acb.logger),
WithChannelStopRecreateStream(acb.stopRecreateStream))

acb.mu.Lock()
acb.channelObservers[channel] = chBlocks
acb.mu.Unlock()

notObservedChannels = append(notObservedChannels, chBlocks)
}
}

return notObservedChannels
}

func (acb *allChannelsBlocks[T]) getSeekFrom(channel string) SeekFromFetcher {
seekFrom := ChannelSeekOldest()
// at first check seekFrom var, if it is empty, check seekFromFetcher
acb.mu.RLock()
seekFromNum, exist := acb.seekFrom[channel]
acb.mu.RUnlock()
if exist {
seekFrom = ChannelSeekFrom(seekFromNum - 1)
} else {
// if seekFromFetcher is also empty, use ChannelSeekOldest
if acb.seekFromFetcher != nil {
seekFrom = acb.seekFromFetcher
}
}

return seekFrom
}
19 changes: 19 additions & 0 deletions observer1/all_channels_blocks_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package observer1

import (
"github.com/hyperledger/fabric-protos-go/common"

"github.com/s7techlab/hlf-sdk-go/api"
)

type AllChannelBlocksCommon struct {
*allChannelsBlocks[*common.Block]
}

func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksCommon {
createStreamWithRetry := CreateBlockStreamWithRetryDelay[*common.Block](DefaultConnectRetryDelay)

allChsBlocks := newAllChannelsBlocks[*common.Block](peerChannels, blocksDeliver.Blocks, createStreamWithRetry, opts...)

return &AllChannelBlocksCommon{allChannelsBlocks: allChsBlocks}
}
Loading

0 comments on commit 69527d2

Please sign in to comment.