Skip to content

Commit

Permalink
observer refactoring to generics
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Apr 8, 2024
1 parent 953b0d4 commit f9b34b4
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 204 deletions.
372 changes: 181 additions & 191 deletions block/block.pb.go

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions block/block.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion block/block.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import "hyperledger/fabric-protos/peer/proposal_response.proto";
import "hyperledger/fabric-protos/peer/transaction.proto";

message Block {
string channel = 4;
common.BlockHeader header = 1;
BlockData data = 2;
BlockMetadata metadata = 3;
Expand Down
28 changes: 21 additions & 7 deletions client/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func NewPeer(dialCtx context.Context, c config.ConnectionConfig, identity msp.Si

// NewFromGRPC allows initializing peer from existing GRPC connection
func NewFromGRPC(conn *grpc.ClientConn, identity msp.SigningIdentity, tlsCertHash []byte, logger *zap.Logger, endorseDefaultTimeout time.Duration) (api.Peer, error) {

if conn == nil {
return nil, errors.New(`empty connection`)
}
Expand All @@ -95,9 +94,29 @@ func NewFromGRPC(conn *grpc.ClientConn, identity msp.SigningIdentity, tlsCertHas
identity: identity,
tlsCertHash: tlsCertHash,
endorseDefaultTimeout: endorseDefaultTimeout,
configBlocks: make(map[string]*common.Block),
logger: logger.Named(`peer`),
}

qsccService := qscc.NewQSCC(p)
ctx := context.Background()

channels, err := p.GetChannels(ctx)
if err != nil {
return nil, fmt.Errorf("get all channels: %w", err)
}

for _, ch := range channels.GetChannels() {
configBlock, err := qsccService.GetBlockByNumber(ctx, &qscc.GetBlockByNumberRequest{ChannelName: ch.ChannelId, BlockNumber: 0})
if err != nil {
return nil, fmt.Errorf("get block by number from channel %s: %w", ch.ChannelId, err)
}

if configBlock != nil {
p.configBlocks[ch.ChannelId] = configBlock
}
}

return p, nil
}

Expand Down Expand Up @@ -198,12 +217,7 @@ func (p *peer) ParsedBlocks(ctx context.Context, channel string, identity msp.Si
}

p.mu.Lock()
var configBlock *common.Block
if b.Header.Number == 0 {
p.configBlocks[channel] = configBlock
} else {
configBlock = p.configBlocks[channel]
}
configBlock := p.configBlocks[channel]
p.mu.Unlock()

parsedBlock, err := block.ParseBlock(b, block.WithConfigBlock(configBlock))
Expand Down
2 changes: 1 addition & 1 deletion observer/all_channels_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (acb *allChannelsBlocks[T]) startNotObservedChannels(ctx context.Context, n
for _, notObservedChannel := range notObservedChannels {
chBlocks := notObservedChannel

if _, err := chBlocks.observe(ctx); err != nil {
if _, err := chBlocks.Observe(ctx); err != nil {
acb.logger.Warn(`init channel observer`, zap.String("channel", notObservedChannel.channel), zap.Error(err))
}

Expand Down
2 changes: 1 addition & 1 deletion observer/all_channels_blocks_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (acb *allChannelsBlocks[T]) startNotObservedChannelsConcurrently(
for _, notObservedChannel := range notObservedChannels {
chBlocks := notObservedChannel

if _, err := chBlocks.observe(ctx); err != nil {
if _, err := chBlocks.Observe(ctx); err != nil {
acb.logger.Warn(`init channel observer concurrently`, zap.String("channel", notObservedChannel.channel), zap.Error(err))
}

Expand Down
2 changes: 1 addition & 1 deletion observer/channel_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *channelBlocks[T]) Stop() error {
return err
}

func (c *channelBlocks[T]) observe(ctx context.Context) (<-chan *Block[T], error) {
func (c *channelBlocks[T]) Observe(ctx context.Context) (<-chan *Block[T], error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down

0 comments on commit f9b34b4

Please sign in to comment.