Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Merge pull request #24 from uber/pause
Browse files Browse the repository at this point in the history
add pause/resume APIs to publisher and consumer
  • Loading branch information
datoug authored May 23, 2017
2 parents ccfe5e5 + d0fedb3 commit 7a6f614
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 46 deletions.
2 changes: 1 addition & 1 deletion client/cherami/authprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type (
}

// BypassAuthProvider is a dummy implementation for AuthProvider
BypassAuthProvider struct {}
BypassAuthProvider struct{}
)

// NewBypassAuthProvider creates a dummy AuthProvider instance
Expand Down
18 changes: 9 additions & 9 deletions client/cherami/basePublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ import (

"github.com/uber-common/bark"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/backoff"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-thrift/.generated/go/cherami"
)

type (
// basePublisher contains the data/code
// common to all types of Publisher
// implementations
basePublisher struct {
idCounter int64
path string
client Client
logger bark.Logger
reporter metrics.Reporter
retryPolicy backoff.RetryPolicy
checksumOption cherami.ChecksumOption
reconfigurationPollingInterval time.Duration
idCounter int64
path string
client Client
logger bark.Logger
reporter metrics.Reporter
retryPolicy backoff.RetryPolicy
checksumOption cherami.ChecksumOption
reconfigurationPollingInterval time.Duration
}

// publishError represents a message publishing error
Expand Down
10 changes: 5 additions & 5 deletions client/cherami/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,17 +357,17 @@ func getDefaultLogger() bark.Logger {

func getDefaultOptions() *ClientOptions {
return &ClientOptions{
Timeout: time.Minute,
Logger: getDefaultLogger(),
MetricsReporter: metrics.NewNullReporter(),
Timeout: time.Minute,
Logger: getDefaultLogger(),
MetricsReporter: metrics.NewNullReporter(),
ReconfigurationPollingInterval: defaultReconfigurationPollingInterval,
}
}

// verifyOptions is used to verify if we have a metrics reporter and
// a logger. If not, just setup a default logger and a null reporter
// it also validate the timeout is sane
func verifyOptions(opts *ClientOptions) (*ClientOptions, error){
func verifyOptions(opts *ClientOptions) (*ClientOptions, error) {
if opts == nil {
opts = getDefaultOptions()
}
Expand Down Expand Up @@ -417,4 +417,4 @@ func isTransientError(err error) bool {

func getMetricTagValueForPath(path string) string {
return strings.Replace(path, "/", "_", -1)
}
}
34 changes: 24 additions & 10 deletions client/cherami/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
reconfigureCh chan reconfigureInfo
closingCh chan struct{}
isClosing int32
paused uint32
logger bark.Logger
reporter metrics.Reporter

Expand Down Expand Up @@ -191,6 +192,14 @@ func (c *consumerImpl) Close() {
c.opened = false
}

func (c *consumerImpl) Pause() {
atomic.StoreUint32(&c.paused, 1)
}

func (c *consumerImpl) Resume() {
atomic.StoreUint32(&c.paused, 0)
}

func (c *consumerImpl) AckDelivery(token string) error {
acknowledger, id, err := c.getAcknowledger(token)
if err != nil {
Expand All @@ -213,22 +222,27 @@ func (c *consumerImpl) reconfigureConsumer() {
c.lk.Lock()
defer c.lk.Unlock()

var err error

select {
case <-c.closingCh:
c.logger.Info("Consumer is closing. Ignore reconfiguration.")
default:
var conn *outputHostConnection

consumerOptions, err := c.client.ReadConsumerGroupHosts(c.path, c.consumerGroupName)
if err != nil {
c.logger.Warnf("Error resolving output hosts: %v", err)
if _, ok := err.(*cherami.EntityNotExistsError); ok {
// ConsumerGroup is deleted. Continue with reconfigure and close all connections
consumerOptions = &cherami.ReadConsumerGroupHostsResult_{}
} else {
// This is a potentially a transient error.
// Retry on next reconfigure
return
consumerOptions := &cherami.ReadConsumerGroupHostsResult_{}
if atomic.LoadUint32(&c.paused) == 0 {
consumerOptions, err = c.client.ReadConsumerGroupHosts(c.path, c.consumerGroupName)
if err != nil {
c.logger.Warnf("Error resolving output hosts: %v", err)
if _, ok := err.(*cherami.EntityNotExistsError); ok {
// ConsumerGroup is deleted. Continue with reconfigure and close all connections
consumerOptions = &cherami.ReadConsumerGroupHostsResult_{}
} else {
// This is a potentially a transient error.
// Retry on next reconfigure
return
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion client/cherami/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

"github.com/stretchr/testify/suite"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-thrift/.generated/go/cherami"

"github.com/stretchr/testify/require"
)
Expand Down
10 changes: 10 additions & 0 deletions client/cherami/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ type (
Publisher interface {
Open() error
Close()
// Pause publishing. All publishing will fail until Resume() is called.
// Note: Pause/Resume APIs only work for streaming publishers(i.e. publish type is PublisherTypeStreaming)
// For non-streaming publishers, Pause/Resume APIs are no-op.
Pause()
// Resume publishing.
Resume()
Publish(message *PublisherMessage) *PublisherReceipt
PublishAsync(message *PublisherMessage, done chan<- *PublisherReceipt) (string, error)
}
Expand Down Expand Up @@ -92,6 +98,10 @@ type (
Open(deliveryCh chan Delivery) (chan Delivery, error)
// Closed all the connections to Cherami nodes for this consumer
Close()
// Pause consuming messages
Pause()
// Resume consuming messages
Resume()
// AckDelivery can be used by application to Ack a message so it is not delivered to any other consumer
AckDelivery(deliveryToken string) error
// NackDelivery can be used by application to Nack a message so it can be delivered to another consumer immediately
Expand Down
46 changes: 33 additions & 13 deletions client/cherami/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
lk sync.Mutex
opened bool
closed bool
paused uint32
connections map[string]*connection
wsConnector WSConnector
reconfigurable *reconfigurable
Expand All @@ -61,14 +62,14 @@ type (
)

const (
maxDuration time.Duration = 1<<62 - 1
defaultMessageTimeout = time.Minute
defaultMessageTimeout = time.Minute
pauseError = `Cherami publisher is paused`
)

var _ Publisher = (*publisherImpl)(nil)

// NewPublisher constructs a new Publisher object
// Deprecated: NewPublisher is deprecated, please use NewPublisher2
// Deprecated: NewPublisher is deprecated, please use NewPublisherWithReporter
func NewPublisher(client *clientImpl, path string, maxInflightMessagesPerConnection int) Publisher {
client.options.Logger.Warn("NewPublisher is a depredcated method, please use the new method NewPublisherWithReporter")
return NewPublisherWithReporter(client, path, maxInflightMessagesPerConnection, client.options.MetricsReporter)
Expand Down Expand Up @@ -147,6 +148,14 @@ func (s *publisherImpl) Open() error {
return nil
}

func (s *publisherImpl) Pause() {
atomic.StoreUint32(&s.paused, 1)
}

func (s *publisherImpl) Resume() {
atomic.StoreUint32(&s.paused, 0)
}

func (s *publisherImpl) Close() {
if atomic.CompareAndSwapInt32(&s.isClosing, 0, 1) {
close(s.closingCh)
Expand Down Expand Up @@ -176,6 +185,9 @@ func (s *publisherImpl) Close() {

// Publish can be used to synchronously publish a message to Cherami
func (s *publisherImpl) Publish(message *PublisherMessage) *PublisherReceipt {
if atomic.LoadUint32(&s.paused) > 0 {
return &PublisherReceipt{Error: fmt.Errorf(pauseError)}
}
timeoutTimer := time.NewTimer(defaultMessageTimeout)
defer timeoutTimer.Stop()

Expand Down Expand Up @@ -211,6 +223,9 @@ func (s *publisherImpl) Publish(message *PublisherMessage) *PublisherReceipt {
// PublishAsync accepts a message, but returns immediately with the local
// reference ID
func (s *publisherImpl) PublishAsync(message *PublisherMessage, done chan<- *PublisherReceipt) (string, error) {
if atomic.LoadUint32(&s.paused) > 0 {
return "", fmt.Errorf(pauseError)
}

if !s.opened {
return "", fmt.Errorf("Cannot publish message to path '%s'. Publisher is not opened.", s.path)
Expand All @@ -229,22 +244,27 @@ func (s *publisherImpl) reconfigurePublisher() {
s.lk.Lock()
defer s.lk.Unlock()

var err error

select {
case <-s.closingCh:
s.logger.Info("Publisher is closing. Ignore reconfiguration.")
default:
var conn *connection

publisherOptions, err := s.readPublisherOptions()
if err != nil {
s.logger.Infof("Error resolving input hosts: %v", err)
if _, ok := err.(*cherami.EntityNotExistsError); ok {
// Destination is deleted. Continue with reconfigure and close all connections
publisherOptions = &cherami.ReadPublisherOptionsResult_{}
} else {
// This is a potentially a transient error.
// Retry on next reconfigure
return
publisherOptions := &cherami.ReadPublisherOptionsResult_{}
if atomic.LoadUint32(&s.paused) == 0 {
publisherOptions, err = s.readPublisherOptions()
if err != nil {
s.logger.Infof("Error resolving input hosts: %v", err)
if _, ok := err.(*cherami.EntityNotExistsError); ok {
// Destination is deleted. Continue with reconfigure and close all connections
publisherOptions = &cherami.ReadPublisherOptionsResult_{}
} else {
// This is a potentially a transient error.
// Retry on next reconfigure
return
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion client/cherami/reconfigurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package cherami
import (
"time"

"github.com/uber/cherami-client-go/common"
"github.com/uber-common/bark"
"github.com/uber/cherami-client-go/common"
)

type (
Expand Down
20 changes: 15 additions & 5 deletions client/cherami/tchanPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ var _ Publisher = (*tchannelBatchPublisher)(nil)

func newTChannelBatchPublisher(client Client, tchan *tchannel.Channel, path string, logger bark.Logger, metricsReporter metrics.Reporter, reconfigurationPollingInterval time.Duration) Publisher {
base := basePublisher{
client: client,
retryPolicy: createDefaultPublisherRetryPolicy(),
path: path,
logger: logger.WithField(common.TagDstPth, common.FmtDstPth(path)),
reporter: metricsReporter,
client: client,
retryPolicy: createDefaultPublisherRetryPolicy(),
path: path,
logger: logger.WithField(common.TagDstPth, common.FmtDstPth(path)),
reporter: metricsReporter,
reconfigurationPollingInterval: reconfigurationPollingInterval,
}
return &tchannelBatchPublisher{
Expand Down Expand Up @@ -154,6 +154,16 @@ func (p *tchannelBatchPublisher) Close() {
p.logger.Info("Publisher Closed.")
}

func (p *tchannelBatchPublisher) Pause() {
p.logger.Error("Pause() is not supported for batch publisher")
return
}

func (p *tchannelBatchPublisher) Resume() {
p.logger.Error("Resume() is not supported for batch publisher")
return
}

// Publish publishes a message to cherami
func (p *tchannelBatchPublisher) Publish(message *PublisherMessage) *PublisherReceipt {

Expand Down
2 changes: 1 addition & 1 deletion client/cherami/wsconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"net/http"
"reflect"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/websocket"
"github.com/uber/cherami-client-go/stream"
"github.com/uber/cherami-thrift/.generated/go/cherami"
)

type (
Expand Down

0 comments on commit 7a6f614

Please sign in to comment.