Skip to content

Commit

Permalink
fix: upstream message aggregation logic in events
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Aug 4, 2023
1 parent 151b0de commit 46b8e07
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions events/upstream_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,91 +66,106 @@ func addErrorToFailedEvents(events []api.Event, err error) []*api.Event {

// Run pushes data from decentralized instances to central incident commander
func (t *pushToUpstreamEventHandler) Run(ctx *api.Context, events []api.Event) []*api.Event {
upstreamMsg := &upstream.PushData{
gormDB := ctx.DB()
upstreamMsgAggr := &upstream.PushData{
AgentName: t.conf.AgentName,
}

gormDB := ctx.DB()

var failedEvents []*api.Event
for _, cl := range GroupChangelogsByTables(events) {
upstreamMsg := &upstream.PushData{
AgentName: t.conf.AgentName,
}

switch cl.tableName {
case "topologies":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Topologies).Error; err != nil {
errMsg := fmt.Errorf("error fetching topologies: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.Topologies = append(upstreamMsgAggr.Topologies, upstreamMsg.Topologies...)

case "components":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Components).Error; err != nil {
errMsg := fmt.Errorf("error fetching components: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.Components = append(upstreamMsgAggr.Components, upstreamMsg.Components...)

case "canaries":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Canaries).Error; err != nil {
errMsg := fmt.Errorf("error fetching canaries: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.Canaries = append(upstreamMsgAggr.Canaries, upstreamMsg.Canaries...)

case "checks":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Checks).Error; err != nil {
errMsg := fmt.Errorf("error fetching checks: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.Checks = append(upstreamMsgAggr.Checks, upstreamMsg.Checks...)

case "config_scrapers":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigAnalysis).Error; err != nil {
errMsg := fmt.Errorf("error fetching config_scrapers: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ConfigScrapers = append(upstreamMsgAggr.ConfigScrapers, upstreamMsg.ConfigScrapers...)

case "config_analysis":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigAnalysis).Error; err != nil {
errMsg := fmt.Errorf("error fetching config_analysis: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ConfigAnalysis = append(upstreamMsgAggr.ConfigAnalysis, upstreamMsg.ConfigAnalysis...)

case "config_changes":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigChanges).Error; err != nil {
errMsg := fmt.Errorf("error fetching config_changes: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ConfigChanges = append(upstreamMsgAggr.ConfigChanges, upstreamMsg.ConfigChanges...)

case "config_items":
if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigItems).Error; err != nil {
errMsg := fmt.Errorf("error fetching config_items: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ConfigItems = append(upstreamMsgAggr.ConfigItems, upstreamMsg.ConfigItems...)

case "check_statuses":
if err := gormDB.Where(`(check_id, "time") IN ?`, cl.itemIDs).Find(&upstreamMsg.CheckStatuses).Error; err != nil {
errMsg := fmt.Errorf("error fetching check_statuses: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.CheckStatuses = append(upstreamMsgAggr.CheckStatuses, upstreamMsg.CheckStatuses...)

case "config_component_relationships":
if err := gormDB.Where("(component_id, config_id) IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigComponentRelationships).Error; err != nil {
errMsg := fmt.Errorf("error fetching config_component_relationships: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ConfigComponentRelationships = append(upstreamMsgAggr.ConfigComponentRelationships, upstreamMsg.ConfigComponentRelationships...)

case "component_relationships":
if err := gormDB.Where("(component_id, relationship_id, selector_id) IN ?", cl.itemIDs).Find(&upstreamMsg.ComponentRelationships).Error; err != nil {
errMsg := fmt.Errorf("error fetching component_relationships: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ComponentRelationships = append(upstreamMsgAggr.ComponentRelationships, upstreamMsg.ComponentRelationships...)

case "config_relationships":
if err := gormDB.Where("(related_id, config_id, selector_id) IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigRelationships).Error; err != nil {
errMsg := fmt.Errorf("error fetching config_relationships: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...)
}
upstreamMsgAggr.ConfigRelationships = append(upstreamMsgAggr.ConfigRelationships, upstreamMsg.ConfigRelationships...)
}
}

upstreamMsg.ApplyLabels(t.conf.LabelsMap())
if err := upstream.Push(ctx, t.conf, upstreamMsg); err != nil {
upstreamMsgAggr.ApplyLabels(t.conf.LabelsMap())
if err := upstream.Push(ctx, t.conf, upstreamMsgAggr); err != nil {
errMsg := fmt.Errorf("failed to push to upstream: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(events, errMsg)...)
}
Expand Down

0 comments on commit 46b8e07

Please sign in to comment.