Skip to content

Commit

Permalink
added metrics on deadLetterHandler, refactored HandleDeadLetter inter… (
Browse files Browse the repository at this point in the history
#122)

* added metrics on deadLetterHandler, refactored HandleDeadLetter interface to receive new DeadLetterMessageHandler type

* fix dead letter test and a build error

* added documentation for DeadLetterMessageHandler, also fixed poison spelling throughout code

* retrigger build
  • Loading branch information
yuvmendel authored and Guy Baron committed Aug 18, 2019
1 parent 6c2a9e6 commit 58b7cec
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 18 deletions.
2 changes: 1 addition & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type Saga interface {

//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
HandleDeadletter(handler DeadLetterMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

Expand Down
11 changes: 8 additions & 3 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DefaultBus struct {
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -548,8 +548,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) {
b.deadletterHandler = handler
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//ReturnDeadToQueue returns a message to its original destination
Expand Down Expand Up @@ -691,6 +691,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}

func (b *DefaultBus) bindQueue(topic, exchange string) error {
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
}
Expand Down
16 changes: 15 additions & 1 deletion gbus/message_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"
Expand All @@ -9,9 +11,21 @@ import (
//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//DeadLetterMessageHandler signature for dead letter handler
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error

//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
return nameFromFunc(mg)
}

//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
func (dlmg DeadLetterMessageHandler) Name() string {
return nameFromFunc(dlmg)
}

func nameFromFunc(function interface{}) string {
funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
splits := strings.Split(funName, ".")
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
return fn
Expand Down
7 changes: 4 additions & 3 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gbus

import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -36,7 +35,7 @@ type worker struct {
handlersLock *sync.Mutex
registrations []*Registration
rpcHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
b *DefaultBus
serializer Serializer
txProvider TxProvider
Expand Down Expand Up @@ -215,7 +214,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
_ = worker.reject(true, delivery)
return
}
err := worker.deadletterHandler(tx, delivery)
err := metrics.RunHandlerWithMetric(func() error {
return worker.deadletterHandler(tx, delivery)
}, worker.deadletterHandler.Name(), worker.log())
var reject bool
if err != nil {
worker.log().WithError(err).Error("failed handling deadletter")
Expand Down
32 changes: 25 additions & 7 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ func TestDeadlettering(t *testing.T) {

var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
poison := gbus.NewBusMessage(PoisonMessage{})
service1 := createNamedBusForTest(testSvc1)
deadletterSvc := createNamedBusForTest("deadletterSvc")

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
waitgroup.Done()
return nil
}
Expand All @@ -252,30 +252,48 @@ func TestDeadlettering(t *testing.T) {
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, poison)
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))

waitgroup.Wait()
count, _ := metrics.GetRejectedMessagesValue()
if count != 1 {
t.Error("Should have one rejected message")
}

//because deadMessageHandler is an anonymous function and is registered first its name will be "func1"
handlerMetrics := metrics.GetHandlerMetrics("func1")
if handlerMetrics == nil {
t.Fatal("DeadLetterHandler should be registered for metrics")
}
failureCount, _ := handlerMetrics.GetFailureCount()
if failureCount != 0 {
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount)
}
handlerMetrics = metrics.GetHandlerMetrics("func2")
if handlerMetrics == nil {
t.Fatal("faulty should be registered for metrics")
}
failureCount, _ = handlerMetrics.GetFailureCount()
if failureCount == 1 {
t.Errorf("faulty should have failed once, but it failed %f times", failureCount)
}
}

func TestReturnDeadToQueue(t *testing.T) {

var visited bool
proceed := make(chan bool, 0)
poision := gbus.NewBusMessage(Command1{})
poison := gbus.NewBusMessage(Command1{})

service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poision)
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poison)
deadletterSvc.ReturnDeadToQueue(context.Background(), &pub)
return nil
}
Expand All @@ -297,7 +315,7 @@ func TestReturnDeadToQueue(t *testing.T) {
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, poison)

select {
case <-proceed:
Expand Down
6 changes: 3 additions & 3 deletions tests/testMessages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ var _ gbus.Message = &Reply2{}
var _ gbus.Message = &Event1{}
var _ gbus.Message = &Event2{}

type PoisionMessage struct {
type PoisonMessage struct {
}

func (PoisionMessage) SchemaName() string {
//an empty schema name will result in a message being treated as poision
func (PoisonMessage) SchemaName() string {
//an empty schema name will result in a message being treated as poison
return ""
}

Expand Down

0 comments on commit 58b7cec

Please sign in to comment.