Skip to content

Commit

Permalink
feat: promauto and pushgateway measurement middlewares (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhck authored Jun 1, 2023
1 parent 1809ff7 commit c48b800
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ jobs:
with:
fetch-depth: 0
- name: release
uses: cycjimmy/semantic-release-action@v2
uses: cycjimmy/semantic-release-action@v3
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand Down
8 changes: 4 additions & 4 deletions docs/docs/middlewares/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ package main
import (
"context"
"fmt"
consumer2 "github.com/honestbank/kp/v2/consumer"
"github.com/honestbank/kp/v2/middlewares/consumer"
"time"

backoff_policy "github.com/honestbank/backoff-policy"
"github.com/honestbank/backoff-policy/policies"
v2 "github.com/honestbank/kp/v2"
"github.com/honestbank/kp/v2/middlewares/measurement"
consumer2 "github.com/honestbank/kp/v2/consumer"
"github.com/honestbank/kp/v2/middlewares/consumer"
"github.com/honestbank/kp/v2/middlewares/measurement/pushgateway"
)

type UserLoggedInEvent struct {
Expand All @@ -52,7 +52,7 @@ func main() {
applicationName := "send-login-notification-worker"
kp := v2.New[kafka.Message]()
kp.AddMiddleware(consumer.NewConsumerMiddleware(getConsumer()))
kp.AddMiddleware(measurement.NewMeasurementMiddleware("http://path/to/push/gateway", applicationName)) // simply add a measurement middleware to get free metrics
kp.AddMiddleware(pushgateway.NewMeasurementMiddleware("http://path/to/push/gateway", applicationName)) // simply add a measurement middleware to get free metrics
err := kp.Process(processUserLoggedInEvent)
if err != nil {
panic(err) // do better error handling
Expand Down
39 changes: 39 additions & 0 deletions v2/middlewares/measurement/promauto/measurement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package promauto

import (
"context"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/honestbank/kp/v2/middlewares"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var operationDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "kp_operation_duration_milliseconds",
Buckets: []float64{1, 5, 50, 200, 500, 1_000, 2_000, 5_000, 15_000, 45_000},
}, []string{"application_name", "result", "error"})

type measurementMiddleware struct {
applicationName string
}

func (m measurementMiddleware) Process(ctx context.Context, item *kafka.Message, next func(ctx context.Context, item *kafka.Message) error) error {
startTime := time.Now()
err := next(ctx, item)
if err != nil {
operationDuration.WithLabelValues(m.applicationName, "failure", err.Error()).Observe(float64(time.Since(startTime).Milliseconds()))

return err
}
operationDuration.WithLabelValues(m.applicationName, "success", "").Observe(float64(time.Since(startTime).Milliseconds()))

return err
}

func NewMeasurementMiddleware(applicationName string) middlewares.KPMiddleware[*kafka.Message] {
return measurementMiddleware{
applicationName: applicationName,
}
}
66 changes: 66 additions & 0 deletions v2/middlewares/measurement/promauto/measurement_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package promauto_test

import (
"context"
"errors"
"io"
"net/http/httptest"
"regexp"
"strings"
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/honestbank/kp/v2/middlewares/measurement/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
)

func TestMeasure(t *testing.T) {
t.Run("calls next", func(t *testing.T) {
called := false
promauto.NewMeasurementMiddleware("integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
called = true
time.Sleep(time.Millisecond * 550)
return nil
})
assert.True(t, called)
})
t.Run("works if message errors", func(t *testing.T) {
promauto.NewMeasurementMiddleware("integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
return errors.New("some error")
})
})
t.Run("works even when push gateway url is not set", func(t *testing.T) {
called := false
promauto.NewMeasurementMiddleware("integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
called = true
time.Sleep(time.Millisecond * 550)

return errors.New("some random error")
})
assert.True(t, called)
})
time.Sleep(time.Second * 6)
t.Run("pushes to prometheus", func(t *testing.T) {
success200, _ := regexp.Compile(`kp_operation_duration_milliseconds_bucket\{.+success.+le="200"\}.+`)
success500, _ := regexp.Compile(`kp_operation_duration_milliseconds_bucket\{.+success.+le="1000"\}.+`)
recorder := httptest.NewRecorder()
handler := promhttp.Handler()
promauto.NewMeasurementMiddleware("integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
time.Sleep(time.Millisecond * 550)

return nil
})
handler.ServeHTTP(recorder, httptest.NewRequest("GET", "/metrics", nil))
bytes, err := io.ReadAll(recorder.Result().Body)
assert.NoError(t, err)
scrapedValues := string(bytes)
successMatches200 := success200.FindStringSubmatch(scrapedValues)
successMatches500 := success500.FindStringSubmatch(scrapedValues)
assert.Len(t, successMatches200, 1)
assert.True(t, strings.HasSuffix(successMatches200[0], "0"))
assert.Len(t, successMatches500, 1)
assert.False(t, strings.HasSuffix(successMatches500[0], "0"))
})
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package measurement
package pushgateway

import (
"context"
"fmt"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/honestbank/kp/v2/middlewares"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"

"github.com/honestbank/kp/v2/middlewares"
)

var operationDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build integration_test

package measurement_test
package pushgateway_test

import (
"context"
Expand All @@ -12,30 +12,29 @@ import (
"testing"
"time"

"github.com/honestbank/kp/v2/middlewares/measurement"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/honestbank/kp/v2/middlewares/measurement/pushgateway"
"github.com/stretchr/testify/assert"
)

func TestMeasure(t *testing.T) {
t.Run("calls next", func(t *testing.T) {
called := false
measurement.NewMeasurementMiddleware("localhost:9091", "integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
pushgateway.NewMeasurementMiddleware("localhost:9091", "integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
called = true
time.Sleep(time.Millisecond * 550)
return nil
})
assert.True(t, called)
})
t.Run("works if message errors", func(t *testing.T) {
measurement.NewMeasurementMiddleware("localhost:9091", "integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
pushgateway.NewMeasurementMiddleware("localhost:9091", "integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
return errors.New("some error")
})
})
t.Run("works even when push gateway url is not set", func(t *testing.T) {
called := false
measurement.NewMeasurementMiddleware("", "integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
pushgateway.NewMeasurementMiddleware("", "integration_test").Process(context.Background(), nil, func(ctx context.Context, msg *kafka.Message) error {
called = true
time.Sleep(time.Millisecond * 550)
return errors.New("some random error")
Expand Down

0 comments on commit c48b800

Please sign in to comment.