diff --git a/pkg/cloudevents/transport/pubsub/transport.go b/pkg/cloudevents/transport/pubsub/transport.go index 5b02f8d73..85939d3e5 100644 --- a/pkg/cloudevents/transport/pubsub/transport.go +++ b/pkg/cloudevents/transport/pubsub/transport.go @@ -263,6 +263,7 @@ func (t *Transport) StartReceiver(ctx context.Context) error { } cctx, cancel := context.WithCancel(ctx) + defer cancel() n := len(t.subscriptions) // Make the channels for quit and errors. @@ -280,22 +281,26 @@ func (t *Transport) StartReceiver(ctx context.Context) error { }) } - // Block for parent context to finish. - <-ctx.Done() - cancel() - // Collect errors and done calls until we have n of them. errs := []string(nil) for success := 0; success < n; success++ { var err error select { - case err = <-errc: + case <-ctx.Done(): // Block for parent context to finish. + success-- + case err = <-errc: // Collect errors case <-quit: } + if cancel != nil { + // Stop all other subscriptions. + cancel() + cancel = nil + } if err != nil { errs = append(errs, err.Error()) } } + close(quit) close(errc)