Skip to content

Commit

Permalink
Project import generated by Copybara.
Browse files Browse the repository at this point in the history
FolderOrigin-RevId: /tmp/fabric-sdk
  • Loading branch information
Deploy authored and Deploy committed Sep 6, 2019
1 parent 35a0aec commit 9009909
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
33 changes: 26 additions & 7 deletions peer/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (d *deliverImpl) SubscribeEvents(ctx context.Context, channelName string, c
return nil, err
}

return events.Serve(sub), nil
return events.Serve(sub, sub.readyForHandling), nil
}

func (d *deliverImpl) SubscribeCC(ctx context.Context, channelName string, ccName string, seekOpt ...api.EventCCSeekOption) (api.EventCCSubscription, error) {
Expand All @@ -125,7 +125,7 @@ func (d *deliverImpl) SubscribeCC(ctx context.Context, channelName string, ccNam
return nil, err
}

return events.Serve(sub), nil
return events.Serve(sub, sub.readyForHandling), nil
}

func (d *deliverImpl) SubscribeTx(ctx context.Context, channelName string, txId api.ChaincodeTx, seekOpt ...api.EventCCSeekOption) (api.TxSubscription, error) {
Expand All @@ -135,7 +135,7 @@ func (d *deliverImpl) SubscribeTx(ctx context.Context, channelName string, txId
return nil, err
}

return txSub.Serve(sub), nil
return txSub.Serve(sub, sub.readyForHandling), nil
}

func (d *deliverImpl) SubscribeBlock(ctx context.Context, channelName string, seekOpt ...api.EventCCSeekOption) (api.BlockSubscription, error) {
Expand All @@ -146,7 +146,7 @@ func (d *deliverImpl) SubscribeBlock(ctx context.Context, channelName string, se
return nil, err
}

return blocker.Serve(sub), nil
return blocker.Serve(sub, sub.readyForHandling), nil
}

func (d *deliverImpl) handleSubscription(ctx context.Context, channel string, blockHandler subs.BlockHandler, seekOpt ...api.EventCCSeekOption) (*subscriptionImpl, error) {
Expand All @@ -163,7 +163,9 @@ func (d *deliverImpl) handleSubscription(ctx context.Context, channel string, bl
return nil, errors.Wrap(err, `failed to get seek envelope`)
}

stream, err := d.cli.Deliver(ctx)
subCtx, stopSub := context.WithCancel(ctx)

stream, err := d.cli.Deliver(subCtx)
if err != nil {
return nil, errors.Wrap(err, `failed to open deliver stream`)
}
Expand All @@ -173,17 +175,20 @@ func (d *deliverImpl) handleSubscription(ctx context.Context, channel string, bl
return nil, errors.Wrap(err, `failed to send seek envelope to stream`)
}

return makeSubscription(stream, blockHandler), nil
return makeSubscription(subCtx, stopSub, stream, blockHandler), nil
}

func makeSubscription(stream peer.Deliver_DeliverClient, blockHandler subs.BlockHandler) *subscriptionImpl {
func makeSubscription(ctx context.Context, stop context.CancelFunc, stream peer.Deliver_DeliverClient, blockHandler subs.BlockHandler) *subscriptionImpl {
s := &subscriptionImpl{
ctx: ctx,
stop: stop,
stream: stream,
blockHandler: blockHandler,
once: new(sync.Once),
err: make(chan error, 1), // only one error
done: make(chan *struct{}), // done will be closed after finished sub.handle
up: make(chan *struct{}),
run: make(chan *struct{}),
}

go s.handle()
Expand All @@ -193,18 +198,23 @@ func makeSubscription(stream peer.Deliver_DeliverClient, blockHandler subs.Block
}

type subscriptionImpl struct {
ctx context.Context
stop context.CancelFunc
blockHandler subs.BlockHandler
stream peer.Deliver_DeliverClient
err chan error
once *sync.Once
done chan *struct{}
up chan *struct{}
run chan *struct{}
}

func (s *subscriptionImpl) handle() {
defer s.Close()
defer close(s.done)
close(s.up)
// wait of set to handler
<-s.run

ctx := s.stream.Context()
for {
Expand Down Expand Up @@ -236,6 +246,14 @@ func (s *subscriptionImpl) handle() {
}
}

func (s *subscriptionImpl) Done() <-chan struct{} {
return s.ctx.Done()
}

func (s *subscriptionImpl) readyForHandling() {
close(s.run)
}

func (s *subscriptionImpl) Err() <-chan error {
return s.err
}
Expand All @@ -249,6 +267,7 @@ func (s *subscriptionImpl) Close() error {

s.once.Do(func() {
err = s.stream.CloseSend()
s.stop()
//wait of stop handler
<-s.done
// close all channels
Expand Down
13 changes: 10 additions & 3 deletions peer/deliver/subs/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (

type (
// BlockHandler when block == nil is eq EOF and signal for terminate all sub channels
BlockHandler func(block *common.Block) bool
BlockHandler func(block *common.Block) bool
ReadyForHandling func()

ErrorCloser interface {
Done() <-chan struct{}
Err() <-chan error
Errors() chan error
Close() error
Expand All @@ -34,13 +36,18 @@ func (b *BlockSubscription) Handler(block *common.Block) bool {
if block == nil {
close(b.blocks)
} else {
b.blocks <- block
select {
case b.blocks <- block:
case <-b.ErrorCloser.Done():
return true
}
}

return false
}

func (b *BlockSubscription) Serve(base ErrorCloser) *BlockSubscription {
func (b *BlockSubscription) Serve(base ErrorCloser, readyForHandling ReadyForHandling) *BlockSubscription {
b.ErrorCloser = base
readyForHandling()
return b
}
9 changes: 7 additions & 2 deletions peer/deliver/subs/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ func (e *EventSubscription) Handler(block *common.Block) bool {
}
}

e.events <- ev
select {
case e.events <- ev:
case <-e.ErrorCloser.Done():
return true
}
}
}
}
return false
}

func (e *EventSubscription) Serve(base ErrorCloser) *EventSubscription {
func (e *EventSubscription) Serve(base ErrorCloser, readyForHandling ReadyForHandling) *EventSubscription {
e.ErrorCloser = base
readyForHandling()
return e
}
3 changes: 2 additions & 1 deletion peer/deliver/subs/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type TxSubscription struct {
ErrorCloser
}

func (ts *TxSubscription) Serve(sub ErrorCloser) *TxSubscription {
func (ts *TxSubscription) Serve(sub ErrorCloser, readyForHandling ReadyForHandling) *TxSubscription {
ts.ErrorCloser = sub
readyForHandling()
return ts
}

Expand Down

0 comments on commit 9009909

Please sign in to comment.