diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index f3745bf..b85ba20 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -20,7 +20,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. - version: v1.55.2 + version: v1.57 # Optional: working directory, useful for monorepos # working-directory: somedir diff --git a/api/client.go b/api/client.go index babd8e6..4425131 100644 --- a/api/client.go +++ b/api/client.go @@ -7,6 +7,8 @@ import ( "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/msp" + + hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) type CurrentIdentity interface { @@ -43,6 +45,16 @@ type BlocksDeliverer interface { ) (blockChan <-chan *common.Block, closer func() error, err error) } +type ParsedBlocksDeliverer interface { + // ParsedBlocks the same as BlocksDeliverer.Blocks, but returns a channel with parsed blocks + ParsedBlocks( + ctx context.Context, + channel string, + identity msp.SigningIdentity, + blockRange ...int64, + ) (parsedBlockChan <-chan *hlfproto.Block, parsedCloser func() error, err error) +} + type Querier interface { CurrentIdentity // Query - shortcut for querying chaincodes diff --git a/api/config/config_yaml.go b/api/config/config_yaml.go index aef7290..661616c 100644 --- a/api/config/config_yaml.go +++ b/api/config/config_yaml.go @@ -1,14 +1,14 @@ package config import ( - "io/ioutil" + "os" "github.com/pkg/errors" "gopkg.in/yaml.v2" ) func NewYamlConfig(configPath string) (*Config, error) { - if configBytes, err := ioutil.ReadFile(configPath); err != nil { + if configBytes, err := os.ReadFile(configPath); err != nil { return nil, errors.Wrap(err, `failed to read config file`) } else { var c Config diff --git a/api/peer.go b/api/peer.go index 9ed43a9..017ffa4 100644 --- a/api/peer.go +++ b/api/peer.go @@ -35,12 +35,14 @@ type Peer interface { BlocksDeliverer + ParsedBlocksDeliverer + EventsDeliverer // DeliverClient returns DeliverClient DeliverClient(identity msp.SigningIdentity) (DeliverClient, error) - // Uri returns url used for grpc connection - Uri() string + // URI returns url used for grpc connection + URI() string // Conn returns instance of grpc connection Conn() *grpc.ClientConn // Close terminates peer connection diff --git a/block/block.go b/block/block.go index 1e093ee..cb2a431 100644 --- a/block/block.go +++ b/block/block.go @@ -1,6 +1,7 @@ package block import ( + "errors" "fmt" "github.com/golang/protobuf/proto" @@ -14,6 +15,11 @@ import ( "github.com/s7techlab/hlf-sdk-go/block/txflags" ) +var ( + ErrNilBlock = errors.New("nil block") + ErrNilConfigBlock = errors.New("nil config block") +) + type ( parseBlockOpts struct { configBlock *common.Block @@ -71,6 +77,10 @@ func ParseBlock(block *common.Block, opts ...ParseBlockOpt) (*Block, error) { } func ParseOrdererIdentity(cb *common.Block) (*msp.SerializedIdentity, error) { + if cb == nil { + return nil, ErrNilBlock + } + meta, err := protoutil.GetMetadataFromBlock(cb, common.BlockMetadataIndex_SIGNATURES) if err != nil { return nil, fmt.Errorf("get metadata from block: %w", err) @@ -96,6 +106,14 @@ func ParseOrdererIdentity(cb *common.Block) (*msp.SerializedIdentity, error) { } func ParseBTFOrderersIdentities(block *common.Block, configBlock *common.Block) ([]*OrdererSignature, error) { + if block == nil { + return nil, ErrNilBlock + } + + if configBlock == nil { + return nil, ErrNilConfigBlock + } + bftMeta := &bftcommon.BFTMetadata{} if err := proto.Unmarshal(block.Metadata.Metadata[common.BlockMetadataIndex_SIGNATURES], bftMeta); err != nil { return nil, fmt.Errorf("unmarshaling bft block metadata from metadata: %w", err) diff --git a/block/block.pb.go b/block/block.pb.go index 15f58bd..f2b186c 100644 --- a/block/block.pb.go +++ b/block/block.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: block.proto diff --git a/block/chan_config.pb.go b/block/chan_config.pb.go index 0213c23..d8abd21 100644 --- a/block/chan_config.pb.go +++ b/block/chan_config.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: chan_config.proto diff --git a/block/smartbft/common/common.pb.go b/block/smartbft/common/common.pb.go index edee15a..0bdaa5a 100644 --- a/block/smartbft/common/common.pb.go +++ b/block/smartbft/common/common.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: smartbft/common/common.proto diff --git a/block/smartbft/configuration.pb.go b/block/smartbft/configuration.pb.go index 432cc2c..09a4a4d 100644 --- a/block/smartbft/configuration.pb.go +++ b/block/smartbft/configuration.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: smartbft/configuration.proto diff --git a/observer/transform/action.go b/block/transform/action.go similarity index 76% rename from observer/transform/action.go rename to block/transform/action.go index 66657fb..072ff29 100644 --- a/observer/transform/action.go +++ b/block/transform/action.go @@ -4,10 +4,7 @@ import ( "fmt" "regexp" - "github.com/mohae/deepcopy" - hlfproto "github.com/s7techlab/hlf-sdk-go/block" - "github.com/s7techlab/hlf-sdk-go/observer" ) type ( @@ -68,77 +65,60 @@ func NewAction(actionMach TxActionMatch, opts ...ActionOpt) *Action { return a } -func (s *Action) Transform(block *observer.ParsedBlock) error { - if block.Block == nil { - return nil +func (s *Action) Transform(block *hlfproto.Block) (*hlfproto.Block, error) { + if block == nil { + return nil, hlfproto.ErrNilBlock } - // if block is transformed, copy of block will be saved to block.BlockOriginal - blockCopy := deepcopy.Copy(block.Block).(*hlfproto.Block) - blockIsTransformed := false - - for _, envelope := range block.Block.Data.Envelopes { - if envelope.Payload.Transaction == nil { + for _, envelope := range block.GetData().GetEnvelopes() { + if envelope.GetPayload().GetTransaction() == nil { continue } - for _, txAction := range envelope.Payload.Transaction.Actions { + for _, txAction := range envelope.GetPayload().GetTransaction().GetActions() { if !s.match(txAction) { continue } for _, argsTransformer := range s.inputArgsTransformers { - if err := argsTransformer.Transform(txAction.ChaincodeSpec().Input.Args); err != nil { - return fmt.Errorf(`transform input args: %w`, err) + if err := argsTransformer.Transform(txAction.ChaincodeSpec().GetInput().GetArgs()); err != nil { + return nil, fmt.Errorf(`transform input args: %w`, err) } } for _, eventTransformer := range s.eventTransformers { if err := eventTransformer.Transform(txAction.Event()); err != nil { - return fmt.Errorf(`transform event: %w`, err) + return nil, fmt.Errorf(`transform event: %w`, err) } } for _, rwSet := range txAction.NsReadWriteSet() { - for _, write := range rwSet.Rwset.Writes { + for _, write := range rwSet.GetRwset().GetWrites() { for _, kvWriteTransformer := range s.kvWriteTransformers { - origKey := write.Key if err := kvWriteTransformer.Transform(write); err != nil { - return fmt.Errorf(`transform KV write with key: %s: %w`, write.Key, err) - } - - if origKey != write.Key { - blockIsTransformed = true + return nil, fmt.Errorf(`transform KV write with key: %s: %w`, write.Key, err) } } } - for _, read := range rwSet.Rwset.Reads { + for _, read := range rwSet.GetRwset().GetReads() { for _, kvReadTransform := range s.kvReadTransformers { - origKey := read.Key if err := kvReadTransform.Transform(read); err != nil { - return fmt.Errorf(`transform KV read with key: %s: %w`, read.Key, err) - } - if origKey != read.Key { - blockIsTransformed = true + return nil, fmt.Errorf(`transform KV read with key: %s: %w`, read.Key, err) } } } for _, actionPayloadTransform := range s.actionPayloadTransformers { if err := actionPayloadTransform.Transform(txAction); err != nil { - return fmt.Errorf(`transform action payload: %w`, err) + return nil, fmt.Errorf(`transform action payload: %w`, err) } } } } } - if blockIsTransformed { - block.BlockOriginal = blockCopy - } - - return nil + return block, nil } func TxChaincodeIDMatch(chaincode string) TxActionMatch { diff --git a/observer/transform/action_payload.go b/block/transform/action_payload.go similarity index 100% rename from observer/transform/action_payload.go rename to block/transform/action_payload.go diff --git a/observer/transform/args.go b/block/transform/args.go similarity index 100% rename from observer/transform/args.go rename to block/transform/args.go diff --git a/observer/transform/event.go b/block/transform/event.go similarity index 100% rename from observer/transform/event.go rename to block/transform/event.go diff --git a/observer/transform/kvread.go b/block/transform/kvread.go similarity index 100% rename from observer/transform/kvread.go rename to block/transform/kvread.go diff --git a/observer/transform/kvwrite.go b/block/transform/kvwrite.go similarity index 100% rename from observer/transform/kvwrite.go rename to block/transform/kvwrite.go diff --git a/observer/transform/lifecycle.go b/block/transform/lifecycle.go similarity index 94% rename from observer/transform/lifecycle.go rename to block/transform/lifecycle.go index 8685829..632b2c2 100644 --- a/observer/transform/lifecycle.go +++ b/block/transform/lifecycle.go @@ -6,7 +6,7 @@ import ( "github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset" - "github.com/s7techlab/hlf-sdk-go/observer" + hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) const ( @@ -53,7 +53,7 @@ func keyReplace(key string) string { return key } -var LifecycleTransformers = []observer.BlockTransformer{ +var LifecycleTransformers = []hlfproto.Transformer{ NewAction( TxChaincodeIDMatch(LifecycleChaincodeName), WithKVWriteTransformer( @@ -62,6 +62,9 @@ var LifecycleTransformers = []observer.BlockTransformer{ return nil }), ), + ), + NewAction( + TxChaincodeAnyMatch(), WithKVReadTransformer( KVReadKeyReplace(LifecycleStateKeyStrMapping(), func(read *kvrwset.KVRead) error { read.Key = keyReplace(read.Key) diff --git a/observer/transform/map_prefix.go b/block/transform/map_prefix.go similarity index 100% rename from observer/transform/map_prefix.go rename to block/transform/map_prefix.go diff --git a/observer/transform/proto.go b/block/transform/proto.go similarity index 100% rename from observer/transform/proto.go rename to block/transform/proto.go diff --git a/observer/transform/replace_bytes.go b/block/transform/replace_bytes.go similarity index 100% rename from observer/transform/replace_bytes.go rename to block/transform/replace_bytes.go diff --git a/block/transformer.go b/block/transformer.go new file mode 100644 index 0000000..3823630 --- /dev/null +++ b/block/transformer.go @@ -0,0 +1,6 @@ +package block + +// Transformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json +type Transformer interface { + Transform(*Block) (*Block, error) +} diff --git a/client/ca/http/client.go b/client/ca/http/client.go index 9d2bf45..c41744c 100644 --- a/client/ca/http/client.go +++ b/client/ca/http/client.go @@ -4,7 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "github.com/golang/protobuf/proto" @@ -92,7 +92,7 @@ func (c *Client) setAuthToken(req *http.Request, body []byte) error { func (c *Client) processResponse(resp *http.Response, out interface{}, expectedHTTPStatuses ...int) error { defer func() { _ = resp.Body.Close() }() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return errors.Wrap(err, `failed to read response body`) } diff --git a/client/chaincode/invoke_test.go b/client/chaincode/invoke_test.go index ceb7cc8..6d1ab49 100644 --- a/client/chaincode/invoke_test.go +++ b/client/chaincode/invoke_test.go @@ -160,8 +160,8 @@ package chaincode_test // return p.deliver, nil //} // -//// Uri returns url used for grpc connection -//func (p *mockPeer) Uri() string { +//// URI returns url used for grpc connection +//func (p *mockPeer) URI() string { // return "localhost:7051" //} // diff --git a/client/core_opts.go b/client/core_opts.go index 1607c9f..d4d08a4 100644 --- a/client/core_opts.go +++ b/client/core_opts.go @@ -2,7 +2,7 @@ package client import ( "fmt" - "io/ioutil" + "os" "github.com/hyperledger/fabric/msp" "github.com/pkg/errors" @@ -51,7 +51,7 @@ func WithOrderer(orderer api.Orderer) Opt { // WithConfigYaml allows passing path to YAML configuration file func WithConfigYaml(configPath string) Opt { return func(c *Client) error { - configBytes, err := ioutil.ReadFile(configPath) + configBytes, err := os.ReadFile(configPath) if err != nil { return errors.Wrap(err, `failed to read config file`) } diff --git a/client/core_public.go b/client/core_public.go index 934ac40..a63c8dc 100644 --- a/client/core_public.go +++ b/client/core_public.go @@ -10,6 +10,7 @@ import ( "github.com/hyperledger/fabric/msp" "github.com/s7techlab/hlf-sdk-go/api" + "github.com/s7techlab/hlf-sdk-go/block" "github.com/s7techlab/hlf-sdk-go/client/chaincode" "github.com/s7techlab/hlf-sdk-go/client/chaincode/txwaiter" "github.com/s7techlab/hlf-sdk-go/client/tx" @@ -108,7 +109,7 @@ func (c *Client) Blocks( channel string, identity msp.SigningIdentity, blockRange ...int64, -) (blocks <-chan *common.Block, closer func() error, _ error) { +) (<-chan *common.Block, func() error, error) { if identity == nil { identity = c.CurrentIdentity() } @@ -120,3 +121,21 @@ func (c *Client) Blocks( return peer.Blocks(ctx, channel, identity, blockRange...) } + +func (c *Client) ParsedBlocks( + ctx context.Context, + channel string, + identity msp.SigningIdentity, + blockRange ...int64, +) (<-chan *block.Block, func() error, error) { + if identity == nil { + identity = c.CurrentIdentity() + } + + peer, err := c.PeerPool().FirstReadyPeer(identity.GetMSPIdentifier()) + if err != nil { + return nil, nil, err + } + + return peer.ParsedBlocks(ctx, channel, identity, blockRange...) +} diff --git a/client/deliver/testing/block_deliverer_mock.go b/client/deliver/testing/block_deliverer_mock.go index babc03e..e8c8526 100644 --- a/client/deliver/testing/block_deliverer_mock.go +++ b/client/deliver/testing/block_deliverer_mock.go @@ -3,7 +3,6 @@ package testing import ( "context" "fmt" - "io/ioutil" "math" "os" "path/filepath" @@ -14,11 +13,14 @@ import ( "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric/msp" "github.com/pkg/errors" + + hlfproto "github.com/s7techlab/hlf-sdk-go/block" ) type BlocksDelivererMock struct { // => [,...] data map[string][]*common.Block + parsedData map[string][]*hlfproto.Block closeWhenAllRead bool } @@ -31,6 +33,7 @@ func NewBlocksDelivererMock(rootPath string, closeWhenAllRead bool) (*BlocksDeli dc := &BlocksDelivererMock{ data: make(map[string][]*common.Block), + parsedData: make(map[string][]*hlfproto.Block), closeWhenAllRead: closeWhenAllRead, } @@ -78,7 +81,7 @@ func NewBlocksDelivererMock(rootPath string, closeWhenAllRead bool) (*BlocksDeli return err } - block, err := ioutil.ReadFile(path) + block, err := os.ReadFile(path) if err != nil { return err } @@ -95,15 +98,23 @@ func NewBlocksDelivererMock(rootPath string, closeWhenAllRead bool) (*BlocksDeli for channelID, data := range channels { channelBlocks := make([]*common.Block, len(data)) + parsedChannelBlocks := make([]*hlfproto.Block, len(data)) for blockID, blockData := range data { block := &common.Block{} - err := proto.Unmarshal(blockData, block) + err = proto.Unmarshal(blockData, block) if err != nil { return nil, err } channelBlocks[blockID] = block + + parsedBlock, err := hlfproto.ParseBlock(block) + if err != nil { + return nil, err + } + parsedChannelBlocks[blockID] = parsedBlock } dc.data[channelID] = channelBlocks + dc.parsedData[channelID] = parsedChannelBlocks println("fill channel '"+channelID+"' blocks from", 0, "...", len(channelBlocks)-1) } @@ -111,15 +122,30 @@ func NewBlocksDelivererMock(rootPath string, closeWhenAllRead bool) (*BlocksDeli } func (m *BlocksDelivererMock) Blocks( - ctx context.Context, + _ context.Context, channelName string, - identity msp.SigningIdentity, + _ msp.SigningIdentity, blockRange ...int64, -) (blockChan <-chan *common.Block, closer func() error, err error) { - if _, ok := m.data[channelName]; !ok { +) (<-chan *common.Block, func() error, error) { + + return blocks[*common.Block](m.data, channelName, m.closeWhenAllRead, blockRange...) +} + +func (m *BlocksDelivererMock) ParsedBlocks( + _ context.Context, + channelName string, + _ msp.SigningIdentity, + blockRange ...int64, +) (<-chan *hlfproto.Block, func() error, error) { + + return blocks[*hlfproto.Block](m.parsedData, channelName, m.closeWhenAllRead, blockRange...) +} + +func blocks[T any](data map[string][]T, channelName string, closeWhenAllRead bool, blockRange ...int64) (<-chan T, func() error, error) { + if _, ok := data[channelName]; !ok { return nil, nil, fmt.Errorf("have no mocked data for this channel") } - closer = func() error { return nil } + closer := func() error { return nil } var ( blockRangeFrom int64 = 0 @@ -134,27 +160,27 @@ func (m *BlocksDelivererMock) Blocks( } if blockRangeFrom < 0 { - blockRangeFrom = int64(len(m.data[channelName])) + blockRangeFrom + blockRangeFrom = int64(len(data[channelName])) + blockRangeFrom } if blockRangeTo < 0 { - blockRangeTo = int64(len(m.data[channelName])) + blockRangeTo + blockRangeTo = int64(len(data[channelName])) + blockRangeTo } - if blockRangeFrom > int64(len(m.data[channelName])) { - blockRangeFrom = int64(len(m.data[channelName])) - 1 + if blockRangeFrom > int64(len(data[channelName])) { + blockRangeFrom = int64(len(data[channelName])) - 1 } - if blockRangeTo > int64(len(m.data[channelName])) { - blockRangeTo = int64(len(m.data[channelName])) - 1 + if blockRangeTo > int64(len(data[channelName])) { + blockRangeTo = int64(len(data[channelName])) - 1 } - ch := make(chan *common.Block, (blockRangeTo-blockRangeFrom)+1) + ch := make(chan T, (blockRangeTo-blockRangeFrom)+1) for i := blockRangeFrom; i <= blockRangeTo; i++ { - ch <- m.data[channelName][i] + ch <- data[channelName][i] } - if m.closeWhenAllRead { + if closeWhenAllRead { close(ch) } diff --git a/client/deliver/testing/deliver.go b/client/deliver/testing/deliver.go index 9071ae4..bb7e372 100644 --- a/client/deliver/testing/deliver.go +++ b/client/deliver/testing/deliver.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "os" "path/filepath" "strconv" @@ -75,7 +74,7 @@ func NewDeliverClient(rootPath string, closeWhenAllRead bool) (peer.DeliverClien return err } - block, err := ioutil.ReadFile(path) + block, err := os.ReadFile(path) if err != nil { return err } diff --git a/client/peer.go b/client/peer.go index 6dce33a..8c361b6 100644 --- a/client/peer.go +++ b/client/peer.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "sync" "time" "github.com/golang/protobuf/ptypes/timestamp" @@ -16,6 +17,7 @@ import ( "github.com/s7techlab/hlf-sdk-go/api" "github.com/s7techlab/hlf-sdk-go/api/config" + "github.com/s7techlab/hlf-sdk-go/block" "github.com/s7techlab/hlf-sdk-go/client/channel" "github.com/s7techlab/hlf-sdk-go/client/deliver" grpcclient "github.com/s7techlab/hlf-sdk-go/client/grpc" @@ -39,11 +41,14 @@ type peer struct { endorseDefaultTimeout time.Duration + configBlocks map[string]*common.Block + mu sync.RWMutex + logger *zap.Logger } // NewPeer returns new peer instance based on peer config -func NewPeer(dialCtx context.Context, c config.ConnectionConfig, identity msp.SigningIdentity, logger *zap.Logger) (api.Peer, error) { +func NewPeer(ctx context.Context, c config.ConnectionConfig, identity msp.SigningIdentity, logger *zap.Logger) (api.Peer, error) { opts, err := grpcclient.OptionsFromConfig(c, logger) if err != nil { return nil, fmt.Errorf(`peer grpc options from config: %w`, err) @@ -55,10 +60,11 @@ func NewPeer(dialCtx context.Context, c config.ConnectionConfig, identity msp.Si } // Dial should always have timeout - ctxDeadline, exists := dialCtx.Deadline() + var dialCtx context.Context + ctxDeadline, exists := ctx.Deadline() if !exists { var cancel context.CancelFunc - dialCtx, cancel = context.WithTimeout(dialCtx, dialTimeout) + dialCtx, cancel = context.WithTimeout(ctx, dialTimeout) defer cancel() ctxDeadline, _ = dialCtx.Deadline() @@ -83,16 +89,15 @@ func NewFromGRPC(conn *grpc.ClientConn, identity msp.SigningIdentity, tlsCertHas endorseDefaultTimeout = PeerDefaultEndorseTimeout } - p := &peer{ + return &peer{ conn: conn, client: fabricPeer.NewEndorserClient(conn), identity: identity, tlsCertHash: tlsCertHash, endorseDefaultTimeout: endorseDefaultTimeout, + configBlocks: make(map[string]*common.Block), logger: logger.Named(`peer`), - } - - return p, nil + }, nil } func (p *peer) Query( @@ -140,9 +145,9 @@ func (p *peer) Query( return response.Response, nil } -func (p *peer) Blocks(ctx context.Context, channel string, identity msp.SigningIdentity, blockRange ...int64) (blockChan <-chan *common.Block, closer func() error, err error) { +func (p *peer) Blocks(ctx context.Context, channel string, identity msp.SigningIdentity, blockRange ...int64) (<-chan *common.Block, func() error, error) { p.logger.Debug(`peer blocks request`, - zap.String(`uri`, p.Uri()), + zap.String(`uri`, p.URI()), zap.String(`channel`, channel), zap.Reflect(`range`, blockRange)) @@ -169,6 +174,82 @@ func (p *peer) Blocks(ctx context.Context, channel string, identity msp.SigningI return bs.Blocks(), bs.Close, nil } +func (p *peer) addConfigBlock(ctx context.Context, channel string) error { + p.mu.RLock() + _, exist := p.configBlocks[channel] + p.mu.RUnlock() + if exist { + return nil + } + + configBlock, err := qscc.NewQSCC(p).GetBlockByNumber(ctx, &qscc.GetBlockByNumberRequest{ChannelName: channel, BlockNumber: 0}) + if err != nil { + return fmt.Errorf("get block by number from channel %s: %w", channel, err) + } + + if configBlock != nil { + p.mu.Lock() + p.configBlocks[channel] = configBlock + p.mu.Unlock() + } + + return nil +} + +func (p *peer) ParsedBlocks(ctx context.Context, channel string, identity msp.SigningIdentity, blockRange ...int64) (<-chan *block.Block, func() error, error) { + commonBlocks, commonCloser, err := p.Blocks(ctx, channel, identity, blockRange...) + if err != nil { + return nil, nil, err + } + + if err = p.addConfigBlock(ctx, channel); err != nil { + return nil, nil, err + } + + parsedBlockChan := make(chan *block.Block) + go func() { + defer func() { + close(parsedBlockChan) + }() + + p.mu.RLock() + configBlock := p.configBlocks[channel] + p.mu.RUnlock() + + for { + select { + case <-ctx.Done(): + return + + case b, ok := <-commonBlocks: + if !ok { + return + } + if b == nil { + return + } + + parsedBlock, err := block.ParseBlock(b, block.WithConfigBlock(configBlock)) + if err != nil { + p.logger.Error("parse block", zap.String("channel", channel), zap.Uint64("number", b.Header.Number)) + continue + } + + parsedBlockChan <- parsedBlock + } + } + }() + + parsedCloser := func() error { + if closerErr := commonCloser(); closerErr != nil { + return closerErr + } + return nil + } + + return parsedBlockChan, parsedCloser, nil +} + func (p *peer) Events(ctx context.Context, channel string, chaincode string, identity msp.SigningIdentity, blockRange ...int64) (events chan interface { Event() *fabricPeer.ChaincodeEvent Block() uint64 @@ -176,7 +257,7 @@ func (p *peer) Events(ctx context.Context, channel string, chaincode string, ide }, closer func() error, err error) { p.logger.Debug(`peer events request`, - zap.String(`uri`, p.Uri()), + zap.String(`uri`, p.URI()), zap.String(`channel`, channel), zap.Reflect(`range`, blockRange)) @@ -217,7 +298,7 @@ func (p *peer) Endorse(ctx context.Context, proposal *fabricPeer.SignedProposal) defer cancel() } - p.logger.Debug(`endorse`, zap.String(`uri`, p.Uri())) + p.logger.Debug(`endorse`, zap.String(`uri`, p.URI())) resp, err := p.client.ProcessProposal(ctx, proposal) if err != nil { @@ -247,7 +328,7 @@ func (p *peer) Conn() *grpc.ClientConn { return p.conn } -func (p *peer) Uri() string { +func (p *peer) URI() string { return p.conn.Target() } diff --git a/client/peer_pool.go b/client/peer_pool.go index f0fd3bb..5ad580f 100644 --- a/client/peer_pool.go +++ b/client/peer_pool.go @@ -76,7 +76,7 @@ func (p *PeerPool) GetMSPPeers(mspID string) []api.Peer { func (p *PeerPool) Add(mspId string, peer api.Peer, peerChecker api.PeerPoolCheckStrategy) error { p.logger.Debug(`add peer`, zap.String(`msp_id`, mspId), - zap.String(`peerUri`, peer.Uri())) + zap.String(`peer_URI`, peer.URI())) p.storeMx.Lock() defer p.storeMx.Unlock() @@ -101,7 +101,7 @@ func (p *PeerPool) addPeer(peer api.Peer, peerSet []*peerPoolPeer, peerChecker a func (p *PeerPool) isPeerInPool(peer api.Peer, peerSet []*peerPoolPeer) bool { for _, pp := range peerSet { - if peer.Uri() == pp.peer.Uri() { + if peer.URI() == pp.peer.URI() { return true } } @@ -119,13 +119,13 @@ func (p *PeerPool) poolChecker(ctx context.Context, aliveChan chan bool, peer *p return case alive, ok := <-aliveChan: - //log.Debug(`Got alive data about peer`, zap.String(`peerUri`, peer.peer.Uri()), zap.Bool(`alive`, alive)) + //log.Debug(`Got alive data about peer`, zap.String(`peerUri`, peer.peer.URI()), zap.Bool(`alive`, alive)) if !ok { return } if !alive { - p.logger.Warn(`peer connection is dead`, zap.String(`peerUri`, peer.peer.Uri())) + p.logger.Warn(`peer connection is dead`, zap.String(`peerUri`, peer.peer.URI())) } p.storeMx.Lock() @@ -157,13 +157,13 @@ func (p *PeerPool) EndorseOnMSP(ctx context.Context, mspID string, proposal *pee for pos, poolPeer := range peers { if !poolPeer.ready { - p.logger.Debug(ErrPeerNotReady.Error(), zap.String(`uri`, poolPeer.peer.Uri())) + p.logger.Debug(ErrPeerNotReady.Error(), zap.String(`uri`, poolPeer.peer.URI())) continue } log.Debug(`Sending endorse to peer...`, zap.String(`mspId`, mspID), - zap.String(`uri`, poolPeer.peer.Uri()), + zap.String(`uri`, poolPeer.peer.URI()), zap.Int(`peerPos`, pos), zap.Int(`peers in msp pool`, len(peers))) @@ -172,29 +172,29 @@ func (p *PeerPool) EndorseOnMSP(ctx context.Context, mspID string, proposal *pee // GRPC error if s, ok := status.FromError(err); ok { if s.Code() == codes.Unavailable { - log.Debug(`peer GRPC unavailable`, zap.String(`mspId`, mspID), zap.String(`peer_uri`, poolPeer.peer.Uri())) + log.Debug(`peer GRPC unavailable`, zap.String(`mspId`, mspID), zap.String(`peer_uri`, poolPeer.peer.URI())) //poolPeer.ready = false } else { log.Debug(`unexpected GRPC error code from peer`, - zap.String(`peer_uri`, poolPeer.peer.Uri()), zap.Uint32(`code`, uint32(s.Code())), + zap.String(`peer_uri`, poolPeer.peer.URI()), zap.Uint32(`code`, uint32(s.Code())), zap.String(`code_str`, s.Code().String()), zap.Error(s.Err())) // not mark as not ready } // next mspId peer - lastError = fmt.Errorf("peer %s: %w", poolPeer.peer.Uri(), err) + lastError = fmt.Errorf("peer %s: %w", poolPeer.peer.URI(), err) continue } log.Debug(`peer endorsement failed`, zap.String(`mspId`, mspID), - zap.String(`peer_uri`, poolPeer.peer.Uri()), + zap.String(`peer_uri`, poolPeer.peer.URI()), zap.String(`error`, err.Error())) - return propResp, errors.Wrap(err, poolPeer.peer.Uri()) + return propResp, errors.Wrap(err, poolPeer.peer.URI()) } - log.Debug(`endorse complete on peer`, zap.String(`mspId`, mspID), zap.String(`uri`, poolPeer.peer.Uri())) + log.Debug(`endorse complete on peer`, zap.String(`mspId`, mspID), zap.String(`uri`, poolPeer.peer.URI())) return propResp, nil } @@ -282,6 +282,8 @@ func (p *PeerPool) Close() error { func StrategyGRPC(d time.Duration) api.PeerPoolCheckStrategy { return func(ctx context.Context, peer api.Peer, alive chan bool) { t := time.NewTicker(d) + defer t.Stop() + for { select { case <-ctx.Done(): diff --git a/examples/caclient/main.go b/examples/caclient/main.go index cb9f05e..4436b7a 100644 --- a/examples/caclient/main.go +++ b/examples/caclient/main.go @@ -66,7 +66,7 @@ func main() { SerialNumber, CommonName string Names []pkix.AttributeTypeAndValue ExtraNames []pkix.AttributeTypeAndValue - }{Country: []string{`RU`}, Organization: []string{`S7`}, OrganizationalUnit: []string{`ORG`}, Locality: []string{`Moscow`}, Province: []string{`Moscow`}, StreetAddress: []string{`Пушкина 7`}, PostalCode: []string{`100001`}, CommonName: name}, + }{Country: []string{`RU`}, Organization: []string{`S7`}, OrganizationalUnit: []string{`ORG`}, Locality: []string{`Moscow`}, Province: []string{`Moscow`}, StreetAddress: []string{`Pushkin 7`}, PostalCode: []string{`100001`}, CommonName: name}, SignatureAlgorithm: x509.ECDSAWithSHA512}, )) } diff --git a/go.mod b/go.mod index c4fd625..621baa9 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/hyperledger/fabric-chaincode-go v0.0.0-20201119163726-f8ef75b17719 github.com/hyperledger/fabric-protos-go v0.0.0-20201028172056-a3136dde2354 github.com/mitchellh/mapstructure v1.5.0 - github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/onsi/ginkgo v1.14.0 github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index de89239..6250a38 100644 --- a/go.sum +++ b/go.sum @@ -317,8 +317,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= -github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= github.com/mreiferson/go-httpclient v0.0.0-20160630210159-31f0106b4474/go.mod h1:OQA4XLvDbMgS8P0CevmM4m9Q3Jq4phKUzcocxuGJ5m8= github.com/mreiferson/go-httpclient v0.0.0-20201222173833-5e475fde3a4d/go.mod h1:OQA4XLvDbMgS8P0CevmM4m9Q3Jq4phKUzcocxuGJ5m8= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= diff --git a/identity/msp.go b/identity/msp.go index dcd457e..8fddca6 100644 --- a/identity/msp.go +++ b/identity/msp.go @@ -182,9 +182,9 @@ func MSPFromPath(mspID, mspPath string, opts ...MSPOpt) (*MSP, error) { } } - if mspOpts.validateCertChain { - // todo: validate - } + // todo: validate + //if mspOpts.validateCertChain { + //} return mspInstance, nil } diff --git a/observer/README.md b/observer/README.md index ddf9f5b..5da5eb7 100644 --- a/observer/README.md +++ b/observer/README.md @@ -2,7 +2,8 @@ Main features: -* Block parsing to components (transactions, events, states etc) +* Stream of channel blocks from peer +* Stream of all channels blocks from peer * Auto reconnection when block or event stream interrupted -* Block and event transformation if needed +Every feature can be used for common block, also for parsed block from [block](../block/block.proto) diff --git a/observer/block.go b/observer/block.go new file mode 100644 index 0000000..39394f6 --- /dev/null +++ b/observer/block.go @@ -0,0 +1,6 @@ +package observer + +type Block[T any] struct { + Channel string + Block T +} diff --git a/observer/block_channel_common.go b/observer/block_channel_common.go deleted file mode 100644 index c5f579c..0000000 --- a/observer/block_channel_common.go +++ /dev/null @@ -1,201 +0,0 @@ -package observer - -import ( - "context" - "fmt" - - "github.com/hyperledger/fabric-protos-go/common" - "go.uber.org/zap" - - "github.com/s7techlab/hlf-sdk-go/api" -) - -type ( - BlockChannel struct { - *Channel - blocksDeliverer api.BlocksDeliverer - createStreamWithRetry CreateBlockStreamWithRetry - stopRecreateStream bool - - blocks chan *Block - - isWork bool - cancelObserve context.CancelFunc - } - - BlockChannelOpts struct { - *Opts - - createStreamWithRetry CreateBlockStreamWithRetry - - // don't recreate stream if it has not any blocks - stopRecreateStream bool - } - - BlockChannelOpt func(*BlockChannelOpts) -) - -func WithChannelBlockLogger(logger *zap.Logger) BlockChannelOpt { - return func(opts *BlockChannelOpts) { - opts.Opts.logger = logger - } -} - -func WithChannelStopRecreateStream(stop bool) BlockChannelOpt { - return func(opts *BlockChannelOpts) { - opts.stopRecreateStream = stop - } -} - -var DefaultBlockChannelOpts = &BlockChannelOpts{ - createStreamWithRetry: CreateBlockStreamWithRetryDelay(DefaultConnectRetryDelay), - Opts: DefaultOpts, -} - -func NewBlockChannel(channel string, blocksDeliver api.BlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...BlockChannelOpt) *BlockChannel { - blockChannelOpts := DefaultBlockChannelOpts - for _, opt := range opts { - opt(blockChannelOpts) - } - - observer := &BlockChannel{ - Channel: &Channel{ - channel: channel, - seekFromFetcher: seekFromFetcher, - identity: blockChannelOpts.identity, - logger: blockChannelOpts.logger.With(zap.String(`channel`, channel)), - }, - - blocksDeliverer: blocksDeliver, - createStreamWithRetry: blockChannelOpts.createStreamWithRetry, - stopRecreateStream: blockChannelOpts.stopRecreateStream, - } - - return observer -} - -func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.isWork { - return c.blocks, nil - } - - // ctxObserve using for nested control process without stopped primary context - ctxObserve, cancel := context.WithCancel(ctx) - c.cancelObserve = cancel - - if err := c.allowToObserve(); err != nil { - return nil, err - } - - // Double check - if err := c.allowToObserve(); err != nil { - return nil, err - } - - c.blocks = make(chan *Block) - - go func() { - c.isWork = true - - c.logger.Debug(`creating block stream`) - incomingBlocks, errCreateStream := c.createStreamWithRetry(ctxObserve, c.createStream) - if errCreateStream != nil { - return - } - - c.logger.Info(`block stream created`) - for { - select { - case incomingBlock, hasMore := <-incomingBlocks: - - var err error - if !hasMore && !c.stopRecreateStream { - c.logger.Debug(`block stream interrupted, recreate`) - incomingBlocks, err = c.createStreamWithRetry(ctx, c.createStream) - if err != nil { - return - } - - c.logger.Debug(`block stream recreated`) - continue - } - - if incomingBlock == nil { - continue - } - - c.blocks <- &Block{ - Block: incomingBlock, - Channel: c.channel, - } - - case <-ctxObserve.Done(): - if err := c.Stop(); err != nil { - c.lastError = err - } - return - } - } - }() - - return c.blocks, nil -} - -func (c *BlockChannel) Stop() error { - c.mu.Lock() - defer c.mu.Unlock() - - // c.blocks mustn't be closed here, because it is closed elsewhere - - err := c.Channel.stop() - - // If primary context is done then cancel ctxObserver - if c.cancelObserve != nil { - c.cancelObserve() - } - - c.isWork = false - return err -} - -func (c *BlockChannel) createStream(ctx context.Context) (<-chan *common.Block, error) { - c.preCreateStream() - - c.logger.Debug(`connecting to blocks stream, receiving seek offset`, - zap.Uint64(`attempt`, c.connectAttempt)) - - seekFrom, err := c.processSeekFrom(ctx) - if err != nil { - c.logger.Warn(`seek from failed`, zap.Error(err)) - return nil, err - } - c.logger.Info(`block seek offset received`, zap.Uint64(`seek from`, seekFrom)) - - var ( - blocks <-chan *common.Block - closer func() error - ) - c.logger.Debug(`subscribing to blocks stream`) - blocks, closer, err = c.blocksDeliverer.Blocks(ctx, c.channel, c.identity, int64(seekFrom)) - if err != nil { - c.logger.Warn(`subscribing to blocks stream failed`, zap.Error(err)) - c.setError(err) - return nil, fmt.Errorf(`blocks deliverer: %w`, err) - } - c.logger.Info(`subscribed to blocks stream`) - - c.afterCreateStream(closer) - - // Check close context - select { - case <-ctx.Done(): - err = closer() - return nil, err - default: - } - - return blocks, nil -} diff --git a/observer/block_channel_parsed.go b/observer/block_channel_parsed.go deleted file mode 100644 index f75a659..0000000 --- a/observer/block_channel_parsed.go +++ /dev/null @@ -1,127 +0,0 @@ -package observer - -import ( - "context" - "fmt" - "sync" - - "github.com/hyperledger/fabric-protos-go/common" - "go.uber.org/zap" - - hlfproto "github.com/s7techlab/hlf-sdk-go/block" -) - -type ( - ParsedBlockChannel struct { - BlockChannel *BlockChannel - - transformers []BlockTransformer - configBlock *common.Block - - blocks chan *ParsedBlock - isWork bool - cancelObserve context.CancelFunc - mu sync.Mutex - } - - ParsedBlockChannelOpt func(*ParsedBlockChannel) -) - -func WithParsedChannelBlockTransformers(transformers []BlockTransformer) ParsedBlockChannelOpt { - return func(pbc *ParsedBlockChannel) { - pbc.transformers = transformers - } -} - -func WithParsedChannelConfigBlock(configBlock *common.Block) ParsedBlockChannelOpt { - return func(pbc *ParsedBlockChannel) { - pbc.configBlock = configBlock - } -} - -func NewParsedBlockChannel(blockChannel *BlockChannel, opts ...ParsedBlockChannelOpt) *ParsedBlockChannel { - parsedBlockChannel := &ParsedBlockChannel{ - BlockChannel: blockChannel, - } - - for _, opt := range opts { - opt(parsedBlockChannel) - } - - return parsedBlockChannel -} - -func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock, error) { - p.mu.Lock() - defer p.mu.Unlock() - - if p.isWork { - return p.blocks, nil - } - - // ctxObserve using for nested control process without stopped primary context - ctxObserve, cancel := context.WithCancel(ctx) - p.cancelObserve = cancel - - incomingBlocks, err := p.BlockChannel.Observe(ctxObserve) - if err != nil { - return nil, fmt.Errorf("observe common blocks: %w", err) - } - - p.blocks = make(chan *ParsedBlock) - - go func() { - p.isWork = true - - for { - select { - case incomingBlock, hasMore := <-incomingBlocks: - if !hasMore { - continue - } - - if incomingBlock == nil { - continue - } - - block := &ParsedBlock{ - Channel: p.BlockChannel.channel, - } - block.Block, block.Error = hlfproto.ParseBlock(incomingBlock.Block, hlfproto.WithConfigBlock(p.configBlock)) - - for pos, transformer := range p.transformers { - if err = transformer.Transform(block); err != nil { - p.BlockChannel.logger.Warn(`transformer`, zap.Int(`pos`, pos), zap.Error(err)) - } - } - - p.blocks <- block - - case <-ctxObserve.Done(): - if err = p.Stop(); err != nil { - p.BlockChannel.lastError = err - } - return - } - } - }() - - return p.blocks, nil -} - -func (p *ParsedBlockChannel) Stop() error { - p.mu.Lock() - defer p.mu.Unlock() - - // p.blocks mustn't be closed here, because it is closed elsewhere - - err := p.BlockChannel.Stop() - - // If primary context is done then cancel ctxObserver - if p.cancelObserve != nil { - p.cancelObserve() - } - - p.isWork = false - return err -} diff --git a/observer/block_peer_common.go b/observer/block_peer_common.go deleted file mode 100644 index f46de51..0000000 --- a/observer/block_peer_common.go +++ /dev/null @@ -1,246 +0,0 @@ -package observer - -import ( - "context" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/s7techlab/hlf-sdk-go/api" -) - -const DefaultBlockPeerObservePeriod = 10 * time.Second - -type ( - BlockPeer struct { - mu sync.RWMutex - - peerChannels PeerChannels - blockDeliverer api.BlocksDeliverer - channelObservers map[string]*BlockPeerChannel - // seekFrom has a higher priority than seekFromFetcher (look getSeekFrom method) - seekFrom map[string]uint64 - seekFromFetcher SeekFromFetcher - observePeriod time.Duration - stopRecreateStream bool - logger *zap.Logger - - blocks chan *Block - blocksByChannels map[string]chan *Block - - isWork bool - cancelObserve context.CancelFunc - } - - BlockPeerChannel struct { - Observer *BlockChannel - err error - } - - BlockPeerOpts struct { - seekFrom map[string]uint64 - seekFromFetcher SeekFromFetcher - observePeriod time.Duration - stopRecreateStream bool - logger *zap.Logger - } - - BlockPeerOpt func(*BlockPeerOpts) - - ChannelStatus struct { - Status ChannelObserverStatus - Err error - } -) - -var DefaultBlockPeerOpts = &BlockPeerOpts{ - observePeriod: DefaultBlockPeerObservePeriod, - logger: zap.NewNop(), -} - -func WithBlockPeerLogger(logger *zap.Logger) BlockPeerOpt { - return func(opts *BlockPeerOpts) { - opts.logger = logger - } -} - -func WithSeekFrom(seekFrom map[string]uint64) BlockPeerOpt { - return func(opts *BlockPeerOpts) { - opts.seekFrom = seekFrom - } -} - -func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) BlockPeerOpt { - return func(opts *BlockPeerOpts) { - opts.seekFromFetcher = seekFromFetcher - } -} - -func WithBlockPeerObservePeriod(observePeriod time.Duration) BlockPeerOpt { - return func(opts *BlockPeerOpts) { - if observePeriod != 0 { - opts.observePeriod = observePeriod - } - } -} - -func WithBlockStopRecreateStream(stop bool) BlockPeerOpt { - return func(opts *BlockPeerOpts) { - opts.stopRecreateStream = stop - } -} - -func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer, opts ...BlockPeerOpt) *BlockPeer { - blockPeerOpts := DefaultBlockPeerOpts - for _, opt := range opts { - opt(blockPeerOpts) - } - - blockPeer := &BlockPeer{ - peerChannels: peerChannels, - blockDeliverer: blockDeliverer, - channelObservers: make(map[string]*BlockPeerChannel), - blocks: make(chan *Block), - blocksByChannels: make(map[string]chan *Block), - seekFrom: blockPeerOpts.seekFrom, - seekFromFetcher: blockPeerOpts.seekFromFetcher, - observePeriod: blockPeerOpts.observePeriod, - stopRecreateStream: blockPeerOpts.stopRecreateStream, - logger: blockPeerOpts.logger, - } - - return blockPeer -} - -func (bp *BlockPeer) ChannelObservers() map[string]*BlockPeerChannel { - bp.mu.RLock() - defer bp.mu.RUnlock() - - var copyChannelObservers = make(map[string]*BlockPeerChannel, len(bp.channelObservers)) - for key, value := range bp.channelObservers { - copyChannelObservers[key] = value - } - - return copyChannelObservers -} - -func (bp *BlockPeer) Observe(ctx context.Context) <-chan *Block { - if bp.isWork { - return bp.blocks - } - - // ctxObserve using for nested control process without stopped primary context - ctxObserve, cancel := context.WithCancel(ctx) - bp.cancelObserve = cancel - - bp.initChannels(ctxObserve) - - // init new channels if they are fetched - go func() { - bp.isWork = true - - ticker := time.NewTicker(bp.observePeriod) - for { - select { - case <-ctxObserve.Done(): - bp.Stop() - return - - case <-ticker.C: - bp.initChannels(ctxObserve) - } - } - }() - - return bp.blocks -} - -func (bp *BlockPeer) Stop() { - bp.mu.Lock() - defer bp.mu.Unlock() - - // bp.blocks and bp.blocksByChannels mustn't be closed here, because they are closed elsewhere - - for _, c := range bp.channelObservers { - if err := c.Observer.Stop(); err != nil { - zap.Error(err) - } - } - - bp.channelObservers = make(map[string]*BlockPeerChannel) - - if bp.cancelObserve != nil { - bp.cancelObserve() - } - - bp.isWork = false -} - -func (bp *BlockPeer) initChannels(ctx context.Context) { - for channel := range bp.peerChannels.Channels() { - bp.mu.RLock() - _, ok := bp.channelObservers[channel] - bp.mu.RUnlock() - - if !ok { - bp.logger.Info(`add channel observer`, zap.String(`channel`, channel)) - - blockPeerChannel := bp.peerChannel(ctx, channel) - - bp.mu.Lock() - bp.channelObservers[channel] = blockPeerChannel - bp.mu.Unlock() - } - } -} - -func (bp *BlockPeer) getSeekFrom(channel string) SeekFromFetcher { - seekFrom := ChannelSeekOldest() - // at first check seekFrom var, if it is empty, check seekFromFetcher - bp.mu.RLock() - seekFromNum, exist := bp.seekFrom[channel] - bp.mu.RUnlock() - if exist { - seekFrom = ChannelSeekFrom(seekFromNum - 1) - } else { - // if seekFromFetcher is also empty, use ChannelSeekOldest - if bp.seekFromFetcher != nil { - seekFrom = bp.seekFromFetcher - } - } - - return seekFrom -} - -func (bp *BlockPeer) peerChannel(ctx context.Context, channel string) *BlockPeerChannel { - seekFrom := bp.getSeekFrom(channel) - - peerChannel := &BlockPeerChannel{} - peerChannel.Observer = NewBlockChannel( - channel, - bp.blockDeliverer, - seekFrom, - WithChannelBlockLogger(bp.logger), - WithChannelStopRecreateStream(bp.stopRecreateStream)) - - _, peerChannel.err = peerChannel.Observer.Observe(ctx) - if peerChannel.err != nil { - bp.logger.Warn(`init channel observer`, zap.Error(peerChannel.err)) - } - - // channel merger - go func() { - for b := range peerChannel.Observer.blocks { - bp.blocks <- b - } - - // after all reads peerParsedChannel.observer.blocks close channels - close(bp.blocks) - for _, blocks := range bp.blocksByChannels { - close(blocks) - } - }() - - return peerChannel -} diff --git a/observer/block_peer_common_concurrently.go b/observer/block_peer_common_concurrently.go deleted file mode 100644 index 4cb1192..0000000 --- a/observer/block_peer_common_concurrently.go +++ /dev/null @@ -1,102 +0,0 @@ -package observer - -import ( - "context" - "time" - - "go.uber.org/zap" -) - -type ChannelCommonBlocks struct { - Name string - Blocks <-chan *Block -} - -type BlocksByChannels struct { - channels chan *ChannelCommonBlocks -} - -func (b *BlocksByChannels) Observe() chan *ChannelCommonBlocks { - return b.channels -} - -func (bp *BlockPeer) ObserveByChannels(ctx context.Context) *BlocksByChannels { - blocksByChannels := &BlocksByChannels{ - channels: make(chan *ChannelCommonBlocks), - } - - bp.initChannelsConcurrently(ctx, blocksByChannels) - - // init new channels if they are fetched - go func() { - ticker := time.NewTicker(bp.observePeriod) - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - bp.initChannelsConcurrently(ctx, blocksByChannels) - } - } - }() - - // closer - go func() { - <-ctx.Done() - bp.Stop() - }() - - return blocksByChannels -} - -func (bp *BlockPeer) initChannelsConcurrently(ctx context.Context, blocksByChannels *BlocksByChannels) { - for channel := range bp.peerChannels.Channels() { - bp.mu.RLock() - _, ok := bp.channelObservers[channel] - bp.mu.RUnlock() - - if !ok { - bp.logger.Info(`add channel observer concurrently`, zap.String(`channel`, channel)) - - blockPeerChannel := bp.peerChannelConcurrently(ctx, channel, blocksByChannels) - - bp.mu.Lock() - bp.channelObservers[channel] = blockPeerChannel - bp.mu.Unlock() - } - } -} - -func (bp *BlockPeer) peerChannelConcurrently(ctx context.Context, channel string, blocksByChannels *BlocksByChannels) *BlockPeerChannel { - seekFrom := bp.getSeekFrom(channel) - - peerChannel := &BlockPeerChannel{} - peerChannel.Observer = NewBlockChannel( - channel, - bp.blockDeliverer, - seekFrom, - WithChannelBlockLogger(bp.logger), - WithChannelStopRecreateStream(bp.stopRecreateStream)) - - _, peerChannel.err = peerChannel.Observer.Observe(ctx) - if peerChannel.err != nil { - bp.logger.Warn(`init channel observer`, zap.Error(peerChannel.err)) - } - - blocks := make(chan *Block) - bp.blocksByChannels[channel] = blocks - - go func() { - blocksByChannels.channels <- &ChannelCommonBlocks{Name: channel, Blocks: blocks} - }() - - // channel merger - go func() { - for b := range peerChannel.Observer.blocks { - blocks <- b - } - }() - - return peerChannel -} diff --git a/observer/block_peer_parsed.go b/observer/block_peer_parsed.go deleted file mode 100644 index 704b32a..0000000 --- a/observer/block_peer_parsed.go +++ /dev/null @@ -1,188 +0,0 @@ -package observer - -import ( - "context" - "sync" - "time" - - "github.com/hyperledger/fabric-protos-go/common" - "go.uber.org/zap" -) - -type ( - ParsedBlockPeer struct { - mu sync.RWMutex - - blockPeer *BlockPeer - transformers []BlockTransformer - configBlocks map[string]*common.Block - - blocks chan *ParsedBlock - blocksByChannels map[string]chan *ParsedBlock - - parsedChannelObservers map[string]*ParsedBlockPeerChannel - - isWork bool - cancelObserve context.CancelFunc - } - - ParsedBlockPeerChannel struct { - Observer *ParsedBlockChannel - err error - } - - ParsedBlockPeerOpt func(*ParsedBlockPeer) -) - -func WithBlockPeerTransformer(transformers ...BlockTransformer) ParsedBlockPeerOpt { - return func(pbp *ParsedBlockPeer) { - pbp.transformers = transformers - } -} - -// WithConfigBlocks just for correct parsing of BFT at hlfproto.ParseBlock -func WithConfigBlocks(configBlocks map[string]*common.Block) ParsedBlockPeerOpt { - return func(pbp *ParsedBlockPeer) { - pbp.configBlocks = configBlocks - } -} - -func NewParsedBlockPeer(blocksPeer *BlockPeer, opts ...ParsedBlockPeerOpt) *ParsedBlockPeer { - parsedBlockPeer := &ParsedBlockPeer{ - blockPeer: blocksPeer, - parsedChannelObservers: make(map[string]*ParsedBlockPeerChannel), - blocks: make(chan *ParsedBlock), - blocksByChannels: make(map[string]chan *ParsedBlock), - } - - for _, opt := range opts { - opt(parsedBlockPeer) - } - - return parsedBlockPeer -} - -func (pbp *ParsedBlockPeer) ChannelObservers() map[string]*ParsedBlockPeerChannel { - pbp.mu.RLock() - defer pbp.mu.RUnlock() - - var copyChannelObservers = make(map[string]*ParsedBlockPeerChannel, len(pbp.parsedChannelObservers)) - for key, value := range pbp.parsedChannelObservers { - copyChannelObservers[key] = value - } - - return copyChannelObservers -} - -func (pbp *ParsedBlockPeer) Observe(ctx context.Context) <-chan *ParsedBlock { - if pbp.isWork { - return pbp.blocks - } - - // ctxObserve using for nested control process without stopped primary context - ctxObserve, cancel := context.WithCancel(ctx) - pbp.cancelObserve = cancel - - pbp.initParsedChannels(ctxObserve) - - // init new channels if they are fetched - go func() { - pbp.isWork = true - - time.Sleep(time.Second) - - ticker := time.NewTicker(pbp.blockPeer.observePeriod) - for { - select { - case <-ctxObserve.Done(): - pbp.Stop() - return - - case <-ticker.C: - pbp.initParsedChannels(ctxObserve) - } - } - }() - - return pbp.blocks -} - -func (pbp *ParsedBlockPeer) Stop() { - pbp.mu.Lock() - defer pbp.mu.Unlock() - - // pbp.blocks and pbp.blocksByChannels mustn't be closed here, because they are closed elsewhere - - pbp.blockPeer.Stop() - - for _, c := range pbp.parsedChannelObservers { - if err := c.Observer.Stop(); err != nil { - zap.Error(err) - } - } - - pbp.parsedChannelObservers = make(map[string]*ParsedBlockPeerChannel) - - if pbp.cancelObserve != nil { - pbp.cancelObserve() - } - - pbp.isWork = false -} - -func (pbp *ParsedBlockPeer) initParsedChannels(ctx context.Context) { - for channel := range pbp.blockPeer.peerChannels.Channels() { - pbp.mu.RLock() - _, ok := pbp.parsedChannelObservers[channel] - pbp.mu.RUnlock() - - if !ok { - pbp.blockPeer.logger.Info(`add parsed channel observer`, zap.String(`channel`, channel)) - - parsedBlockPeerChannel := pbp.peerParsedChannel(ctx, channel) - - pbp.mu.Lock() - pbp.parsedChannelObservers[channel] = parsedBlockPeerChannel - pbp.mu.Unlock() - } - } -} - -func (pbp *ParsedBlockPeer) peerParsedChannel(ctx context.Context, channel string) *ParsedBlockPeerChannel { - seekFrom := pbp.blockPeer.getSeekFrom(channel) - - commonBlockChannel := NewBlockChannel( - channel, - pbp.blockPeer.blockDeliverer, - seekFrom, - WithChannelBlockLogger(pbp.blockPeer.logger), - WithChannelStopRecreateStream(pbp.blockPeer.stopRecreateStream)) - - configBlock := pbp.configBlocks[channel] - - peerParsedChannel := &ParsedBlockPeerChannel{} - peerParsedChannel.Observer = NewParsedBlockChannel( - commonBlockChannel, - WithParsedChannelBlockTransformers(pbp.transformers), - WithParsedChannelConfigBlock(configBlock)) - - _, peerParsedChannel.err = peerParsedChannel.Observer.Observe(ctx) - if peerParsedChannel.err != nil { - pbp.blockPeer.logger.Warn(`init parsed channel observer`, zap.Error(peerParsedChannel.err)) - } - - // channel merger - go func() { - for b := range peerParsedChannel.Observer.blocks { - pbp.blocks <- b - } - - // after all reads peerParsedChannel.observer.blocks close channels - close(pbp.blocks) - for _, blocks := range pbp.blocksByChannels { - close(blocks) - } - }() - - return peerParsedChannel -} diff --git a/observer/block_peer_parsed_concurrently.go b/observer/block_peer_parsed_concurrently.go deleted file mode 100644 index 4311f66..0000000 --- a/observer/block_peer_parsed_concurrently.go +++ /dev/null @@ -1,111 +0,0 @@ -package observer - -import ( - "context" - "time" - - "go.uber.org/zap" -) - -type ChannelParsedBlocks struct { - Name string - Blocks <-chan *ParsedBlock -} - -type ParsedBlocksByChannels struct { - channels chan *ChannelParsedBlocks -} - -func (p *ParsedBlocksByChannels) Observe() chan *ChannelParsedBlocks { - return p.channels -} - -func (pbp *ParsedBlockPeer) ObserveByChannels(ctx context.Context) *ParsedBlocksByChannels { - blocksByChannels := &ParsedBlocksByChannels{ - channels: make(chan *ChannelParsedBlocks), - } - - pbp.initParsedChannelsConcurrently(ctx, blocksByChannels) - - // init new channels if they are fetched - go func() { - pbp.isWork = true - - ticker := time.NewTicker(pbp.blockPeer.observePeriod) - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - pbp.initParsedChannelsConcurrently(ctx, blocksByChannels) - } - } - }() - - // closer - go func() { - <-ctx.Done() - pbp.Stop() - }() - - return blocksByChannels -} - -func (pbp *ParsedBlockPeer) initParsedChannelsConcurrently(ctx context.Context, blocksByChannels *ParsedBlocksByChannels) { - for channel := range pbp.blockPeer.peerChannels.Channels() { - pbp.mu.RLock() - _, ok := pbp.parsedChannelObservers[channel] - pbp.mu.RUnlock() - - if !ok { - pbp.blockPeer.logger.Info(`add parsed channel observer concurrently`, zap.String(`channel`, channel)) - - parsedBlockPeerChannel := pbp.peerParsedChannelConcurrently(ctx, channel, blocksByChannels) - - pbp.mu.Lock() - pbp.parsedChannelObservers[channel] = parsedBlockPeerChannel - pbp.mu.Unlock() - } - } -} - -func (pbp *ParsedBlockPeer) peerParsedChannelConcurrently(ctx context.Context, channel string, blocksByChannels *ParsedBlocksByChannels) *ParsedBlockPeerChannel { - seekFrom := pbp.blockPeer.getSeekFrom(channel) - - commonBlockChannel := NewBlockChannel( - channel, - pbp.blockPeer.blockDeliverer, - seekFrom, - WithChannelBlockLogger(pbp.blockPeer.logger), - WithChannelStopRecreateStream(pbp.blockPeer.stopRecreateStream)) - - configBlock := pbp.configBlocks[channel] - - peerParsedChannel := &ParsedBlockPeerChannel{} - peerParsedChannel.Observer = NewParsedBlockChannel( - commonBlockChannel, - WithParsedChannelBlockTransformers(pbp.transformers), - WithParsedChannelConfigBlock(configBlock)) - - _, peerParsedChannel.err = peerParsedChannel.Observer.Observe(ctx) - if peerParsedChannel.err != nil { - pbp.blockPeer.logger.Warn(`init parsed channel observer`, zap.Error(peerParsedChannel.err)) - } - - blocks := make(chan *ParsedBlock) - pbp.blocksByChannels[channel] = blocks - - go func() { - blocksByChannels.channels <- &ChannelParsedBlocks{Name: channel, Blocks: blocks} - }() - - // channel merger - go func() { - for b := range peerParsedChannel.Observer.blocks { - blocks <- b - } - }() - - return peerParsedChannel -} diff --git a/observer/block_stream_common.go b/observer/block_stream_common.go deleted file mode 100644 index 7e21388..0000000 --- a/observer/block_stream_common.go +++ /dev/null @@ -1,38 +0,0 @@ -package observer - -import ( - "context" - "time" - - "github.com/hyperledger/fabric-protos-go/common" -) - -type ( - Block struct { - Block *common.Block - Channel string - } - - CreateBlockStream func(context.Context) (<-chan *common.Block, error) - - CreateBlockStreamWithRetry func(context.Context, CreateBlockStream) (<-chan *common.Block, error) -) - -func CreateBlockStreamWithRetryDelay(delay time.Duration) CreateBlockStreamWithRetry { - return func(ctx context.Context, createBlockStream CreateBlockStream) (<-chan *common.Block, error) { - for { - select { - case <-ctx.Done(): - return nil, nil - default: - } - - blocks, err := createBlockStream(ctx) - if err == nil { - return blocks, nil - } - - time.Sleep(delay) - } - } -} diff --git a/observer/block_stream_parsed.go b/observer/block_stream_parsed.go deleted file mode 100644 index 884f80b..0000000 --- a/observer/block_stream_parsed.go +++ /dev/null @@ -1,14 +0,0 @@ -package observer - -import ( - hlfproto "github.com/s7techlab/hlf-sdk-go/block" -) - -type ( - ParsedBlock struct { - Block *hlfproto.Block // parsed block - BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil - Channel string - Error error - } -) diff --git a/observer/channel_blocks.go b/observer/channel_blocks.go new file mode 100644 index 0000000..644a068 --- /dev/null +++ b/observer/channel_blocks.go @@ -0,0 +1,220 @@ +package observer + +import ( + "context" + "fmt" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/msp" + "go.uber.org/zap" + + hlfproto "github.com/s7techlab/hlf-sdk-go/block" +) + +type ( + ChannelBlocks[T any] struct { + // pointer is to use Channel's data, which can be changed + *Channel + + channelWithBlocks chan *Block[T] + blocksDeliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error) + createStreamWithRetry CreateBlockStreamWithRetry[T] + + stopRecreateStream bool + + isWork bool + cancelObserve context.CancelFunc + } + + ChannelBlocksOpts struct { + *Opts + + // don't recreate stream if it has not any blocks + stopRecreateStream bool + } + + ChannelBlocksOpt func(*ChannelBlocksOpts) +) + +func WithChannelBlockLogger(logger *zap.Logger) ChannelBlocksOpt { + return func(opts *ChannelBlocksOpts) { + opts.Opts.logger = logger + } +} + +func WithChannelStopRecreateStream(stop bool) ChannelBlocksOpt { + return func(opts *ChannelBlocksOpts) { + opts.stopRecreateStream = stop + } +} + +var DefaultChannelBlocksOpts = &ChannelBlocksOpts{ + Opts: DefaultOpts, + stopRecreateStream: false, +} + +func NewChannelBlocks[T any]( + channel string, + deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error), + createStreamWithRetry CreateBlockStreamWithRetry[T], + seekFromFetcher SeekFromFetcher, + opts ...ChannelBlocksOpt, +) *ChannelBlocks[T] { + + channelBlocksOpts := DefaultChannelBlocksOpts + for _, opt := range opts { + opt(channelBlocksOpts) + } + + return &ChannelBlocks[T]{ + Channel: &Channel{ + channel: channel, + seekFromFetcher: seekFromFetcher, + identity: channelBlocksOpts.identity, + logger: channelBlocksOpts.logger.With(zap.String(`channel`, channel)), + }, + + blocksDeliverer: deliverer, + createStreamWithRetry: createStreamWithRetry, + stopRecreateStream: channelBlocksOpts.stopRecreateStream, + } +} + +func (cb *ChannelBlocks[T]) Stop() error { + cb.mu.Lock() + defer cb.mu.Unlock() + + // cb.channelWithBlocks mustn't be closed here, because it is closed elsewhere + + err := cb.Channel.stop() + + // If primary context is done then cancel ctxObserver + if cb.cancelObserve != nil { + cb.cancelObserve() + } + + cb.isWork = false + return err +} + +func (cb *ChannelBlocks[T]) Observe(ctx context.Context) (<-chan *Block[T], error) { + cb.mu.Lock() + defer cb.mu.Unlock() + + if cb.isWork { + return cb.channelWithBlocks, nil + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + cb.cancelObserve = cancel + + if err := cb.allowToObserve(); err != nil { + return nil, err + } + + // Double check + if err := cb.allowToObserve(); err != nil { + return nil, err + } + + cb.channelWithBlocks = make(chan *Block[T]) + + go func() { + cb.isWork = true + + defer close(cb.channelWithBlocks) + + cb.logger.Debug(`creating block stream`) + incomingBlocks, errCreateStream := cb.createStreamWithRetry(ctxObserve, cb.createStream) + if errCreateStream != nil { + return + } + + cb.logger.Info(`block stream created`) + for { + select { + case incomingBlock, hasMore := <-incomingBlocks: + + var err error + if !hasMore && !cb.stopRecreateStream { + cb.logger.Debug(`block stream interrupted, recreate`) + incomingBlocks, err = cb.createStreamWithRetry(ctx, cb.createStream) + if err != nil { + return + } + + cb.logger.Debug(`block stream recreated`) + continue + } + + switch t := any(incomingBlock).(type) { + case *common.Block: + if t == nil { + continue + } + + case *hlfproto.Block: + if t == nil { + continue + } + + default: + continue + } + + cb.channelWithBlocks <- &Block[T]{ + Channel: cb.channel, + Block: incomingBlock, + } + + case <-ctxObserve.Done(): + if err := cb.Stop(); err != nil { + cb.lastError = err + } + return + } + } + }() + + return cb.channelWithBlocks, nil +} + +func (cb *ChannelBlocks[T]) createStream(ctx context.Context) (<-chan T, error) { + cb.preCreateStream() + + cb.logger.Debug(`connecting to blocks stream, receiving seek offset`, + zap.Uint64(`attempt`, cb.connectAttempt)) + + seekFrom, err := cb.processSeekFrom(ctx) + if err != nil { + cb.logger.Warn(`seek from failed`, zap.Error(err)) + return nil, err + } + cb.logger.Info(`block seek offset received`, zap.Uint64(`seek from`, seekFrom)) + + var ( + blocks <-chan T + closer func() error + ) + cb.logger.Debug(`subscribing to blocks stream`) + blocks, closer, err = cb.blocksDeliverer(ctx, cb.channel, cb.identity, int64(seekFrom)) + if err != nil { + cb.logger.Warn(`subscribing to blocks stream failed`, zap.Error(err)) + cb.setError(err) + return nil, fmt.Errorf(`blocks deliverer: %w`, err) + } + cb.logger.Info(`subscribed to blocks stream`) + + cb.afterCreateStream(closer) + + // Check close context + select { + case <-ctx.Done(): + err = closer() + return nil, err + default: + } + + return blocks, nil +} diff --git a/observer/channel_blocks_common.go b/observer/channel_blocks_common.go new file mode 100644 index 0000000..f421885 --- /dev/null +++ b/observer/channel_blocks_common.go @@ -0,0 +1,21 @@ +package observer + +import ( + "github.com/hyperledger/fabric-protos-go/common" + + "github.com/s7techlab/hlf-sdk-go/api" +) + +type ( + ChannelBlocksCommon struct { + *ChannelBlocks[*common.Block] + } +) + +func NewChannelBlocksCommon(channel string, blocksDeliver api.BlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...ChannelBlocksOpt) *ChannelBlocksCommon { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*common.Block](DefaultConnectRetryDelay) + + chBlocks := NewChannelBlocks[*common.Block](channel, blocksDeliver.Blocks, createStreamWithRetry, seekFromFetcher, opts...) + + return &ChannelBlocksCommon{ChannelBlocks: chBlocks} +} diff --git a/observer/channel_blocks_parsed.go b/observer/channel_blocks_parsed.go new file mode 100644 index 0000000..eb3dc3c --- /dev/null +++ b/observer/channel_blocks_parsed.go @@ -0,0 +1,20 @@ +package observer + +import ( + "github.com/s7techlab/hlf-sdk-go/api" + hlfproto "github.com/s7techlab/hlf-sdk-go/block" +) + +type ( + ChannelBlocksParsed struct { + *ChannelBlocks[*hlfproto.Block] + } +) + +func NewChannelBlocksParsed(channel string, blocksDeliver api.ParsedBlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...ChannelBlocksOpt) *ChannelBlocksParsed { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*hlfproto.Block](DefaultConnectRetryDelay) + + chBlocks := NewChannelBlocks[*hlfproto.Block](channel, blocksDeliver.ParsedBlocks, createStreamWithRetry, seekFromFetcher, opts...) + + return &ChannelBlocksParsed{ChannelBlocks: chBlocks} +} diff --git a/observer/channel_blocks_stream.go b/observer/channel_blocks_stream.go new file mode 100644 index 0000000..90040e8 --- /dev/null +++ b/observer/channel_blocks_stream.go @@ -0,0 +1,31 @@ +package observer + +import ( + "context" + "time" +) + +type ( + CreateBlockStream[T any] func(context.Context) (<-chan T, error) + + CreateBlockStreamWithRetry[T any] func(context.Context, CreateBlockStream[T]) (<-chan T, error) +) + +func CreateBlockStreamWithRetryDelay[T any](delay time.Duration) CreateBlockStreamWithRetry[T] { + return func(ctx context.Context, createBlockStream CreateBlockStream[T]) (<-chan T, error) { + for { + select { + case <-ctx.Done(): + return nil, nil + default: + } + + blocks, err := createBlockStream(ctx) + if err == nil { + return blocks, nil + } + + time.Sleep(delay) + } + } +} diff --git a/observer/channel_peer.go b/observer/channel_peer.go deleted file mode 100644 index 0fdb553..0000000 --- a/observer/channel_peer.go +++ /dev/null @@ -1,198 +0,0 @@ -package observer - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/timestamp" - "github.com/hyperledger/fabric-protos-go/common" - "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/s7techlab/hlf-sdk-go/api" -) - -const DefaultChannelPeerObservePeriod = 30 * time.Second - -type ( - ChannelInfo struct { - Channel string - Height uint64 - UpdatedAt *timestamppb.Timestamp - } - - // ChannelPeer observes for peer channels - ChannelPeer struct { - channelFetcher PeerChannelsFetcher - - channelsMatcher *ChannelsMatcher - - channels map[string]*ChannelInfo - observePeriod time.Duration - - lastError error - mu sync.Mutex - logger *zap.Logger - - isWork bool - cancelObserve context.CancelFunc - } - - PeerChannelsFetcher interface { - api.ChannelListGetter - api.ChainInfoGetter - } - - PeerChannels interface { - Channels() map[string]*ChannelInfo - } - - ChannelPeerOpts struct { - channels []ChannelToMatch - observePeriod time.Duration - logger *zap.Logger - } - - ChannelPeerOpt func(*ChannelPeerOpts) -) - -var DefaultChannelPeerOpts = &ChannelPeerOpts{ - channels: MatchAllChannels, - observePeriod: DefaultChannelPeerObservePeriod, - logger: zap.NewNop(), -} - -func WithChannels(channels []ChannelToMatch) ChannelPeerOpt { - return func(opts *ChannelPeerOpts) { - opts.channels = channels - } -} - -func WithChannelPeerLogger(logger *zap.Logger) ChannelPeerOpt { - return func(opts *ChannelPeerOpts) { - opts.logger = logger - } -} - -func NewChannelPeer(peerChannelsFetcher PeerChannelsFetcher, opts ...ChannelPeerOpt) (*ChannelPeer, error) { - channelPeerOpts := DefaultChannelPeerOpts - for _, opt := range opts { - opt(channelPeerOpts) - } - - channelsMatcher, err := NewChannelsMatcher(channelPeerOpts.channels) - if err != nil { - return nil, fmt.Errorf(`channels matcher: %w`, err) - } - - channelPeer := &ChannelPeer{ - channelFetcher: peerChannelsFetcher, - channelsMatcher: channelsMatcher, - channels: make(map[string]*ChannelInfo), - observePeriod: channelPeerOpts.observePeriod, - logger: channelPeerOpts.logger, - } - - return channelPeer, nil -} - -func (cp *ChannelPeer) Stop() { - cp.cancelObserve() - cp.isWork = false -} - -func (cp *ChannelPeer) Observe(ctx context.Context) { - if cp.isWork { - return - } - - // ctxObserve using for nested control process without stopped primary context - ctxObserve, cancel := context.WithCancel(context.Background()) - cp.cancelObserve = cancel - - go func() { - cp.isWork = true - cp.updateChannels(ctxObserve) - - ticker := time.NewTicker(cp.observePeriod) - for { - select { - case <-ctx.Done(): - // If primary context is done then cancel ctxObserver - cp.cancelObserve() - return - - case <-ctxObserve.Done(): - return - - case <-ticker.C: - cp.updateChannels(ctxObserve) - } - } - }() -} - -func (cp *ChannelPeer) Channels() map[string]*ChannelInfo { - cp.mu.Lock() - defer cp.mu.Unlock() - - var copyChannelInfo = make(map[string]*ChannelInfo, len(cp.channels)) - for key, value := range cp.channels { - copyChannelInfo[key] = value - } - - return copyChannelInfo -} - -func (cp *ChannelPeer) updateChannels(ctx context.Context) { - cp.logger.Debug(`fetching channels`) - channelsInfo, err := cp.channelFetcher.GetChannels(ctx) - if err != nil { - cp.logger.Warn(`error while fetching channels`, zap.Error(err)) - cp.lastError = err - return - } - - channels := ChannelsInfoToStrings(channelsInfo.Channels) - cp.logger.Debug(`channels fetched`, zap.Strings(`channels`, channels)) - - channelsMatched, err := cp.channelsMatcher.Match(channels) - if err != nil { - cp.logger.Warn(`channel matching error`, zap.Error(err)) - cp.lastError = err - return - } - cp.logger.Debug(`channels matched`, zap.Reflect(`channels`, channelsMatched)) - - channelHeights := make(map[string]uint64) - - for _, channel := range channelsMatched { - var channelInfo *common.BlockchainInfo - channelInfo, err = cp.channelFetcher.GetChainInfo(ctx, channel.Name) - if err != nil { - cp.lastError = err - continue - } - channelHeights[channel.Name] = channelInfo.Height - } - - cp.mu.Lock() - defer cp.mu.Unlock() - - for channel, height := range channelHeights { - var updatedAt *timestamp.Timestamp - updatedAt, err = ptypes.TimestampProto(time.Now()) - if err != nil { - cp.lastError = err - } - - cp.channels[channel] = &ChannelInfo{ - Channel: channel, - Height: height, - UpdatedAt: updatedAt, - } - } -} diff --git a/observer/channel_peer_fetcher_mock.go b/observer/channel_peer_fetcher_mock.go deleted file mode 100644 index 2dc28a6..0000000 --- a/observer/channel_peer_fetcher_mock.go +++ /dev/null @@ -1,41 +0,0 @@ -package observer - -import ( - "context" - "fmt" - - "github.com/hyperledger/fabric-protos-go/common" - "github.com/hyperledger/fabric-protos-go/peer" -) - -type ChannelPeerFetcherMock struct { - channels map[string]uint64 -} - -func NewChannelPeerFetcherMock(channels map[string]uint64) *ChannelPeerFetcherMock { - return &ChannelPeerFetcherMock{ - channels: channels, - } -} - -func (c *ChannelPeerFetcherMock) GetChannels(context.Context) (*peer.ChannelQueryResponse, error) { - var channels []*peer.ChannelInfo - for channelName := range c.channels { - channels = append(channels, &peer.ChannelInfo{ChannelId: channelName}) - } - - return &peer.ChannelQueryResponse{ - Channels: channels, - }, nil -} - -func (c *ChannelPeerFetcherMock) GetChainInfo(_ context.Context, channel string) (*common.BlockchainInfo, error) { - chHeight, exists := c.channels[channel] - if !exists { - return nil, fmt.Errorf("channel '%s' does not exist", channel) - } - - return &common.BlockchainInfo{ - Height: chHeight, - }, nil -} diff --git a/observer/channel_peer_mock.go b/observer/channel_peer_mock.go deleted file mode 100644 index a641fa5..0000000 --- a/observer/channel_peer_mock.go +++ /dev/null @@ -1,40 +0,0 @@ -package observer - -import ( - "sync" -) - -type ChannelPeerMock struct { - mu sync.Mutex - channelsInfo map[string]*ChannelInfo -} - -func NewChannelPeerMock(channelsInfo ...*ChannelInfo) *ChannelPeerMock { - channels := make(map[string]*ChannelInfo, len(channelsInfo)) - for _, channelInfo := range channelsInfo { - channels[channelInfo.Channel] = channelInfo - } - - return &ChannelPeerMock{ - channelsInfo: channels, - } -} - -func (m *ChannelPeerMock) Channels() map[string]*ChannelInfo { - m.mu.Lock() - defer m.mu.Unlock() - - var copyChannelInfo = make(map[string]*ChannelInfo, len(m.channelsInfo)) - for key, value := range m.channelsInfo { - copyChannelInfo[key] = value - } - - return copyChannelInfo -} - -func (m *ChannelPeerMock) UpdateChannelInfo(channelInfo *ChannelInfo) { - m.mu.Lock() - defer m.mu.Unlock() - - m.channelsInfo[channelInfo.Channel] = channelInfo -} diff --git a/observer/channels_blocks_peer.go b/observer/channels_blocks_peer.go new file mode 100644 index 0000000..686d573 --- /dev/null +++ b/observer/channels_blocks_peer.go @@ -0,0 +1,251 @@ +package observer + +import ( + "context" + "sync" + "time" + + "github.com/hyperledger/fabric/msp" + "go.uber.org/zap" +) + +const DefaultChannelsBLocksPeerRefreshPeriod = 10 * time.Second + +type ( + ChannelsBlocksPeer[T any] struct { + channelObservers map[string]*ChannelBlocks[T] + + blocks chan *Block[T] + + peerChannelsGetter PeerChannelsGetter + deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error) + createStreamWithRetry CreateBlockStreamWithRetry[T] + + refreshPeriod time.Duration + + // seekFrom has a higher priority than seekFromFetcher (look getSeekFrom method) + seekFrom map[string]uint64 + seekFromFetcher SeekFromFetcher + stopRecreateStream bool + + isWork bool + cancelObserve context.CancelFunc + + mu sync.RWMutex + logger *zap.Logger + } + + ChannelsBlocksPeerOpts struct { + seekFrom map[string]uint64 + seekFromFetcher SeekFromFetcher + refreshPeriod time.Duration + stopRecreateStream bool + logger *zap.Logger + } + + ChannelsBlocksPeerOpt func(*ChannelsBlocksPeerOpts) +) + +var DefaultChannelsBlocksPeerOpts = &ChannelsBlocksPeerOpts{ + refreshPeriod: DefaultChannelsBLocksPeerRefreshPeriod, + logger: zap.NewNop(), +} + +func WithChannelsBlocksPeerLogger(logger *zap.Logger) ChannelsBlocksPeerOpt { + return func(opts *ChannelsBlocksPeerOpts) { + opts.logger = logger + } +} + +func WithSeekFrom(seekFrom map[string]uint64) ChannelsBlocksPeerOpt { + return func(opts *ChannelsBlocksPeerOpts) { + opts.seekFrom = seekFrom + } +} + +func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) ChannelsBlocksPeerOpt { + return func(opts *ChannelsBlocksPeerOpts) { + opts.seekFromFetcher = seekFromFetcher + } +} + +func WithChannelsBlocksPeerRefreshPeriod(refreshPeriod time.Duration) ChannelsBlocksPeerOpt { + return func(opts *ChannelsBlocksPeerOpts) { + if refreshPeriod != 0 { + opts.refreshPeriod = refreshPeriod + } + } +} + +func WithBlockStopRecreateStream(stop bool) ChannelsBlocksPeerOpt { + return func(opts *ChannelsBlocksPeerOpts) { + opts.stopRecreateStream = stop + } +} + +func NewChannelsBlocksPeer[T any]( + peerChannelsGetter PeerChannelsGetter, + deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error), + createStreamWithRetry CreateBlockStreamWithRetry[T], + opts ...ChannelsBlocksPeerOpt, +) *ChannelsBlocksPeer[T] { + + channelsBlocksPeerOpts := DefaultChannelsBlocksPeerOpts + for _, opt := range opts { + opt(channelsBlocksPeerOpts) + } + + return &ChannelsBlocksPeer[T]{ + channelObservers: make(map[string]*ChannelBlocks[T]), + blocks: make(chan *Block[T]), + + peerChannelsGetter: peerChannelsGetter, + deliverer: deliverer, + createStreamWithRetry: createStreamWithRetry, + refreshPeriod: channelsBlocksPeerOpts.refreshPeriod, + + seekFrom: channelsBlocksPeerOpts.seekFrom, + seekFromFetcher: channelsBlocksPeerOpts.seekFromFetcher, + stopRecreateStream: channelsBlocksPeerOpts.stopRecreateStream, + logger: channelsBlocksPeerOpts.logger, + } +} + +func (acb *ChannelsBlocksPeer[T]) Channels() map[string]*Channel { + acb.mu.RLock() + defer acb.mu.RUnlock() + + var copyChannels = make(map[string]*Channel, len(acb.channelObservers)) + for key, value := range acb.channelObservers { + copyChannels[key] = value.Channel + } + + return copyChannels +} + +func (acb *ChannelsBlocksPeer[T]) Stop() { + // acb.blocks and acb.blocksByChannels mustn't be closed here, because they are closed elsewhere + + acb.mu.RLock() + for _, c := range acb.channelObservers { + if err := c.Stop(); err != nil { + zap.Error(err) + } + } + acb.mu.RUnlock() + + acb.mu.Lock() + acb.channelObservers = make(map[string]*ChannelBlocks[T]) + acb.mu.Unlock() + + if acb.cancelObserve != nil { + acb.cancelObserve() + } + + acb.isWork = false +} + +func (acb *ChannelsBlocksPeer[T]) Observe(ctx context.Context) <-chan *Block[T] { + if acb.isWork { + return acb.blocks + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + acb.cancelObserve = cancel + + acb.startNotObservedChannels(ctxObserve, acb.initChannelsObservers()) + + acb.blocks = make(chan *Block[T]) + + // init new channels if they are fetched + go func() { + acb.isWork = true + + ticker := time.NewTicker(acb.refreshPeriod) + defer func() { + ticker.Stop() + close(acb.blocks) + }() + + for { + select { + case <-ctxObserve.Done(): + acb.Stop() + return + + case <-ticker.C: + acb.startNotObservedChannels(ctxObserve, acb.initChannelsObservers()) + } + } + }() + + return acb.blocks +} + +func (acb *ChannelsBlocksPeer[T]) startNotObservedChannels(ctx context.Context, notObservedChannels []*ChannelBlocks[T]) { + for _, notObservedChannel := range notObservedChannels { + chBlocks := notObservedChannel + + if _, err := chBlocks.Observe(ctx); err != nil { + acb.logger.Warn(`init channel observer`, zap.String("channel", notObservedChannel.channel), zap.Error(err)) + } + + // channel merger + go func() { + for b := range chBlocks.channelWithBlocks { + acb.blocks <- b + } + }() + } +} + +func (acb *ChannelsBlocksPeer[T]) initChannelsObservers() []*ChannelBlocks[T] { + var notObservedChannels []*ChannelBlocks[T] + + for channel := range acb.peerChannelsGetter.Channels() { + acb.mu.RLock() + _, ok := acb.channelObservers[channel] + acb.mu.RUnlock() + + if !ok { + acb.logger.Info(`add channel observer`, zap.String(`channel`, channel)) + + seekFrom := acb.getSeekFrom(channel) + + chBlocks := NewChannelBlocks[T]( + channel, + acb.deliverer, + acb.createStreamWithRetry, + seekFrom, + WithChannelBlockLogger(acb.logger), + WithChannelStopRecreateStream(acb.stopRecreateStream)) + + acb.mu.Lock() + acb.channelObservers[channel] = chBlocks + acb.mu.Unlock() + + notObservedChannels = append(notObservedChannels, chBlocks) + } + } + + return notObservedChannels +} + +func (acb *ChannelsBlocksPeer[T]) getSeekFrom(channel string) SeekFromFetcher { + seekFrom := ChannelSeekOldest() + // at first check seekFrom var, if it is empty, check seekFromFetcher + acb.mu.RLock() + seekFromNum, exist := acb.seekFrom[channel] + acb.mu.RUnlock() + if exist { + seekFrom = ChannelSeekFrom(seekFromNum - 1) + } else { + // if seekFromFetcher is also empty, use ChannelSeekOldest + if acb.seekFromFetcher != nil { + seekFrom = acb.seekFromFetcher + } + } + + return seekFrom +} diff --git a/observer/channels_blocks_peer_common.go b/observer/channels_blocks_peer_common.go new file mode 100644 index 0000000..98aeae5 --- /dev/null +++ b/observer/channels_blocks_peer_common.go @@ -0,0 +1,19 @@ +package observer + +import ( + "github.com/hyperledger/fabric-protos-go/common" + + "github.com/s7techlab/hlf-sdk-go/api" +) + +type ChannelsBlocksPeerCommon struct { + *ChannelsBlocksPeer[*common.Block] +} + +func NewChannelsBlocksPeerCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...ChannelsBlocksPeerOpt) *ChannelsBlocksPeerCommon { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*common.Block](DefaultConnectRetryDelay) + + channelsBlocksPeerCommon := NewChannelsBlocksPeer[*common.Block](peerChannels, blocksDeliver.Blocks, createStreamWithRetry, opts...) + + return &ChannelsBlocksPeerCommon{ChannelsBlocksPeer: channelsBlocksPeerCommon} +} diff --git a/observer/block_peer_common_test.go b/observer/channels_blocks_peer_common_test.go similarity index 60% rename from observer/block_peer_common_test.go rename to observer/channels_blocks_peer_common_test.go index 363b76c..e2bbcec 100644 --- a/observer/block_peer_common_test.go +++ b/observer/channels_blocks_peer_common_test.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/hyperledger/fabric-protos-go/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -16,60 +17,60 @@ import ( var ( ctx = context.Background() - channelPeerMockForCommon *observer.ChannelPeerMock - commonBlockPeer *observer.BlockPeer - commonBlocks <-chan *observer.Block + peerChannelsMockForCommon *observer.PeerChannelsMock + channelsBlocksPeerCommon *observer.ChannelsBlocksPeerCommon + commonBlocks <-chan *observer.Block[*common.Block] - channelPeerMockConcurrentlyForCommon *observer.ChannelPeerMock - commonBlockPeerConcurrently *observer.BlockPeer - commonBlocksByChannels *observer.BlocksByChannels + peerChannelsMockConcurrentlyForCommon *observer.PeerChannelsMock + channelsBlocksPeerConcurrentlyCommon *observer.ChannelsBlocksPeerCommon + channelWithChannelsCommon *observer.ChannelWithChannels[*common.Block] ) -func blockPeerCommonTestBeforeSuit() { +func channelsBlocksPeerCommonTestBeforeSuit() { const closeChannelWhenAllRead = true blockDelivererMock, err := sdkmocks.NewBlocksDelivererMock(fmt.Sprintf("../%s", testdata.Path), closeChannelWhenAllRead) Expect(err).ShouldNot(HaveOccurred()) - channelPeerMockForCommon = observer.NewChannelPeerMock() + peerChannelsMockForCommon = observer.NewPeerChannelsMock() for _, channel := range testdata.Channels { - channelPeerMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - commonBlockPeer = observer.NewBlockPeer(channelPeerMockForCommon, blockDelivererMock, - observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + channelsBlocksPeerCommon = observer.NewChannelsBlocksPeerCommon(peerChannelsMockForCommon, blockDelivererMock, + observer.WithBlockStopRecreateStream(true), observer.WithChannelsBlocksPeerRefreshPeriod(time.Nanosecond)) - commonBlocks = commonBlockPeer.Observe(ctx) + commonBlocks = channelsBlocksPeerCommon.Observe(ctx) - channelPeerMockConcurrentlyForCommon = observer.NewChannelPeerMock() + peerChannelsMockConcurrentlyForCommon = observer.NewPeerChannelsMock() for _, channel := range testdata.Channels { - channelPeerMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - commonBlockPeerConcurrently = observer.NewBlockPeer(channelPeerMockConcurrentlyForCommon, blockDelivererMock, - observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + channelsBlocksPeerConcurrentlyCommon = observer.NewChannelsBlocksPeerCommon(peerChannelsMockConcurrentlyForCommon, blockDelivererMock, + observer.WithBlockStopRecreateStream(true), observer.WithChannelsBlocksPeerRefreshPeriod(time.Nanosecond)) - commonBlocksByChannels = commonBlockPeerConcurrently.ObserveByChannels(ctx) + channelWithChannelsCommon = channelsBlocksPeerConcurrentlyCommon.ObserveByChannels(ctx) } -var _ = Describe("Block Peer", func() { - Context("Block peer", func() { +var _ = Describe("All channels blocks common", func() { + Context("Sequentially", func() { It("should return current number of channels", func() { - channelObservers := commonBlockPeer.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels))) + channels := channelsBlocksPeerCommon.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels))) }) - It("should add channels to channelPeerMock", func() { + It("should add channels to peerChannelsMock", func() { newChannels := map[string]struct{}{"channel1": {}, "channel2": {}, "channel3": {}} for channel := range newChannels { - channelPeerMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to commonBlockPeer observer + // wait to channelsBlocksPeerCommon observer time.Sleep(time.Millisecond * 10) - channelObservers := commonBlockPeer.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) + channels := channelsBlocksPeerCommon.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels) + len(newChannels))) }) It("should return correct channels heights", func() { @@ -100,12 +101,12 @@ var _ = Describe("Block Peer", func() { }) }) - Context("Block peer concurrently", func() { + Context("Concurrently", func() { It("should return current number of channels", func() { - channelObservers := commonBlockPeerConcurrently.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels))) + channels := channelsBlocksPeerConcurrentlyCommon.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels))) - channelsWithBlocks := commonBlocksByChannels.Observe() + channelsWithBlocks := channelWithChannelsCommon.Observe() for i := 0; i < len(testdata.Channels); i++ { sampleOrFabcarChannelBlocks := <-channelsWithBlocks @@ -140,20 +141,20 @@ var _ = Describe("Block Peer", func() { } }) - It("should add channels to channelPeerMock", func() { + It("should add channels to peerChannelsMock", func() { channel4, channel5, channel6 := "channel4", "channel5", "channel6" newChannels := []string{channel4, channel5, channel6} for _, channel := range newChannels { - channelPeerMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to commonBlockPeer observer + // wait to channelsBlocksPeerCommon observer time.Sleep(time.Millisecond * 200) - channelObservers := commonBlockPeerConcurrently.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) + channels := channelsBlocksPeerConcurrentlyCommon.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels) + len(newChannels))) - channelsWithBlocks := commonBlocksByChannels.Observe() + channelsWithBlocks := channelWithChannelsCommon.Observe() for i := 0; i < len(newChannels); i++ { channel4Or5Or6Blocks := <-channelsWithBlocks diff --git a/observer/channels_blocks_peer_concurrently.go b/observer/channels_blocks_peer_concurrently.go new file mode 100644 index 0000000..172e217 --- /dev/null +++ b/observer/channels_blocks_peer_concurrently.go @@ -0,0 +1,79 @@ +package observer + +import ( + "context" + "time" + + "go.uber.org/zap" +) + +type ChannelBlocksWithName[T any] struct { + Name string + Blocks <-chan *Block[T] +} + +type ChannelWithChannels[T any] struct { + channels chan *ChannelBlocksWithName[T] +} + +func (cwc *ChannelWithChannels[T]) Observe() <-chan *ChannelBlocksWithName[T] { + return cwc.channels +} + +func (acb *ChannelsBlocksPeer[T]) ObserveByChannels(ctx context.Context) *ChannelWithChannels[T] { + channelWithChannels := &ChannelWithChannels[T]{ + channels: make(chan *ChannelBlocksWithName[T]), + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(ctx) + acb.cancelObserve = cancel + + acb.startNotObservedChannelsConcurrently(ctxObserve, acb.initChannelsObservers(), channelWithChannels) + + // init new channels if they are fetched + go func() { + ticker := time.NewTicker(acb.refreshPeriod) + defer func() { + ticker.Stop() + close(channelWithChannels.channels) + }() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + acb.startNotObservedChannelsConcurrently(ctxObserve, acb.initChannelsObservers(), channelWithChannels) + } + } + }() + + // closer + go func() { + <-ctx.Done() + acb.Stop() + }() + + return channelWithChannels +} + +func (acb *ChannelsBlocksPeer[T]) startNotObservedChannelsConcurrently( + ctx context.Context, + notObservedChannels []*ChannelBlocks[T], + channelWithChannels *ChannelWithChannels[T], +) { + + for _, notObservedChannel := range notObservedChannels { + chBlocks := notObservedChannel + + if _, err := chBlocks.Observe(ctx); err != nil { + acb.logger.Warn(`init channel observer concurrently`, zap.String("channel", notObservedChannel.channel), zap.Error(err)) + } + + go func() { + channelWithChannels.channels <- &ChannelBlocksWithName[T]{Name: chBlocks.channel, Blocks: chBlocks.channelWithBlocks} + }() + } +} diff --git a/observer/channels_blocks_peer_parsed.go b/observer/channels_blocks_peer_parsed.go new file mode 100644 index 0000000..5fa7dbe --- /dev/null +++ b/observer/channels_blocks_peer_parsed.go @@ -0,0 +1,18 @@ +package observer + +import ( + "github.com/s7techlab/hlf-sdk-go/api" + hlfproto "github.com/s7techlab/hlf-sdk-go/block" +) + +type ChannelsBlocksPeerParsed struct { + *ChannelsBlocksPeer[*hlfproto.Block] +} + +func NewChannelsBlocksPeerParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...ChannelsBlocksPeerOpt) *ChannelsBlocksPeerParsed { + createStreamWithRetry := CreateBlockStreamWithRetryDelay[*hlfproto.Block](DefaultConnectRetryDelay) + + channelsBlocksPeerParsed := NewChannelsBlocksPeer[*hlfproto.Block](peerChannels, blocksDeliver.ParsedBlocks, createStreamWithRetry, opts...) + + return &ChannelsBlocksPeerParsed{ChannelsBlocksPeer: channelsBlocksPeerParsed} +} diff --git a/observer/block_peer_parsed_test.go b/observer/channels_blocks_peer_parsed_test.go similarity index 60% rename from observer/block_peer_parsed_test.go rename to observer/channels_blocks_peer_parsed_test.go index 3f6df79..e434a42 100644 --- a/observer/block_peer_parsed_test.go +++ b/observer/channels_blocks_peer_parsed_test.go @@ -7,66 +7,67 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + hlfproto "github.com/s7techlab/hlf-sdk-go/block" sdkmocks "github.com/s7techlab/hlf-sdk-go/client/deliver/testing" "github.com/s7techlab/hlf-sdk-go/observer" testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks" ) var ( - channelPeerMockForParsed *observer.ChannelPeerMock - parsedBlockPeer *observer.ParsedBlockPeer - parsedBlocks <-chan *observer.ParsedBlock + peerChannelsMockForParsed *observer.PeerChannelsMock + channelsBlocksPeerParsed *observer.ChannelsBlocksPeerParsed + parsedBlocks <-chan *observer.Block[*hlfproto.Block] - channelPeerMockConcurrentlyForParsed *observer.ChannelPeerMock - parsedBlockPeerConcurrently *observer.ParsedBlockPeer - parsedBlocksByChannels *observer.ParsedBlocksByChannels + peerChannelsMockConcurrentlyForParsed *observer.PeerChannelsMock + channelsBlocksPeerConcurrentlyParsed *observer.ChannelsBlocksPeerParsed + channelWithChannelsParsed *observer.ChannelWithChannels[*hlfproto.Block] ) -func blockPeerParsedTestBeforeSuit() { +func channelsBlocksPeerParsedTestBeforeSuit() { const closeChannelWhenAllRead = true blockDelivererMock, err := sdkmocks.NewBlocksDelivererMock(fmt.Sprintf("../%s", testdata.Path), closeChannelWhenAllRead) Expect(err).ShouldNot(HaveOccurred()) - channelPeerMockForParsed = observer.NewChannelPeerMock() + peerChannelsMockForParsed = observer.NewPeerChannelsMock() for _, channel := range testdata.Channels { - channelPeerMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - commonBlockPeerForParsed := observer.NewBlockPeer(channelPeerMockForParsed, blockDelivererMock, observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) - parsedBlockPeer = observer.NewParsedBlockPeer(commonBlockPeerForParsed) + channelsBlocksPeerParsed = observer.NewChannelsBlocksPeerParsed(peerChannelsMockForParsed, blockDelivererMock, + observer.WithBlockStopRecreateStream(true), observer.WithChannelsBlocksPeerRefreshPeriod(time.Nanosecond)) - parsedBlocks = parsedBlockPeer.Observe(ctx) + parsedBlocks = channelsBlocksPeerParsed.Observe(ctx) - channelPeerMockConcurrentlyForParsed = observer.NewChannelPeerMock() + peerChannelsMockConcurrentlyForParsed = observer.NewPeerChannelsMock() for _, channel := range testdata.Channels { - channelPeerMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - commonBlockPeerConcurrentlyForParsed := observer.NewBlockPeer(channelPeerMockConcurrentlyForParsed, blockDelivererMock, observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) - parsedBlockPeerConcurrently = observer.NewParsedBlockPeer(commonBlockPeerConcurrentlyForParsed) + channelsBlocksPeerConcurrentlyParsed = observer.NewChannelsBlocksPeerParsed(peerChannelsMockConcurrentlyForParsed, blockDelivererMock, + observer.WithBlockStopRecreateStream(true), observer.WithChannelsBlocksPeerRefreshPeriod(time.Nanosecond)) - parsedBlocksByChannels = parsedBlockPeerConcurrently.ObserveByChannels(ctx) + channelWithChannelsParsed = channelsBlocksPeerConcurrentlyParsed.ObserveByChannels(ctx) } -var _ = Describe("Block Peer", func() { - Context("Block peer", func() { +var _ = Describe("All channels blocks parsed", func() { + Context("Sequentially", func() { It("should return current number of channels", func() { - channelObservers := parsedBlockPeer.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels))) + channels := channelsBlocksPeerParsed.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels))) }) - It("should add channels to channelPeerMock", func() { + It("should add channels to peerChannelsMock", func() { newChannels := map[string]struct{}{"channel1": {}, "channel2": {}, "channel3": {}} for channel := range newChannels { - channelPeerMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to parsedBlockPeer observer + // wait to channelsBlocksPeerParsed observer time.Sleep(time.Second + time.Millisecond*10) - channelObservers := parsedBlockPeer.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) + channels := channelsBlocksPeerParsed.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels) + len(newChannels))) }) It("should return correct channels heights", func() { @@ -97,12 +98,12 @@ var _ = Describe("Block Peer", func() { }) }) - Context("Block peer concurrently", func() { + Context("Concurrently", func() { It("should return current number of channels", func() { - channelObservers := parsedBlockPeerConcurrently.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels))) + channels := channelsBlocksPeerConcurrentlyParsed.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels))) - channelsWithBlocks := parsedBlocksByChannels.Observe() + channelsWithBlocks := channelWithChannelsParsed.Observe() for i := 0; i < len(testdata.Channels); i++ { sampleOrFabcarChannelBlocks := <-channelsWithBlocks @@ -137,20 +138,20 @@ var _ = Describe("Block Peer", func() { } }) - It("should add channels to channelPeerMock", func() { + It("should add channels to peerChannelsMock", func() { channel4, channel5, channel6 := "channel4", "channel5", "channel6" newChannels := []string{channel4, channel5, channel6} for _, channel := range newChannels { - channelPeerMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + peerChannelsMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) } - // wait to blockPeer observer + // wait to channelsBlocksPeerParsed observer time.Sleep(time.Millisecond * 200) - channelObservers := parsedBlockPeerConcurrently.ChannelObservers() - Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) + channels := channelsBlocksPeerConcurrentlyParsed.Channels() + Expect(channels).To(HaveLen(len(testdata.Channels) + len(newChannels))) - channelsWithBlocks := parsedBlocksByChannels.Observe() + channelsWithBlocks := channelWithChannelsParsed.Observe() for i := 0; i < len(newChannels); i++ { channel4Or5Or6Blocks := <-channelsWithBlocks diff --git a/observer/channel_matcher.go b/observer/channels_matcher.go similarity index 100% rename from observer/channel_matcher.go rename to observer/channels_matcher.go diff --git a/observer/event.go b/observer/event.go deleted file mode 100644 index 870f3ef..0000000 --- a/observer/event.go +++ /dev/null @@ -1,23 +0,0 @@ -package observer - -import ( - "context" - - "github.com/hyperledger/fabric-protos-go/peer" -) - -type ( - Event struct { - Block *peer.ChaincodeEvent - Channel string - Error error - } - - CreateEventStream func(context.Context) (<-chan *peer.ChaincodeEvent, error) - - CreateEventStreamWithRetry func(context.Context, CreateEventStream) (<-chan *peer.ChaincodeEvent, error) - - EventTransformer interface { - Transform(*Event) - } -) diff --git a/observer/old_files/event.go b/observer/old_files/event.go new file mode 100644 index 0000000..da0a6b1 --- /dev/null +++ b/observer/old_files/event.go @@ -0,0 +1,23 @@ +package old_files + +//import ( +// "context" +// +// "github.com/hyperledger/fabric-protos-go/peer" +//) +// +//type ( +// Event struct { +// Block *peer.ChaincodeEvent +// Channel string +// Error error +// } +// +// CreateEventStream func(context.Context) (<-chan *peer.ChaincodeEvent, error) +// +// CreateEventStreamWithRetry func(context.Context, CreateEventStream) (<-chan *peer.ChaincodeEvent, error) +// +// EventTransformer interface { +// Transform(*Event) +// } +//) diff --git a/observer/event_channel.go b/observer/old_files/event_channel.go similarity index 94% rename from observer/event_channel.go rename to observer/old_files/event_channel.go index 4eee327..0333d39 100644 --- a/observer/event_channel.go +++ b/observer/old_files/event_channel.go @@ -1,4 +1,4 @@ -package observer +package old_files // type ( // EventChannel struct { diff --git a/observer/event_peer.go b/observer/old_files/event_peer.go similarity index 86% rename from observer/event_peer.go rename to observer/old_files/event_peer.go index b0edc85..a1fe666 100644 --- a/observer/event_peer.go +++ b/observer/old_files/event_peer.go @@ -1,4 +1,4 @@ -package observer +package old_files // type ( // EventPeer struct { diff --git a/observer/opts.go b/observer/old_files/opts.go similarity index 99% rename from observer/opts.go rename to observer/old_files/opts.go index b380d44..3808181 100644 --- a/observer/opts.go +++ b/observer/old_files/opts.go @@ -1,4 +1,4 @@ -package observer +package old_files // import ( // "context" diff --git a/observer/peer_channels.go b/observer/peer_channels.go new file mode 100644 index 0000000..af3623d --- /dev/null +++ b/observer/peer_channels.go @@ -0,0 +1,213 @@ +package observer + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric-protos-go/common" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/s7techlab/hlf-sdk-go/api" +) + +const DefaultPeerChannelsRefreshPeriod = 30 * time.Second + +type ( + // PeerReader implement it to create your service as peer (for example, message broker) + PeerReader interface { + PeerChannelsFetcher + api.BlocksDeliverer + api.ParsedBlocksDeliverer + } + + PeerChannelsGetter interface { + URI() string + Channels() map[string]*ChannelInfo + } + + PeerChannelsFetcher interface { + URI() string + api.ChannelListGetter + api.ChainInfoGetter + } + + ChannelInfo struct { + Channel string + Height uint64 + UpdatedAt *timestamppb.Timestamp + } + + // PeerChannels observes for peer channels + PeerChannels struct { + channels map[string]*ChannelInfo + + channelFetcher PeerChannelsFetcher + channelsMatcher *ChannelsMatcher + refreshPeriod time.Duration + + mu sync.RWMutex + logger *zap.Logger + + lastError error + + isWork bool + cancelObserve context.CancelFunc + } + + PeerChannelsOpts struct { + channels []ChannelToMatch + refreshPeriod time.Duration + logger *zap.Logger + } + + PeerChannelsOpt func(*PeerChannelsOpts) +) + +var DefaultPeerChannelsOpts = &PeerChannelsOpts{ + channels: MatchAllChannels, + refreshPeriod: DefaultPeerChannelsRefreshPeriod, + logger: zap.NewNop(), +} + +func WithChannels(channels []ChannelToMatch) PeerChannelsOpt { + return func(opts *PeerChannelsOpts) { + opts.channels = channels + } +} + +func WithPeerChannelsLogger(logger *zap.Logger) PeerChannelsOpt { + return func(opts *PeerChannelsOpts) { + opts.logger = logger + } +} + +func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...PeerChannelsOpt) (*PeerChannels, error) { + peerChannelsOpts := DefaultPeerChannelsOpts + for _, opt := range opts { + opt(peerChannelsOpts) + } + + channelsMatcher, err := NewChannelsMatcher(peerChannelsOpts.channels) + if err != nil { + return nil, fmt.Errorf(`channels matcher: %w`, err) + } + + peerChannels := &PeerChannels{ + channelFetcher: peerChannelsFetcher, + channelsMatcher: channelsMatcher, + channels: make(map[string]*ChannelInfo), + refreshPeriod: peerChannelsOpts.refreshPeriod, + logger: peerChannelsOpts.logger, + } + + return peerChannels, nil +} + +func (pc *PeerChannels) Stop() { + pc.cancelObserve() + pc.isWork = false +} + +func (pc *PeerChannels) Observe(ctx context.Context) { + if pc.isWork { + return + } + + // ctxObserve using for nested control process without stopped primary context + ctxObserve, cancel := context.WithCancel(context.Background()) + pc.cancelObserve = cancel + + go func() { + pc.isWork = true + pc.updateChannels(ctxObserve) + + ticker := time.NewTicker(pc.refreshPeriod) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // If primary context is done then cancel ctxObserver + pc.cancelObserve() + return + + case <-ctxObserve.Done(): + return + + case <-ticker.C: + pc.updateChannels(ctxObserve) + } + } + }() +} + +func (pc *PeerChannels) URI() string { + return pc.channelFetcher.URI() +} + +func (pc *PeerChannels) Channels() map[string]*ChannelInfo { + pc.mu.RLock() + defer pc.mu.RUnlock() + + var copyChannelInfo = make(map[string]*ChannelInfo, len(pc.channels)) + for key, value := range pc.channels { + copyChannelInfo[key] = value + } + + return copyChannelInfo +} + +func (pc *PeerChannels) updateChannels(ctx context.Context) { + pc.logger.Debug(`fetching channels`) + channelsInfo, err := pc.channelFetcher.GetChannels(ctx) + if err != nil { + pc.logger.Warn(`error while fetching channels`, zap.Error(err)) + pc.lastError = err + return + } + + channels := ChannelsInfoToStrings(channelsInfo.Channels) + pc.logger.Debug(`channels fetched`, zap.Strings(`channels`, channels)) + + channelsMatched, err := pc.channelsMatcher.Match(channels) + if err != nil { + pc.logger.Warn(`channel matching error`, zap.Error(err)) + pc.lastError = err + return + } + pc.logger.Debug(`channels matched`, zap.Reflect(`channels`, channelsMatched)) + + channelHeights := make(map[string]uint64) + + for _, channel := range channelsMatched { + var channelInfo *common.BlockchainInfo + channelInfo, err = pc.channelFetcher.GetChainInfo(ctx, channel.Name) + if err != nil { + pc.lastError = err + continue + } + channelHeights[channel.Name] = channelInfo.Height + } + + pc.mu.Lock() + defer pc.mu.Unlock() + + for channel, height := range channelHeights { + var updatedAt *timestamp.Timestamp + updatedAt, err = ptypes.TimestampProto(time.Now()) + if err != nil { + pc.lastError = err + } + + pc.channels[channel] = &ChannelInfo{ + Channel: channel, + Height: height, + UpdatedAt: updatedAt, + } + } +} diff --git a/observer/peer_channels_fetcher_mock.go b/observer/peer_channels_fetcher_mock.go new file mode 100644 index 0000000..758d484 --- /dev/null +++ b/observer/peer_channels_fetcher_mock.go @@ -0,0 +1,39 @@ +package observer + +import ( + "context" + "fmt" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type PeerChannelsFetcherMock struct { + channels map[string]uint64 +} + +func NewPeerChannelsFetcherMock(channels map[string]uint64) *PeerChannelsFetcherMock { + return &PeerChannelsFetcherMock{channels: channels} +} + +func (p *PeerChannelsFetcherMock) URI() string { + return "mock" +} + +func (p *PeerChannelsFetcherMock) GetChannels(context.Context) (*peer.ChannelQueryResponse, error) { + var channels []*peer.ChannelInfo + for channelName := range p.channels { + channels = append(channels, &peer.ChannelInfo{ChannelId: channelName}) + } + + return &peer.ChannelQueryResponse{Channels: channels}, nil +} + +func (p *PeerChannelsFetcherMock) GetChainInfo(_ context.Context, channel string) (*common.BlockchainInfo, error) { + chHeight, exists := p.channels[channel] + if !exists { + return nil, fmt.Errorf("channel '%s' does not exist", channel) + } + + return &common.BlockchainInfo{Height: chHeight}, nil +} diff --git a/observer/peer_channels_mock.go b/observer/peer_channels_mock.go new file mode 100644 index 0000000..4ad55ed --- /dev/null +++ b/observer/peer_channels_mock.go @@ -0,0 +1,42 @@ +package observer + +import ( + "sync" +) + +type PeerChannelsMock struct { + mu sync.Mutex + channelsInfo map[string]*ChannelInfo +} + +func NewPeerChannelsMock(channelsInfo ...*ChannelInfo) *PeerChannelsMock { + channels := make(map[string]*ChannelInfo, len(channelsInfo)) + for _, channelInfo := range channelsInfo { + channels[channelInfo.Channel] = channelInfo + } + + return &PeerChannelsMock{channelsInfo: channels} +} + +func (p *PeerChannelsMock) URI() string { + return "mock" +} + +func (p *PeerChannelsMock) Channels() map[string]*ChannelInfo { + p.mu.Lock() + defer p.mu.Unlock() + + var copyChannelInfo = make(map[string]*ChannelInfo, len(p.channelsInfo)) + for key, value := range p.channelsInfo { + copyChannelInfo[key] = value + } + + return copyChannelInfo +} + +func (p *PeerChannelsMock) UpdateChannelInfo(channelInfo *ChannelInfo) { + p.mu.Lock() + defer p.mu.Unlock() + + p.channelsInfo[channelInfo.Channel] = channelInfo +} diff --git a/observer/channel_peer_test.go b/observer/peer_channels_test.go similarity index 68% rename from observer/channel_peer_test.go rename to observer/peer_channels_test.go index 1e61264..07cfed6 100644 --- a/observer/channel_peer_test.go +++ b/observer/peer_channels_test.go @@ -10,22 +10,22 @@ import ( testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks" ) -var _ = Describe("Channel peer", func() { +var _ = Describe("Peer channels", func() { var ( - channelPeerFetcherMock observer.PeerChannelsFetcher + peerChannelsFetcherMock observer.PeerChannelsFetcher ) BeforeEach(func() { - channelPeerFetcherMock = observer.NewChannelPeerFetcherMock(testdata.ChannelsHeights) + peerChannelsFetcherMock = observer.NewPeerChannelsFetcherMock(testdata.ChannelsHeights) }) - It("default channel peer, no channel matcher", func() { - channelPeer, err := observer.NewChannelPeer(channelPeerFetcherMock) + It("default peer channels, no channel matcher", func() { + peerChannels, err := observer.NewPeerChannels(peerChannelsFetcherMock) Expect(err).To(BeNil()) - channelPeer.Observe(ctx) + peerChannels.Observe(ctx) time.Sleep(time.Millisecond * 100) - channelsMap := channelPeer.Channels() + channelsMap := peerChannels.Channels() sampleChannelInfo, exist := channelsMap[testdata.SampleChannel] Expect(exist).To(BeTrue()) @@ -38,15 +38,15 @@ var _ = Describe("Channel peer", func() { Expect(fabcarChannelInfo.Height).To(Equal(testdata.FabcarChannelHeight)) }) - It("default channel peer, with channel matcher, exclude Fabcar", func() { - channelPeer, err := observer.NewChannelPeer(channelPeerFetcherMock, + It("default peer channels, with channel matcher, exclude Fabcar", func() { + peerChannels, err := observer.NewPeerChannels(peerChannelsFetcherMock, observer.WithChannels([]observer.ChannelToMatch{{MatchPattern: testdata.SampleChannel}})) Expect(err).To(BeNil()) - channelPeer.Observe(ctx) + peerChannels.Observe(ctx) time.Sleep(time.Millisecond * 100) - channelsMap := channelPeer.Channels() + channelsMap := peerChannels.Channels() sampleChannelInfo, exist := channelsMap[testdata.SampleChannel] Expect(exist).To(BeTrue()) diff --git a/observer/stream.go b/observer/stream.go index 11b6c12..560cb3a 100644 --- a/observer/stream.go +++ b/observer/stream.go @@ -6,26 +6,26 @@ import ( "sync" ) -type Stream interface { - Subscribe() (ch chan *Block, closer func()) +type Stream[T any] interface { + Subscribe() (ch <-chan *Block[T], closer func()) } -type BlocksStream struct { - connections map[string]chan *Block +type BlocksStream[T any] struct { + connections map[string]chan *Block[T] mu *sync.RWMutex isWork bool cancelObserve context.CancelFunc } -func NewBlocksStream() *BlocksStream { - return &BlocksStream{ - connections: make(map[string]chan *Block), +func NewBlocksStream[T any]() *BlocksStream[T] { + return &BlocksStream[T]{ + connections: make(map[string]chan *Block[T]), mu: &sync.RWMutex{}, } } -func (b *BlocksStream) Observe(ctx context.Context, blocks <-chan *Block) { +func (b *BlocksStream[T]) Observe(ctx context.Context, blocks <-chan *Block[T]) { if b.isWork { return } @@ -36,9 +36,11 @@ func (b *BlocksStream) Observe(ctx context.Context, blocks <-chan *Block) { go func() { defer func() { + b.mu.Lock() for connName := range b.connections { b.closeChannel(connName) } + b.mu.Unlock() }() b.isWork = true @@ -65,9 +67,9 @@ func (b *BlocksStream) Observe(ctx context.Context, blocks <-chan *Block) { }() } -func (b *BlocksStream) Subscribe() (chan *Block, func()) { +func (b *BlocksStream[T]) Subscribe() (<-chan *Block[T], func()) { b.mu.Lock() - newConnection := make(chan *Block) + newConnection := make(chan *Block[T]) name := "channel-" + strconv.Itoa(len(b.connections)) b.connections[name] = newConnection b.mu.Unlock() @@ -77,14 +79,14 @@ func (b *BlocksStream) Subscribe() (chan *Block, func()) { return newConnection, closer } -func (b *BlocksStream) closeChannel(name string) { +func (b *BlocksStream[T]) closeChannel(name string) { b.mu.Lock() close(b.connections[name]) delete(b.connections, name) b.mu.Unlock() } -func (b *BlocksStream) Stop() { +func (b *BlocksStream[T]) Stop() { if b.cancelObserve != nil { b.cancelObserve() } diff --git a/observer/stream_parsed.go b/observer/stream_parsed.go deleted file mode 100644 index b128171..0000000 --- a/observer/stream_parsed.go +++ /dev/null @@ -1,104 +0,0 @@ -package observer - -import ( - "context" - "strconv" - "sync" -) - -type StreamParsed interface { - Subscribe() (ch chan *ParsedBlock, closer func()) -} - -type ParsedBlocksStream struct { - connectionsParsed map[string]chan *ParsedBlock - mu *sync.RWMutex - - isWork bool - cancelObserve context.CancelFunc -} - -func NewParsedBlocksStream() *ParsedBlocksStream { - return &ParsedBlocksStream{ - connectionsParsed: make(map[string]chan *ParsedBlock), - mu: &sync.RWMutex{}, - } -} - -func (b *ParsedBlocksStream) Observe(ctx context.Context, blocks <-chan *ParsedBlock) { - if b.isWork { - return - } - - // ctxObserve using for nested control process without stopped primary context - ctxObserve, cancel := context.WithCancel(ctx) - b.cancelObserve = cancel - - go func() { - defer func() { - for connName := range b.connectionsParsed { - b.closeChannel(connName) - } - }() - - b.isWork = true - - for { - select { - case <-ctxObserve.Done(): - // If primary context is done then cancel ctxObserver - b.cancelObserve() - return - - case block, ok := <-blocks: - if !ok { - return - } - - b.mu.RLock() - for _, connection := range b.connectionsParsed { - connection <- block - } - b.mu.RUnlock() - } - } - }() -} - -func (b *ParsedBlocksStream) Subscribe() (chan *ParsedBlock, func()) { - b.mu.Lock() - newConnection := make(chan *ParsedBlock) - name := "channel-" + strconv.Itoa(len(b.connectionsParsed)) - b.connectionsParsed[name] = newConnection - b.mu.Unlock() - - closer := func() { b.closeChannel(name) } - - return newConnection, closer -} - -func (b *ParsedBlocksStream) SubscribeParsed() (chan *ParsedBlock, func()) { - b.mu.Lock() - newConnection := make(chan *ParsedBlock) - name := "channel-" + strconv.Itoa(len(b.connectionsParsed)) - b.connectionsParsed[name] = newConnection - b.mu.Unlock() - - closer := func() { b.closeChannel(name) } - - return newConnection, closer -} - -func (b *ParsedBlocksStream) closeChannel(name string) { - b.mu.Lock() - close(b.connectionsParsed[name]) - delete(b.connectionsParsed, name) - b.mu.Unlock() -} - -func (b *ParsedBlocksStream) Stop() { - if b.cancelObserve != nil { - b.cancelObserve() - } - b.isWork = false -} diff --git a/observer/suite_test.go b/observer/suite_test.go index 0738dfb..5117f0a 100644 --- a/observer/suite_test.go +++ b/observer/suite_test.go @@ -13,6 +13,6 @@ func TestObservers(t *testing.T) { } var _ = BeforeSuite(func() { - blockPeerCommonTestBeforeSuit() - blockPeerParsedTestBeforeSuit() + channelsBlocksPeerCommonTestBeforeSuit() + channelsBlocksPeerParsedTestBeforeSuit() }) diff --git a/observer/transformer.go b/observer/transformer.go deleted file mode 100644 index dc9c87a..0000000 --- a/observer/transformer.go +++ /dev/null @@ -1,6 +0,0 @@ -package observer - -// BlockTransformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json -type BlockTransformer interface { - Transform(*ParsedBlock) error -} diff --git a/service/ccpackage/packages.pb.go b/service/ccpackage/packages.pb.go index 5b479eb..03f8d91 100644 --- a/service/ccpackage/packages.pb.go +++ b/service/ccpackage/packages.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: ccpackage/packages.proto diff --git a/service/systemcc/cscc/cscc.pb.go b/service/systemcc/cscc/cscc.pb.go index 01e2158..1f67193 100644 --- a/service/systemcc/cscc/cscc.pb.go +++ b/service/systemcc/cscc/cscc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: systemcc/cscc/cscc.proto diff --git a/service/systemcc/lifecycle/lifecycle.pb.go b/service/systemcc/lifecycle/lifecycle.pb.go index 08c4942..c1e509b 100644 --- a/service/systemcc/lifecycle/lifecycle.pb.go +++ b/service/systemcc/lifecycle/lifecycle.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: systemcc/lifecycle/lifecycle.proto diff --git a/service/systemcc/lscc/lscc.pb.go b/service/systemcc/lscc/lscc.pb.go index b414d49..da42d1e 100644 --- a/service/systemcc/lscc/lscc.pb.go +++ b/service/systemcc/lscc/lscc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: systemcc/lscc/lscc.proto diff --git a/service/systemcc/qscc/qscc.pb.go b/service/systemcc/qscc/qscc.pb.go index 936825c..ddd13fc 100644 --- a/service/systemcc/qscc/qscc.pb.go +++ b/service/systemcc/qscc/qscc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: systemcc/qscc/qscc.proto diff --git a/service/wallet/wallet.pb.go b/service/wallet/wallet.pb.go index 90cfe3d..fe160ab 100644 --- a/service/wallet/wallet.pb.go +++ b/service/wallet/wallet.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.32.0 // protoc (unknown) // source: wallet/wallet.proto @@ -828,19 +828,19 @@ const _ = grpc.SupportPackageIsVersion6 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type WalletServiceClient interface { - // Получить identity + // get identity IdentityGet(ctx context.Context, in *IdentityLabel, opts ...grpc.CallOption) (*IdentityInWallet, error) - // Получить identity в виде текста + // Get identity like text IdentityGetText(ctx context.Context, in *IdentityLabel, opts ...grpc.CallOption) (*IdentityInWalletText, error) - // Записать identity + // set identity IdentitySet(ctx context.Context, in *Identity, opts ...grpc.CallOption) (*IdentityInWallet, error) - // Записать identity в зашифрованном виде + // set identity in encrypted form IdentitySetWithPassword(ctx context.Context, in *IdentityWithPassword, opts ...grpc.CallOption) (*IdentityInWallet, error) - // Получить identity из зашифрованного вида + // get identity from encrypted view IdentityGetWithPassword(ctx context.Context, in *IdentityPassword, opts ...grpc.CallOption) (*IdentityInWallet, error) - // Список identity + // identity list IdentityList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*IdentityLabels, error) - // Удалить identity + // delete identity IdentityDelete(ctx context.Context, in *IdentityLabel, opts ...grpc.CallOption) (*IdentityInWallet, error) } @@ -917,19 +917,19 @@ func (c *walletServiceClient) IdentityDelete(ctx context.Context, in *IdentityLa // WalletServiceServer is the server API for WalletService service. type WalletServiceServer interface { - // Получить identity + // get identity IdentityGet(context.Context, *IdentityLabel) (*IdentityInWallet, error) - // Получить identity в виде текста + // Get identity like text IdentityGetText(context.Context, *IdentityLabel) (*IdentityInWalletText, error) - // Записать identity + // set identity IdentitySet(context.Context, *Identity) (*IdentityInWallet, error) - // Записать identity в зашифрованном виде + // set identity in encrypted form IdentitySetWithPassword(context.Context, *IdentityWithPassword) (*IdentityInWallet, error) - // Получить identity из зашифрованного вида + // get identity from encrypted view IdentityGetWithPassword(context.Context, *IdentityPassword) (*IdentityInWallet, error) - // Список identity + // identity list IdentityList(context.Context, *emptypb.Empty) (*IdentityLabels, error) - // Удалить identity + // delete identity IdentityDelete(context.Context, *IdentityLabel) (*IdentityInWallet, error) } diff --git a/service/wallet/wallet.proto b/service/wallet/wallet.proto index 584659f..d92b0df 100644 --- a/service/wallet/wallet.proto +++ b/service/wallet/wallet.proto @@ -10,21 +10,21 @@ import "validate/validate.proto"; service WalletService { - // Получить identity + // get identity rpc IdentityGet (IdentityLabel) returns (IdentityInWallet) { option (google.api.http) = { get: "/wallet/identities/{label}" }; } - // Получить identity в виде текста + // Get identity like text rpc IdentityGetText (IdentityLabel) returns (IdentityInWalletText) { option (google.api.http) = { get: "/wallet/identities/{label}/text" }; } - // Записать identity + // set identity rpc IdentitySet (Identity) returns (IdentityInWallet) { option (google.api.http) = { put: "/wallet/identities" @@ -32,7 +32,7 @@ service WalletService { }; } - // Записать identity в зашифрованном виде + // set identity in encrypted form rpc IdentitySetWithPassword (IdentityWithPassword) returns (IdentityInWallet) { option (google.api.http) = { put: "/wallet/identities/withpassword" @@ -47,7 +47,7 @@ service WalletService { // }; // } - // Получить identity из зашифрованного вида + // get identity from encrypted view rpc IdentityGetWithPassword (IdentityPassword) returns (IdentityInWallet) { option (google.api.http) = { post: "/wallet/identities/withpassword" @@ -55,14 +55,14 @@ service WalletService { }; } - // Список identity + // identity list rpc IdentityList (google.protobuf.Empty) returns (IdentityLabels) { option (google.api.http) = { get: "/wallet/identities" }; } - // Удалить identity + // delete identity rpc IdentityDelete (IdentityLabel) returns (IdentityInWallet) { option (google.api.http) = { delete: "/wallet/identities/{label}" diff --git a/service/wallet/wallet.swagger.json b/service/wallet/wallet.swagger.json index a7e686e..3809360 100644 --- a/service/wallet/wallet.swagger.json +++ b/service/wallet/wallet.swagger.json @@ -13,7 +13,7 @@ "paths": { "/wallet/identities": { "get": { - "summary": "Список identity", + "summary": "identity list", "operationId": "WalletService_IdentityList", "responses": { "200": { @@ -34,7 +34,7 @@ ] }, "put": { - "summary": "Записать identity", + "summary": "set identity", "operationId": "WalletService_IdentitySet", "responses": { "200": { @@ -67,7 +67,7 @@ }, "/wallet/identities/withpassword": { "post": { - "summary": "Получить identity из зашифрованного вида", + "summary": "get identity from encrypted view", "operationId": "WalletService_IdentityGetWithPassword", "responses": { "200": { @@ -98,7 +98,7 @@ ] }, "put": { - "summary": "Записать identity в зашифрованном виде", + "summary": "set identity in encrypted form", "operationId": "WalletService_IdentitySetWithPassword", "responses": { "200": { @@ -131,7 +131,7 @@ }, "/wallet/identities/{label}": { "get": { - "summary": "Получить identity", + "summary": "get identity", "operationId": "WalletService_IdentityGet", "responses": { "200": { @@ -160,7 +160,7 @@ ] }, "delete": { - "summary": "Удалить identity", + "summary": "delete identity", "operationId": "WalletService_IdentityDelete", "responses": { "200": { @@ -191,7 +191,7 @@ }, "/wallet/identities/{label}/text": { "get": { - "summary": "Получить identity в виде текста", + "summary": "Get identity like text", "operationId": "WalletService_IdentityGetText", "responses": { "200": {