Skip to content

Commit

Permalink
Merge branch 'master' into sendStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidtw authored Feb 22, 2019
2 parents b67dce5 + 1d9ea83 commit 399d299
Show file tree
Hide file tree
Showing 16 changed files with 337 additions and 510 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ after_success:
- docker cp build:/versionno.txt .
- BINARY_NAME=`ls x86_64/`
- TRAVIS_TAG=`cat versionno.txt`
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then docker build -t caduceus:local .; fi'


deploy:
provider: releases
Expand Down
21 changes: 17 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,21 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Fix for webhook update for all fields
- Fix for retry logic so all failures are retried the specified number of times
- Fix for waiting for DNS to resolve prior to listening for webhook updates
### Changed
-

## [0.1.2] - 2019-02-21
### Added
- Fix for delivering events as json or msgpack based events [issue 113](https://github.com/Comcast/caduceus/issues/113)

### Changed
- Updated to new version of webpa-common library
- Remove the worker pool as a fixed number of workers per endpoint and simply cap
the maximum number. Partial fix for [issue 115](https://github.com/Comcast/caduceus/issues/115), [issue 103](https://github.com/Comcast/caduceus/issues/103)
- Fix for webhook shallow copy bug. Partial fix for [issue 115](https://github.com/Comcast/caduceus/issues/115)
- Fix for webhook update for all fields (updated webpa-common code to bring in fix)
- Fix for retry logic so all failures are retried the specified number of times - [issue 91](https://github.com/Comcast/caduceus/issues/91)
- Fix for waiting for DNS to resolve prior to listening for webhook updates - [issue 111](https://github.com/Comcast/caduceus/issues/111)
- Fix for cpu spike after about 10 mintues due to worker go routines not finishing.
- Fix logic for updating webhooks
- Fix for sending the same event multiple times to the same webhook.
Expand All @@ -26,6 +38,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- Initial creation

[Unreleased]: https://github.com/Comcast/caduceus/compare/0.1.1...HEAD
[Unreleased]: https://github.com/Comcast/caduceus/compare/0.1.2...HEAD
[0.1.2]: https://github.com/Comcast/caduceus/compare/0.1.1...0.1.2
[0.1.1]: https://github.com/Comcast/caduceus/compare/0.0.1...0.1.1
[0.0.1]: https://github.com/Comcast/caduceus/compare/0.0.0...0.0.1
21 changes: 21 additions & 0 deletions Dockerfile.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM golang:alpine as builder
MAINTAINER Jack Murdock <jack_murdock@comcast.com>

# build the binary
WORKDIR /go/src
COPY src/ /go/src/

RUN go build -o caduceus_linux_amd64 caduceus

EXPOSE 6000 6001 6002
RUN mkdir -p /etc/caduceus
VOLUME /etc/caduceus

# the actual image
FROM alpine:latest
RUN apk --no-cache add ca-certificates
RUN mkdir -p /etc/caduceus
VOLUME /etc/caduceus
WORKDIR /root/
COPY --from=builder /go/src/caduceus_linux_amd64 .
ENTRYPOINT ["./caduceus_linux_amd64"]
2 changes: 1 addition & 1 deletion caduceus.spec.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ License: ASL 2.0
URL: https://github.com/Comcast/%{name}
Source0: %{name}-%{_fullver}.tar.gz

BuildRequires: golang >= 1.8
BuildRequires: golang >= 1.11
Requires: supervisor

Provides: %{name}
Expand Down
14 changes: 9 additions & 5 deletions example-caduceus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ soa:

numWorkerThreads: 10
jobQueueSize: 10
senderNumWorkersPerSender: 10
senderQueueSizePerSender: 50
senderCutOffPeriod: 30
senderLinger: 180
senderClientTimeout: 60
sender:
numWorkersPerSender: 10
queueSizePerSender: 50
cutOffPeriod: 30s
linger: 180s
clientTimeout: 60s
deliveryRetries: 1
deliveryInterval: 10ms
responseHeaderTimeout: 10s
profilerFrequency: 15
profilerDuration: 15
profilerQueueSize: 100
Expand Down
24 changes: 11 additions & 13 deletions src/caduceus/caduceus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,23 @@ func caduceus(arguments []string) int {

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConnsPerHost: caduceusConfig.SenderNumWorkersPerSender,
ResponseHeaderTimeout: 10 * time.Second, // TODO Make this configurable
MaxIdleConnsPerHost: caduceusConfig.Sender.NumWorkersPerSender,
ResponseHeaderTimeout: caduceusConfig.Sender.ResponseHeaderTimeout,
IdleConnTimeout: caduceusConfig.Sender.IdleConnTimeout,
}

timeout := time.Duration(caduceusConfig.SenderClientTimeout) * time.Second

caduceusSenderWrapper, err := SenderWrapperFactory{
NumWorkersPerSender: caduceusConfig.SenderNumWorkersPerSender,
QueueSizePerSender: caduceusConfig.SenderQueueSizePerSender,
CutOffPeriod: time.Duration(caduceusConfig.SenderCutOffPeriod) * time.Second,
Linger: time.Duration(caduceusConfig.SenderLinger) * time.Second,
DeliveryRetries: 1, // TODO Make this configurable
DeliveryInterval: 10 * time.Millisecond, // TODO Make this configurable
NumWorkersPerSender: caduceusConfig.Sender.NumWorkersPerSender,
QueueSizePerSender: caduceusConfig.Sender.QueueSizePerSender,
CutOffPeriod: caduceusConfig.Sender.CutOffPeriod,
Linger: caduceusConfig.Sender.Linger,
DeliveryRetries: caduceusConfig.Sender.DeliveryRetries,
DeliveryInterval: caduceusConfig.Sender.DeliveryInterval,
MetricsRegistry: metricsRegistry,
Logger: logger,
Sender: (&http.Client{
Transport: tr,
Timeout: timeout,
Timeout: caduceusConfig.Sender.ClientTimeout,
}).Do,
}.New()

Expand Down Expand Up @@ -297,8 +296,7 @@ func caduceus(arguments []string) int {
exit = true
}
}
s := server.SignalWait(infoLog, signals, os.Interrupt, os.Kill)
errorLog.Log(logging.MessageKey(), "exiting due to signal", "signal", s)

close(shutdown)
waitGroup.Wait()

Expand Down
68 changes: 18 additions & 50 deletions src/caduceus/caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,35 @@
package main

import (
"errors"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/secure"
"github.com/Comcast/webpa-common/secure/key"
"github.com/Comcast/webpa-common/wrp"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"time"
)

// Below is the struct we're using to contain the data from a provided config file
// TODO: Try to figure out how to make bucket ranges configurable
type CaduceusConfig struct {
AuthHeader []string
NumWorkerThreads int
JobQueueSize int
SenderNumWorkersPerSender int
SenderQueueSizePerSender int
SenderCutOffPeriod int
SenderLinger int
SenderClientTimeout int
JWTValidators []JWTValidator
AuthHeader []string
NumWorkerThreads int
JobQueueSize int
Sender SenderConfig
JWTValidators []JWTValidator
}

type SenderConfig struct {
NumWorkersPerSender int
QueueSizePerSender int
CutOffPeriod time.Duration
Linger time.Duration
ClientTimeout time.Duration
ResponseHeaderTimeout time.Duration
IdleConnTimeout time.Duration
DeliveryRetries int
DeliveryInterval time.Duration
}

type JWTValidator struct {
Expand Down Expand Up @@ -69,43 +77,3 @@ func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) {
" to sender")
ch.senderWrapper.Queue(msg)
}

// Below is the struct and implementation of our worker pool factory
type WorkerPoolFactory struct {
NumWorkers int
QueueSize int
}

func (wpf WorkerPoolFactory) New() (wp *WorkerPool) {
jobs := make(chan func(workerID int), wpf.QueueSize)

for i := 0; i < wpf.NumWorkers; i++ {
go func(id int) {
for f := range jobs {
f(id)
}
}(i)
}

wp = &WorkerPool{
jobs: jobs,
}

return
}

// Below is the struct and implementation of our worker pool
// It utilizes a non-blocking channel, so we throw away any requests that exceed
// the channel's limit (indicated by its buffer size)
type WorkerPool struct {
jobs chan func(workerID int)
}

func (wp *WorkerPool) Send(inFunc func(workerID int)) error {
select {
case wp.jobs <- inFunc:
return nil
default:
return errors.New("Worker pool channel full.")
}
}
39 changes: 0 additions & 39 deletions src/caduceus/caduceus_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,13 @@
package main

import (
"sync"
"testing"

"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestWorkerPool(t *testing.T) {
assert := assert.New(t)

workerPool := WorkerPoolFactory{
NumWorkers: 1,
QueueSize: 1,
}.New()

t.Run("TestWorkerPoolSend", func(t *testing.T) {
testWG := new(sync.WaitGroup)
testWG.Add(1)

require.NotNil(t, workerPool)
err := workerPool.Send(func(workerID int) {
testWG.Done()
})

testWG.Wait()
assert.Nil(err)
})

workerPool = WorkerPoolFactory{
NumWorkers: 0,
QueueSize: 0,
}.New()

t.Run("TestWorkerPoolFullQueue", func(t *testing.T) {
require.NotNil(t, workerPool)
err := workerPool.Send(func(workerID int) {
assert.Fail("This should not execute because our worker queue is full and we have no workers.")
})

assert.NotNil(err)
})
}

func TestCaduceusHandler(t *testing.T) {
logger := logging.DefaultLogger()

Expand Down
2 changes: 1 addition & 1 deletion src/caduceus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Metrics() []xmetrics.Metric {
Name: DeliveryRetryCounter,
Help: "Number of delivery retries made",
Type: "counter",
LabelNames: []string{"url", "code", "event"},
LabelNames: []string{"url", "event"},
},
{
Name: DeliveryCounter,
Expand Down
Loading

0 comments on commit 399d299

Please sign in to comment.