From 4f7b42c9c6410196eea9a38e24c27fb029853f94 Mon Sep 17 00:00:00 2001 From: Jack Murdock Date: Fri, 28 Sep 2018 15:32:59 -0700 Subject: [PATCH 01/15] adding docker build to travis build --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 9cb306a4..5b0348ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,8 @@ after_success: - docker cp build:/versionno.txt . - BINARY_NAME=`ls x86_64/` - TRAVIS_TAG=`cat versionno.txt` + - 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then docker build -t caduceus:local .; fi' + deploy: provider: releases From 73e19fd763d32bfff7996b8368f147df5e96f877 Mon Sep 17 00:00:00 2001 From: Jack Murdock Date: Mon, 8 Oct 2018 10:11:52 -0700 Subject: [PATCH 02/15] adding Dockerfile.local for faster local development --- Dockerfile.local | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 Dockerfile.local diff --git a/Dockerfile.local b/Dockerfile.local new file mode 100644 index 00000000..3323c826 --- /dev/null +++ b/Dockerfile.local @@ -0,0 +1,21 @@ +FROM golang:alpine as builder +MAINTAINER Jack Murdock + +# build the binary +WORKDIR /go/src +COPY src/ /go/src/ + +RUN go build -o caduceus_linux_amd64 caduceus + +EXPOSE 6000 6001 6002 +RUN mkdir -p /etc/caduceus +VOLUME /etc/caduceus + +# the actual image +FROM alpine:latest +RUN apk --no-cache add ca-certificates +RUN mkdir -p /etc/caduceus +VOLUME /etc/caduceus +WORKDIR /root/ +COPY --from=builder /go/src/caduceus_linux_amd64 . +ENTRYPOINT ["./caduceus_linux_amd64"] From dc0dbe605a29f258517e0a5ccc1e508526f170f4 Mon Sep 17 00:00:00 2001 From: Nicholas Harter Date: Tue, 23 Oct 2018 17:01:22 -0700 Subject: [PATCH 03/15] removing old signal wait line. removing old signal wait line. this line is block when we don't want it to the code right above is already doing that. --- src/caduceus/caduceus.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/caduceus/caduceus.go b/src/caduceus/caduceus.go index 30e7fdfc..07151503 100644 --- a/src/caduceus/caduceus.go +++ b/src/caduceus/caduceus.go @@ -261,8 +261,7 @@ func caduceus(arguments []string) int { exit = true } } - s := server.SignalWait(infoLog, signals, os.Interrupt, os.Kill) - errorLog.Log(logging.MessageKey(), "exiting due to signal", "signal", s) + close(shutdown) waitGroup.Wait() From 1de18de10a73b222c79023dde5d9ff3bca4093b2 Mon Sep 17 00:00:00 2001 From: Kristina Spring Date: Wed, 24 Oct 2018 13:34:56 -0700 Subject: [PATCH 04/15] Added/Modified Configuration Options --- example-caduceus.yaml | 14 +++++++++----- src/caduceus/caduceus.go | 21 ++++++++++----------- src/caduceus/caduceus_type.go | 27 ++++++++++++++++++--------- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/example-caduceus.yaml b/example-caduceus.yaml index 8e4c309b..90f857c6 100644 --- a/example-caduceus.yaml +++ b/example-caduceus.yaml @@ -17,11 +17,15 @@ soa: numWorkerThreads: 10 jobQueueSize: 10 -senderNumWorkersPerSender: 10 -senderQueueSizePerSender: 50 -senderCutOffPeriod: 30 -senderLinger: 180 -senderClientTimeout: 60 +sender: + numWorkersPerSender: 10 + queueSizePerSender: 50 + cutOffPeriod: 30s + linger: 180s + clientTimeout: 60s + deliveryRetries: 1 + deliveryInterval: 10ms + responseHeaderTimeout: 10s profilerFrequency: 15 profilerDuration: 15 profilerQueueSize: 100 diff --git a/src/caduceus/caduceus.go b/src/caduceus/caduceus.go index 07151503..5ffbed22 100644 --- a/src/caduceus/caduceus.go +++ b/src/caduceus/caduceus.go @@ -124,24 +124,23 @@ func caduceus(arguments []string) int { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - MaxIdleConnsPerHost: caduceusConfig.SenderNumWorkersPerSender, - ResponseHeaderTimeout: 10 * time.Second, // TODO Make this configurable + MaxIdleConnsPerHost: caduceusConfig.Sender.NumWorkersPerSender, + ResponseHeaderTimeout: caduceusConfig.Sender.ResponseHeaderTimeout, + IdleConnTimeout: caduceusConfig.Sender.IdleConnTimeout, } - timeout := time.Duration(caduceusConfig.SenderClientTimeout) * time.Second - caduceusSenderWrapper, err := SenderWrapperFactory{ - NumWorkersPerSender: caduceusConfig.SenderNumWorkersPerSender, - QueueSizePerSender: caduceusConfig.SenderQueueSizePerSender, - CutOffPeriod: time.Duration(caduceusConfig.SenderCutOffPeriod) * time.Second, - Linger: time.Duration(caduceusConfig.SenderLinger) * time.Second, - DeliveryRetries: 1, // TODO Make this configurable - DeliveryInterval: 10 * time.Millisecond, // TODO Make this configurable + NumWorkersPerSender: caduceusConfig.Sender.NumWorkersPerSender, + QueueSizePerSender: caduceusConfig.Sender.QueueSizePerSender, + CutOffPeriod: caduceusConfig.Sender.CutOffPeriod, + Linger: caduceusConfig.Sender.Linger, + DeliveryRetries: caduceusConfig.Sender.DeliveryRetries, + DeliveryInterval: caduceusConfig.Sender.DeliveryInterval, MetricsRegistry: metricsRegistry, Logger: logger, Sender: (&http.Client{ Transport: tr, - Timeout: timeout, + Timeout: caduceusConfig.Sender.ClientTimeout, }).Do, }.New() diff --git a/src/caduceus/caduceus_type.go b/src/caduceus/caduceus_type.go index 0c79892a..8a3d3d41 100644 --- a/src/caduceus/caduceus_type.go +++ b/src/caduceus/caduceus_type.go @@ -24,20 +24,29 @@ import ( "github.com/Comcast/webpa-common/wrp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" + "time" ) // Below is the struct we're using to contain the data from a provided config file // TODO: Try to figure out how to make bucket ranges configurable type CaduceusConfig struct { - AuthHeader []string - NumWorkerThreads int - JobQueueSize int - SenderNumWorkersPerSender int - SenderQueueSizePerSender int - SenderCutOffPeriod int - SenderLinger int - SenderClientTimeout int - JWTValidators []JWTValidator + AuthHeader []string + NumWorkerThreads int + JobQueueSize int + Sender SenderConfig + JWTValidators []JWTValidator +} + +type SenderConfig struct { + NumWorkersPerSender int + QueueSizePerSender int + CutOffPeriod time.Duration + Linger time.Duration + ClientTimeout time.Duration + ResponseHeaderTimeout time.Duration + IdleConnTimeout time.Duration + DeliveryRetries int + DeliveryInterval time.Duration } type JWTValidator struct { From 7ca7d0ed26ba80134cc8ed961ca4cad24248bcae Mon Sep 17 00:00:00 2001 From: Jack Murdock Date: Tue, 4 Dec 2018 13:46:55 -0800 Subject: [PATCH 05/15] adding webhook sending of full wrp message --- src/caduceus/outboundSender.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index 6eab8e4a..d9389909 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -517,7 +517,22 @@ func (obs *CaduceusOutboundSender) worker(id int) { if now.After(dropUntil) { if now.Before(deliverUntil) { payload := msg.Payload - payloadReader := bytes.NewReader(payload) + var payloadReader *bytes.Reader + if obs.listener.Config.ContentType == "wrp" { + buffer := bytes.NewBuffer([]byte{}) + var f wrp.Format + switch msg.ContentType { + case "json": + f = wrp.JSON + default: + f = wrp.Msgpack + } + encoder := wrp.NewEncoder(buffer, f) + encoder.Encode(msg) + payloadReader = bytes.NewReader(buffer.Bytes()) + } else { + payloadReader = bytes.NewReader(payload) + } req, err := http.NewRequest("POST", obs.id, payloadReader) if nil != err { // Report drop From e992219cbddc1312dbc9c436f775114c72be3bde Mon Sep 17 00:00:00 2001 From: Nicholas Harter Date: Wed, 30 Jan 2019 16:13:36 -0800 Subject: [PATCH 06/15] changed required go version from 1.8 to 1.11 --- caduceus.spec.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caduceus.spec.in b/caduceus.spec.in index f355b721..c7951f3e 100644 --- a/caduceus.spec.in +++ b/caduceus.spec.in @@ -10,7 +10,7 @@ License: ASL 2.0 URL: https://github.com/Comcast/%{name} Source0: %{name}-%{_fullver}.tar.gz -BuildRequires: golang >= 1.8 +BuildRequires: golang >= 1.11 Requires: supervisor Provides: %{name} From 988c69015354a457a5f858c115513143d0d414e8 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Thu, 14 Feb 2019 14:10:02 -0800 Subject: [PATCH 07/15] Fix a shallow copy bug by not using that pattern before updating the object. --- src/caduceus/outboundSender.go | 70 +++++++++++++++++----------------- src/glide.yaml | 1 - 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index 6eab8e4a..f20f28be 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -239,50 +239,42 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { // Update applies user configurable values for the outbound sender when a // webhook is registered func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { - // make a copy - obsCopy := *obs - // set events & matchers to empty - obsCopy.events = []*regexp.Regexp{} - obsCopy.matcher = []*regexp.Regexp{} - - obsCopy.listener = wh - obsCopy.failureMsg.Original = wh - - // Don't share the secret with others when there is an error. - obsCopy.failureMsg.Original.Config.Secret = "XxxxxX" - - if "" != obsCopy.listener.Config.Secret { - obsCopy.secret = []byte(obsCopy.listener.Config.Secret) - } - - if "" != obsCopy.listener.FailureURL { - if _, err = url.ParseRequestURI(obsCopy.listener.FailureURL); nil != err { + // Validate the failure URL, if present + if "" != wh.FailureURL { + if _, err = url.ParseRequestURI(wh.FailureURL); nil != err { return } } - obsCopy.deliverUntil = obsCopy.listener.Until - - // Create the event regex objects + // Create and validate the event regex objects + var events []*regexp.Regexp for _, event := range wh.Events { var re *regexp.Regexp if re, err = regexp.Compile(event); nil != err { return } - obsCopy.events = append(obsCopy.events, re) + events = append(events, re) } - if nil == obsCopy.events || len(obsCopy.events) == 0 { + if len(events) < 1 { err = errors.New("Events must not be empty.") return } + // Make the secret bytes if available + var secret []byte + if "" != wh.Config.Secret { + secret = []byte(wh.Config.Secret) + } + // Create the matcher regex objects + matcher := []*regexp.Regexp{} + matchEverything := false for _, item := range wh.Matcher.DeviceId { if ".*" == item { // Match everything - skip the filtering - obsCopy.matcher = nil + matchEverything = true break } @@ -291,28 +283,34 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { err = fmt.Errorf("Invalid matcher item: '%s'", item) return } - obsCopy.matcher = append(obsCopy.matcher, re) + matcher = append(matcher, re) } // if matcher list is empty set it nil for Queue() logic - if len(obsCopy.matcher) == 0 { - obsCopy.matcher = nil + if len(matcher) == 0 { + matchEverything = true } // write/update obs obs.mutex.Lock() - obs.listener = obsCopy.listener - obs.failureMsg.Original = obsCopy.failureMsg.Original - obs.failureMsg.Original.Config.Secret = obsCopy.failureMsg.Original.Config.Secret - obs.secret = obsCopy.secret - obs.listener.FailureURL = obsCopy.listener.FailureURL - obs.deliverUntil = obsCopy.deliverUntil - obs.events = obsCopy.events - obs.matcher = obsCopy.matcher + obs.listener = wh + + obs.failureMsg.Original = wh + // Don't share the secret with others when there is an error. + obs.failureMsg.Original.Config.Secret = "XxxxxX" + + obs.secret = secret + obs.listener.FailureURL = wh.FailureURL + obs.deliverUntil = wh.Until + obs.events = events + obs.matcher = nil + if false == matchEverything { + obs.matcher = matcher + } obs.mutex.Unlock() - obs.secretChan <- obsCopy.secret + obs.secretChan <- secret return } diff --git a/src/glide.yaml b/src/glide.yaml index 535e5ee7..682a6609 100644 --- a/src/glide.yaml +++ b/src/glide.yaml @@ -4,4 +4,3 @@ import: version: 6ca7d6c5e78ebba17483f5d0e61c7c119b43619a - package: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 -- package: github.com/gorilla/websocket From 800e305042c7f872db32cf6a188fea5aa28b12fe Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Thu, 14 Feb 2019 14:12:39 -0800 Subject: [PATCH 08/15] Simply the match everything logic. --- src/caduceus/outboundSender.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index f20f28be..a985f601 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -270,11 +270,10 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { // Create the matcher regex objects matcher := []*regexp.Regexp{} - matchEverything := false for _, item := range wh.Matcher.DeviceId { if ".*" == item { // Match everything - skip the filtering - matchEverything = true + matcher = []*regexp.Regexp{} break } @@ -285,10 +284,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { } matcher = append(matcher, re) } - // if matcher list is empty set it nil for Queue() logic - if len(matcher) == 0 { - matchEverything = true - } // write/update obs obs.mutex.Lock() @@ -303,8 +298,10 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { obs.listener.FailureURL = wh.FailureURL obs.deliverUntil = wh.Until obs.events = events + + // if matcher list is empty set it nil for Queue() logic obs.matcher = nil - if false == matchEverything { + if 0 < len(matcher) { obs.matcher = matcher } From 5fc50ec748a167b24f0cd8584b2ae235f5483acd Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Fri, 15 Feb 2019 18:15:31 -0800 Subject: [PATCH 09/15] Remove the outbound worker pool and replace it with short lived goroutines and a maximum limit configuration. --- CHANGELOG.md | 4 + src/caduceus/metrics.go | 2 +- src/caduceus/outboundSender.go | 295 ++++++++++------------------ src/caduceus/outboundSender_test.go | 13 +- src/caduceus/senderWrapper_test.go | 17 +- src/caduceus/simpleCounter.go | 38 ---- src/caduceus/simpleCounter_test.go | 40 ---- src/glide.yaml | 2 +- 8 files changed, 125 insertions(+), 286 deletions(-) delete mode 100644 src/caduceus/simpleCounter.go delete mode 100644 src/caduceus/simpleCounter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8be62ec2..91e6accd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Remove the worker pool as a fixed number of workers per endpoint and simply cap + the maximum number. +- Fix for webhook shallow copy bug. +- Fix for delivering events as json or msgpack based events - Fix for webhook update for all fields - Fix for retry logic so all failures are retried the specified number of times - Fix for waiting for DNS to resolve prior to listening for webhook updates diff --git a/src/caduceus/metrics.go b/src/caduceus/metrics.go index 521d4578..10d2b0e9 100644 --- a/src/caduceus/metrics.go +++ b/src/caduceus/metrics.go @@ -49,7 +49,7 @@ func Metrics() []xmetrics.Metric { Name: DeliveryRetryCounter, Help: "Number of delivery retries made", Type: "counter", - LabelNames: []string{"url", "code", "event"}, + LabelNames: []string{"url", "event"}, }, { Name: DeliveryCounter, diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index efafd843..f7bac4de 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -24,7 +24,6 @@ import ( "encoding/json" "errors" "fmt" - "hash" "io" "io/ioutil" "net/http" @@ -33,11 +32,11 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/Comcast/webpa-common/device" "github.com/Comcast/webpa-common/logging" + "github.com/Comcast/webpa-common/semaphore" "github.com/Comcast/webpa-common/webhook" "github.com/Comcast/webpa-common/wrp" "github.com/Comcast/webpa-common/wrp/wrphttp" @@ -125,8 +124,6 @@ type CaduceusOutboundSender struct { deliverUntil time.Time dropUntil time.Time sender func(*http.Request) (*http.Response, error) - secret []byte - secretChan chan []byte events []*regexp.Regexp matcher []*regexp.Regexp queueSize int @@ -144,10 +141,11 @@ type CaduceusOutboundSender struct { queueDepthGauge metrics.Gauge wg sync.WaitGroup cutOffPeriod time.Duration + workers semaphore.Interface + maxWorkers int failureMsg FailureMessage logger log.Logger mutex sync.RWMutex - shutdownChan chan bool } // New creates a new OutboundSender object from the factory, or returns an error. @@ -175,13 +173,13 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { id: osf.Listener.Config.URL, listener: osf.Listener, sender: osf.Sender, - secretChan: make(chan []byte, 10), queueSize: osf.QueueSize, cutOffPeriod: osf.CutOffPeriod, deliverUntil: osf.Listener.Until, logger: osf.Logger, deliveryRetries: osf.DeliveryRetries, deliveryInterval: osf.DeliveryInterval, + maxWorkers: osf.NumWorkers, failureMsg: FailureMessage{ Original: osf.Listener, Text: failureText, @@ -189,7 +187,6 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { QueueSize: osf.QueueSize, Workers: osf.NumWorkers, }, - shutdownChan: make(chan bool, 10), } // Don't share the secret with others when there is an error. @@ -227,12 +224,12 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } - caduceusOutboundSender.wg.Add(osf.NumWorkers) - for i := 0; i < osf.NumWorkers; i++ { - go caduceusOutboundSender.worker(i) - } + caduceusOutboundSender.workers = semaphore.New(caduceusOutboundSender.maxWorkers) + caduceusOutboundSender.wg.Add(1) + go caduceusOutboundSender.dispatcher() obs = caduceusOutboundSender + return } @@ -262,12 +259,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { return } - // Make the secret bytes if available - var secret []byte - if "" != wh.Config.Secret { - secret = []byte(wh.Config.Secret) - } - // Create the matcher regex objects matcher := []*regexp.Regexp{} for _, item := range wh.Matcher.DeviceId { @@ -294,7 +285,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { // Don't share the secret with others when there is an error. obs.failureMsg.Original.Config.Secret = "XxxxxX" - obs.secret = secret obs.listener.FailureURL = wh.FailureURL obs.deliverUntil = wh.Until obs.events = events @@ -307,8 +297,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { obs.mutex.Unlock() - obs.secretChan <- secret - return } @@ -316,11 +304,7 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. func (obs *CaduceusOutboundSender) Shutdown(gentle bool) { - obs.shutdownChan <- true - close(obs.queue) - close(obs.secretChan) - close(obs.shutdownChan) obs.mutex.Lock() if false == gentle { @@ -425,189 +409,124 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } } -// helper function to get the right delivery counter to increment -func (obs *CaduceusOutboundSender) getCounter(c metrics.Counter, status int) metrics.Counter { - if -1 == status { - return c.With("url", obs.id, "code", "failure") - } - - s := strconv.Itoa(status) - return c.With("url", obs.id, "code", s) -} - -type secretHash struct { - value atomic.Value -} - -func (sh *secretHash) set(h *hash.Hash) { - sh.value.Store(h) -} - -func (sh *secretHash) get() *hash.Hash { - if h, ok := sh.value.Load().(*hash.Hash); ok { - return h - } - - return nil -} - -// worker is the routine that actually takes the queued messages and delivers -// them to the listeners outside webpa -func (obs *CaduceusOutboundSender) worker(id int) { +func (obs *CaduceusOutboundSender) dispatcher() { defer obs.wg.Done() - // Make a local copy of the hmac - var h secretHash - h.set(new(hash.Hash)) - - // routine that will listen for secret changes - // if a change comes in, both the local secret copy and the hash are updated - go func(sc chan []byte, shutdown chan bool) { - for { - select { - case secret := <-sc: - // Create the base sha1 hash object for each thread - if nil != secret { - t := hmac.New(sha1.New, secret) - h.set(&t) - } else { - h.set(new(hash.Hash)) - } - case <-shutdown: - return - } - } - }(obs.secretChan, obs.shutdownChan) - - // Setup the retry structs once - simpleCounter := &SimpleCounter{} - - retryOptions := xhttp.RetryOptions{ - Logger: obs.logger, - Retries: obs.deliveryRetries, - Interval: obs.deliveryInterval, - Counter: simpleCounter, - // Always retry on failures up to the max count. - ShouldRetry: func(error) bool { return true }, - } - - // Only optimize the successful answers - delivered200 := obs.getCounter(obs.deliveryCounter, 200) - delivered201 := obs.getCounter(obs.deliveryCounter, 201) - delivered202 := obs.getCounter(obs.deliveryCounter, 202) - delivered204 := obs.getCounter(obs.deliveryCounter, 204) - retries200 := obs.getCounter(obs.deliveryRetryCounter, 200) - retries201 := obs.getCounter(obs.deliveryRetryCounter, 201) - retries202 := obs.getCounter(obs.deliveryRetryCounter, 202) - retries204 := obs.getCounter(obs.deliveryRetryCounter, 204) - for msg := range obs.queue { obs.queueDepthGauge.Add(-1.0) + obs.mutex.RLock() deliverUntil := obs.deliverUntil dropUntil := obs.dropUntil + secret := obs.listener.Config.Secret obs.mutex.RUnlock() now := time.Now() + if now.After(dropUntil) { if now.Before(deliverUntil) { - payload := msg.Payload - var payloadReader *bytes.Reader - if obs.listener.Config.ContentType == "wrp" { - buffer := bytes.NewBuffer([]byte{}) - var f wrp.Format - switch msg.ContentType { - case "json": - f = wrp.JSON - default: - f = wrp.Msgpack - } - encoder := wrp.NewEncoder(buffer, f) - encoder.Encode(msg) - payloadReader = bytes.NewReader(buffer.Bytes()) - } else { - payloadReader = bytes.NewReader(payload) - } - req, err := http.NewRequest("POST", obs.id, payloadReader) - if nil != err { - // Report drop - obs.droppedInvalidConfig.Add(1.0) - logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id, - logging.ErrorKey(), err) - } else { - req.Header.Set("Content-Type", msg.ContentType) + obs.workers.Acquire() + go obs.send(secret, msg) + } else { + obs.droppedExpiredCounter.Add(1.0) + } + } else { + obs.droppedCutoffCounter.Add(1.0) + } + } - // Add x-Midt-* headers - wrphttp.AddMessageHeaders(req.Header, msg) + // Grab all the workers to make sure they are done. + for i := 0; i < obs.maxWorkers; i++ { + obs.workers.Acquire() + } +} - // Provide the old headers for now - req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) - req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) +// worker is the routine that actually takes the queued messages and delivers +// them to the listeners outside webpa +func (obs *CaduceusOutboundSender) send(secret string, msg *wrp.Message) { + defer obs.workers.Release() + + payload := msg.Payload + var payloadReader *bytes.Reader + if obs.listener.Config.ContentType == "wrp" { + // WTS - I'm not sure if this is correct. + buffer := bytes.NewBuffer([]byte{}) + var f wrp.Format + switch msg.ContentType { + case "json": + f = wrp.JSON + default: + f = wrp.Msgpack + } + encoder := wrp.NewEncoder(buffer, f) + encoder.Encode(msg) + payloadReader = bytes.NewReader(buffer.Bytes()) + } else { + payloadReader = bytes.NewReader(payload) + } - // Add the device id without the trailing service - id, _ := device.ParseID(msg.Source) - req.Header.Set("X-Webpa-Device-Id", string(id)) - req.Header.Set("X-Webpa-Device-Name", string(id)) + req, err := http.NewRequest("POST", obs.id, payloadReader) + if nil != err { + // Report drop + obs.droppedInvalidConfig.Add(1.0) + logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id, + logging.ErrorKey(), err) + } else { + req.Header.Set("Content-Type", msg.ContentType) - // get the latest secret hash - sh := *h.get() + // Add x-Midt-* headers + wrphttp.AddMessageHeaders(req.Header, msg) - if nil != sh { - sh.Reset() - sh.Write(payload) - sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(sh.Sum(nil))) - req.Header.Set("X-Webpa-Signature", sig) - } + // Provide the old headers for now + req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) + req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) - // Setup the retry logic - simpleCounter.Count = 0.0 + // Add the device id without the trailing service + id, _ := device.ParseID(msg.Source) + req.Header.Set("X-Webpa-Device-Id", string(id)) + req.Header.Set("X-Webpa-Device-Name", string(id)) - // find the event "short name" - match := eventPattern.FindStringSubmatch(msg.Destination) - event := "unknown" - if match != nil { - event = match[1] - } + // Apply the secret - // Send it - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) - if nil != err { - // Report failure - obs.getCounter(obs.deliveryCounter, -1).With("event", event).Add(1.0) - obs.droppedNetworkErrCounter.Add(1.0) - } else { - // Report Result - switch resp.StatusCode { - case 200: - delivered200.With("event", event).Add(1.0) - retries200.With("event", event).Add(simpleCounter.Count) - case 201: - delivered201.With("event", event).Add(1.0) - retries201.With("event", event).Add(simpleCounter.Count) - case 202: - delivered202.With("event", event).Add(1.0) - retries202.With("event", event).Add(simpleCounter.Count) - case 204: - delivered204.With("event", event).Add(1.0) - retries204.With("event", event).Add(simpleCounter.Count) - default: - obs.getCounter(obs.deliveryCounter, resp.StatusCode).With("event", event).Add(1.0) - obs.getCounter(obs.deliveryRetryCounter, resp.StatusCode).With("event", event).Add(simpleCounter.Count) - } + if "" != secret { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(payload) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } - // read until the response is complete before closing to allow - // connection reuse - if nil != resp.Body { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - } - } - } - } else { - obs.droppedExpiredCounter.Add(1.0) - } + // find the event "short name" + match := eventPattern.FindStringSubmatch(msg.Destination) + event := "unknown" + if match != nil { + event = match[1] + } + + retryOptions := xhttp.RetryOptions{ + Logger: obs.logger, + Retries: obs.deliveryRetries, + Interval: obs.deliveryInterval, + Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), + // Always retry on failures up to the max count. + ShouldRetry: func(error) bool { return true }, + } + + // Send it + resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + if nil != err { + // Report failure + obs.deliveryCounter.With("url", obs.id, "code", "failure", "event", event).Add(1.0) + obs.droppedNetworkErrCounter.Add(1.0) } else { - obs.droppedCutoffCounter.Add(1.0) + // Report Result + obs.deliveryCounter.With("url", obs.id, "code", strconv.Itoa(resp.StatusCode), "event", event).Add(1.0) + + // read until the response is complete before closing to allow + // connection reuse + if nil != resp.Body { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } } } } @@ -616,7 +535,7 @@ func (obs *CaduceusOutboundSender) worker(id int) { func (obs *CaduceusOutboundSender) queueOverflow() { obs.mutex.Lock() obs.dropUntil = time.Now().Add(obs.cutOffPeriod) - secret := obs.secret + secret := obs.listener.Config.Secret failureMsg := obs.failureMsg failureURL := obs.listener.FailureURL obs.mutex.Unlock() @@ -648,8 +567,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } else { req.Header.Set("Content-Type", "application/json") - if nil != obs.secret { - h := hmac.New(sha1.New, secret) + if "" != secret { + h := hmac.New(sha1.New, []byte(secret)) h.Write(msg) sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) req.Header.Set("X-Webpa-Signature", sig) diff --git a/src/caduceus/outboundSender_test.go b/src/caduceus/outboundSender_test.go index f290e7c6..eb4e3d91 100644 --- a/src/caduceus/outboundSender_test.go +++ b/src/caduceus/outboundSender_test.go @@ -76,14 +76,17 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] w.Matcher.DeviceId = matcher fakeDC := new(mockCounter) - fakeDC.On("With", []string{"url", w.Config.URL, "code", "200"}).Return(fakeDC). + fakeDC.On("With", []string{"url", w.Config.URL, "code", "200", "event", "test"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "code", "200", "event", "iot"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "code", "200", "event", "unknown"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "code", "failure", "event", "iot"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "event", "test"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "event", "iot"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "event", "unknown"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "201"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "202"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "204"}).Return(fakeDC). - On("With", []string{"url", w.Config.URL, "code", "failure"}).Return(fakeDC). - On("With", []string{"event", "iot"}).Return(fakeDC). - On("With", []string{"event", "unknown"}).Return(fakeDC). - On("With", []string{"event", "test"}).Return(fakeDC) + On("With", []string{"url", w.Config.URL, "code", "failure"}).Return(fakeDC) fakeDC.On("Add", 1.0).Return() fakeDC.On("Add", 0.0).Return() diff --git a/src/caduceus/senderWrapper_test.go b/src/caduceus/senderWrapper_test.go index 88fb206d..db08df37 100644 --- a/src/caduceus/senderWrapper_test.go +++ b/src/caduceus/senderWrapper_test.go @@ -79,9 +79,10 @@ func getFakeFactory() *SenderWrapperFactory { fakeIgnore := new(mockCounter) fakeIgnore.On("Add", 1.0).Return().On("Add", 0.0).Return(). - On("With", []string{"url", "unknown"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:8888/foo", "event", "unknown"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:9999/foo", "event", "unknown"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "cut_off"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "queue_full"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired"}).Return(fakeIgnore). @@ -92,18 +93,8 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "201"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "202"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "204"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "201"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "202"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "204"}).Return(fakeIgnore). - On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore). - On("With", []string{"event", "wrp"}).Return(fakeIgnore). - On("With", []string{"event", "unknown"}).Return(fakeIgnore). - On("With", []string{"event", "iot"}).Return(fakeIgnore) + On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore) fakeRegistry := new(mockCaduceusMetricsRegistry) fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC) diff --git a/src/caduceus/simpleCounter.go b/src/caduceus/simpleCounter.go deleted file mode 100644 index cbf69513..00000000 --- a/src/caduceus/simpleCounter.go +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2018 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package main - -import "github.com/go-kit/kit/metrics" - -// This is a non-concurrent safe counter that lets a single goroutine aggregate -// a metric before adding them to a larger correlated metric. -type SimpleCounter struct { - // The active count - Count float64 -} - -// With implements Counter. -func (s *SimpleCounter) With(labelValues ...string) metrics.Counter { - return s -} - -// Add implements Counter. -func (s *SimpleCounter) Add(delta float64) { - if 0.0 < delta { - s.Count += delta - } -} diff --git a/src/caduceus/simpleCounter_test.go b/src/caduceus/simpleCounter_test.go deleted file mode 100644 index 08701795..00000000 --- a/src/caduceus/simpleCounter_test.go +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright 2018 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package main - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSimpleCounter(t *testing.T) { - assert := assert.New(t) - - s := &SimpleCounter{Count: 0} - newS := s.With("foo", "bar") - assert.True(s == newS) - - s.Add(1.0) - assert.True(1.0 == s.Count) - - s.Add(10.0) - assert.True(11.0 == s.Count) - - s.Add(-10.0) - assert.True(11.0 == s.Count) -} diff --git a/src/glide.yaml b/src/glide.yaml index 682a6609..504c57c0 100644 --- a/src/glide.yaml +++ b/src/glide.yaml @@ -1,6 +1,6 @@ package: . import: - package: github.com/Comcast/webpa-common - version: 6ca7d6c5e78ebba17483f5d0e61c7c119b43619a + version: 0f087c5390cb60f4bdff2a27d1858c5e44755ce4 - package: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 From e3d47f19ee65369e18913bfb06194a30415b7a91 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Fri, 15 Feb 2019 18:28:16 -0800 Subject: [PATCH 10/15] Add the glide lock file. --- src/glide.lock | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/glide.lock b/src/glide.lock index 3aed32b1..7014496f 100644 --- a/src/glide.lock +++ b/src/glide.lock @@ -1,5 +1,5 @@ -hash: 5c9aa43d5410004080968bf0948bf1f1ee619d6269cb56ed1e499e57cd3a4fe7 -updated: 2018-10-11T15:02:53.818112-07:00 +hash: 98a3ab8aa409ebc6af88888bdb48f1cf25af970ed8ea0a3bbcaae1d01f7d51dc +updated: 2019-02-14T18:47:17.113403999-08:00 imports: - name: github.com/aws/aws-sdk-go version: 7be45195c3af1b54a609812f90c05a7e492e2491 @@ -38,11 +38,12 @@ imports: subpackages: - linux - name: github.com/Comcast/webpa-common - version: 6ca7d6c5e78ebba17483f5d0e61c7c119b43619a + version: 0f087c5390cb60f4bdff2a27d1858c5e44755ce4 subpackages: - concurrent - convey - convey/conveyhttp + - convey/conveymetric - device - health - logging @@ -50,17 +51,19 @@ imports: - secure - secure/handler - secure/key + - semaphore - server - types - webhook - webhook/aws - wrp - wrp/wrphttp + - wrp/wrpmeta - xhttp - xlistener - xmetrics - name: github.com/davecgh/go-spew - version: 346938d642f2ec3594ed81d874461961cd0faa76 + version: 8991bc29aa16c548c550c7ff78260e27b9ab7c73 subpackages: - spew - name: github.com/fsnotify/fsnotify @@ -68,7 +71,7 @@ imports: - name: github.com/go-ini/ini version: bda519ae5f4cbc60d391ff8610711627a08b86ae - name: github.com/go-kit/kit - version: 4dc7be5d2d12881735283bcab7352178e190fc71 + version: 12210fb6ace19e0496167bb3e667dcd91fa9f69b subpackages: - endpoint - log @@ -89,10 +92,8 @@ imports: - util/conn - name: github.com/go-logfmt/logfmt version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -- name: github.com/go-stack/stack - version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf - name: github.com/golang/protobuf - version: b4deda0973fb4c70b50d226b1af49f3da59f5265 + version: 9eb2c01ac278a5d89ce4b2be68fe4500955d8179 subpackages: - proto - name: github.com/gorilla/context @@ -106,6 +107,7 @@ imports: subpackages: - hcl/ast - hcl/parser + - hcl/printer - hcl/scanner - hcl/strconv - hcl/token @@ -113,7 +115,7 @@ imports: - json/scanner - json/token - name: github.com/influxdata/influxdb - version: d977c0ac2494a59d72f41dc277771a3d297b8e98 + version: 48797873ee24fc1dcb5a63f23474d3210f6d68c5 subpackages: - client/v2 - models @@ -135,11 +137,11 @@ imports: - name: github.com/miekg/dns version: ba6747e8a94115e9dc7738afb87850687611df1b - name: github.com/mitchellh/mapstructure - version: f15292f7a699fcc1a38a80977f80a046874ba8ac + version: bb74f1db0675b241733089d5a1faa5dd8b0ef57b - name: github.com/pelletier/go-toml - version: 603baefff989777996bf283da430d693e78eba3a + version: c01d1270ff3e442a8a57cddc1c92dc1138598194 - name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: - difflib - name: github.com/prometheus/client_golang @@ -148,7 +150,7 @@ imports: - prometheus - prometheus/promhttp - name: github.com/prometheus/client_model - version: 5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f + version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c subpackages: - go - name: github.com/prometheus/common @@ -182,9 +184,9 @@ imports: - name: github.com/spf13/pflag version: e57e3eeb33f795204c1ca35f56c44f83227c6e66 - name: github.com/spf13/viper - version: 25b30aa063fc18e48662b86996252eabdcf2f0c7 + version: 6d33b5a963d922d182c91e8a1c88d81fd150cfd4 - name: github.com/stretchr/objx - version: cbeaeb16a013161a98496fad62933b1d21786672 + version: 9e1dfc121bca96d392da5d00591953bdb54ab306 - name: github.com/stretchr/testify version: 12b6f73e6084dad08a7c6e575284b177ecafbc71 subpackages: @@ -193,13 +195,13 @@ imports: - mock - require - name: github.com/ugorji/go - version: 00a57e09e383d445aeef6c6cd642969dc4360231 + version: e5e69e061d4f7ee3a69b793cf9c1b41afe21918e subpackages: - codec - name: github.com/VividCortex/gohistogram version: 51564d9861991fb0ad0f531c99ef602d0f9866e6 - name: golang.org/x/crypto - version: e3636079e1a4c1f337f212cc5cd2aca108f6c900 + version: 74369b46fc6756741c016591724fd1cb8e26845f subpackages: - ed25519 - ed25519/internal/edwards25519 @@ -212,11 +214,11 @@ imports: - ipv4 - ipv6 - name: golang.org/x/sys - version: ac767d655b305d4e9612f5f6e33120b9176c4ad4 + version: 7138fd3d9dc8335c567ca206f4333fb75eb05d56 subpackages: - unix - name: golang.org/x/text - version: f21a4dfb5e38f5895301dc265a8def02365cc3d0 + version: 5cec4b58c438bd98288aeb248bab2c1840713d21 subpackages: - transform - unicode/norm From 9ea0513f49cd63b6b49469b576b76f9a9d180f98 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Mon, 18 Feb 2019 09:37:44 -0800 Subject: [PATCH 11/15] Fix the race condition. --- src/caduceus/outboundSender.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index f7bac4de..7de36aae 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -419,6 +419,7 @@ func (obs *CaduceusOutboundSender) dispatcher() { deliverUntil := obs.deliverUntil dropUntil := obs.dropUntil secret := obs.listener.Config.Secret + accept := obs.listener.Config.ContentType obs.mutex.RUnlock() now := time.Now() @@ -426,7 +427,7 @@ func (obs *CaduceusOutboundSender) dispatcher() { if now.After(dropUntil) { if now.Before(deliverUntil) { obs.workers.Acquire() - go obs.send(secret, msg) + go obs.send(secret, accept, msg) } else { obs.droppedExpiredCounter.Add(1.0) } @@ -443,22 +444,17 @@ func (obs *CaduceusOutboundSender) dispatcher() { // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa -func (obs *CaduceusOutboundSender) send(secret string, msg *wrp.Message) { +func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Message) { defer obs.workers.Release() payload := msg.Payload var payloadReader *bytes.Reader - if obs.listener.Config.ContentType == "wrp" { - // WTS - I'm not sure if this is correct. + + if acceptType == "wrp" { + // WTS - We should pass the original, raw WRP event instead of + // re-encoding it. buffer := bytes.NewBuffer([]byte{}) - var f wrp.Format - switch msg.ContentType { - case "json": - f = wrp.JSON - default: - f = wrp.Msgpack - } - encoder := wrp.NewEncoder(buffer, f) + encoder := wrp.NewEncoder(buffer, wrp.Msgpack) encoder.Encode(msg) payloadReader = bytes.NewReader(buffer.Bytes()) } else { From 07c37ed8022dd97dcbb2f5fedec44be1c456901d Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Mon, 18 Feb 2019 13:06:03 -0800 Subject: [PATCH 12/15] Simplify if/else condition statements to make the code more readable. --- src/caduceus/outboundSender.go | 289 +++++++++++++++++---------------- 1 file changed, 153 insertions(+), 136 deletions(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index 7de36aae..a0accb64 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -343,70 +343,83 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { var debugLog = logging.Debug(obs.logger) - if now.After(dropUntil) { - if now.Before(deliverUntil) { - for _, eventRegex := range events { - if eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { - matchDevice := (nil == matcher) - if nil != matcher { - for _, deviceRegex := range matcher { - if deviceRegex.MatchString(msg.Source) { + if false == obs.isValidTimeWindow(now, dropUntil, deliverUntil) { + return + } + + for _, eventRegex := range events { + if false == eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { + debugLog.Log(logging.MessageKey(), + fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", + msg.Destination, eventRegex.String())) + continue + } + + matchDevice := (nil == matcher) + if nil != matcher { + for _, deviceRegex := range matcher { + if deviceRegex.MatchString(msg.Source) { + matchDevice = true + break + } + } + } + /* + // if the device id matches then we want to look through all the metadata + // and make sure that the obs metadata matches the metadata provided + if matchDevice { + for key, val := range metaData { + if matchers, ok := matcher[key]; ok { + for _, deviceRegex := range matchers { + matchDevice = false + if deviceRegex.MatchString(val) { matchDevice = true break } } - } - /* - // if the device id matches then we want to look through all the metadata - // and make sure that the obs metadata matches the metadata provided - if matchDevice { - for key, val := range metaData { - if matchers, ok := matcher[key]; ok { - for _, deviceRegex := range matchers { - matchDevice = false - if deviceRegex.MatchString(val) { - matchDevice = true - break - } - } - - // metadata was provided but did not match our expectations, - // so it is time to drop the message - if !matchDevice { - break - } - } - } - } - */ - if matchDevice { - if len(obs.queue) < obs.queueSize { - obs.queueDepthGauge.Add(1.0) - obs.queue <- msg - debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id) - // a regex was matched, no need to check further matches + + // metadata was provided but did not match our expectations, + // so it is time to drop the message + if !matchDevice { break - } else { - obs.queueOverflow() - obs.droppedQueueFullCounter.Add(1.0) } } - } else { - debugLog.Log(logging.MessageKey(), - fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", - msg.Destination, eventRegex.String())) } } - } else { - debugLog.Log(logging.MessageKey(), "Outside delivery window", - "now", now, "before", deliverUntil, "after", dropUntil) - obs.droppedExpiredCounter.Add(1.0) + */ + if matchDevice { + if len(obs.queue) < obs.queueSize { + obs.queueDepthGauge.Add(1.0) + obs.queue <- msg + debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id) + // a regex was matched, no need to check further matches + break + } else { + obs.queueOverflow() + obs.droppedQueueFullCounter.Add(1.0) + } } - } else { + } +} + +func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { + var debugLog = logging.Debug(obs.logger) + + if false == now.After(dropUntil) { debugLog.Log(logging.MessageKey(), "Client has been cut off", "now", now, "before", deliverUntil, "after", dropUntil) obs.droppedCutoffCounter.Add(1.0) + return false + } + + if false == now.Before(deliverUntil) { + debugLog.Log(logging.MessageKey(), "Outside delivery window", + "now", now, "before", deliverUntil, "after", dropUntil) + obs.droppedExpiredCounter.Add(1.0) + return false } + + return true } func (obs *CaduceusOutboundSender) dispatcher() { @@ -467,64 +480,66 @@ func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Mess obs.droppedInvalidConfig.Add(1.0) logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id, logging.ErrorKey(), err) - } else { - req.Header.Set("Content-Type", msg.ContentType) + return + } - // Add x-Midt-* headers - wrphttp.AddMessageHeaders(req.Header, msg) + req.Header.Set("Content-Type", msg.ContentType) - // Provide the old headers for now - req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) - req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) + // Add x-Midt-* headers + wrphttp.AddMessageHeaders(req.Header, msg) - // Add the device id without the trailing service - id, _ := device.ParseID(msg.Source) - req.Header.Set("X-Webpa-Device-Id", string(id)) - req.Header.Set("X-Webpa-Device-Name", string(id)) + // Provide the old headers for now + req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) + req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) - // Apply the secret + // Add the device id without the trailing service + id, _ := device.ParseID(msg.Source) + req.Header.Set("X-Webpa-Device-Id", string(id)) + req.Header.Set("X-Webpa-Device-Name", string(id)) - if "" != secret { - s := hmac.New(sha1.New, []byte(secret)) - s.Write(payload) - sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) - req.Header.Set("X-Webpa-Signature", sig) - } + // Apply the secret - // find the event "short name" - match := eventPattern.FindStringSubmatch(msg.Destination) - event := "unknown" - if match != nil { - event = match[1] - } + if "" != secret { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(payload) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } - retryOptions := xhttp.RetryOptions{ - Logger: obs.logger, - Retries: obs.deliveryRetries, - Interval: obs.deliveryInterval, - Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), - // Always retry on failures up to the max count. - ShouldRetry: func(error) bool { return true }, - } + // find the event "short name" + match := eventPattern.FindStringSubmatch(msg.Destination) + event := "unknown" + if match != nil { + event = match[1] + } - // Send it - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) - if nil != err { - // Report failure - obs.deliveryCounter.With("url", obs.id, "code", "failure", "event", event).Add(1.0) - obs.droppedNetworkErrCounter.Add(1.0) - } else { - // Report Result - obs.deliveryCounter.With("url", obs.id, "code", strconv.Itoa(resp.StatusCode), "event", event).Add(1.0) - - // read until the response is complete before closing to allow - // connection reuse - if nil != resp.Body { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - } + retryOptions := xhttp.RetryOptions{ + Logger: obs.logger, + Retries: obs.deliveryRetries, + Interval: obs.deliveryInterval, + Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), + // Always retry on failures up to the max count. + ShouldRetry: func(error) bool { return true }, + } + + // Send it + resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + code := "failure" + if nil != err { + // Report failure + obs.droppedNetworkErrCounter.Add(1.0) + } else { + // Report Result + code = strconv.Itoa(resp.StatusCode) + + // read until the response is complete before closing to allow + // connection reuse + if nil != resp.Body { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() } } + obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) } // queueOverflow handles the logic of what to do when a queue overflows @@ -548,47 +563,49 @@ func (obs *CaduceusOutboundSender) queueOverflow() { if nil != err { errorLog.Log(logging.MessageKey(), "Cut-off notification json.Marshall failed", "failureMessage", obs.failureMsg, "for", obs.id, logging.ErrorKey(), err) - } else { - errorLog.Log(logging.MessageKey(), "Cut-off notification", "failureMessage", msg, "for", obs.id) + return + } + errorLog.Log(logging.MessageKey(), "Cut-off notification", "failureMessage", msg, "for", obs.id) - // Send a "you've been cut off" warning message - if "" != failureURL { + // Send a "you've been cut off" warning message + if "" == failureURL { + errorLog.Log(logging.MessageKey(), "No cut-off notification URL specified", "for", obs.id) + return + } - payload := bytes.NewReader(msg) - req, err := http.NewRequest("POST", failureURL, payload) - if nil != err { - // Failure - errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", - failureURL, "for", obs.id, logging.ErrorKey(), err) - } else { - req.Header.Set("Content-Type", "application/json") + payload := bytes.NewReader(msg) + req, err := http.NewRequest("POST", failureURL, payload) + if nil != err { + // Failure + errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", + failureURL, "for", obs.id, logging.ErrorKey(), err) + return + } + req.Header.Set("Content-Type", "application/json") - if "" != secret { - h := hmac.New(sha1.New, []byte(secret)) - h.Write(msg) - sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) - req.Header.Set("X-Webpa-Signature", sig) - } + if "" != secret { + h := hmac.New(sha1.New, []byte(secret)) + h.Write(msg) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } - resp, err := obs.sender(req) - if nil != err { - // Failure - errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", - failureURL, "for", obs.id, logging.ErrorKey(), err) - } else { - if nil == resp { - // Failure - errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification, nil response", - "notification", failureURL) - } else { - // Success - logging.Info(obs.logger).Log("Able to send cut-off notification", "url", failureURL, - "status", resp.Status) - } - } - } - } else { - errorLog.Log(logging.MessageKey(), "No cut-off notification URL specified", "for", obs.id) - } + resp, err := obs.sender(req) + if nil != err { + // Failure + errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", + failureURL, "for", obs.id, logging.ErrorKey(), err) + return } + + if nil == resp { + // Failure + errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification, nil response", + "notification", failureURL) + return + } + + // Success + logging.Info(obs.logger).Log("Able to send cut-off notification", "url", failureURL, + "status", resp.Status) } From 5b4486232b2c48ab0db54a9e3eae96471db01955 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Tue, 19 Feb 2019 17:25:27 -0800 Subject: [PATCH 13/15] Prune some dead code. --- src/caduceus/caduceus_type.go | 41 ------------------------------ src/caduceus/caduceus_type_test.go | 39 ---------------------------- 2 files changed, 80 deletions(-) diff --git a/src/caduceus/caduceus_type.go b/src/caduceus/caduceus_type.go index 8a3d3d41..67189ba7 100644 --- a/src/caduceus/caduceus_type.go +++ b/src/caduceus/caduceus_type.go @@ -17,7 +17,6 @@ package main import ( - "errors" "github.com/Comcast/webpa-common/logging" "github.com/Comcast/webpa-common/secure" "github.com/Comcast/webpa-common/secure/key" @@ -78,43 +77,3 @@ func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { " to sender") ch.senderWrapper.Queue(msg) } - -// Below is the struct and implementation of our worker pool factory -type WorkerPoolFactory struct { - NumWorkers int - QueueSize int -} - -func (wpf WorkerPoolFactory) New() (wp *WorkerPool) { - jobs := make(chan func(workerID int), wpf.QueueSize) - - for i := 0; i < wpf.NumWorkers; i++ { - go func(id int) { - for f := range jobs { - f(id) - } - }(i) - } - - wp = &WorkerPool{ - jobs: jobs, - } - - return -} - -// Below is the struct and implementation of our worker pool -// It utilizes a non-blocking channel, so we throw away any requests that exceed -// the channel's limit (indicated by its buffer size) -type WorkerPool struct { - jobs chan func(workerID int) -} - -func (wp *WorkerPool) Send(inFunc func(workerID int)) error { - select { - case wp.jobs <- inFunc: - return nil - default: - return errors.New("Worker pool channel full.") - } -} diff --git a/src/caduceus/caduceus_type_test.go b/src/caduceus/caduceus_type_test.go index d51bde3d..fcfcb253 100644 --- a/src/caduceus/caduceus_type_test.go +++ b/src/caduceus/caduceus_type_test.go @@ -17,52 +17,13 @@ package main import ( - "sync" "testing" "github.com/Comcast/webpa-common/logging" "github.com/Comcast/webpa-common/wrp" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) -func TestWorkerPool(t *testing.T) { - assert := assert.New(t) - - workerPool := WorkerPoolFactory{ - NumWorkers: 1, - QueueSize: 1, - }.New() - - t.Run("TestWorkerPoolSend", func(t *testing.T) { - testWG := new(sync.WaitGroup) - testWG.Add(1) - - require.NotNil(t, workerPool) - err := workerPool.Send(func(workerID int) { - testWG.Done() - }) - - testWG.Wait() - assert.Nil(err) - }) - - workerPool = WorkerPoolFactory{ - NumWorkers: 0, - QueueSize: 0, - }.New() - - t.Run("TestWorkerPoolFullQueue", func(t *testing.T) { - require.NotNil(t, workerPool) - err := workerPool.Send(func(workerID int) { - assert.Fail("This should not execute because our worker queue is full and we have no workers.") - }) - - assert.NotNil(err) - }) -} - func TestCaduceusHandler(t *testing.T) { logger := logging.DefaultLogger() From b778bba4b98e109112d7dfd35d70815f1984bf14 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Thu, 21 Feb 2019 10:17:49 -0800 Subject: [PATCH 14/15] Upgrade to the lastest webpa-common. --- src/glide.lock | 9 +++++---- src/glide.yaml | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/glide.lock b/src/glide.lock index 7014496f..7e6c4a55 100644 --- a/src/glide.lock +++ b/src/glide.lock @@ -1,5 +1,5 @@ -hash: 98a3ab8aa409ebc6af88888bdb48f1cf25af970ed8ea0a3bbcaae1d01f7d51dc -updated: 2019-02-14T18:47:17.113403999-08:00 +hash: 8e0b5dc0f4fcee573e27644aa4b665502fecb68c941dad21430d3f491ee8519d +updated: 2019-02-21T10:14:46.1291513-08:00 imports: - name: github.com/aws/aws-sdk-go version: 7be45195c3af1b54a609812f90c05a7e492e2491 @@ -38,7 +38,7 @@ imports: subpackages: - linux - name: github.com/Comcast/webpa-common - version: 0f087c5390cb60f4bdff2a27d1858c5e44755ce4 + version: ee36b7e7561c779aa65a66fbef35b46720486987 subpackages: - concurrent - convey @@ -145,9 +145,10 @@ imports: subpackages: - difflib - name: github.com/prometheus/client_golang - version: 967789050ba94deca04a5e84cce8ad472ce313c1 + version: 505eaef017263e299324067d40ca2c48f6a2cf50 subpackages: - prometheus + - prometheus/internal - prometheus/promhttp - name: github.com/prometheus/client_model version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c diff --git a/src/glide.yaml b/src/glide.yaml index 504c57c0..f22625c4 100644 --- a/src/glide.yaml +++ b/src/glide.yaml @@ -1,6 +1,6 @@ package: . import: - package: github.com/Comcast/webpa-common - version: 0f087c5390cb60f4bdff2a27d1858c5e44755ce4 + version: ee36b7e7561c779aa65a66fbef35b46720486987 - package: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 From 1d9ea83f231986f43542b0f14a8486108194803c Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Thu, 21 Feb 2019 11:44:36 -0800 Subject: [PATCH 15/15] Release 0.1.2 --- CHANGELOG.md | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91e6accd..df3f31db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,13 +5,21 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- + +## [0.1.2] - 2019-02-21 +### Added +- Fix for delivering events as json or msgpack based events [issue 113](https://github.com/Comcast/caduceus/issues/113) + +### Changed +- Updated to new version of webpa-common library - Remove the worker pool as a fixed number of workers per endpoint and simply cap - the maximum number. -- Fix for webhook shallow copy bug. -- Fix for delivering events as json or msgpack based events -- Fix for webhook update for all fields -- Fix for retry logic so all failures are retried the specified number of times -- Fix for waiting for DNS to resolve prior to listening for webhook updates + the maximum number. Partial fix for [issue 115](https://github.com/Comcast/caduceus/issues/115), [issue 103](https://github.com/Comcast/caduceus/issues/103) +- Fix for webhook shallow copy bug. Partial fix for [issue 115](https://github.com/Comcast/caduceus/issues/115) +- Fix for webhook update for all fields (updated webpa-common code to bring in fix) +- Fix for retry logic so all failures are retried the specified number of times - [issue 91](https://github.com/Comcast/caduceus/issues/91) +- Fix for waiting for DNS to resolve prior to listening for webhook updates - [issue 111](https://github.com/Comcast/caduceus/issues/111) - Fix for cpu spike after about 10 mintues due to worker go routines not finishing. - Fix logic for updating webhooks - Fix for sending the same event multiple times to the same webhook. @@ -30,6 +38,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Initial creation -[Unreleased]: https://github.com/Comcast/caduceus/compare/0.1.1...HEAD +[Unreleased]: https://github.com/Comcast/caduceus/compare/0.1.2...HEAD +[0.1.2]: https://github.com/Comcast/caduceus/compare/0.1.1...0.1.2 [0.1.1]: https://github.com/Comcast/caduceus/compare/0.0.1...0.1.1 [0.0.1]: https://github.com/Comcast/caduceus/compare/0.0.0...0.0.1