Skip to content

Commit

Permalink
Hearbeat reporting (#1469)
Browse files Browse the repository at this point in the history
* MOD healthChecker notifiers settings
MOD extend bots and sinks with errorMsg

* MOD Update report heartbeat mutation by sending also health status

* MOD unify notificationKey

* FIX lint

* FIX CR
  • Loading branch information
madebyrogal authored Jul 9, 2024
1 parent af46051 commit 70803a4
Show file tree
Hide file tree
Showing 18 changed files with 234 additions and 141 deletions.
123 changes: 51 additions & 72 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,82 +262,78 @@ func run(ctx context.Context) (err error) {
Index: commGroupIdx + 1,
}

scheduleBotNotifier := func(in bot.Bot) {
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
})
scheduleNotifier := func(provider func() (notifier.Platform, error)) {
app, err := provider()
key := fmt.Sprintf("%s-%s", commGroupName, app.IntegrationName())
if err != nil {
commGroupLogger.WithError(err).Errorf("while creating %s bot", app.IntegrationName())
healthChecker.AddNotifier(key, health.NewFailed(health.FailureReasonConnectionError, err.Error()))
return
}

healthChecker.AddNotifier(key, app)

switch platform := app.(type) {
case notifier.Sink:
sinkNotifiers = append(sinkNotifiers, platform)
case bot.Bot:
bots[key] = platform
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return platform.Start(ctx)
})
}
}

// Run bots
if commGroupCfg.SocketSlack.Enabled {
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating SocketSlack bot", err)
}
scheduleBotNotifier(sb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudSlack.Enabled {
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(sb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Mattermost.Enabled {
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Mattermost bot", err)
}
scheduleBotNotifier(mb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudTeams.Enabled {
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudTeams bot", err)
}
scheduleBotNotifier(ctb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Discord.Enabled {
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Discord bot", err)
}
scheduleBotNotifier(db)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
})
}

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
return reportFatalError("while creating Elasticsearch sink", err)
}
sinkNotifiers = append(sinkNotifiers, es)
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
})
}

if commGroupCfg.Webhook.Enabled {
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
return reportFatalError("while creating Webhook sink", err)
}

sinkNotifiers = append(sinkNotifiers, wh)
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
})
}
if commGroupCfg.PagerDuty.Enabled {
pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
if err != nil {
return reportFatalError("while creating PagerDuty sink", err)
}

sinkNotifiers = append(sinkNotifiers, pd)
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
})
}
}
healthChecker.SetNotifiers(getHealthNotifiers(bots, sinkNotifiers))

if conf.ConfigWatcher.Enabled {
restarter := reloader.NewRestarter(
Expand Down Expand Up @@ -448,7 +444,7 @@ func run(ctx context.Context) (err error) {
logger.Errorf("while reporting fatal error: %s", reportErr.Error())
}
}()
heartbeatReporter := heartbeat.GetReporter(logger, gqlClient)
heartbeatReporter := heartbeat.GetReporter(logger, gqlClient, healthChecker)
k8sCollector := insights.NewK8sCollector(k8sCli, heartbeatReporter, logger, reportHeartbeatInterval, reportHeartbeatMaxRetries)
return k8sCollector.Start(ctx)
})
Expand Down Expand Up @@ -496,12 +492,7 @@ func getAnalyticsReporter(disableAnalytics bool, logger logrus.FieldLogger) (ana
return nil, fmt.Errorf("while creating new Analytics Client: %w", err)
}

analyticsReporter := analytics.NewSegmentReporter(wrappedLogger, segmentCli)
if err != nil {
return nil, err
}

return analyticsReporter, nil
return analytics.NewSegmentReporter(wrappedLogger, segmentCli), nil
}

func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInterface, error) {
Expand Down Expand Up @@ -555,15 +546,15 @@ func sendHelp(ctx context.Context, s *storage.Help, clusterName string, executor

var sent []string

for key, notifier := range notifiers {
for key, notifierItem := range notifiers {
if alreadySentHelp[key] {
continue
}

help := interactive.NewHelpMessage(notifier.IntegrationName(), clusterName, executors).Build(true)
err := notifier.SendMessageToAll(ctx, help)
help := interactive.NewHelpMessage(notifierItem.IntegrationName(), clusterName, executors).Build(true)
err := notifierItem.SendMessageToAll(ctx, help)
if err != nil {
return fmt.Errorf("while sending help message for %s: %w", notifier.IntegrationName(), err)
return fmt.Errorf("while sending help message for %s: %w", notifierItem.IntegrationName(), err)
}
sent = append(sent, key)
}
Expand All @@ -584,15 +575,3 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) {

return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil
}

func getHealthNotifiers(bots map[string]bot.Bot, sinks []notifier.Sink) map[string]health.Notifier {
notifiers := make(map[string]health.Notifier)
for key, botInstance := range bots {
notifiers[key] = botInstance
}
for key, sinkInstance := range sinks {
notifiers[fmt.Sprintf("%s-%d", sinkInstance.IntegrationName(), key)] = sinkInstance
}

return notifiers
}
27 changes: 27 additions & 0 deletions internal/health/failed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package health

// Failed represents failed platform.
type Failed struct {
status PlatformStatusMsg
failureReason FailureReasonMsg
errorMsg string
}

// NewFailed creates a new Failed instance.
func NewFailed(failureReason FailureReasonMsg, errorMsg string) *Failed {
return &Failed{
status: StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (b *Failed) GetStatus() PlatformStatus {
return PlatformStatus{
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
25 changes: 13 additions & 12 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewChecker(ctx context.Context, config *config.Config, stats *plugin.Health
ctx: ctx,
config: config,
pluginHealthStats: stats,
notifiers: map[string]Notifier{},
}
}

Expand All @@ -60,7 +61,7 @@ func (h *Checker) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
}
resp.Header().Set("Content-Type", "application/json")

status := h.getStatus()
status := h.GetStatus()
respJSon, err := json.Marshal(status)
if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError)
Expand All @@ -79,26 +80,26 @@ func (h *Checker) NewServer(log logrus.FieldLogger, port string) *httpx.Server {
return httpx.NewServer(log, addr, router)
}

// SetNotifiers sets platform bots instances.
func (h *Checker) SetNotifiers(notifiers map[string]Notifier) {
h.notifiers = notifiers
// AddNotifier add platform bot instance
func (h *Checker) AddNotifier(key string, notifier Notifier) {
h.notifiers[key] = notifier
}

func (h *Checker) getStatus() *status {
pluginsStats := make(map[string]pluginStatuses)
func (h *Checker) GetStatus() *Status {
pluginsStats := make(map[string]PluginStatus)
h.collectSourcePluginsStatuses(pluginsStats)
h.collectExecutorPluginsStatuses(pluginsStats)

return &status{
Botkube: botStatus{
return &Status{
Botkube: BotStatus{
Status: h.getBotkubeStatus(),
},
Plugins: pluginsStats,
Platforms: h.getPlatformsStatus(),
}
}

func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses) {
func (h *Checker) collectSourcePluginsStatuses(plugins map[string]PluginStatus) {
if h.config == nil {
return
}
Expand All @@ -109,7 +110,7 @@ func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses
}
}

func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatuses) {
func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]PluginStatus) {
if h.config == nil {
return
}
Expand All @@ -120,9 +121,9 @@ func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatus
}
}

func (h *Checker) collectPluginStatus(plugins map[string]pluginStatuses, pluginConfigName string, pluginName string, enabled bool) {
func (h *Checker) collectPluginStatus(plugins map[string]PluginStatus, pluginConfigName string, pluginName string, enabled bool) {
status, restarts, threshold, _ := h.pluginHealthStats.GetStats(pluginName)
plugins[pluginConfigName] = pluginStatuses{
plugins[pluginConfigName] = PluginStatus{
Enabled: enabled,
Status: status,
Restarts: fmt.Sprintf("%d/%d", restarts, threshold),
Expand Down
8 changes: 4 additions & 4 deletions internal/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestServeHTTPUnavailable(t *testing.T) {
// given
checker := NewChecker(context.TODO(), &config.Config{}, nil)
expectedStatus := checker.getStatus()
expectedStatus := checker.GetStatus()

req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err)
Expand All @@ -29,7 +29,7 @@ func TestServeHTTPUnavailable(t *testing.T) {
assert.Equal(t, http.StatusServiceUnavailable, rr.Code)
assert.Equal(t, "application/json", rr.Header().Get("Content-Type"))

var resp status
var resp Status
err = json.Unmarshal(rr.Body.Bytes(), &resp)
require.NoError(t, err)

Expand All @@ -41,7 +41,7 @@ func TestServeHTTPOK(t *testing.T) {
// given
checker := NewChecker(context.TODO(), &config.Config{}, nil)
checker.MarkAsReady()
expectedStatus := checker.getStatus()
expectedStatus := checker.GetStatus()

req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err)
Expand All @@ -54,7 +54,7 @@ func TestServeHTTPOK(t *testing.T) {
assert.Equal(t, http.StatusOK, rr.Code)
assert.Equal(t, "application/json", rr.Header().Get("Content-Type"))

var resp status
var resp Status
err = json.Unmarshal(rr.Body.Bytes(), &resp)
require.NoError(t, err)

Expand Down
15 changes: 8 additions & 7 deletions internal/health/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ type PlatformStatus struct {
Status PlatformStatusMsg `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
Reason FailureReasonMsg `json:"reason,omitempty"`
ErrorMsg string `json:"errorMsg,omitempty"`
}

// status defines bot agent status.
type status struct {
Botkube botStatus `json:"botkube"`
Plugins map[string]pluginStatuses `json:"plugins,omitempty"`
Platforms platformStatuses `json:"platforms,omitempty"`
// Status defines bot agent status.
type Status struct {
Botkube BotStatus `json:"botkube"`
Plugins map[string]PluginStatus `json:"plugins,omitempty"`
Platforms platformStatuses `json:"platforms,omitempty"`
}

type platformStatuses map[string]PlatformStatus

type pluginStatuses struct {
type PluginStatus struct {
Enabled bool `json:"enabled,omitempty"`
Status string `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
}

type botStatus struct {
type BotStatus struct {
Status BotkubeStatus `json:"status,omitempty"`
}
Loading

0 comments on commit 70803a4

Please sign in to comment.