Skip to content

Commit

Permalink
MOD Update report heartbeat mutation by sending also health status
Browse files Browse the repository at this point in the history
  • Loading branch information
madebyrogal committed Jul 5, 2024
1 parent 87e88de commit 385bf80
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func run(ctx context.Context) (err error) {
logger.Errorf("while reporting fatal error: %s", reportErr.Error())
}
}()
heartbeatReporter := heartbeat.GetReporter(logger, gqlClient)
heartbeatReporter := heartbeat.GetReporter(logger, gqlClient, healthChecker)
k8sCollector := insights.NewK8sCollector(k8sCli, heartbeatReporter, logger, reportHeartbeatInterval, reportHeartbeatMaxRetries)
return k8sCollector.Start(ctx)
})
Expand Down
18 changes: 9 additions & 9 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *Checker) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
}
resp.Header().Set("Content-Type", "application/json")

status := h.getStatus()
status := h.GetStatus()
respJSon, err := json.Marshal(status)
if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError)
Expand All @@ -84,21 +84,21 @@ func (h *Checker) SetNotifiers(notifiers map[string]Notifier) {
h.notifiers = notifiers
}

func (h *Checker) getStatus() *status {
pluginsStats := make(map[string]pluginStatuses)
func (h *Checker) GetStatus() *Status {
pluginsStats := make(map[string]PluginStatus)
h.collectSourcePluginsStatuses(pluginsStats)
h.collectExecutorPluginsStatuses(pluginsStats)

return &status{
Botkube: botStatus{
return &Status{
Botkube: BotStatus{
Status: h.getBotkubeStatus(),
},
Plugins: pluginsStats,
Platforms: h.getPlatformsStatus(),
}
}

func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses) {
func (h *Checker) collectSourcePluginsStatuses(plugins map[string]PluginStatus) {
if h.config == nil {
return
}
Expand All @@ -109,7 +109,7 @@ func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses
}
}

func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatuses) {
func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]PluginStatus) {
if h.config == nil {
return
}
Expand All @@ -120,9 +120,9 @@ func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatus
}
}

func (h *Checker) collectPluginStatus(plugins map[string]pluginStatuses, pluginConfigName string, pluginName string, enabled bool) {
func (h *Checker) collectPluginStatus(plugins map[string]PluginStatus, pluginConfigName string, pluginName string, enabled bool) {
status, restarts, threshold, _ := h.pluginHealthStats.GetStats(pluginName)
plugins[pluginConfigName] = pluginStatuses{
plugins[pluginConfigName] = PluginStatus{
Enabled: enabled,
Status: status,
Restarts: fmt.Sprintf("%d/%d", restarts, threshold),
Expand Down
16 changes: 8 additions & 8 deletions internal/health/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ type PlatformStatus struct {
Status PlatformStatusMsg `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
Reason FailureReasonMsg `json:"reason,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
ErrorMsg string `json:"errorMsg,omitempty"`
}

// status defines bot agent status.
type status struct {
Botkube botStatus `json:"botkube"`
Plugins map[string]pluginStatuses `json:"plugins,omitempty"`
Platforms platformStatuses `json:"platforms,omitempty"`
// Status defines bot agent status.
type Status struct {
Botkube BotStatus `json:"botkube"`
Plugins map[string]PluginStatus `json:"plugins,omitempty"`
Platforms platformStatuses `json:"platforms,omitempty"`
}

type platformStatuses map[string]PlatformStatus

type pluginStatuses struct {
type PluginStatus struct {
Enabled bool `json:"enabled,omitempty"`
Status string `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
}

type botStatus struct {
type BotStatus struct {
Status BotkubeStatus `json:"status,omitempty"`
}
44 changes: 35 additions & 9 deletions internal/heartbeat/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/hasura/go-graphql-client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/kubeshop/botkube/internal/health"
)

var _ HeartbeatReporter = (*GraphQLHeartbeatReporter)(nil)
Expand All @@ -18,29 +20,53 @@ type GraphQLClient interface {

// GraphQLHeartbeatReporter reports heartbeat to GraphQL server.
type GraphQLHeartbeatReporter struct {
log logrus.FieldLogger
gql GraphQLClient
log logrus.FieldLogger
gql GraphQLClient
healthChecker health.Checker
}

func newGraphQLHeartbeatReporter(logger logrus.FieldLogger, client GraphQLClient) *GraphQLHeartbeatReporter {
func newGraphQLHeartbeatReporter(logger logrus.FieldLogger, client GraphQLClient, healthChecker health.Checker) *GraphQLHeartbeatReporter {
return &GraphQLHeartbeatReporter{
log: logger,
gql: client,
log: logger,
gql: client,
healthChecker: healthChecker,
}
}

func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat DeploymentHeartbeatInput) error {
func (r *GraphQLHeartbeatReporter) ReportHeartbeat(ctx context.Context, heartbeat ReportHeartBeat) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID(),
"heartbeat": heartbeat,
})
logger.Debug("Sending heartbeat...")
var mutation struct {
Success bool `graphql:"reportDeploymentHeartbeat(id: $id, in: $heartbeat)"`
Success bool `graphql:"reportDeploymentHeartbeat(id: $id, in: $input)"`
}
status := r.healthChecker.GetStatus()
var pluginsStatuses []DeploymentHeartbeatHealthPluginInput
var platformsStatuses []DeploymentHeartbeatHealthPlatformInput
for pluginKey, pluginStatus := range status.Plugins {
pluginsStatuses = append(pluginsStatuses, DeploymentHeartbeatHealthPluginInput{
Key: pluginKey,
Value: pluginStatus,
})
}
for platformKey, platformStatus := range status.Platforms {
platformsStatuses = append(platformsStatuses, DeploymentHeartbeatHealthPlatformInput{
Key: platformKey,
Value: platformStatus,
})
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID()),
"heartbeat": heartbeat,
"id": graphql.ID(r.gql.DeploymentID()),
"input": DeploymentHeartbeatInput{
NodeCount: heartbeat.NodeCount,
Health: &DeploymentHeartbeatHealthInput{
Botkube: status.Botkube,
Plugins: pluginsStatuses,
Platforms: platformsStatuses,
},
},
}
err := r.gql.Client().Mutate(ctx, &mutation, variables)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/heartbeat/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ var _ HeartbeatReporter = (*NoopHeartbeatReporter)(nil)

type NoopHeartbeatReporter struct{}

func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, DeploymentHeartbeatInput) error {
func (n NoopHeartbeatReporter) ReportHeartbeat(context.Context, ReportHeartBeat) error {
return nil
}
26 changes: 24 additions & 2 deletions internal/heartbeat/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,42 @@ package heartbeat
import (
"context"

"github.com/kubeshop/botkube/internal/health"

"github.com/sirupsen/logrus"
)

type DeploymentHeartbeatInput struct {
NodeCount int `json:"nodeCount"`
Health *DeploymentHeartbeatHealthInput `json:"health,omitempty"`
}

type DeploymentHeartbeatHealthPluginInput struct {
Key string `json:"key"`
Value health.PluginStatus `json:"value"`
}
type DeploymentHeartbeatHealthPlatformInput struct {
Key string `json:"key"`
Value health.PlatformStatus `json:"value"`
}
type DeploymentHeartbeatHealthInput struct {
Botkube health.BotStatus `json:"botkube"`
Plugins []DeploymentHeartbeatHealthPluginInput `json:"plugins,omitempty"`
Platforms []DeploymentHeartbeatHealthPlatformInput `json:"platforms,omitempty"`
}

type ReportHeartBeat struct {
NodeCount int `json:"nodeCount"`
}

type HeartbeatReporter interface {
ReportHeartbeat(ctx context.Context, heartBeat DeploymentHeartbeatInput) error
ReportHeartbeat(ctx context.Context, heartBeat ReportHeartBeat) error
}

func GetReporter(logger logrus.FieldLogger, gql GraphQLClient) HeartbeatReporter {
func GetReporter(logger logrus.FieldLogger, gql GraphQLClient, healthChecker health.Checker) HeartbeatReporter {
return newGraphQLHeartbeatReporter(
logger.WithField("component", "GraphQLHeartbeatReporter"),
gql,
healthChecker,
)
}
2 changes: 1 addition & 1 deletion internal/insights/k8s_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (k *K8sCollector) Start(ctx context.Context) error {
k.failureCount.Add(1)
} else {
k.failureCount.Store(0)
err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.DeploymentHeartbeatInput{NodeCount: len(list.Items)})
err = k.heartbeatReporter.ReportHeartbeat(ctx, heartbeat.ReportHeartBeat{NodeCount: len(list.Items)})
if err != nil {
k.logger.Errorf("while reporting heartbeat: %s", err.Error())
}
Expand Down

0 comments on commit 385bf80

Please sign in to comment.