Skip to content

Commit

Permalink
Add metrics eventDelivery status (#506)
Browse files Browse the repository at this point in the history
* add metrics eventdelivery status

* remove unwanted files

* remove eventstatus struct

* fix: memqueue test

* chore: clean up
  • Loading branch information
ogbanugot authored Mar 7, 2022
1 parent b3b6016 commit 190c23e
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 12 deletions.
9 changes: 9 additions & 0 deletions datastore/badger/event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func (e *eventDeliveryRepo) FindEventDeliveriesByEventID(ctx context.Context, ev
return deliveries, err
}

func (e *eventDeliveryRepo) CountDeliveriesByStatus(ctx context.Context, status datastore.EventDeliveryStatus) (int64, error) {

count, err := e.db.Count(&datastore.EventDelivery{}, badgerhold.Where("Status").Eq(status))
if err != nil {
return 0, err
}
return int64(count), err
}

func (e *eventDeliveryRepo) UpdateStatusOfEventDelivery(ctx context.Context, delivery datastore.EventDelivery, status datastore.EventDeliveryStatus) error {
delivery.Status = status
delivery.UpdatedAt = primitive.NewDateTimeFromTime(time.Now())
Expand Down
13 changes: 13 additions & 0 deletions datastore/mongo/event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ func (db *eventDeliveryRepo) FindEventDeliveriesByEventID(ctx context.Context,
return deliveries, nil
}

func (db *eventDeliveryRepo) CountDeliveriesByStatus(ctx context.Context,
status datastore.EventDeliveryStatus) (int64, error) {

filter := bson.M{"status": status, "document_status": datastore.ActiveDocumentStatus}

count, err := db.inner.CountDocuments(ctx, filter, nil)
if err != nil {
return 0, err
}

return count, nil
}

func (db *eventDeliveryRepo) UpdateStatusOfEventDelivery(ctx context.Context,
e datastore.EventDelivery, status datastore.EventDeliveryStatus) error {

Expand Down
2 changes: 1 addition & 1 deletion datastore/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *Client) ensureMongoIndices() {
c.ensureIndex(EventCollection, "event_type", false, nil)
c.ensureIndex(EventCollection, "app_metadata.uid", false, nil)
c.ensureIndex(AppCollections, "group_id", false, nil)

c.ensureIndex(EventDeliveryCollection, "status", false, nil)
c.ensureCompoundIndex(AppCollections)
c.ensureCompoundIndex(EventCollection)
c.ensureCompoundIndex(EventDeliveryCollection)
Expand Down
1 change: 1 addition & 0 deletions datastore/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type EventDeliveryRepository interface {
FindEventDeliveryByID(context.Context, string) (*EventDelivery, error)
FindEventDeliveriesByIDs(context.Context, []string) ([]EventDelivery, error)
FindEventDeliveriesByEventID(context.Context, string) ([]EventDelivery, error)
CountDeliveriesByStatus(context.Context, EventDeliveryStatus) (int64, error)
UpdateStatusOfEventDelivery(context.Context, EventDelivery, EventDeliveryStatus) error
UpdateEventDeliveryWithAttempt(context.Context, EventDelivery, DeliveryAttempt) error
LoadEventDeliveriesPaged(context.Context, string, string, string, []EventDeliveryStatus, SearchParams, Pageable) ([]EventDelivery, PaginationData, error)
Expand Down
19 changes: 17 additions & 2 deletions mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions queue/memqueue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ func TestWrite(t *testing.T) {
if err != nil {
t.Fatalf("Failed to write to queue: %v", err)
}
queueLength, err := eventQueue.Consumer().Queue().Len()

if err != nil {
t.Fatalf("Failed to get queue length: %v", err)
}
if queueLength != tc.queueLen {
t.Fatalf("Length = %q, Want: %v", queueLength, tc.queueLen)

}

})
}
Expand Down
76 changes: 76 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/queue"
memqueue "github.com/frain-dev/convoy/queue/memqueue"
redisqueue "github.com/frain-dev/convoy/queue/redis"
Expand Down Expand Up @@ -72,6 +73,81 @@ func RegisterQueueMetrics(q queue.Queuer, cfg config.Configuration) {
}
}

func RegisterDBMetrics(app *applicationHandler) {
ctx := context.Background()
err := prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Subsystem: "eventdelivery",
Name: "scheduled",
Help: "Number of eventDeliveries in the Scheduled state.",
},
func() float64 {
count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.ScheduledEventStatus)
if err != nil {
log.Errorf("Error fetching eventdelivery status scheduled: %v", err)
}
return float64(count)
},
))
if err != nil {
log.Errorf("Error registering eventdelivery Scheduled: %v", err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Subsystem: "eventdelivery",
Name: "processing",
Help: "Number of eventDeliveries in the Processing state.",
},
func() float64 {
count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.ProcessingEventStatus)
if err != nil {
log.Errorf("Error fetching eventdelivery status Processing: %v", err)
}
return float64(count)
},
))
if err != nil {
log.Errorf("Error registering eventdelivery Processing: %v", err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Subsystem: "eventdelivery",
Name: "retry",
Help: "Number of eventDeliveries in the Retry state.",
},
func() float64 {
count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.RetryEventStatus)
if err != nil {
log.Errorf("Error fetching eventdelivery status Retry: %v", err)
}
return float64(count)
},
))
if err != nil {
log.Errorf("Error registering eventdelivery Retry: %v", err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Subsystem: "eventdelivery",
Name: "discarded",
Help: "Number of eventDeliveries in the Discarded state.",
},
func() float64 {
count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.DiscardedEventStatus)
if err != nil {
log.Errorf("Error fetching eventdelivery status Discarded: %v", err)
}
return float64(count)
},
))
if err != nil {
log.Errorf("Error registering eventdelivery Discarded: %v", err)
}
}

func queueLength(q queue.Queuer, cfg config.Configuration) (int, error) {
switch cfg.Queue.Type {
case config.RedisQueueProvider:
Expand Down
1 change: 1 addition & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func New(cfg config.Configuration,
Addr: fmt.Sprintf(":%d", cfg.Server.HTTP.Port),
}

RegisterDBMetrics(app)
RegisterQueueMetrics(eventQueue, cfg)
worker.RegisterWorkerMetrics(eventQueue, cfg)
prometheus.MustRegister(requestDuration)
Expand Down

0 comments on commit 190c23e

Please sign in to comment.