From 69527d2410db126dc0a4bf4a35fe09a910422dda Mon Sep 17 00:00:00 2001 From: Nikita Neznaemov Date: Fri, 5 Apr 2024 17:00:08 +0300 Subject: [PATCH] observer refactoring to generics --- block/block.go | 4 + block/block.pb.go | 365 +++++++++--------- block/block.pb.validate.go | 4 +- block/block.proto | 2 +- client/peer.go | 19 +- observer1/all_channels_blocks.go | 259 +++++++++++++ observer1/all_channels_blocks_common.go | 19 + observer1/all_channels_blocks_concurrently.go | 74 ++++ observer1/all_channels_parsed.go | 18 + observer1/block.go | 6 + observer1/channel.go | 148 +++++++ observer1/channel_blocks.go | 205 ++++++++++ observer1/channel_blocks_common.go | 62 +++ observer1/channel_blocks_parsed.go | 65 ++++ observer1/channel_blocks_stream.go | 31 ++ observer1/channels_matcher.go | 149 +++++++ observer1/peer_channels.go | 199 ++++++++++ observer1/transformer.go | 6 + 18 files changed, 1447 insertions(+), 188 deletions(-) create mode 100644 observer1/all_channels_blocks.go create mode 100644 observer1/all_channels_blocks_common.go create mode 100644 observer1/all_channels_blocks_concurrently.go create mode 100644 observer1/all_channels_parsed.go create mode 100644 observer1/block.go create mode 100644 observer1/channel.go create mode 100644 observer1/channel_blocks.go create mode 100644 observer1/channel_blocks_common.go create mode 100644 observer1/channel_blocks_parsed.go create mode 100644 observer1/channel_blocks_stream.go create mode 100644 observer1/channels_matcher.go create mode 100644 observer1/peer_channels.go create mode 100644 observer1/transformer.go diff --git a/block/block.go b/block/block.go index 45ddce7..4022cc2 100644 --- a/block/block.go +++ b/block/block.go @@ -75,6 +75,10 @@ func ParseBlock(block *common.Block, opts ...ParseBlockOpt) (*Block, error) { } func ParseOrdererIdentity(cb *common.Block) (*msp.SerializedIdentity, error) { + if cb == nil { + return nil, nil + } + meta, err := protoutil.GetMetadataFromBlock(cb, common.BlockMetadataIndex_SIGNATURES) if err != nil { return nil, fmt.Errorf("get metadata from block: %w", err) diff --git a/block/block.pb.go b/block/block.pb.go index 1dd14fe..fbf55ec 100644 --- a/block/block.pb.go +++ b/block/block.pb.go @@ -30,6 +30,7 @@ type Block struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + Channel string `protobuf:"bytes,4,opt,name=channel,proto3" json:"channel,omitempty"` Header *common.BlockHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` Data *BlockData `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` Metadata *BlockMetadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"` @@ -67,6 +68,13 @@ func (*Block) Descriptor() ([]byte, []int) { return file_block_proto_rawDescGZIP(), []int{0} } +func (x *Block) GetChannel() string { + if x != nil { + return x.Channel + } + return "" +} + func (x *Block) GetHeader() *common.BlockHeader { if x != nil { return x.Header @@ -1014,8 +1022,7 @@ type BlockMetadata struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - OrdererSignatures []*OrdererSignature `protobuf:"bytes,1,rep,name=ordererSignatures,proto3" json:"ordererSignatures,omitempty"` - RawUnparsedMetadataSignatures []byte `protobuf:"bytes,2,opt,name=raw_unparsed_metadata_signatures,json=rawUnparsedMetadataSignatures,proto3" json:"raw_unparsed_metadata_signatures,omitempty"` + OrdererSignatures []*OrdererSignature `protobuf:"bytes,1,rep,name=ordererSignatures,proto3" json:"ordererSignatures,omitempty"` } func (x *BlockMetadata) Reset() { @@ -1057,13 +1064,6 @@ func (x *BlockMetadata) GetOrdererSignatures() []*OrdererSignature { return nil } -func (x *BlockMetadata) GetRawUnparsedMetadataSignatures() []byte { - if x != nil { - return x.RawUnparsedMetadataSignatures - } - return nil -} - type OrdererSignature struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1149,183 +1149,180 @@ var file_block_proto_rawDesc = []byte{ 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x30, 0x68, 0x79, 0x70, 0x65, 0x72, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x72, 0x2f, 0x66, 0x61, 0x62, 0x72, 0x69, 0x63, 0x2d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x9e, 0x01, 0x0a, - 0x05, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x2b, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, - 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x06, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x12, 0x2d, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x19, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, - 0x63, 0x6b, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, - 0x74, 0x61, 0x12, 0x39, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x43, 0x0a, - 0x09, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x12, 0x36, 0x0a, 0x09, 0x65, 0x6e, - 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, - 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x45, - 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, - 0x65, 0x73, 0x22, 0x9e, 0x01, 0x0a, 0x08, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x12, - 0x31, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x17, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, - 0x61, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, - 0x12, 0x41, 0x0a, 0x0f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, - 0x6f, 0x64, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x73, 0x2e, 0x54, 0x78, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, - 0x6f, 0x64, 0x65, 0x52, 0x0e, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, - 0x6f, 0x64, 0x65, 0x22, 0xb2, 0x01, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, - 0x2e, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x16, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, - 0x3d, 0x0a, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x38, - 0x0a, 0x18, 0x72, 0x61, 0x77, 0x5f, 0x75, 0x6e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x64, 0x5f, 0x74, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x16, 0x72, 0x61, 0x77, 0x55, 0x6e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x64, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x92, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x12, 0x3c, 0x0a, 0x0e, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, - 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x48, 0x65, 0x61, 0x64, - 0x65, 0x72, 0x52, 0x0d, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x48, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x5f, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x6c, - 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x53, 0x69, 0x67, - 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x0f, 0x73, 0x69, - 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x5a, 0x0a, - 0x0f, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x12, 0x31, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x73, 0x70, 0x2e, 0x53, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, - 0x65, 0x64, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, - 0x74, 0x6f, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x22, 0x90, 0x01, 0x0a, 0x0b, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x44, 0x0a, 0x0e, 0x63, 0x68, 0x61, - 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x1d, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, - 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x0d, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, - 0x3b, 0x0a, 0x07, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x21, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x8e, 0x01, 0x0a, - 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x37, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, - 0x6f, 0x63, 0x6b, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x48, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x40, 0x0a, 0x07, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x68, - 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, - 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xc1, 0x01, - 0x0a, 0x16, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x66, 0x0a, 0x1a, 0x63, 0x68, 0x61, 0x69, - 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x5f, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x5f, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, - 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, - 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x50, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x18, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, - 0x65, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, - 0x12, 0x3f, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x27, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x45, 0x6e, 0x64, 0x6f, 0x72, - 0x73, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x22, 0xf2, 0x01, 0x0a, 0x18, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x50, - 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, - 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, - 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x52, 0x05, - 0x69, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x5e, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, - 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3a, 0x2e, 0x68, 0x6c, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb8, 0x01, 0x0a, + 0x05, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, + 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, + 0x12, 0x2b, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x13, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x2d, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x68, 0x6c, + 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x39, 0x0a, 0x08, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, + 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x43, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x44, 0x61, 0x74, 0x61, 0x12, 0x36, 0x0a, 0x09, 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, + 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, + 0x65, 0x52, 0x09, 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x73, 0x22, 0x9e, 0x01, 0x0a, + 0x08, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x12, 0x31, 0x0a, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x68, 0x6c, 0x66, + 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x50, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1c, 0x0a, 0x09, + 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x41, 0x0a, 0x0f, 0x76, 0x61, + 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x54, 0x78, 0x56, + 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x64, 0x65, 0x52, 0x0e, 0x76, + 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x64, 0x65, 0x22, 0xb2, 0x01, + 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x2e, 0x0a, 0x06, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x68, 0x6c, 0x66, 0x73, + 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x0b, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, + 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x18, 0x72, 0x61, 0x77, 0x5f, + 0x75, 0x6e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x64, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x16, 0x72, 0x61, 0x77, 0x55, + 0x6e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x64, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x22, 0x92, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x3c, 0x0a, + 0x0e, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x0d, 0x63, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x4a, 0x0a, 0x10, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, + 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x0f, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x5a, 0x0a, 0x0f, 0x53, 0x69, 0x67, 0x6e, 0x61, + 0x74, 0x75, 0x72, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x31, 0x0a, 0x07, 0x63, 0x72, + 0x65, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x73, + 0x70, 0x2e, 0x53, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x49, 0x64, 0x65, 0x6e, + 0x74, 0x69, 0x74, 0x79, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x14, 0x0a, + 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, + 0x6e, 0x63, 0x65, 0x22, 0x90, 0x01, 0x0a, 0x0b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x44, 0x0a, 0x0e, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x63, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, - 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x50, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, 0x6e, 0x74, 0x4d, - 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, - 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x1a, 0x3f, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, - 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xbd, 0x01, 0x0a, 0x17, 0x43, 0x68, 0x61, 0x69, 0x6e, - 0x63, 0x6f, 0x64, 0x65, 0x45, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x63, 0x0a, 0x19, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x5f, 0x72, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, - 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x17, - 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x3d, 0x0a, 0x0b, 0x65, 0x6e, 0x64, 0x6f, 0x72, - 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x68, - 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x45, 0x6e, - 0x64, 0x6f, 0x72, 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x0b, 0x65, 0x6e, 0x64, 0x6f, 0x72, - 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x7d, 0x0a, 0x17, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, + 0x6e, 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0d, 0x63, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, 0x07, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x68, 0x6c, 0x66, + 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x37, 0x0a, 0x06, + 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, + 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x53, 0x69, + 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x06, 0x68, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x40, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, + 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, + 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xc1, 0x01, 0x0a, 0x16, 0x43, 0x68, 0x61, 0x69, + 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x12, 0x66, 0x0a, 0x1a, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x5f, + 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, + 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, + 0x65, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x52, 0x18, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x6f, + 0x73, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x3f, 0x0a, 0x06, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x68, 0x6c, 0x66, + 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, + 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x45, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x64, 0x41, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xf2, 0x01, 0x0a, 0x18, + 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, + 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x52, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x12, + 0x5e, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3a, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, + 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, + 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x1a, + 0x3f, 0x0a, 0x11, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x61, 0x70, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x22, 0xbd, 0x01, 0x0a, 0x17, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x45, 0x6e, + 0x64, 0x6f, 0x72, 0x73, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x63, 0x0a, 0x19, + 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x27, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, + 0x2e, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x17, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, - 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x5f, 0x68, 0x61, - 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, - 0x61, 0x6c, 0x48, 0x61, 0x73, 0x68, 0x12, 0x3d, 0x0a, 0x09, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, - 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x6c, 0x66, 0x73, - 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, - 0x63, 0x6f, 0x64, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x65, 0x78, 0x74, 0x65, - 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0xe1, 0x01, 0x0a, 0x0f, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, - 0x6f, 0x64, 0x65, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x07, 0x72, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x68, 0x6c, 0x66, - 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x54, 0x78, 0x52, 0x65, - 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x73, 0x12, 0x2e, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x43, 0x68, 0x61, - 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, - 0x6e, 0x74, 0x73, 0x12, 0x2c, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x36, 0x0a, 0x0c, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x5f, 0x69, - 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x52, 0x0b, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x6a, 0x0a, 0x0e, 0x54, 0x78, 0x52, - 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x64, - 0x61, 0x74, 0x61, 0x5f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x09, 0x64, 0x61, 0x74, 0x61, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x12, 0x39, 0x0a, 0x08, 0x6e, 0x73, - 0x5f, 0x72, 0x77, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x68, - 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4e, 0x73, - 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x52, 0x07, 0x6e, 0x73, - 0x52, 0x77, 0x73, 0x65, 0x74, 0x22, 0xbc, 0x01, 0x0a, 0x0e, 0x4e, 0x73, 0x52, 0x65, 0x61, 0x64, - 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, - 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, - 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x26, 0x0a, 0x05, 0x72, 0x77, 0x73, 0x65, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6b, 0x76, 0x72, 0x77, 0x73, 0x65, 0x74, 0x2e, - 0x4b, 0x56, 0x52, 0x57, 0x53, 0x65, 0x74, 0x52, 0x05, 0x72, 0x77, 0x73, 0x65, 0x74, 0x12, 0x64, - 0x0a, 0x17, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x68, 0x61, 0x73, - 0x68, 0x65, 0x64, 0x5f, 0x72, 0x77, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x2c, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x61, 0x73, 0x68, 0x65, - 0x64, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x52, 0x15, 0x63, - 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, - 0x77, 0x73, 0x65, 0x74, 0x22, 0xa6, 0x01, 0x0a, 0x1c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x48, 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, - 0x74, 0x65, 0x53, 0x65, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, - 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x37, - 0x0a, 0x0c, 0x68, 0x61, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x72, 0x77, 0x73, 0x65, 0x74, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6b, 0x76, 0x72, 0x77, 0x73, 0x65, 0x74, 0x2e, 0x48, - 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, 0x57, 0x53, 0x65, 0x74, 0x52, 0x0b, 0x68, 0x61, 0x73, 0x68, - 0x65, 0x64, 0x52, 0x77, 0x73, 0x65, 0x74, 0x12, 0x24, 0x0a, 0x0e, 0x70, 0x76, 0x74, 0x5f, 0x72, - 0x77, 0x73, 0x65, 0x74, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0c, 0x70, 0x76, 0x74, 0x52, 0x77, 0x73, 0x65, 0x74, 0x48, 0x61, 0x73, 0x68, 0x22, 0x60, 0x0a, - 0x0b, 0x45, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x33, 0x0a, 0x08, - 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, - 0x2e, 0x6d, 0x73, 0x70, 0x2e, 0x53, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x49, - 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, - 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, - 0xa8, 0x01, 0x0a, 0x0d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x12, 0x4e, 0x0a, 0x11, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, - 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x68, - 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4f, 0x72, - 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x52, 0x11, - 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, - 0x73, 0x12, 0x47, 0x0a, 0x20, 0x72, 0x61, 0x77, 0x5f, 0x75, 0x6e, 0x70, 0x61, 0x72, 0x73, 0x65, - 0x64, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x1d, 0x72, 0x61, 0x77, - 0x55, 0x6e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x64, 0x12, 0x3d, 0x0a, 0x0b, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, + 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x45, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x6d, + 0x65, 0x6e, 0x74, 0x52, 0x0b, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, + 0x22, 0x7d, 0x0a, 0x17, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x70, + 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x0c, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x48, 0x61, 0x73, 0x68, + 0x12, 0x3d, 0x0a, 0x09, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, + 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x22, + 0xe1, 0x01, 0x0a, 0x0f, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x41, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, 0x2e, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x54, 0x78, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, + 0x65, 0x53, 0x65, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, 0x2e, 0x0a, + 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x2c, 0x0a, + 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x0c, 0x63, + 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, + 0x63, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x52, 0x0b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x63, 0x6f, 0x64, + 0x65, 0x49, 0x64, 0x22, 0x6a, 0x0a, 0x0e, 0x54, 0x78, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, + 0x74, 0x65, 0x53, 0x65, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x6d, 0x6f, + 0x64, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x61, 0x74, 0x61, 0x4d, + 0x6f, 0x64, 0x65, 0x6c, 0x12, 0x39, 0x0a, 0x08, 0x6e, 0x73, 0x5f, 0x72, 0x77, 0x73, 0x65, 0x74, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, + 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4e, 0x73, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x52, 0x07, 0x6e, 0x73, 0x52, 0x77, 0x73, 0x65, 0x74, 0x22, + 0xbc, 0x01, 0x0a, 0x0e, 0x4e, 0x73, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, + 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, + 0x12, 0x26, 0x0a, 0x05, 0x72, 0x77, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x10, 0x2e, 0x6b, 0x76, 0x72, 0x77, 0x73, 0x65, 0x74, 0x2e, 0x4b, 0x56, 0x52, 0x57, 0x53, 0x65, + 0x74, 0x52, 0x05, 0x72, 0x77, 0x73, 0x65, 0x74, 0x12, 0x64, 0x0a, 0x17, 0x63, 0x6f, 0x6c, 0x6c, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x72, 0x77, + 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x68, 0x6c, 0x66, 0x73, + 0x64, 0x6b, 0x67, 0x6f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, 0x65, 0x61, 0x64, 0x57, + 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x52, 0x15, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x48, 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, 0x77, 0x73, 0x65, 0x74, 0x22, 0xa6, + 0x01, 0x0a, 0x1c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x61, 0x73, + 0x68, 0x65, 0x64, 0x52, 0x65, 0x61, 0x64, 0x57, 0x72, 0x69, 0x74, 0x65, 0x53, 0x65, 0x74, 0x12, + 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x37, 0x0a, 0x0c, 0x68, 0x61, 0x73, 0x68, + 0x65, 0x64, 0x5f, 0x72, 0x77, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, + 0x2e, 0x6b, 0x76, 0x72, 0x77, 0x73, 0x65, 0x74, 0x2e, 0x48, 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, + 0x57, 0x53, 0x65, 0x74, 0x52, 0x0b, 0x68, 0x61, 0x73, 0x68, 0x65, 0x64, 0x52, 0x77, 0x73, 0x65, + 0x74, 0x12, 0x24, 0x0a, 0x0e, 0x70, 0x76, 0x74, 0x5f, 0x72, 0x77, 0x73, 0x65, 0x74, 0x5f, 0x68, + 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x70, 0x76, 0x74, 0x52, 0x77, + 0x73, 0x65, 0x74, 0x48, 0x61, 0x73, 0x68, 0x22, 0x60, 0x0a, 0x0b, 0x45, 0x6e, 0x64, 0x6f, 0x72, + 0x73, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x33, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x73, + 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x73, 0x70, 0x2e, 0x53, + 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, + 0x79, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x73, 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, + 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x5f, 0x0a, 0x0d, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x4e, 0x0a, 0x11, 0x6f, 0x72, + 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x68, 0x6c, 0x66, 0x73, 0x64, 0x6b, 0x67, 0x6f, + 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, + 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x52, 0x11, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x65, 0x0a, 0x10, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x33, 0x0a, 0x08, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, diff --git a/block/block.pb.validate.go b/block/block.pb.validate.go index 6ba315f..6bb121f 100644 --- a/block/block.pb.validate.go +++ b/block/block.pb.validate.go @@ -60,6 +60,8 @@ func (m *Block) validate(all bool) error { var errors []error + // no validation rules for Channel + if all { switch v := interface{}(m.GetHeader()).(type) { case interface{ ValidateAll() error }: @@ -2686,8 +2688,6 @@ func (m *BlockMetadata) validate(all bool) error { } - // no validation rules for RawUnparsedMetadataSignatures - if len(errors) > 0 { return BlockMetadataMultiError(errors) } diff --git a/block/block.proto b/block/block.proto index acd7949..2a5e366 100644 --- a/block/block.proto +++ b/block/block.proto @@ -17,6 +17,7 @@ import "hyperledger/fabric-protos/peer/proposal_response.proto"; import "hyperledger/fabric-protos/peer/transaction.proto"; message Block { + string channel = 4; common.BlockHeader header = 1; BlockData data = 2; BlockMetadata metadata = 3; @@ -110,7 +111,6 @@ message Endorsement { message BlockMetadata { repeated OrdererSignature ordererSignatures = 1; - bytes raw_unparsed_metadata_signatures = 2; } message OrdererSignature { diff --git a/client/peer.go b/client/peer.go index f7eeebe..65004ee 100644 --- a/client/peer.go +++ b/client/peer.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "sync" "time" "github.com/golang/protobuf/ptypes/timestamp" @@ -40,6 +41,9 @@ type peer struct { endorseDefaultTimeout time.Duration + configBlocks map[string]*common.Block + mu sync.Mutex + logger *zap.Logger } @@ -76,6 +80,7 @@ func NewPeer(dialCtx context.Context, c config.ConnectionConfig, identity msp.Si // NewFromGRPC allows initializing peer from existing GRPC connection func NewFromGRPC(conn *grpc.ClientConn, identity msp.SigningIdentity, tlsCertHash []byte, logger *zap.Logger, endorseDefaultTimeout time.Duration) (api.Peer, error) { + if conn == nil { return nil, errors.New(`empty connection`) } @@ -188,8 +193,20 @@ func (p *peer) ParsedBlocks(ctx context.Context, channel string, identity msp.Si if !ok { return } + if b == nil { + return + } + + p.mu.Lock() + var configBlock *common.Block + if b.Header.Number == 0 { + p.configBlocks[channel] = configBlock + } else { + configBlock = p.configBlocks[channel] + } + p.mu.Unlock() - parsedBlock, err := block.ParseBlock(b) + parsedBlock, err := block.ParseBlock(b, block.WithConfigBlock(configBlock)) if err != nil { p.logger.Error("parse block", zap.String("channel", channel), zap.Uint64("number", b.Header.Number)) continue diff --git a/observer1/all_channels_blocks.go b/observer1/all_channels_blocks.go new file mode 100644 index 0000000..f61add5 --- /dev/null +++ b/observer1/all_channels_blocks.go @@ -0,0 +1,259 @@ +package observer1 + +import ( + "context" + "sync" + "time" + + "github.com/hyperledger/fabric/msp" + "go.uber.org/zap" +) + +const DefaultAllChannelsBlocksObservePeriod = 10 * time.Second + +type ( + PeerChannelsGetter interface { + Uri() string + Channels() map[string]*ChannelInfo + } + + allChannelsBlocks[T any] struct { + channelObservers map[string]*channelBlocks[T] + + blocks chan *Block[T] + blocksByChannels map[string]chan *Block[T] + + peerChannelsGetter PeerChannelsGetter + deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error) + createStreamWithRetry CreateBlockStreamWithRetry[T] + + observePeriod time.Duration + + // seekFrom has a higher priority than seekFromFetcher (look getSeekFrom method) + seekFrom map[string]uint64 + seekFromFetcher SeekFromFetcher + stopRecreateStream bool + + isWork bool + cancelObserve context.CancelFunc + + mu sync.RWMutex + logger *zap.Logger + } + + AllChannelsBlocksOpts struct { + seekFrom map[string]uint64 + seekFromFetcher SeekFromFetcher + observePeriod time.Duration + stopRecreateStream bool + logger *zap.Logger + } + + AllChannelsBlocksOpt func(*AllChannelsBlocksOpts) +) + +var DefaultAllChannelsBlocksOpts = &AllChannelsBlocksOpts{ + observePeriod: DefaultAllChannelsBlocksObservePeriod, + logger: zap.NewNop(), +} + +func WithBlockPeerLogger(logger *zap.Logger) AllChannelsBlocksOpt { + return func(opts *AllChannelsBlocksOpts) { + opts.logger = logger + } +} + +func WithSeekFrom(seekFrom map[string]uint64) AllChannelsBlocksOpt { + return func(opts *AllChannelsBlocksOpts) { + opts.seekFrom = seekFrom + } +} + +func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) AllChannelsBlocksOpt { + return func(opts *AllChannelsBlocksOpts) { + opts.seekFromFetcher = seekFromFetcher + } +} + +func WithBlockPeerObservePeriod(observePeriod time.Duration) AllChannelsBlocksOpt { + return func(opts *AllChannelsBlocksOpts) { + if observePeriod != 0 { + opts.observePeriod = observePeriod + } + } +} + +func WithBlockStopRecreateStream(stop bool) AllChannelsBlocksOpt { + return func(opts *AllChannelsBlocksOpts) { + opts.stopRecreateStream = stop + } +} + +func newAllChannelsBlocks[T any]( + peerChannelsGetter PeerChannelsGetter, + deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error), + createStreamWithRetry CreateBlockStreamWithRetry[T], + opts ...AllChannelsBlocksOpt, +) *allChannelsBlocks[T] { + + blockPeerOpts := DefaultAllChannelsBlocksOpts + for _, opt := range opts { + opt(blockPeerOpts) + } + + return &allChannelsBlocks[T]{ + channelObservers: make(map[string]*channelBlocks[T]), + blocks: make(chan *Block[T]), + blocksByChannels: make(map[string]chan *Block[T]), + + peerChannelsGetter: peerChannelsGetter, + deliverer: deliverer, + createStreamWithRetry: createStreamWithRetry, + observePeriod: blockPeerOpts.observePeriod, + + seekFrom: blockPeerOpts.seekFrom, + seekFromFetcher: blockPeerOpts.seekFromFetcher, + stopRecreateStream: blockPeerOpts.stopRecreateStream, + logger: blockPeerOpts.logger, + } +} + +func (acb *allChannelsBlocks[T]) Channels() map[string]*Channel { + acb.mu.RLock() + defer acb.mu.RUnlock() + + var copyChannels = make(map[string]*Channel, len(acb.channelObservers)) + for key, value := range acb.channelObservers { + copyChannels[key] = value.Channel + } + + return copyChannels +} + +func (acb *allChannelsBlocks[T]) Stop() { + acb.mu.Lock() + defer acb.mu.Unlock() + + // acb.blocks and acb.blocksByChannels mustn't be closed here, because they are closed elsewhere + + for _, c := range acb.channelObservers { + if err := c.Stop(); err != nil { + zap.Error(err) + } + } + + acb.channelObservers = make(map[string]*channelBlocks[T]) + + if acb.cancelObserve != nil { + acb.cancelObserve() + } + + acb.isWork = false +} + +func (acb *allChannelsBlocks[T]) Observe(ctx context.Context) <-chan *Block[T] { + if acb.isWork { + return acb.blocks + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + acb.cancelObserve = cancel + + acb.startNotObservedChannels(ctxObserve, acb.initChannelsObservers()) + + acb.blocks = make(chan *Block[T]) + + // init new channels if they are fetched + go func() { + acb.isWork = true + defer close(acb.blocks) + + ticker := time.NewTicker(acb.observePeriod) + for { + select { + case <-ctxObserve.Done(): + acb.Stop() + return + + case <-ticker.C: + acb.startNotObservedChannels(ctxObserve, acb.initChannelsObservers()) + } + } + }() + + return acb.blocks +} + +func (acb *allChannelsBlocks[T]) startNotObservedChannels(ctx context.Context, notObservedChannels []*channelBlocks[T]) { + for _, notObservedChannel := range notObservedChannels { + chBlocks := notObservedChannel + + if _, err := chBlocks.observe(ctx); err != nil { + acb.logger.Warn(`init channel observer`, zap.String("channel", notObservedChannel.channel), zap.Error(err)) + } + + // channel merger + go func() { + for b := range chBlocks.channelWithBlocks { + acb.blocks <- b + } + + //// after all reads peerParsedChannel.observer.blocks close channels + //close(acb.blocks) + //for _, blocks := range acb.blocksByChannels { + // close(blocks) + //} + }() + } +} + +func (acb *allChannelsBlocks[T]) initChannelsObservers() []*channelBlocks[T] { + var notObservedChannels []*channelBlocks[T] + + for channel := range acb.peerChannelsGetter.Channels() { + acb.mu.RLock() + _, ok := acb.channelObservers[channel] + acb.mu.RUnlock() + + if !ok { + acb.logger.Info(`add channel observer`, zap.String(`channel`, channel)) + + seekFrom := acb.getSeekFrom(channel) + + chBlocks := newChannelBlocks[T]( + channel, + acb.deliverer, + acb.createStreamWithRetry, + seekFrom, + WithChannelBlockLogger(acb.logger), + WithChannelStopRecreateStream(acb.stopRecreateStream)) + + acb.mu.Lock() + acb.channelObservers[channel] = chBlocks + acb.mu.Unlock() + + notObservedChannels = append(notObservedChannels, chBlocks) + } + } + + return notObservedChannels +} + +func (acb *allChannelsBlocks[T]) getSeekFrom(channel string) SeekFromFetcher { + seekFrom := ChannelSeekOldest() + // at first check seekFrom var, if it is empty, check seekFromFetcher + acb.mu.RLock() + seekFromNum, exist := acb.seekFrom[channel] + acb.mu.RUnlock() + if exist { + seekFrom = ChannelSeekFrom(seekFromNum - 1) + } else { + // if seekFromFetcher is also empty, use ChannelSeekOldest + if acb.seekFromFetcher != nil { + seekFrom = acb.seekFromFetcher + } + } + + return seekFrom +} diff --git a/observer1/all_channels_blocks_common.go b/observer1/all_channels_blocks_common.go new file mode 100644 index 0000000..fc8a338 --- /dev/null +++ b/observer1/all_channels_blocks_common.go @@ -0,0 +1,19 @@ +package observer1 + +import ( + "github.com/hyperledger/fabric-protos-go/common" + + "github.com/s7techlab/hlf-sdk-go/api" +) + +type AllChannelBlocksCommon struct { + *allChannelsBlocks[*common.Block] +} + +func NewAllChannelBlocksCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksCommon { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*common.Block](DefaultConnectRetryDelay) + + allChsBlocks := newAllChannelsBlocks[*common.Block](peerChannels, blocksDeliver.Blocks, createStreamWithRetry, opts...) + + return &AllChannelBlocksCommon{allChannelsBlocks: allChsBlocks} +} diff --git a/observer1/all_channels_blocks_concurrently.go b/observer1/all_channels_blocks_concurrently.go new file mode 100644 index 0000000..88d4029 --- /dev/null +++ b/observer1/all_channels_blocks_concurrently.go @@ -0,0 +1,74 @@ +package observer1 + +import ( + "context" + "time" + + "go.uber.org/zap" +) + +type ChannelBlocksWithName[T any] struct { + Name string + Blocks <-chan *Block[T] +} + +type ChannelWithChannels[T any] struct { + channels chan *ChannelBlocksWithName[T] +} + +func (acb *allChannelsBlocks[T]) ObserveByChannels(ctx context.Context) *ChannelWithChannels[T] { + channelWithChannels := &ChannelWithChannels[T]{ + channels: make(chan *ChannelBlocksWithName[T]), + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + acb.cancelObserve = cancel + + acb.startNotObservedChannelsConcurrently(ctxObserve, acb.initChannelsObservers(), channelWithChannels) + + // init new channels if they are fetched + go func() { + defer func() { + close(channelWithChannels.channels) + }() + + ticker := time.NewTicker(acb.observePeriod) + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + acb.startNotObservedChannelsConcurrently(ctxObserve, acb.initChannelsObservers(), channelWithChannels) + } + } + }() + + // closer + go func() { + <-ctx.Done() + acb.Stop() + }() + + return channelWithChannels +} + +func (acb *allChannelsBlocks[T]) startNotObservedChannelsConcurrently( + ctx context.Context, + notObservedChannels []*channelBlocks[T], + channelWithChannels *ChannelWithChannels[T], +) { + + for _, notObservedChannel := range notObservedChannels { + chBlocks := notObservedChannel + + if _, err := chBlocks.observe(ctx); err != nil { + acb.logger.Warn(`init channel observer concurrently`, zap.String("channel", notObservedChannel.channel), zap.Error(err)) + } + + go func() { + channelWithChannels.channels <- &ChannelBlocksWithName[T]{Name: chBlocks.channel, Blocks: chBlocks.channelWithBlocks} + }() + } +} diff --git a/observer1/all_channels_parsed.go b/observer1/all_channels_parsed.go new file mode 100644 index 0000000..aee6976 --- /dev/null +++ b/observer1/all_channels_parsed.go @@ -0,0 +1,18 @@ +package observer1 + +import ( + "github.com/s7techlab/hlf-sdk-go/api" + hlfproto "github.com/s7techlab/hlf-sdk-go/block" +) + +type AllChannelBlocksParsed struct { + *allChannelsBlocks[*hlfproto.Block] +} + +func NewAllChannelBlocksParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...AllChannelsBlocksOpt) *AllChannelBlocksParsed { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*hlfproto.Block](DefaultConnectRetryDelay) + + allChsBlocks := newAllChannelsBlocks[*hlfproto.Block](peerChannels, blocksDeliver.ParsedBlocks, createStreamWithRetry, opts...) + + return &AllChannelBlocksParsed{allChannelsBlocks: allChsBlocks} +} diff --git a/observer1/block.go b/observer1/block.go new file mode 100644 index 0000000..475ba0a --- /dev/null +++ b/observer1/block.go @@ -0,0 +1,6 @@ +package observer1 + +type Block[T any] struct { + Channel string + Block T +} diff --git a/observer1/channel.go b/observer1/channel.go new file mode 100644 index 0000000..edd47b5 --- /dev/null +++ b/observer1/channel.go @@ -0,0 +1,148 @@ +package observer1 + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/hyperledger/fabric/msp" + "go.uber.org/zap" +) + +var ErrChannelObserverAlreadyStarted = errors.New(`channel observer already started`) + +type ( + SeekFromFetcher func(ctx context.Context, channel string) (uint64, error) + + Opts struct { + identity msp.SigningIdentity + logger *zap.Logger + } + + Channel struct { + // current name of channel + channel string + + seekFromFetcher SeekFromFetcher + + identity msp.SigningIdentity + + // graceful shutdown + closer func() error + + // value received from seekFromFetcher + lastSeekFrom uint64 + + // status current + status ChannelObserverStatus + // in case of error how many times we tried co reconnect + connectAttempt uint64 + connectAttemptAt time.Time + // when we subscribed to channel + connectedAt time.Time + + // last errors we got + lastError error + + logger *zap.Logger + + mu sync.Mutex + } + + ChannelObserverStatus int +) + +const ( + ChannelObserverCreated ChannelObserverStatus = iota + ChannelObserverConnecting + ChannelObserverConnected + ChannelObserverStopped + ChannelObserverErrored + + DefaultConnectRetryDelay = 5 * time.Second +) + +var DefaultOpts = &Opts{ + identity: nil, // use default identity in blocksDeliverer + logger: zap.NewNop(), // silent logger +} + +func (s ChannelObserverStatus) String() string { + return [...]string{`Created`, `Connecting`, `Connected`, `Stopped`, `Errored`}[s] +} + +func ChannelSeekFrom(seekFrom uint64) SeekFromFetcher { + return func(ctx context.Context, channel string) (uint64, error) { + return seekFrom, nil + } +} + +func ChannelSeekOldest() SeekFromFetcher { + return ChannelSeekFrom(0) +} + +func (c *Channel) GetStatus() ChannelObserverStatus { + return c.status +} + +func (c *Channel) GetLastError() error { + return c.lastError +} + +func (c *Channel) setStatus(status ChannelObserverStatus) { + c.status = status +} + +func (c *Channel) allowToObserve() error { + if c.status != ChannelObserverCreated && c.status != ChannelObserverStopped { + return ErrChannelObserverAlreadyStarted + } + return nil +} + +func (c *Channel) preCreateStream() { + c.closer = nil + c.connectAttempt++ + c.connectAttemptAt = time.Now() + c.connectedAt = time.Unix(0, 0) // zero + c.setStatus(ChannelObserverConnecting) +} + +func (c *Channel) afterCreateStream(closer func() error) { + c.closer = closer + c.connectedAt = time.Now() + c.setStatus(ChannelObserverConnected) +} + +func (c *Channel) setError(err error) { + c.lastError = err + c.setStatus(ChannelObserverErrored) +} + +func (c *Channel) processSeekFrom(ctx context.Context) (uint64, error) { + seekFrom, err := c.seekFromFetcher(ctx, c.channel) + if err != nil { + c.setError(err) + return 0, fmt.Errorf(`seek from: %w`, c.lastError) + } + + c.lastSeekFrom = seekFrom + return seekFrom, nil +} + +func (c *Channel) stop() error { + if c == nil { + return nil + } + + if c.closer != nil { + // close incoming stream + c.lastError = c.closer() + c.closer = nil + } + + c.setStatus(ChannelObserverStopped) + return c.lastError +} diff --git a/observer1/channel_blocks.go b/observer1/channel_blocks.go new file mode 100644 index 0000000..974bb4c --- /dev/null +++ b/observer1/channel_blocks.go @@ -0,0 +1,205 @@ +package observer1 + +import ( + "context" + "fmt" + + "github.com/hyperledger/fabric/msp" + "go.uber.org/zap" +) + +type ( + channelBlocks[T any] struct { + *Channel + + channelWithBlocks chan *Block[T] + blocksDeliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error) + createStreamWithRetry CreateBlockStreamWithRetry[T] + + stopRecreateStream bool + + isWork bool + cancelObserve context.CancelFunc + } + + ChannelBlocksOpts struct { + *Opts + + // don't recreate stream if it has not any blocks + stopRecreateStream bool + } + + ChannelBlocksOpt func(*ChannelBlocksOpts) +) + +func WithChannelBlockLogger(logger *zap.Logger) ChannelBlocksOpt { + return func(opts *ChannelBlocksOpts) { + opts.Opts.logger = logger + } +} + +func WithChannelStopRecreateStream(stop bool) ChannelBlocksOpt { + return func(opts *ChannelBlocksOpts) { + opts.stopRecreateStream = stop + } +} + +var DefaultChannelBlocksOpts = &ChannelBlocksOpts{ + Opts: DefaultOpts, + stopRecreateStream: false, +} + +func newChannelBlocks[T any]( + channel string, + deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error), + createStreamWithRetry CreateBlockStreamWithRetry[T], + seekFromFetcher SeekFromFetcher, + opts ...ChannelBlocksOpt, +) *channelBlocks[T] { + + channelBlocksOpts := DefaultChannelBlocksOpts + for _, opt := range opts { + opt(channelBlocksOpts) + } + + return &channelBlocks[T]{ + Channel: &Channel{ + channel: channel, + seekFromFetcher: seekFromFetcher, + identity: channelBlocksOpts.identity, + logger: channelBlocksOpts.logger.With(zap.String(`channel`, channel)), + }, + + blocksDeliverer: deliverer, + createStreamWithRetry: createStreamWithRetry, + stopRecreateStream: channelBlocksOpts.stopRecreateStream, + } +} + +func (c *channelBlocks[T]) Stop() error { + c.mu.Lock() + defer c.mu.Unlock() + + // c.channelWithBlocks mustn't be closed here, because it is closed elsewhere + + err := c.Channel.stop() + + // If primary context is done then cancel ctxObserver + if c.cancelObserve != nil { + c.cancelObserve() + } + + c.isWork = false + return err +} + +func (c *channelBlocks[T]) observe(ctx context.Context) (<-chan *Block[T], error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.isWork { + return c.channelWithBlocks, nil + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + c.cancelObserve = cancel + + if err := c.allowToObserve(); err != nil { + return nil, err + } + + // Double check + if err := c.allowToObserve(); err != nil { + return nil, err + } + + c.channelWithBlocks = make(chan *Block[T]) + + go func() { + c.isWork = true + + defer close(c.channelWithBlocks) + + c.logger.Debug(`creating block stream`) + incomingBlocks, errCreateStream := c.createStreamWithRetry(ctxObserve, c.createStream) + if errCreateStream != nil { + return + } + + c.logger.Info(`block stream created`) + for { + select { + case incomingBlock, hasMore := <-incomingBlocks: + + var err error + if !hasMore && !c.stopRecreateStream { + c.logger.Debug(`block stream interrupted, recreate`) + incomingBlocks, err = c.createStreamWithRetry(ctx, c.createStream) + if err != nil { + return + } + + c.logger.Debug(`block stream recreated`) + continue + } + + if incomingBlock == nil { + continue + } + + c.channelWithBlocks <- &Block[T]{ + Channel: c.channel, + Block: incomingBlock, + } + + case <-ctxObserve.Done(): + if err := c.Stop(); err != nil { + c.lastError = err + } + return + } + } + }() + + return c.channelWithBlocks, nil +} + +func (c *channelBlocks[T]) createStream(ctx context.Context) (<-chan T, error) { + c.preCreateStream() + + c.logger.Debug(`connecting to blocks stream, receiving seek offset`, + zap.Uint64(`attempt`, c.connectAttempt)) + + seekFrom, err := c.processSeekFrom(ctx) + if err != nil { + c.logger.Warn(`seek from failed`, zap.Error(err)) + return nil, err + } + c.logger.Info(`block seek offset received`, zap.Uint64(`seek from`, seekFrom)) + + var ( + blocks <-chan T + closer func() error + ) + c.logger.Debug(`subscribing to blocks stream`) + blocks, closer, err = c.blocksDeliverer(ctx, c.channel, c.identity, int64(seekFrom)) + if err != nil { + c.logger.Warn(`subscribing to blocks stream failed`, zap.Error(err)) + c.setError(err) + return nil, fmt.Errorf(`blocks deliverer: %w`, err) + } + c.logger.Info(`subscribed to blocks stream`) + + c.afterCreateStream(closer) + + // Check close context + select { + case <-ctx.Done(): + err = closer() + return nil, err + default: + } + + return blocks, nil +} diff --git a/observer1/channel_blocks_common.go b/observer1/channel_blocks_common.go new file mode 100644 index 0000000..fe66ba1 --- /dev/null +++ b/observer1/channel_blocks_common.go @@ -0,0 +1,62 @@ +package observer1 + +import ( + "github.com/hyperledger/fabric-protos-go/common" + + "github.com/s7techlab/hlf-sdk-go/api" +) + +type ( + ChannelBlocksCommon struct { + *channelBlocks[*common.Block] + + //blocks chan *Block[*common.Block] + //isWork bool + } +) + +func NewChannelBlocksCommon(channel string, blocksDeliver api.BlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...ChannelBlocksOpt) *ChannelBlocksCommon { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*common.Block](DefaultConnectRetryDelay) + + chBlocks := newChannelBlocks[*common.Block](channel, blocksDeliver.Blocks, createStreamWithRetry, seekFromFetcher, opts...) + + return &ChannelBlocksCommon{channelBlocks: chBlocks} +} + +//func (c *ChannelBlocksCommon) Observe(ctx context.Context) (chan<- *Block[*common.Block], error) { +// if c.isWork { +// return c.blocks, nil +// } +// +// commonBlocks, err := c.observe(ctx) +// if err != nil { +// return nil, err +// } +// +// c.blocks = make(chan *Block[*common.Block]) +// +// go func() { +// c.isWork = true +// +// defer func() { +// c.isWork = false +// close(c.blocks) +// }() +// +// for { +// select { +// case commonBlock, ok := <-commonBlocks: +// if !ok { +// return +// } +// if commonBlock == nil { +// continue +// } +// +// c.blocks <- commonBlock +// } +// } +// }() +// +// return c.blocks, nil +//} diff --git a/observer1/channel_blocks_parsed.go b/observer1/channel_blocks_parsed.go new file mode 100644 index 0000000..66996d4 --- /dev/null +++ b/observer1/channel_blocks_parsed.go @@ -0,0 +1,65 @@ +package observer1 + +import ( + "github.com/s7techlab/hlf-sdk-go/api" + hlfproto "github.com/s7techlab/hlf-sdk-go/block" +) + +type ( + ChannelBlocksParsed struct { + *channelBlocks[*hlfproto.Block] + + //parsedBlocks chan *Block[*hlfproto.Block] + //isWork bool + } +) + +func NewChannelBlocksParsed(channel string, blocksDeliver api.ParsedBlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...ChannelBlocksOpt) *ChannelBlocksParsed { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*hlfproto.Block](DefaultConnectRetryDelay) + + chBlocks := newChannelBlocks[*hlfproto.Block](channel, blocksDeliver.ParsedBlocks, createStreamWithRetry, seekFromFetcher, opts...) + + return &ChannelBlocksParsed{channelBlocks: chBlocks} +} + +//func (c *ChannelBlocksParsed) Observe(ctx context.Context) (chan<- *Block[*hlfproto.Block], error) { +// if c.isWork { +// return c.parsedBlocks, nil +// } +// +// hlfprotoBlocks, err := c.observe(ctx) +// if err != nil { +// return nil, err +// } +// +// c.parsedBlocks = make(chan *ParsedBlock) +// +// go func() { +// c.isWork = true +// +// defer func() { +// c.isWork = false +// close(c.parsedBlocks) +// }() +// +// for { +// select { +// case hlfprotoBlock, ok := <-hlfprotoBlocks: +// if !ok { +// return +// } +// if hlfprotoBlock == nil { +// continue +// } +// +// c.parsedBlocks <- &ParsedBlock{ +// Channel: c.channel, +// Block: hlfprotoBlock, +// BlockOriginal: hlfprotoBlock, +// } +// } +// } +// }() +// +// return c.parsedBlocks, nil +//} diff --git a/observer1/channel_blocks_stream.go b/observer1/channel_blocks_stream.go new file mode 100644 index 0000000..fd41257 --- /dev/null +++ b/observer1/channel_blocks_stream.go @@ -0,0 +1,31 @@ +package observer1 + +import ( + "context" + "time" +) + +type ( + CreateBlockStream[T any] func(context.Context) (<-chan T, error) + + CreateBlockStreamWithRetry[T any] func(context.Context, CreateBlockStream[T]) (<-chan T, error) +) + +func CreateBlockStreamWithRetryDelay[T any](delay time.Duration) CreateBlockStreamWithRetry[T] { + return func(ctx context.Context, createBlockStream CreateBlockStream[T]) (<-chan T, error) { + for { + select { + case <-ctx.Done(): + return nil, nil + default: + } + + blocks, err := createBlockStream(ctx) + if err == nil { + return blocks, nil + } + + time.Sleep(delay) + } + } +} diff --git a/observer1/channels_matcher.go b/observer1/channels_matcher.go new file mode 100644 index 0000000..917bace --- /dev/null +++ b/observer1/channels_matcher.go @@ -0,0 +1,149 @@ +package observer1 + +import ( + "fmt" + "regexp" + + "github.com/hyperledger/fabric-protos-go/peer" +) + +const MatchAnyPattern = `*` + +type ( + ChannelToMatch struct { + Name string `json:"name" yaml:"name"` + MatchPattern string `json:"match_pattern" yaml:"matchPattern"` + NotMatchPattern string `json:"not_match_pattern" yaml:"notMatchPattern"` + } + + ChannelMatched struct { + Name string + // name from settings that lead to this subscription + MatchPattern string + NotMatchPattern string + } + + ChannelsMatcher struct { + matchers []*channelMatcher + } + + channelMatcher struct { + name string + matchPattern string + notMatchPattern string + matchAny bool + regexp *regexp.Regexp + regexpNotMatch *regexp.Regexp + } +) + +var MatchAllChannels = []ChannelToMatch{{ + MatchPattern: MatchAnyPattern, +}} + +func newChannelMatcher(toMatch ChannelToMatch) (*channelMatcher, error) { + matcher := &channelMatcher{ + name: toMatch.Name, + matchPattern: toMatch.MatchPattern, + notMatchPattern: toMatch.NotMatchPattern, + } + + if toMatch.MatchPattern == MatchAnyPattern { + matcher.matchAny = true + return matcher, nil + } + + if toMatch.NotMatchPattern != `` { + var err error + matcher.regexpNotMatch, err = regexp.Compile(toMatch.NotMatchPattern) + if err != nil { + return nil, err + } + } + + if toMatch.MatchPattern != `` { + var err error + matcher.regexp, err = regexp.Compile(toMatch.MatchPattern) + if err != nil { + return nil, err + } + } + + return matcher, nil +} + +func (cm *channelMatcher) Match(channel string) *ChannelMatched { + switch { + case cm.matchAny: + return &ChannelMatched{ + Name: channel, + MatchPattern: MatchAnyPattern, + } + + case cm.name == channel: + return &ChannelMatched{ + Name: channel, + } + + default: + chMatched := &ChannelMatched{ + Name: channel, + MatchPattern: cm.matchPattern, + NotMatchPattern: cm.notMatchPattern, + } + if cm.regexpNotMatch != nil && cm.regexpNotMatch.MatchString(channel) { + return nil + } + + if cm.regexp != nil && !cm.regexp.MatchString(channel) { + return nil + } + + return chMatched + } +} + +func NewChannelsMatcher(channelsToMatch []ChannelToMatch) (*ChannelsMatcher, error) { + if len(channelsToMatch) == 0 { + channelsToMatch = MatchAllChannels + } + channelsMatcher := &ChannelsMatcher{} + for _, toMatch := range channelsToMatch { + matcher, err := newChannelMatcher(toMatch) + if err != nil { + pattern := toMatch.MatchPattern + if toMatch.NotMatchPattern != `` { + pattern = toMatch.NotMatchPattern + } + + return nil, fmt.Errorf(`channel match name=%s, pattern=%s: %w`, toMatch.Name, pattern, err) + } + + channelsMatcher.matchers = append(channelsMatcher.matchers, matcher) + } + + return channelsMatcher, nil +} + +func ChannelsInfoToStrings(channelsInfo []*peer.ChannelInfo) []string { + channels := make([]string, 0) + for _, channelInfo := range channelsInfo { + channels = append(channels, channelInfo.ChannelId) + } + + return channels +} + +func (cm *ChannelsMatcher) Match(channels []string) ([]*ChannelMatched, error) { + var matched []*ChannelMatched + + for _, channel := range channels { + for _, matcher := range cm.matchers { + if match := matcher.Match(channel); match != nil { + matched = append(matched, match) + break + } + } + } + return matched, nil +} diff --git a/observer1/peer_channels.go b/observer1/peer_channels.go new file mode 100644 index 0000000..5f101f1 --- /dev/null +++ b/observer1/peer_channels.go @@ -0,0 +1,199 @@ +package observer1 + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric-protos-go/common" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/s7techlab/hlf-sdk-go/api" +) + +const DefaultChannelPeerObservePeriod = 30 * time.Second + +type ( + ChannelInfo struct { + Channel string + Height uint64 + UpdatedAt *timestamppb.Timestamp + } + + // PeerChannels observes for peer channels + PeerChannels struct { + channels map[string]*ChannelInfo + + channelFetcher PeerChannelsFetcher + channelsMatcher *ChannelsMatcher + observePeriod time.Duration + + mu sync.Mutex + logger *zap.Logger + + lastError error + + isWork bool + cancelObserve context.CancelFunc + } + + PeerChannelsFetcher interface { + Uri() string + api.ChannelListGetter + api.ChainInfoGetter + } + + ChannelPeerOpts struct { + channels []ChannelToMatch + observePeriod time.Duration + logger *zap.Logger + } + + ChannelPeerOpt func(*ChannelPeerOpts) +) + +var DefaultChannelPeerOpts = &ChannelPeerOpts{ + channels: MatchAllChannels, + observePeriod: DefaultChannelPeerObservePeriod, + logger: zap.NewNop(), +} + +func WithChannels(channels []ChannelToMatch) ChannelPeerOpt { + return func(opts *ChannelPeerOpts) { + opts.channels = channels + } +} + +func WithChannelPeerLogger(logger *zap.Logger) ChannelPeerOpt { + return func(opts *ChannelPeerOpts) { + opts.logger = logger + } +} + +func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...ChannelPeerOpt) (*PeerChannels, error) { + channelPeerOpts := DefaultChannelPeerOpts + for _, opt := range opts { + opt(channelPeerOpts) + } + + channelsMatcher, err := NewChannelsMatcher(channelPeerOpts.channels) + if err != nil { + return nil, fmt.Errorf(`channels matcher: %w`, err) + } + + channelPeer := &PeerChannels{ + channelFetcher: peerChannelsFetcher, + channelsMatcher: channelsMatcher, + channels: make(map[string]*ChannelInfo), + observePeriod: channelPeerOpts.observePeriod, + logger: channelPeerOpts.logger, + } + + return channelPeer, nil +} + +func (cp *PeerChannels) Stop() { + cp.cancelObserve() + cp.isWork = false +} + +func (cp *PeerChannels) Observe(ctx context.Context) { + if cp.isWork { + return + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(context.Background()) + cp.cancelObserve = cancel + + go func() { + cp.isWork = true + cp.updateChannels(ctxObserve) + + ticker := time.NewTicker(cp.observePeriod) + for { + select { + case <-ctx.Done(): + // If primary context is done then cancel ctxObserver + cp.cancelObserve() + return + + case <-ctxObserve.Done(): + return + + case <-ticker.C: + cp.updateChannels(ctxObserve) + } + } + }() +} + +func (cp *PeerChannels) Uri() string { + return cp.channelFetcher.Uri() +} + +func (cp *PeerChannels) Channels() map[string]*ChannelInfo { + cp.mu.Lock() + defer cp.mu.Unlock() + + var copyChannelInfo = make(map[string]*ChannelInfo, len(cp.channels)) + for key, value := range cp.channels { + copyChannelInfo[key] = value + } + + return copyChannelInfo +} + +func (cp *PeerChannels) updateChannels(ctx context.Context) { + cp.logger.Debug(`fetching channels`) + channelsInfo, err := cp.channelFetcher.GetChannels(ctx) + if err != nil { + cp.logger.Warn(`error while fetching channels`, zap.Error(err)) + cp.lastError = err + return + } + + channels := ChannelsInfoToStrings(channelsInfo.Channels) + cp.logger.Debug(`channels fetched`, zap.Strings(`channels`, channels)) + + channelsMatched, err := cp.channelsMatcher.Match(channels) + if err != nil { + cp.logger.Warn(`channel matching error`, zap.Error(err)) + cp.lastError = err + return + } + cp.logger.Debug(`channels matched`, zap.Reflect(`channels`, channelsMatched)) + + channelHeights := make(map[string]uint64) + + for _, channel := range channelsMatched { + var channelInfo *common.BlockchainInfo + channelInfo, err = cp.channelFetcher.GetChainInfo(ctx, channel.Name) + if err != nil { + cp.lastError = err + continue + } + channelHeights[channel.Name] = channelInfo.Height + } + + cp.mu.Lock() + defer cp.mu.Unlock() + + for channel, height := range channelHeights { + var updatedAt *timestamp.Timestamp + updatedAt, err = ptypes.TimestampProto(time.Now()) + if err != nil { + cp.lastError = err + } + + cp.channels[channel] = &ChannelInfo{ + Channel: channel, + Height: height, + UpdatedAt: updatedAt, + } + } +} diff --git a/observer1/transformer.go b/observer1/transformer.go new file mode 100644 index 0000000..1f59305 --- /dev/null +++ b/observer1/transformer.go @@ -0,0 +1,6 @@ +package observer1 + +// BlockTransformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json +type BlockTransformer interface { + Transform(*ParsedBlock) error +}