Skip to content

Commit

Permalink
Merge pull request #133 from githubsands/metric_for_content_type
Browse files Browse the repository at this point in the history
caduceus: add metric - incoming content type
  • Loading branch information
schmidtw authored Jun 5, 2019
2 parents f07deb3 + a242e6d commit 5d93a81
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/caduceus/caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type CaduceusHandler struct {
}

func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) {

logging.Info(ch).Log("workerID", workerID, logging.MessageKey(), "Worker received a request, now passing"+
" to sender")

ch.senderWrapper.Queue(msg)
}
3 changes: 2 additions & 1 deletion src/caduceus/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/Comcast/wrp-go/wrp"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"
)

// Below is the struct that will implement our ServeHTTP method
Expand Down Expand Up @@ -89,6 +89,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
debugLog.Log(messageKey, "Invalid payload format.\n")
return
}

sh.caduceusHandler.HandleRequest(0, fixWrp(msg))

// return a 202
Expand Down
18 changes: 18 additions & 0 deletions src/caduceus/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -65,6 +66,8 @@ func exampleRequest(list ...string) *http.Request {
}

func TestServerHandler(t *testing.T) {
fmt.Print("TestingeServerHandler")

assert := assert.New(t)

logger := logging.DefaultLogger()
Expand Down Expand Up @@ -100,6 +103,8 @@ func TestServerHandler(t *testing.T) {
}

func TestServerHandlerFixWrp(t *testing.T) {
fmt.Printf("TestServerHandlerFixWrp")

assert := assert.New(t)

logger := logging.DefaultLogger()
Expand All @@ -113,6 +118,11 @@ func TestServerHandlerFixWrp(t *testing.T) {
fakeQueueDepth := new(mockGauge)
fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2)

fakeIncomingContentTypeCount := new(mockCounter)
fakeIncomingContentTypeCount.On("With", []string{"content_type", "application/msgpack"}).Return(fakeIncomingContentTypeCount)
fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount)
fakeIncomingContentTypeCount.On("Add", 1.0).Return()

serverWrapper := &ServerHandler{
Logger: logger,
caduceusHandler: fakeHandler,
Expand All @@ -135,6 +145,8 @@ func TestServerHandlerFixWrp(t *testing.T) {
}

func TestServerHandlerFull(t *testing.T) {
fmt.Printf("TestServerHandlerFull")

assert := assert.New(t)

logger := logging.DefaultLogger()
Expand Down Expand Up @@ -167,6 +179,8 @@ func TestServerHandlerFull(t *testing.T) {
}

func TestServerEmptyPayload(t *testing.T) {
fmt.Printf("TestServerEmptyPayLoad")

assert := assert.New(t)

var buffer bytes.Buffer
Expand Down Expand Up @@ -204,6 +218,8 @@ func TestServerEmptyPayload(t *testing.T) {
}

func TestServerUnableToReadBody(t *testing.T) {
fmt.Printf("TestServerUnableToReadBody")

assert := assert.New(t)

var buffer bytes.Buffer
Expand Down Expand Up @@ -243,6 +259,8 @@ func TestServerUnableToReadBody(t *testing.T) {
}

func TestServerInvalidBody(t *testing.T) {
fmt.Printf("TestServerInvalidBody")

assert := assert.New(t)

r := bytes.NewReader([]byte("Invalid payload."))
Expand Down
13 changes: 13 additions & 0 deletions src/caduceus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,16 @@ func Metrics() []xmetrics.Metric {
},
}
}

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "network_err")
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.contentTypeCounter = m.NewCounter(IncomingContentTypeCounter)
}
30 changes: 7 additions & 23 deletions src/caduceus/outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type CaduceusOutboundSender struct {
droppedNetworkErrCounter metrics.Counter
droppedInvalidConfig metrics.Counter
cutOffCounter metrics.Counter
contentTypeCounter metrics.Counter
queueDepthGauge metrics.Gauge
eventType metrics.Counter
wg sync.WaitGroup
Expand Down Expand Up @@ -187,29 +188,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {
// Don't share the secret with others when there is an error.
caduceusOutboundSender.failureMsg.Original.Config.Secret = "XxxxxX"

caduceusOutboundSender.deliveryCounter = osf.MetricsRegistry.NewCounter(DeliveryCounter)
caduceusOutboundSender.deliveryRetryCounter = osf.MetricsRegistry.NewCounter(DeliveryRetryCounter)

caduceusOutboundSender.cutOffCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerCounter).With("url", caduceusOutboundSender.id)

caduceusOutboundSender.droppedQueueFullCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "queue_full")

caduceusOutboundSender.droppedExpiredCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "expired")

caduceusOutboundSender.droppedCutoffCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "cut_off")

caduceusOutboundSender.droppedInvalidConfig = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "invalid_config")

caduceusOutboundSender.droppedNetworkErrCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "network_err")

caduceusOutboundSender.queueDepthGauge = osf.MetricsRegistry.
NewGauge(OutgoingQueueDepth).With("url", caduceusOutboundSender.id)
CreateOutbounderMetrics(osf.MetricsRegistry, caduceusOutboundSender)

// Give us some head room so that we don't block when we get near the
// completely full point.
Expand Down Expand Up @@ -441,6 +420,7 @@ func (obs *CaduceusOutboundSender) dispatcher() {
continue
}
obs.workers.Acquire()

go obs.send(secret, accept, msg)
}

Expand All @@ -459,6 +439,8 @@ func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Mess
body := payload
var payloadReader *bytes.Reader

obs.contentTypeCounter.With("content_type", strings.TrimLeft(msg.ContentType, "application/")).Add(1.0)

// Use the internal content type unless the accept type is wrp
contentType := msg.ContentType
switch acceptType {
Expand Down Expand Up @@ -588,6 +570,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() {
req.Header.Set("X-Webpa-Signature", sig)
}

// record content type, json.
obs.contentTypeCounter.With("content_type", "json").Add(1.0)
resp, err := obs.sender(req)
if nil != err {
// Failure
Expand Down
53 changes: 41 additions & 12 deletions src/caduceus/outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"

//"github.com/stretchr/testify/mock"
"io"
"io/ioutil"
Expand Down Expand Up @@ -123,18 +124,30 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("Add", 1.0).Return()

// test queue depth
// IncomingContentType cases
fakeContentType := new(mockCounter)
fakeContentType.On("With", []string{"content_type", "msgpack"}).Return(fakeContentType)
fakeContentType.On("With", []string{"content_type", "json"}).Return(fakeContentType)
fakeContentType.On("With", []string{"content_type", "http"}).Return(fakeContentType)
fakeContentType.On("With", []string{"content_type", "other"}).Return(fakeContentType)
fakeContentType.On("Add", 1.0).Return()

// QueueDepth case
fakeQdepth := new(mockGauge)
fakeQdepth.On("With", []string{"url", w.Config.URL}).Return(fakeQdepth)
fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return()

// build a registry and register all fake metrics
// Build a registry and register all fake metrics, these are synymous with the metrics in
// metrics.go
//
// If a new metric within outboundsender is created it must be added here
fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
//fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeEventType)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeContentType)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth)

return &OutboundSenderFactory{
Expand Down Expand Up @@ -175,19 +188,35 @@ func TestSimpleWrp(t *testing.T) {
// queue case 1
req := simpleRequest()
req.Destination = "event:iot"
fmt.Printf("Queue case 1:\n %v\n", spew.Sprint(req.Destination))
fmt.Printf("Queue case 1:\n %v\n", spew.Sprint(req))
obs.Queue(req)

r2 := simpleRequest()
r2.Destination = "event:test"
fmt.Printf("\nQueue case 2:\n %v\n", spew.Sprint(r2.Destination))
obs.Queue(r2)
req = simpleRequest()
req.Destination = "event:test"
fmt.Printf("\nQueue case 2:\n %v\n", spew.Sprint(req))
obs.Queue(req)

// queue case 3
r3 := simpleRequest()
r3.Destination = "event:no-match"
fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(r3.Destination))
obs.Queue(r3)
req = simpleRequest()
req.Destination = "event:no-match"
fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(req))
obs.Queue(req)

// queue case 4
req = simpleRequest()
req.ContentType = "application/json"
fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(req))
obs.Queue(req)

req = simpleRequest()
req.ContentType = "application/http"
fmt.Printf("\nQueue case 4:\n %v\n", spew.Sprint(req))
obs.Queue(req)

req = simpleRequest()
req.ContentType = "unknown"
fmt.Printf("\nQueue case 4:\n %v\n", spew.Sprint(req))
obs.Queue(req)

obs.Shutdown(true)

Expand Down
2 changes: 0 additions & 2 deletions src/caduceus/senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ type SenderWrapperFactory struct {
// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry

ContentTypeCounter metrics.Counter

// The metrics counter for dropped messages due to invalid payloads
DroppedMsgCounter metrics.Counter

Expand Down
12 changes: 6 additions & 6 deletions src/caduceus/senderWrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func (t *swTransport) RoundTrip(req *http.Request) (*http.Response, error) {
}

func getFakeFactory() *SenderWrapperFactory {
fakeICTC := new(mockCounter)
fakeICTC.On("With", []string{"content_type", "msgpack"}).Return(fakeICTC).
On("With", []string{"content_type", "unknown"}).Return(fakeICTC).
On("Add", 1.0).Return()

fakeDDTIP := new(mockCounter)
fakeDDTIP.On("Add", 1.0).Return()
Expand Down Expand Up @@ -101,15 +97,19 @@ func getFakeFactory() *SenderWrapperFactory {
On("With", []string{"event", "iot"}).Return(fakeIgnore).
On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore).
On("With", []string{"event", "bob/magic/dog"}).Return(fakeIgnore).
On("With", []string{"event", "unknown"}).Return(fakeIgnore)
On("With", []string{"event", "unknown"}).Return(fakeIgnore).
On("With", []string{"content_type", "msgpack"}).Return(fakeIgnore).
On("With", []string{"content_type", "json"}).Return(fakeIgnore).
On("With", []string{"content_type", "http"}).Return(fakeIgnore).
On("With", []string{"content_type", "other"}).Return(fakeIgnore)

fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", DropsDueToInvalidPayload).Return(fakeDDTIP)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge)

Expand Down

0 comments on commit 5d93a81

Please sign in to comment.