Skip to content

Commit

Permalink
fix: save notification repeat interval & groupBy via CRD (#1338)
Browse files Browse the repository at this point in the history
* fix: save notification repeat interval & groupBy via CRD

* fix: make groupBy no-op for now

* fix: test
  • Loading branch information
adityathebe authored Sep 3, 2024
1 parent fa5391a commit ac18b28
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 82 deletions.
7 changes: 7 additions & 0 deletions api/v1/notification_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ type NotificationSpec struct {
// Cel-expression used to decide whether this notification client should send the notification
Filter string `json:"filter,omitempty" yaml:"filter,omitempty"`

// RepeatInterval is the waiting time to resend a notification after it has been succefully sent.
RepeatInterval string `json:"repeatInterval,omitempty" yaml:"repeatInterval,omitempty"`

// RepeatGroup allows notifications to be grouped by certain set of keys and only send
// one per group within the specified repeat interval.
RepeatGroup []string `json:"repeatGroup,omitempty" yaml:"repeatGroup,omitempty"`

// Specify the recipient
To NotificationRecipientSpec `json:"to" yaml:"to"`
}
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions config/crds/mission-control.flanksource.com_notifications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ spec:
description: Cel-expression used to decide whether this notification
client should send the notification
type: string
repeatGroup:
description: |-
RepeatGroup allows notifications to be grouped by certain set of keys and only send
one per group within the specified repeat interval.
items:
type: string
type: array
repeatInterval:
description: RepeatInterval is the waiting time to resend a notification
after it has been succefully sent.
type: string
template:
description: Template is the notification body in markdown
type: string
Expand Down
9 changes: 9 additions & 0 deletions config/schemas/notification.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@
"filter": {
"type": "string"
},
"repeatInterval": {
"type": "string"
},
"repeatGroup": {
"items": {
"type": "string"
},
"type": "array"
},
"to": {
"$ref": "#/$defs/NotificationRecipientSpec"
}
Expand Down
16 changes: 9 additions & 7 deletions db/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ func PersistNotificationFromCRD(ctx context.Context, obj *v1.Notification) error
}

dbObj := models.Notification{
ID: uid,
Events: obj.Spec.Events,
Title: obj.Spec.Title,
Template: obj.Spec.Template,
Filter: obj.Spec.Filter,
Properties: obj.Spec.To.Properties,
Source: models.SourceCRD,
ID: uid,
Events: obj.Spec.Events,
Title: obj.Spec.Title,
Template: obj.Spec.Template,
Filter: obj.Spec.Filter,
Properties: obj.Spec.To.Properties,
Source: models.SourceCRD,
RepeatInterval: obj.Spec.RepeatInterval,
GroupBy: obj.Spec.RepeatGroup,
}

switch {
Expand Down
52 changes: 19 additions & 33 deletions notification/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,28 @@ type notificationHandler struct {
Ring *events.EventRing
}

// Check if notification can be send in the interval based on group by, returns true if it can sent
// Check if notification can be sent in the interval based on group by, returns true if it can be sent
func checkRepeatInterval(ctx context.Context, n NotificationWithSpec, event models.Event) (bool, error) {
validKeys := map[string]string{
"notification_id": n.ID.String(),
"resource_id": event.Properties["id"],
"source_event": event.Name,
}
var clauses []clause.Expression
for _, g := range n.GroupBy {
if val, exists := validKeys[g]; exists {
clauses = append(clauses, clause.Eq{Column: g, Value: val})
}
interval, err := text.ParseDuration(n.RepeatInterval)
if err != nil {
return false, fmt.Errorf("error parsing repeat interval[%s] to time.Duration: %w", n.RepeatInterval, err)
}
if len(clauses) > 0 {
interval, err := text.ParseDuration(n.RepeatInterval)
if err != nil {
return false, fmt.Errorf("error parsing repeat interval[%s] to time.Duration: %w", n.RepeatInterval, err)
}

var exists bool
tx := ctx.DB().Model(&models.NotificationSendHistory{}).Clauses(clauses...).
Select(fmt.Sprintf("(NOW() - created_at) > '%d minutes'::INTERVAL", int(interval.Minutes()))).
Order("created_at DESC").Limit(1).Scan(&exists)
if tx.Error != nil {
return false, fmt.Errorf("error querying db for last send notification[%s]: %w", n.ID, err)
}
// Allow notification to be sent as there is no history
if tx.RowsAffected == 0 {
return true, nil
}
clauses := []clause.Expression{
clause.Eq{Column: "notification_id", Value: n.ID.String()},
clause.Eq{Column: "resource_id", Value: event.Properties["id"]},
clause.Eq{Column: "source_event", Value: event.Name},
}

if !exists {
return false, nil
}
var exists bool
tx := ctx.DB().Model(&models.NotificationSendHistory{}).Clauses(clauses...).
Select(fmt.Sprintf("(NOW() - created_at) <= '%d minutes'::INTERVAL", int(interval.Minutes()))).
Order("created_at DESC").Limit(1).Scan(&exists)
if tx.Error != nil {
return false, fmt.Errorf("error querying db for last send notification[%s]: %w", n.ID, err)
}

return true, nil
return !exists, nil
}

// addNotificationEvent responds to a event that can possibly generate a notification.
Expand Down Expand Up @@ -160,9 +145,9 @@ func (t *notificationHandler) addNotificationEvent(ctx context.Context, event mo
}

// Repeat interval check
if n.RepeatInterval != "" && n.GroupBy != nil {
if n.RepeatInterval != "" {
allow, err := checkRepeatInterval(ctx, *n, event)
// If there are any errors in calculating interval, we sent the notification
// If there are any errors in calculating interval, we send the notification
// and log the error
if err != nil {
ctx.Errorf("error checking repeat interval for notification[%s]: %v", n.ID, err)
Expand All @@ -171,6 +156,7 @@ func (t *notificationHandler) addNotificationEvent(ctx context.Context, event mo
continue
}
}

if valid, err := expression.Eval(n.Filter, celEnv, allEnvVars); err != nil {
logs.IfError(db.UpdateNotificationError(ctx, id, err.Error()), "failed to update notification")
continue
Expand Down
81 changes: 39 additions & 42 deletions notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package notification_test
import (
"encoding/json"
"fmt"
"time"

"github.com/flanksource/commons/collections"
"github.com/flanksource/duty/models"
Expand Down Expand Up @@ -122,85 +121,83 @@ var _ = ginkgo.Describe("Notification on incident creation", ginkgo.Ordered, fun
Expect(webhookPostdata["message"]).To(Equal(fmt.Sprintf("Severity: %s", incident.Severity)))
Expect(webhookPostdata["title"]).To(Equal(fmt.Sprintf("New incident: %s", incident.Title)))
})
})

ginkgo.It("should create a new notification with repeat interval", func() {
n := models.Notification{
var _ = ginkgo.Describe("repeat interval test", ginkgo.Ordered, func() {
var n models.Notification
var config models.ConfigItem

ginkgo.BeforeAll(func() {
customReceiver := []api.NotificationConfig{
{
URL: fmt.Sprintf("generic+%s", webhookEndpoint),
Properties: map[string]string{
"disabletls": "yes",
"template": "json",
},
},
}
customReceiverJson, err := json.Marshal(customReceiver)
Expect(err).To(BeNil())

n = models.Notification{
ID: uuid.New(),
Events: pq.StringArray([]string{"config.created"}),
Template: "Config created",
TeamID: &team.ID,
Events: pq.StringArray([]string{"config.updated"}),
Source: models.SourceCRD,
Title: "Dummy",
Template: "dummy",
CustomServices: types.JSON(customReceiverJson),
RepeatInterval: "4h",
GroupBy: pq.StringArray([]string{"notification_id", "source_event"}),
}
err := DefaultContext.DB().Create(&n).Error

err = DefaultContext.DB().Create(&n).Error
Expect(err).To(BeNil())

config1 := &models.ConfigItem{
config = models.ConfigItem{
ID: uuid.New(),
Name: lo.ToPtr("Deployment1"),
ConfigClass: models.ConfigClassDeployment,
Config: lo.ToPtr(`{"color": "red"}`),
Type: lo.ToPtr("Kubernetes::Deployment"),
}
err = DefaultContext.DB().Create(config1).Error
Expect(err).To(BeNil())

events.ConsumeAll(DefaultContext)
Eventually(func() int64 {
var c int64
DefaultContext.DB().Model(&models.Event{}).Count(&c)
return c
}, "10s", "200ms").Should(Equal(int64(0)))

// Check send history
var sentHistoryCount int64
err = DefaultContext.DB().Model(&models.NotificationSendHistory{}).Where("notification_id = ?", n.ID).Count(&sentHistoryCount).Error
err = DefaultContext.DB().Create(&config).Error
Expect(err).To(BeNil())
Expect(sentHistoryCount).To(Equal(int64(1)))
})

config2 := &models.ConfigItem{
ID: uuid.New(),
Name: lo.ToPtr("Deployment2"),
ConfigClass: models.ConfigClassDeployment,
Type: lo.ToPtr("Kubernetes::Deployment"),
}
err = DefaultContext.DB().Create(config2).Error
ginkgo.It("should have sent a notification for a config update", func() {
err := DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", config.ID).UpdateColumn("config", `{"color": "blue"}`).Error
Expect(err).To(BeNil())

events.ConsumeAll(DefaultContext)
Eventually(func() int64 {
var c int64
DefaultContext.DB().Model(&models.Event{}).Count(&c)
DefaultContext.DB().Model(&models.Event{}).Where("name = 'config.updated'").Count(&c)
return c
}, "10s", "200ms").Should(Equal(int64(0)))

// Check send history
var sentHistoryCount int64
err = DefaultContext.DB().Model(&models.NotificationSendHistory{}).Where("notification_id = ?", n.ID).Count(&sentHistoryCount).Error
Expect(err).To(BeNil())
Expect(sentHistoryCount).To(Equal(int64(1)))
})

err = DefaultContext.DB().Model(&models.NotificationSendHistory{}).Where("notification_id = ?", n.ID).Update("created_at", time.Now().Add(-10*time.Hour)).Error
Expect(err).To(BeNil())

config3 := &models.ConfigItem{
ID: uuid.New(),
Name: lo.ToPtr("Deployment3"),
ConfigClass: models.ConfigClassDeployment,
Type: lo.ToPtr("Kubernetes::Deployment"),
}
err = DefaultContext.DB().Create(config3).Error
ginkgo.It("should NOT have sent a notification for a subsequent config update", func() {
err := DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", config.ID).UpdateColumn("config", `{"color": "yellow"}`).Error
Expect(err).To(BeNil())

events.ConsumeAll(DefaultContext)
Eventually(func() int64 {
var c int64
DefaultContext.DB().Model(&models.Event{}).Count(&c)
DefaultContext.DB().Model(&models.Event{}).Where("name = 'config.updated'").Count(&c)
return c
}, "10s", "200ms").Should(Equal(int64(0)))

// Check send history
var sentHistoryCount int64
err = DefaultContext.DB().Model(&models.NotificationSendHistory{}).Where("notification_id = ?", n.ID).Count(&sentHistoryCount).Error
Expect(err).To(BeNil())
Expect(sentHistoryCount).To(Equal(int64(2)))
Expect(sentHistoryCount).To(Equal(int64(1)))
})
})

0 comments on commit ac18b28

Please sign in to comment.