Skip to content

Commit

Permalink
FIX CR
Browse files Browse the repository at this point in the history
  • Loading branch information
madebyrogal committed Jul 9, 2024
1 parent 006edaa commit 8466704
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 171 deletions.
142 changes: 45 additions & 97 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ const (
reportHeartbeatMaxRetries = 30
)

var healthNotifiers = make(map[string]health.Notifier)

func main() {
// Set up context
ctx := signals.SetupSignalHandler()
Expand Down Expand Up @@ -264,116 +262,78 @@ func run(ctx context.Context) (err error) {
Index: commGroupIdx + 1,
}

scheduleBotNotifier := func(in bot.Bot, key string) {
setHealthBotNotifier(in, key)
bots[key] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
})
scheduleNotifier := func(provider func() (notifier.Platform, error)) {
app, err := provider()
key := fmt.Sprintf("%s-%s", commGroupName, app.IntegrationName())
if err != nil {
commGroupLogger.WithError(err).Errorf("while creating %s bot", app.IntegrationName())
healthChecker.AddNotifier(key, health.NewFailed(health.FailureReasonConnectionError, err.Error()))
return
}

healthChecker.AddNotifier(key, app)

switch platform := app.(type) {
case notifier.Sink:
sinkNotifiers = append(sinkNotifiers, platform)
case bot.Bot:
bots[key] = platform
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return platform.Start(ctx)
})
}
}

// Run bots
if commGroupCfg.SocketSlack.Enabled {
notifierKey := getNotifierKey(commGroupName, config.SocketSlackCommPlatformIntegration)
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating SocketSlack bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(sb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudSlack.Enabled {
notifierKey := getNotifierKey(commGroupName, config.CloudSlackCommPlatformIntegration)
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating CloudSlack bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(sb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Mattermost.Enabled {
notifierKey := getNotifierKey(commGroupName, config.MattermostCommPlatformIntegration)
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Mattermost bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(mb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudTeams.Enabled {
notifierKey := getNotifierKey(commGroupName, config.CloudTeamsCommPlatformIntegration)
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating CloudTeams bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(ctb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Discord.Enabled {
notifierKey := getNotifierKey(commGroupName, config.DiscordCommPlatformIntegration)
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Discord bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(db, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
})
}

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
notifierKey := getNotifierKey(commGroupName, config.ElasticsearchCommPlatformIntegration)
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Elasticsearch sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(es, notifierKey)
sinkNotifiers = append(sinkNotifiers, es)
}
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
})
}

if commGroupCfg.Webhook.Enabled {
notifierKey := getNotifierKey(commGroupName, config.WebhookCommPlatformIntegration)
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Webhook sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(wh, notifierKey)
sinkNotifiers = append(sinkNotifiers, wh)
}
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
})
}
if commGroupCfg.PagerDuty.Enabled {
notifierKey := getNotifierKey(commGroupName, config.PagerDutyCommPlatformIntegration)
pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating PagerDuty sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(pd, notifierKey)
sinkNotifiers = append(sinkNotifiers, pd)
}
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
})
}
}
healthChecker.SetNotifiers(healthNotifiers)

if conf.ConfigWatcher.Enabled {
restarter := reloader.NewRestarter(
Expand Down Expand Up @@ -615,15 +575,3 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) {

return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil
}

func setHealthBotNotifier(bot bot.HealthNotifierBot, key string) {
healthNotifiers[key] = bot
}

func setHealthSinkNotifier(sink sink.HealthNotifierSink, key string) {
healthNotifiers[key] = sink
}

func getNotifierKey(commGroupName string, commPlatformIntegration config.CommPlatformIntegration) string {
return fmt.Sprintf("%s-%s", commGroupName, commPlatformIntegration)
}
27 changes: 27 additions & 0 deletions internal/health/failed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package health

// Failed represents failed platform.
type Failed struct {
status PlatformStatusMsg
failureReason FailureReasonMsg
errorMsg string
}

// NewFailed creates a new Failed instance.
func NewFailed(failureReason FailureReasonMsg, errorMsg string) *Failed {
return &Failed{
status: StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (b *Failed) GetStatus() PlatformStatus {
return PlatformStatus{
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
7 changes: 4 additions & 3 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewChecker(ctx context.Context, config *config.Config, stats *plugin.Health
ctx: ctx,
config: config,
pluginHealthStats: stats,
notifiers: map[string]Notifier{},
}
}

Expand Down Expand Up @@ -79,9 +80,9 @@ func (h *Checker) NewServer(log logrus.FieldLogger, port string) *httpx.Server {
return httpx.NewServer(log, addr, router)
}

// SetNotifiers sets platform bots instances.
func (h *Checker) SetNotifiers(notifiers map[string]Notifier) {
h.notifiers = notifiers
// AddNotifier add platform bot instance
func (h *Checker) AddNotifier(key string, notifier Notifier) {
h.notifiers[key] = notifier
}

func (h *Checker) GetStatus() *Status {
Expand Down
2 changes: 1 addition & 1 deletion internal/heartbeat/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newGraphQLHeartbeatReporter(logger logrus.FieldLogger, client GraphQLClient
}
}

func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat ReportHeartBeat) error {
func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat ReportHeartbeat) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID(),
"heartbeat": heartbeat,
Expand Down
2 changes: 1 addition & 1 deletion internal/heartbeat/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ var _ HeartbeatReporter = (*NoopHeartbeatReporter)(nil)

type NoopHeartbeatReporter struct{}

func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, ReportHeartBeat) error {
func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, ReportHeartbeat) error {
return nil
}
4 changes: 2 additions & 2 deletions internal/heartbeat/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ type DeploymentHeartbeatHealthInput struct {
Platforms []DeploymentHeartbeatHealthPlatformInput `json:"platforms,omitempty"`
}

type ReportHeartBeat struct {
type ReportHeartbeat struct {
NodeCount int `json:"nodeCount"`
}

type HeartbeatReporter interface {
ReportHeartbeat(ctx context.Context, heartBeat ReportHeartBeat) error
ReportHeartbeat(ctx context.Context, heartBeat ReportHeartbeat) error
}

func GetReporter(logger logrus.FieldLogger, gql GraphQLClient, healthChecker health.Checker) HeartbeatReporter {
Expand Down
2 changes: 1 addition & 1 deletion internal/insights/k8s_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (k *K8sCollector) Start(ctx context.Context) error {
k.failureCount.Add(1)
} else {
k.failureCount.Store(0)
err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.ReportHeartBeat{NodeCount: len(list.Items)})
err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.ReportHeartbeat{NodeCount: len(list.Items)})
if err != nil {
k.logger.Errorf("while reporting heartbeat: %s", err.Error())
}
Expand Down
35 changes: 0 additions & 35 deletions pkg/bot/bot_failed.go

This file was deleted.

12 changes: 12 additions & 0 deletions pkg/notifier/platform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package notifier

import (
"github.com/kubeshop/botkube/internal/health"
"github.com/kubeshop/botkube/pkg/config"
)

// Platform represents platform notifier
type Platform interface {
GetStatus() health.PlatformStatus
IntegrationName() config.CommPlatformIntegration
}
31 changes: 0 additions & 31 deletions pkg/sink/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sink

import (
"github.com/kubeshop/botkube/internal/health"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/notifier"
)
Expand All @@ -16,33 +15,3 @@ type AnalyticsReporter interface {
// ReportSinkEnabled reports an enabled sink.
ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error
}

type HealthNotifierSink interface {
GetStatus() health.PlatformStatus
}

// FailedSink mock of sink, uses for healthChecker.
type FailedSink struct {
status health.PlatformStatusMsg
failureReason health.FailureReasonMsg
errorMsg string
}

// NewSinkFailed creates a new FailedSink instance.
func NewSinkFailed(failureReason health.FailureReasonMsg, errorMsg string) *FailedSink {
return &FailedSink{
status: health.StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (s *FailedSink) GetStatus() health.PlatformStatus {
return health.PlatformStatus{
Status: s.status,
Restarts: "0/0",
Reason: s.failureReason,
ErrorMsg: s.errorMsg,
}
}

0 comments on commit 8466704

Please sign in to comment.