Skip to content

Commit

Permalink
Merge pull request #39 from Comcast/feature/profiler-work
Browse files Browse the repository at this point in the history
Feature/profiler work
  • Loading branch information
schmidtw authored Aug 9, 2017
2 parents f40b441 + 266c639 commit 32cd31a
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 54 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ _testmain.go
*.exe
*.test
*.prof

# IDE files
.idea/*
22 changes: 13 additions & 9 deletions src/caduceus/caduceus.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,23 @@ func caduceus(arguments []string) int {
QueueSize: caduceusConfig.JobQueueSize,
}.New()

caduceusProfilerFactory := ServerProfilerFactory{
mainCaduceusProfilerFactory := ServerProfilerFactory{
Frequency: caduceusConfig.ProfilerFrequency,
Duration: caduceusConfig.ProfilerDuration,
QueueSize: caduceusConfig.ProfilerQueueSize,
Logger: logger,
}

// here we create a profiler specifically for our main server handler
caduceusHandlerProfiler, err := mainCaduceusProfilerFactory.New("main")
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to profiler for main caduceus handler: %s\n", err)
return 1
}

childCaduceusProfilerFactory := mainCaduceusProfilerFactory
childCaduceusProfilerFactory.Parent = caduceusHandlerProfiler

tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
timeout := time.Duration(caduceusConfig.SenderClientTimeout) * time.Second

Expand All @@ -122,7 +133,7 @@ func caduceus(arguments []string) int {
QueueSizePerSender: caduceusConfig.SenderQueueSizePerSender,
CutOffPeriod: time.Duration(caduceusConfig.SenderCutOffPeriod) * time.Second,
Linger: time.Duration(caduceusConfig.SenderLinger) * time.Second,
ProfilerFactory: caduceusProfilerFactory,
ProfilerFactory: childCaduceusProfilerFactory,
Logger: logger,
Client: &http.Client{Transport: tr, Timeout: timeout},
}.New()
Expand All @@ -132,13 +143,6 @@ func caduceus(arguments []string) int {
return 1
}

// here we create a profiler specifically for our main server handler
caduceusHandlerProfiler, err := caduceusProfilerFactory.New()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to profiler for main caduceus handler: %s\n", err)
return 1
}

serverWrapper := &ServerHandler{
Logger: logger,
caduceusHandler: &CaduceusHandler{
Expand Down
119 changes: 112 additions & 7 deletions src/caduceus/caduceusProfiler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"encoding/json"
"errors"
"github.com/Comcast/webpa-common/logging"
"math"
"sort"
"sync"
"time"
)
Expand All @@ -10,22 +14,27 @@ type ServerProfilerFactory struct {
Frequency int
Duration int
QueueSize int
Parent ServerProfiler
Logger logging.Logger
}

// New will be used to initialize a new server profiler for caduceus and get
// the gears in motion for aggregating data
func (spf ServerProfilerFactory) New() (serverProfiler ServerProfiler, err error) {
func (spf ServerProfilerFactory) New(name string) (serverProfiler ServerProfiler, err error) {
if spf.Frequency < 1 || spf.Duration < 1 || spf.QueueSize < 1 {
err = errors.New("No parameter to the ServerProfilerFactory can be less than 1.")
return
}

newCaduceusProfiler := &caduceusProfiler{
name: name,
frequency: spf.Frequency,
profilerRing: NewCaduceusRing(spf.Duration),
inChan: make(chan interface{}, spf.QueueSize),
quit: make(chan struct{}),
rwMutex: new(sync.RWMutex),
parent: spf.Parent,
logger: spf.Logger,
}

go newCaduceusProfiler.aggregate(newCaduceusProfiler.quit)
Expand All @@ -43,12 +52,15 @@ type ServerProfiler interface {
type Tick func(time.Duration) <-chan time.Time

type caduceusProfiler struct {
name string
frequency int
tick Tick
profilerRing ServerRing
inChan chan interface{}
quit chan struct{}
rwMutex *sync.RWMutex
parent ServerProfiler
logger logging.Logger
}

// Send will add data that we retrieve onto the
Expand Down Expand Up @@ -89,19 +101,112 @@ func (cp *caduceusProfiler) aggregate(quit <-chan struct{}) {
ticker = cp.tick(time.Duration(cp.frequency) * time.Second)
}

// Send out a stat at the start of time.
if nil != cp.parent {
cp.parent.Send(cp.process(data))
}

for {
select {
case <-ticker:
// add the data to the ring and clear the temporary structure
cp.rwMutex.Lock()
cp.profilerRing.Add(data)
cp.rwMutex.Unlock()
if nil != cp.parent {
// perform some analysis
cp.parent.Send(cp.process(data))
}
data = nil
case inData := <-cp.inChan:
// add the data to a temporary structure
data = append(data, inData)
if nil != cp.parent {
// add the data to a temporary structure
data = append(data, inData)
} else {
// add the data to the ring and clear the temporary structure
cp.rwMutex.Lock()
cp.profilerRing.Add(inData)
cp.rwMutex.Unlock()
}
case <-quit:
return
}
}
}

type int64Array []int64

func (a int64Array) Len() int { return len(a) }
func (a int64Array) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64Array) Less(i, j int) bool { return a[i] < a[j] }

func (cp *caduceusProfiler) process(raw []interface{}) (rv interface{}) {

raw = filterNonTelemetryElements(raw)
n := len(raw)

cs := CaduceusStats{
Name: cp.name,
Time: time.Now().String(),
}

if 0 < n {
// in nanoseconds
latency := make([]int64, n)
processingTime := make([]int64, n)
responseTime := make([]int64, n)
tonnage := 0
var responseTotal, processingTotal, latencyTotal int64

for i, rawElement := range raw {
telemetryData := rawElement.(CaduceusTelemetry)

tonnage += telemetryData.PayloadSize

latency[i] = telemetryData.TimeSent.Sub(telemetryData.TimeReceived).Nanoseconds()
processingTime[i] = telemetryData.TimeOutboundAccepted.Sub(telemetryData.TimeReceived).Nanoseconds()
responseTime[i] = telemetryData.TimeResponded.Sub(telemetryData.TimeSent).Nanoseconds()

latencyTotal += latency[i]
processingTotal += processingTime[i]
responseTotal += responseTime[i]
}
sort.Sort(int64Array(latency))
sort.Sort(int64Array(processingTime))
sort.Sort(int64Array(responseTime))

// TODO There is a pattern for time based stats calculations that should be made common

// get98th returns the 98% indice value.
// example: in an array with length of 100. index 97 would be the 98th.
get98th := func(list []int64) int64 {
return int64(math.Ceil(float64(len(list))*0.98) - 1)
}

cs.EventsSent = n
cs.ProcessingTimePerc98 = time.Duration(processingTime[get98th(processingTime)]).String()
cs.ProcessingTimeAvg = time.Duration(processingTotal / int64(n)).String()
cs.LatencyPerc98 = time.Duration(latency[get98th(latency)]).String()
cs.LatencyAvg = time.Duration(latencyTotal / int64(n)).String()
cs.ResponsePerc98 = time.Duration(responseTime[get98th(responseTime)]).String()
cs.ResponseAvg = time.Duration(responseTotal / int64(n)).String()
}

rv = &cs

b, err := json.Marshal(cs)
if nil == err {
cp.logger.Error("Endpoint Delivery Stats: %s", string(b))
} else {
cp.logger.Error("Endpoint Delivery Stats: %+v", cs)
}

return
}

//Input: An array A of interfaces
//Output: An array A' containing those elements in A that cast to type CaduceusTelemetry
func filterNonTelemetryElements(elements []interface{}) (output []interface{}) {
for _, element := range elements {
if _, isCaduceusTelemetry := element.(CaduceusTelemetry); isCaduceusTelemetry {
output = append(output, element)
}
}
return
}
51 changes: 45 additions & 6 deletions src/caduceus/caduceusProfiler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
//"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -41,23 +42,23 @@ func TestCaduceusProfilerFactory(t *testing.T) {

t.Run("TestCaduceusProfilerFactoryNew", func(t *testing.T) {
require.NotNil(t, testFactory)
testProfiler, err := testFactory.New()
testProfiler, err := testFactory.New("dogbert")
assert.NotNil(testProfiler)
assert.Nil(err)
})

t.Run("TestCaduceusProfilerFactoryNewInvalidParameters", func(t *testing.T) {
require.NotNil(t, testFactory)
testFactory.Frequency = 0
testProfiler, err := testFactory.New()
testProfiler, err := testFactory.New("dogbert")
assert.Nil(testProfiler)
assert.NotNil(err)
})
}

func TestCaduceusProfiler(t *testing.T) {
assert := assert.New(t)
testMsg := "test"
testMsg := CaduceusTelemetry{PayloadSize: 12}
testData := make([]interface{}, 0)
testData = append(testData, testMsg)

Expand All @@ -72,14 +73,15 @@ func TestCaduceusProfiler(t *testing.T) {

// used to mock out a ring that the server profiler uses
fakeRing := new(mockRing)
fakeRing.On("Add", mock.AnythingOfType("[]interface {}")).Run(
fakeRing.On("Add", mock.AnythingOfType("main.CaduceusTelemetry")).Run(
func(args mock.Arguments) {
testWG.Done()
}).Once()
fakeRing.On("Snapshot").Return(testData).Once()

// what we'll use for most of the tests
testProfiler := caduceusProfiler{
name: "catbert",
frequency: 1,
tick: testFunc,
profilerRing: fakeRing,
Expand All @@ -88,6 +90,8 @@ func TestCaduceusProfiler(t *testing.T) {
rwMutex: new(sync.RWMutex),
}

testWG.Add(1)

// start this up for later
go testProfiler.aggregate(testProfiler.quit)

Expand All @@ -99,6 +103,7 @@ func TestCaduceusProfiler(t *testing.T) {

t.Run("TestCaduceusProfilerSendFullQueue", func(t *testing.T) {
fullQueueProfiler := caduceusProfiler{
name: "catbert",
frequency: 1,
profilerRing: NewCaduceusRing(1),
inChan: make(chan interface{}, 1),
Expand All @@ -119,16 +124,50 @@ func TestCaduceusProfiler(t *testing.T) {
// check to see if the data that we put on to the queue earlier is still there
t.Run("TestCaduceusProfilerReport", func(t *testing.T) {
require.NotNil(t, testProfiler)
testWG.Add(1)
testChan <- time.Now()
testWG.Wait()
testResults := testProfiler.Report()

assert.Equal(1, len(testResults))
assert.Equal("test", testResults[0].(string))
assert.Equal(CaduceusTelemetry{PayloadSize: 12}, testResults[0].(CaduceusTelemetry))

fakeRing.AssertExpectations(t)
})

testProfiler.Close()
}

/*
func TestCaduceusProfilerProcess(t *testing.T) {
set := []CaduceusTelemetry{
{
PayloadSize: 100,
TimeReceived: time.Unix(1000, 0),
TimeSent: time.Unix(1000, 10),
},
{
PayloadSize: 200,
TimeReceived: time.Unix(1010, 0),
TimeSent: time.Unix(1010, 12),
},
}
cp := caduceusProfiler{
name: "catbert",
frequency: 1,
profilerRing: NewCaduceusRing(1),
inChan: make(chan interface{}, 1),
quit: make(chan struct{}),
rwMutex: new(sync.RWMutex),
}
inputSet := make([]interface{}, len(set))
for i, v := range set {
inputSet[i] = v
}
out := cp.process(inputSet)
// TODO this is not a test, just a hack - fix me!
fmt.Printf("out: %+v\n", out[0])
}
*/
1 change: 0 additions & 1 deletion src/caduceus/caduceus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func TestCaduceusHandler(t *testing.T) {
fakeSenderWrapper.On("Queue", mock.AnythingOfType("CaduceusRequest")).Return().Once()

fakeProfiler := new(mockServerProfiler)
fakeProfiler.On("Send", mock.AnythingOfType("CaduceusRequest")).Return(nil).Once()

testHandler := CaduceusHandler{
handlerProfiler: fakeProfiler,
Expand Down
Loading

0 comments on commit 32cd31a

Please sign in to comment.