diff --git a/events/upstream_push.go b/events/upstream_push.go index 9f5bc70ec..9708416c3 100644 --- a/events/upstream_push.go +++ b/events/upstream_push.go @@ -66,91 +66,106 @@ func addErrorToFailedEvents(events []api.Event, err error) []*api.Event { // Run pushes data from decentralized instances to central incident commander func (t *pushToUpstreamEventHandler) Run(ctx *api.Context, events []api.Event) []*api.Event { - upstreamMsg := &upstream.PushData{ + gormDB := ctx.DB() + upstreamMsgAggr := &upstream.PushData{ AgentName: t.conf.AgentName, } - gormDB := ctx.DB() - var failedEvents []*api.Event for _, cl := range GroupChangelogsByTables(events) { + upstreamMsg := &upstream.PushData{ + AgentName: t.conf.AgentName, + } + switch cl.tableName { case "topologies": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Topologies).Error; err != nil { errMsg := fmt.Errorf("error fetching topologies: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.Topologies = append(upstreamMsgAggr.Topologies, upstreamMsg.Topologies...) case "components": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Components).Error; err != nil { errMsg := fmt.Errorf("error fetching components: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.Components = append(upstreamMsgAggr.Components, upstreamMsg.Components...) case "canaries": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Canaries).Error; err != nil { errMsg := fmt.Errorf("error fetching canaries: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.Canaries = append(upstreamMsgAggr.Canaries, upstreamMsg.Canaries...) case "checks": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.Checks).Error; err != nil { errMsg := fmt.Errorf("error fetching checks: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.Checks = append(upstreamMsgAggr.Checks, upstreamMsg.Checks...) case "config_scrapers": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigAnalysis).Error; err != nil { errMsg := fmt.Errorf("error fetching config_scrapers: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ConfigScrapers = append(upstreamMsgAggr.ConfigScrapers, upstreamMsg.ConfigScrapers...) case "config_analysis": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigAnalysis).Error; err != nil { errMsg := fmt.Errorf("error fetching config_analysis: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ConfigAnalysis = append(upstreamMsgAggr.ConfigAnalysis, upstreamMsg.ConfigAnalysis...) case "config_changes": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigChanges).Error; err != nil { errMsg := fmt.Errorf("error fetching config_changes: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ConfigChanges = append(upstreamMsgAggr.ConfigChanges, upstreamMsg.ConfigChanges...) case "config_items": if err := gormDB.Where("id IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigItems).Error; err != nil { errMsg := fmt.Errorf("error fetching config_items: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ConfigItems = append(upstreamMsgAggr.ConfigItems, upstreamMsg.ConfigItems...) case "check_statuses": if err := gormDB.Where(`(check_id, "time") IN ?`, cl.itemIDs).Find(&upstreamMsg.CheckStatuses).Error; err != nil { errMsg := fmt.Errorf("error fetching check_statuses: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.CheckStatuses = append(upstreamMsgAggr.CheckStatuses, upstreamMsg.CheckStatuses...) case "config_component_relationships": if err := gormDB.Where("(component_id, config_id) IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigComponentRelationships).Error; err != nil { errMsg := fmt.Errorf("error fetching config_component_relationships: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ConfigComponentRelationships = append(upstreamMsgAggr.ConfigComponentRelationships, upstreamMsg.ConfigComponentRelationships...) case "component_relationships": if err := gormDB.Where("(component_id, relationship_id, selector_id) IN ?", cl.itemIDs).Find(&upstreamMsg.ComponentRelationships).Error; err != nil { errMsg := fmt.Errorf("error fetching component_relationships: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ComponentRelationships = append(upstreamMsgAggr.ComponentRelationships, upstreamMsg.ComponentRelationships...) case "config_relationships": if err := gormDB.Where("(related_id, config_id, selector_id) IN ?", cl.itemIDs).Find(&upstreamMsg.ConfigRelationships).Error; err != nil { errMsg := fmt.Errorf("error fetching config_relationships: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(cl.events, errMsg)...) } + upstreamMsgAggr.ConfigRelationships = append(upstreamMsgAggr.ConfigRelationships, upstreamMsg.ConfigRelationships...) } } - upstreamMsg.ApplyLabels(t.conf.LabelsMap()) - if err := upstream.Push(ctx, t.conf, upstreamMsg); err != nil { + upstreamMsgAggr.ApplyLabels(t.conf.LabelsMap()) + if err := upstream.Push(ctx, t.conf, upstreamMsgAggr); err != nil { errMsg := fmt.Errorf("failed to push to upstream: %w", err) failedEvents = append(failedEvents, addErrorToFailedEvents(events, errMsg)...) }