From 190c23ec85c42465d066844f52291630e5dff88b Mon Sep 17 00:00:00 2001 From: Ogban Ugot Date: Mon, 7 Mar 2022 20:08:02 +0100 Subject: [PATCH] Add metrics eventDelivery status (#506) * add metrics eventdelivery status * remove unwanted files * remove eventstatus struct * fix: memqueue test * chore: clean up --- datastore/badger/event_delivery.go | 9 ++++ datastore/mongo/event_delivery.go | 13 +++++ datastore/mongo/mongo.go | 2 +- datastore/repository.go | 1 + mocks/repository.go | 19 +++++++- queue/memqueue/client_test.go | 9 ---- server/metrics.go | 76 ++++++++++++++++++++++++++++++ server/route.go | 1 + 8 files changed, 118 insertions(+), 12 deletions(-) diff --git a/datastore/badger/event_delivery.go b/datastore/badger/event_delivery.go index c6eb6e4c98..a9a26b2ef4 100644 --- a/datastore/badger/event_delivery.go +++ b/datastore/badger/event_delivery.go @@ -51,6 +51,15 @@ func (e *eventDeliveryRepo) FindEventDeliveriesByEventID(ctx context.Context, ev return deliveries, err } +func (e *eventDeliveryRepo) CountDeliveriesByStatus(ctx context.Context, status datastore.EventDeliveryStatus) (int64, error) { + + count, err := e.db.Count(&datastore.EventDelivery{}, badgerhold.Where("Status").Eq(status)) + if err != nil { + return 0, err + } + return int64(count), err +} + func (e *eventDeliveryRepo) UpdateStatusOfEventDelivery(ctx context.Context, delivery datastore.EventDelivery, status datastore.EventDeliveryStatus) error { delivery.Status = status delivery.UpdatedAt = primitive.NewDateTimeFromTime(time.Now()) diff --git a/datastore/mongo/event_delivery.go b/datastore/mongo/event_delivery.go index 7325d4b959..5d79c78c18 100644 --- a/datastore/mongo/event_delivery.go +++ b/datastore/mongo/event_delivery.go @@ -117,6 +117,19 @@ func (db *eventDeliveryRepo) FindEventDeliveriesByEventID(ctx context.Context, return deliveries, nil } +func (db *eventDeliveryRepo) CountDeliveriesByStatus(ctx context.Context, + status datastore.EventDeliveryStatus) (int64, error) { + + filter := bson.M{"status": status, "document_status": datastore.ActiveDocumentStatus} + + count, err := db.inner.CountDocuments(ctx, filter, nil) + if err != nil { + return 0, err + } + + return count, nil +} + func (db *eventDeliveryRepo) UpdateStatusOfEventDelivery(ctx context.Context, e datastore.EventDelivery, status datastore.EventDeliveryStatus) error { diff --git a/datastore/mongo/mongo.go b/datastore/mongo/mongo.go index 14e0a66034..90d390382b 100644 --- a/datastore/mongo/mongo.go +++ b/datastore/mongo/mongo.go @@ -108,7 +108,7 @@ func (c *Client) ensureMongoIndices() { c.ensureIndex(EventCollection, "event_type", false, nil) c.ensureIndex(EventCollection, "app_metadata.uid", false, nil) c.ensureIndex(AppCollections, "group_id", false, nil) - + c.ensureIndex(EventDeliveryCollection, "status", false, nil) c.ensureCompoundIndex(AppCollections) c.ensureCompoundIndex(EventCollection) c.ensureCompoundIndex(EventDeliveryCollection) diff --git a/datastore/repository.go b/datastore/repository.go index 25b5996332..01687037e4 100644 --- a/datastore/repository.go +++ b/datastore/repository.go @@ -19,6 +19,7 @@ type EventDeliveryRepository interface { FindEventDeliveryByID(context.Context, string) (*EventDelivery, error) FindEventDeliveriesByIDs(context.Context, []string) ([]EventDelivery, error) FindEventDeliveriesByEventID(context.Context, string) ([]EventDelivery, error) + CountDeliveriesByStatus(context.Context, EventDeliveryStatus) (int64, error) UpdateStatusOfEventDelivery(context.Context, EventDelivery, EventDeliveryStatus) error UpdateEventDeliveryWithAttempt(context.Context, EventDelivery, DeliveryAttempt) error LoadEventDeliveriesPaged(context.Context, string, string, string, []EventDeliveryStatus, SearchParams, Pageable) ([]EventDelivery, PaginationData, error) diff --git a/mocks/repository.go b/mocks/repository.go index 926ffdf608..7f4720a4e7 100644 --- a/mocks/repository.go +++ b/mocks/repository.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: datastore/repository.go +// Source: ./datastore/repository.go -// Package mocks is a generated GoMock package. +// Package mock_datastore is a generated GoMock package. package mocks import ( @@ -161,6 +161,21 @@ func (m *MockEventDeliveryRepository) EXPECT() *MockEventDeliveryRepositoryMockR return m.recorder } +// CountDeliveriesByStatus mocks base method. +func (m *MockEventDeliveryRepository) CountDeliveriesByStatus(arg0 context.Context, arg1 datastore.EventDeliveryStatus) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountDeliveriesByStatus", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountDeliveriesByStatus indicates an expected call of CountDeliveriesByStatus. +func (mr *MockEventDeliveryRepositoryMockRecorder) CountDeliveriesByStatus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountDeliveriesByStatus", reflect.TypeOf((*MockEventDeliveryRepository)(nil).CountDeliveriesByStatus), arg0, arg1) +} + // CreateEventDelivery mocks base method. func (m *MockEventDeliveryRepository) CreateEventDelivery(arg0 context.Context, arg1 *datastore.EventDelivery) error { m.ctrl.T.Helper() diff --git a/queue/memqueue/client_test.go b/queue/memqueue/client_test.go index 736574427a..ba6c3acade 100644 --- a/queue/memqueue/client_test.go +++ b/queue/memqueue/client_test.go @@ -56,15 +56,6 @@ func TestWrite(t *testing.T) { if err != nil { t.Fatalf("Failed to write to queue: %v", err) } - queueLength, err := eventQueue.Consumer().Queue().Len() - - if err != nil { - t.Fatalf("Failed to get queue length: %v", err) - } - if queueLength != tc.queueLen { - t.Fatalf("Length = %q, Want: %v", queueLength, tc.queueLen) - - } }) } diff --git a/server/metrics.go b/server/metrics.go index ec4ce71795..0a2136962d 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -4,6 +4,7 @@ import ( "context" "github.com/frain-dev/convoy/config" + "github.com/frain-dev/convoy/datastore" "github.com/frain-dev/convoy/queue" memqueue "github.com/frain-dev/convoy/queue/memqueue" redisqueue "github.com/frain-dev/convoy/queue/redis" @@ -72,6 +73,81 @@ func RegisterQueueMetrics(q queue.Queuer, cfg config.Configuration) { } } +func RegisterDBMetrics(app *applicationHandler) { + ctx := context.Background() + err := prometheus.Register(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Subsystem: "eventdelivery", + Name: "scheduled", + Help: "Number of eventDeliveries in the Scheduled state.", + }, + func() float64 { + count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.ScheduledEventStatus) + if err != nil { + log.Errorf("Error fetching eventdelivery status scheduled: %v", err) + } + return float64(count) + }, + )) + if err != nil { + log.Errorf("Error registering eventdelivery Scheduled: %v", err) + } + + err = prometheus.Register(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Subsystem: "eventdelivery", + Name: "processing", + Help: "Number of eventDeliveries in the Processing state.", + }, + func() float64 { + count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.ProcessingEventStatus) + if err != nil { + log.Errorf("Error fetching eventdelivery status Processing: %v", err) + } + return float64(count) + }, + )) + if err != nil { + log.Errorf("Error registering eventdelivery Processing: %v", err) + } + + err = prometheus.Register(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Subsystem: "eventdelivery", + Name: "retry", + Help: "Number of eventDeliveries in the Retry state.", + }, + func() float64 { + count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.RetryEventStatus) + if err != nil { + log.Errorf("Error fetching eventdelivery status Retry: %v", err) + } + return float64(count) + }, + )) + if err != nil { + log.Errorf("Error registering eventdelivery Retry: %v", err) + } + + err = prometheus.Register(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Subsystem: "eventdelivery", + Name: "discarded", + Help: "Number of eventDeliveries in the Discarded state.", + }, + func() float64 { + count, err := app.eventDeliveryRepo.CountDeliveriesByStatus(ctx, datastore.DiscardedEventStatus) + if err != nil { + log.Errorf("Error fetching eventdelivery status Discarded: %v", err) + } + return float64(count) + }, + )) + if err != nil { + log.Errorf("Error registering eventdelivery Discarded: %v", err) + } +} + func queueLength(q queue.Queuer, cfg config.Configuration) (int, error) { switch cfg.Queue.Type { case config.RedisQueueProvider: diff --git a/server/route.go b/server/route.go index d4537387d2..3295a81606 100644 --- a/server/route.go +++ b/server/route.go @@ -354,6 +354,7 @@ func New(cfg config.Configuration, Addr: fmt.Sprintf(":%d", cfg.Server.HTTP.Port), } + RegisterDBMetrics(app) RegisterQueueMetrics(eventQueue, cfg) worker.RegisterWorkerMetrics(eventQueue, cfg) prometheus.MustRegister(requestDuration)