diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 847ed0dac..388afcd5e 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -61,8 +61,6 @@ const ( reportHeartbeatMaxRetries = 30 ) -var healthNotifiers = make(map[string]health.Notifier) - func main() { // Set up context ctx := signals.SetupSignalHandler() @@ -264,116 +262,78 @@ func run(ctx context.Context) (err error) { Index: commGroupIdx + 1, } - scheduleBotNotifier := func(in bot.Bot, key string) { - setHealthBotNotifier(in, key) - bots[key] = in - errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter) - return in.Start(ctx) - }) + scheduleNotifier := func(provider func() (notifier.Platform, error)) { + app, err := provider() + key := fmt.Sprintf("%s-%s", commGroupName, app.IntegrationName()) + if err != nil { + commGroupLogger.WithError(err).Errorf("while creating %s bot", app.IntegrationName()) + healthChecker.AddNotifier(key, health.NewFailed(health.FailureReasonConnectionError, err.Error())) + return + } + + healthChecker.AddNotifier(key, app) + + switch platform := app.(type) { + case notifier.Sink: + sinkNotifiers = append(sinkNotifiers, platform) + case bot.Bot: + bots[key] = platform + errGroup.Go(func() error { + defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter) + return platform.Start(ctx) + }) + } } // Run bots if commGroupCfg.SocketSlack.Enabled { - notifierKey := getNotifierKey(commGroupName, config.SocketSlackCommPlatformIntegration) - sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating SocketSlack bot: %s", err.Error()) - setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Error(errorMsg) - } else { - scheduleBotNotifier(sb, notifierKey) - } + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter) + }) } if commGroupCfg.CloudSlack.Enabled { - notifierKey := getNotifierKey(commGroupName, config.CloudSlackCommPlatformIntegration) - sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating CloudSlack bot: %s", err.Error()) - setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Error(errorMsg) - } else { - scheduleBotNotifier(sb, notifierKey) - } + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter) + }) } if commGroupCfg.Mattermost.Enabled { - notifierKey := getNotifierKey(commGroupName, config.MattermostCommPlatformIntegration) - mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating Mattermost bot: %s", err.Error()) - setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Error(errorMsg) - } else { - scheduleBotNotifier(mb, notifierKey) - } + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter) + }) } if commGroupCfg.CloudTeams.Enabled { - notifierKey := getNotifierKey(commGroupName, config.CloudTeamsCommPlatformIntegration) - ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating CloudTeams bot: %s", err.Error()) - setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Error(errorMsg) - } else { - scheduleBotNotifier(ctb, notifierKey) - } + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter) + }) } if commGroupCfg.Discord.Enabled { - notifierKey := getNotifierKey(commGroupName, config.DiscordCommPlatformIntegration) - db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating Discord bot: %s", err.Error()) - setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Error(errorMsg) - } else { - scheduleBotNotifier(db, notifierKey) - } + scheduleNotifier(func() (notifier.Platform, error) { + return bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter) + }) } // Run sinks if commGroupCfg.Elasticsearch.Enabled { - notifierKey := getNotifierKey(commGroupName, config.ElasticsearchCommPlatformIntegration) - es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating Elasticsearch sink: %s", err.Error()) - setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Errorf(errorMsg) - } else { - setHealthSinkNotifier(es, notifierKey) - sinkNotifiers = append(sinkNotifiers, es) - } + scheduleNotifier(func() (notifier.Platform, error) { + return sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter) + }) } if commGroupCfg.Webhook.Enabled { - notifierKey := getNotifierKey(commGroupName, config.WebhookCommPlatformIntegration) - wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating Webhook sink: %s", err.Error()) - setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Errorf(errorMsg) - } else { - setHealthSinkNotifier(wh, notifierKey) - sinkNotifiers = append(sinkNotifiers, wh) - } + scheduleNotifier(func() (notifier.Platform, error) { + return sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter) + }) } if commGroupCfg.PagerDuty.Enabled { - notifierKey := getNotifierKey(commGroupName, config.PagerDutyCommPlatformIntegration) - pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter) - if err != nil { - errorMsg := fmt.Sprintf("while creating PagerDuty sink: %s", err.Error()) - setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey) - logger.Errorf(errorMsg) - } else { - setHealthSinkNotifier(pd, notifierKey) - sinkNotifiers = append(sinkNotifiers, pd) - } + scheduleNotifier(func() (notifier.Platform, error) { + return sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter) + }) } } - healthChecker.SetNotifiers(healthNotifiers) if conf.ConfigWatcher.Enabled { restarter := reloader.NewRestarter( @@ -615,15 +575,3 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) { return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil } - -func setHealthBotNotifier(bot bot.HealthNotifierBot, key string) { - healthNotifiers[key] = bot -} - -func setHealthSinkNotifier(sink sink.HealthNotifierSink, key string) { - healthNotifiers[key] = sink -} - -func getNotifierKey(commGroupName string, commPlatformIntegration config.CommPlatformIntegration) string { - return fmt.Sprintf("%s-%s", commGroupName, commPlatformIntegration) -} diff --git a/internal/health/failed.go b/internal/health/failed.go new file mode 100644 index 000000000..93b8f27d8 --- /dev/null +++ b/internal/health/failed.go @@ -0,0 +1,27 @@ +package health + +// Failed represents failed platform. +type Failed struct { + status PlatformStatusMsg + failureReason FailureReasonMsg + errorMsg string +} + +// NewFailed creates a new Failed instance. +func NewFailed(failureReason FailureReasonMsg, errorMsg string) *Failed { + return &Failed{ + status: StatusUnHealthy, + failureReason: failureReason, + errorMsg: errorMsg, + } +} + +// GetStatus gets bot status. +func (b *Failed) GetStatus() PlatformStatus { + return PlatformStatus{ + Status: b.status, + Restarts: "0/0", + Reason: b.failureReason, + ErrorMsg: b.errorMsg, + } +} diff --git a/internal/health/health.go b/internal/health/health.go index 94e4cb0ae..e86f5124c 100644 --- a/internal/health/health.go +++ b/internal/health/health.go @@ -39,6 +39,7 @@ func NewChecker(ctx context.Context, config *config.Config, stats *plugin.Health ctx: ctx, config: config, pluginHealthStats: stats, + notifiers: map[string]Notifier{}, } } @@ -79,9 +80,9 @@ func (h *Checker) NewServer(log logrus.FieldLogger, port string) *httpx.Server { return httpx.NewServer(log, addr, router) } -// SetNotifiers sets platform bots instances. -func (h *Checker) SetNotifiers(notifiers map[string]Notifier) { - h.notifiers = notifiers +// AddNotifier add platform bot instance +func (h *Checker) AddNotifier(key string, notifier Notifier) { + h.notifiers[key] = notifier } func (h *Checker) GetStatus() *Status { diff --git a/internal/heartbeat/gql_reporter.go b/internal/heartbeat/gql_reporter.go index 8d7477b86..e1ccd13c4 100644 --- a/internal/heartbeat/gql_reporter.go +++ b/internal/heartbeat/gql_reporter.go @@ -33,7 +33,7 @@ func newGraphQLHeartbeatReporter(logger logrus.FieldLogger, client GraphQLClient } } -func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat ReportHeartBeat) error { +func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat ReportHeartbeat) error { logger := r.log.WithFields(logrus.Fields{ "deploymentID": r.gql.DeploymentID(), "heartbeat": heartbeat, diff --git a/internal/heartbeat/noop_reporter.go b/internal/heartbeat/noop_reporter.go index ebbe015e2..fcd44b6b0 100644 --- a/internal/heartbeat/noop_reporter.go +++ b/internal/heartbeat/noop_reporter.go @@ -8,6 +8,6 @@ var _ HeartbeatReporter = (*NoopHeartbeatReporter)(nil) type NoopHeartbeatReporter struct{} -func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, ReportHeartBeat) error { +func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, ReportHeartbeat) error { return nil } diff --git a/internal/heartbeat/reporter.go b/internal/heartbeat/reporter.go index 4c48160be..1c38cf01d 100644 --- a/internal/heartbeat/reporter.go +++ b/internal/heartbeat/reporter.go @@ -27,12 +27,12 @@ type DeploymentHeartbeatHealthInput struct { Platforms []DeploymentHeartbeatHealthPlatformInput `json:"platforms,omitempty"` } -type ReportHeartBeat struct { +type ReportHeartbeat struct { NodeCount int `json:"nodeCount"` } type HeartbeatReporter interface { - ReportHeartbeat(ctx context.Context, heartBeat ReportHeartBeat) error + ReportHeartbeat(ctx context.Context, heartBeat ReportHeartbeat) error } func GetReporter(logger logrus.FieldLogger, gql GraphQLClient, healthChecker health.Checker) HeartbeatReporter { diff --git a/internal/insights/k8s_collector.go b/internal/insights/k8s_collector.go index ca3ae761e..e28c23c69 100644 --- a/internal/insights/k8s_collector.go +++ b/internal/insights/k8s_collector.go @@ -44,7 +44,7 @@ func (k *K8sCollector) Start(ctx context.Context) error { k.failureCount.Add(1) } else { k.failureCount.Store(0) - err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.ReportHeartBeat{NodeCount: len(list.Items)}) + err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.ReportHeartbeat{NodeCount: len(list.Items)}) if err != nil { k.logger.Errorf("while reporting heartbeat: %s", err.Error()) } diff --git a/pkg/bot/bot_failed.go b/pkg/bot/bot_failed.go deleted file mode 100644 index 73703e7c8..000000000 --- a/pkg/bot/bot_failed.go +++ /dev/null @@ -1,35 +0,0 @@ -package bot - -import ( - "github.com/kubeshop/botkube/internal/health" -) - -type HealthNotifierBot interface { - GetStatus() health.PlatformStatus -} - -// FailedBot mock of bot, uses for healthChecker. -type FailedBot struct { - status health.PlatformStatusMsg - failureReason health.FailureReasonMsg - errorMsg string -} - -// NewBotFailed creates a new FailedBot instance. -func NewBotFailed(failureReason health.FailureReasonMsg, errorMsg string) *FailedBot { - return &FailedBot{ - status: health.StatusUnHealthy, - failureReason: failureReason, - errorMsg: errorMsg, - } -} - -// GetStatus gets bot status. -func (b *FailedBot) GetStatus() health.PlatformStatus { - return health.PlatformStatus{ - Status: b.status, - Restarts: "0/0", - Reason: b.failureReason, - ErrorMsg: b.errorMsg, - } -} diff --git a/pkg/notifier/platform.go b/pkg/notifier/platform.go new file mode 100644 index 000000000..ad8c5acf4 --- /dev/null +++ b/pkg/notifier/platform.go @@ -0,0 +1,12 @@ +package notifier + +import ( + "github.com/kubeshop/botkube/internal/health" + "github.com/kubeshop/botkube/pkg/config" +) + +// Platform represents platform notifier +type Platform interface { + GetStatus() health.PlatformStatus + IntegrationName() config.CommPlatformIntegration +} diff --git a/pkg/sink/types.go b/pkg/sink/types.go index fad37d160..e7d9ae31b 100644 --- a/pkg/sink/types.go +++ b/pkg/sink/types.go @@ -1,7 +1,6 @@ package sink import ( - "github.com/kubeshop/botkube/internal/health" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/notifier" ) @@ -16,33 +15,3 @@ type AnalyticsReporter interface { // ReportSinkEnabled reports an enabled sink. ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error } - -type HealthNotifierSink interface { - GetStatus() health.PlatformStatus -} - -// FailedSink mock of sink, uses for healthChecker. -type FailedSink struct { - status health.PlatformStatusMsg - failureReason health.FailureReasonMsg - errorMsg string -} - -// NewSinkFailed creates a new FailedSink instance. -func NewSinkFailed(failureReason health.FailureReasonMsg, errorMsg string) *FailedSink { - return &FailedSink{ - status: health.StatusUnHealthy, - failureReason: failureReason, - errorMsg: errorMsg, - } -} - -// GetStatus gets bot status. -func (s *FailedSink) GetStatus() health.PlatformStatus { - return health.PlatformStatus{ - Status: s.status, - Restarts: "0/0", - Reason: s.failureReason, - ErrorMsg: s.errorMsg, - } -}