Skip to content

Commit

Permalink
Send ack when the new cfg 'resourceVersion' was loaded without restart
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed Jun 12, 2024
1 parent aca7405 commit 6a526b7
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 41 deletions.
5 changes: 3 additions & 2 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,14 @@ func run(ctx context.Context) (err error) {
cfgReloader, err := reloader.Get(
remoteCfgEnabled,
logger.WithField(componentLogFieldKey, "Config Reloader"),
statusReporter,
deployClient,
dynamicCli,
restarter,
analyticsReporter,
*conf,
cfgVersion,
cfgManager,
cfgManager, statusReporter,
)
if err != nil {
return reportFatalError("while creating config reloader", err)
Expand Down Expand Up @@ -519,7 +520,7 @@ func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInte
return dynamicK8sCli, discoCacheClient, nil
}

func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.StatusReporter) func(ctx string, err error) error {
func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.Reporter) func(ctx string, err error) error {
return func(ctx string, err error) error {
if err == nil {
return nil
Expand Down
5 changes: 3 additions & 2 deletions internal/config/reloader/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/client-go/dynamic"

"github.com/kubeshop/botkube/internal/analytics"
"github.com/kubeshop/botkube/internal/status"
"github.com/kubeshop/botkube/pkg/config"
)

Expand All @@ -13,10 +14,10 @@ const (
)

// Get returns Reloader based on remoteCfgEnabled flag.
func Get(remoteCfgEnabled bool, log logrus.FieldLogger, deployCli DeploymentClient, dynamicCli dynamic.Interface, restarter *Restarter, reporter analytics.Reporter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) (Reloader, error) {
func Get(remoteCfgEnabled bool, log logrus.FieldLogger, statusReporter status.Reporter, deployCli DeploymentClient, dynamicCli dynamic.Interface, restarter *Restarter, reporter analytics.Reporter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) (Reloader, error) {
if remoteCfgEnabled {
log = log.WithField(typeKey, "remote")
return NewRemote(log, deployCli, restarter, cfg, cfgVer, resVerHolders...), nil
return NewRemote(log, statusReporter, deployCli, restarter, cfg, cfgVer, resVerHolders...), nil
}

log = log.WithField(typeKey, "in-cluster")
Expand Down
34 changes: 22 additions & 12 deletions internal/config/reloader/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/kubeshop/botkube/internal/config/remote"
"github.com/kubeshop/botkube/internal/status"
"github.com/kubeshop/botkube/pkg/config"
)

Expand All @@ -23,15 +24,16 @@ type DeploymentClient interface {
}

// NewRemote returns new RemoteConfigReloader.
func NewRemote(log logrus.FieldLogger, deployCli DeploymentClient, restarter *Restarter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) *RemoteConfigReloader {
func NewRemote(log logrus.FieldLogger, statusReporter status.Reporter, deployCli DeploymentClient, restarter *Restarter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) *RemoteConfigReloader {
return &RemoteConfigReloader{
log: log,
currentCfg: cfg,
resVersion: cfgVer,
interval: cfg.ConfigWatcher.Remote.PollInterval,
deployCli: deployCli,
resVerHolders: resVerHolders,
restarter: restarter,
log: log,
currentCfg: cfg,
resVersion: cfgVer,
interval: cfg.ConfigWatcher.Remote.PollInterval,
deployCli: deployCli,
resVerHolders: resVerHolders,
restarter: restarter,
statusReporter: statusReporter,
}
}

Expand All @@ -44,8 +46,9 @@ type RemoteConfigReloader struct {
currentCfg config.Config
resVersion int

deployCli DeploymentClient
restarter *Restarter
deployCli DeploymentClient
restarter *Restarter
statusReporter status.Reporter
}

// Do starts the remote config reloader.
Expand Down Expand Up @@ -88,7 +91,7 @@ func (u *RemoteConfigReloader) Do(ctx context.Context) error {
continue
}

cfgDiff, err := u.processNewConfig(cfgBytes, resVer)
cfgDiff, err := u.processNewConfig(ctx, cfgBytes, resVer)
if err != nil {
wrappedErr := fmt.Errorf("while processing new config: %w", err)
u.log.Error(wrappedErr.Error())
Expand Down Expand Up @@ -148,7 +151,7 @@ type configDiff struct {
shouldRestart bool
}

func (u *RemoteConfigReloader) processNewConfig(newCfgBytes []byte, newResVer int) (configDiff, error) {
func (u *RemoteConfigReloader) processNewConfig(ctx context.Context, newCfgBytes []byte, newResVer int) (configDiff, error) {
// another resource version check, because it can change between the first and second query
shouldUpdate, err := u.compareResVer(newResVer)
if err != nil {
Expand Down Expand Up @@ -176,6 +179,9 @@ func (u *RemoteConfigReloader) processNewConfig(newCfgBytes []byte, newResVer in

if len(changelog) == 0 {
u.log.Debugf("Config with higher version (%d) is the same as the latest one. No need to reload config", newResVer)
if err := u.statusReporter.AckNewResourceVersion(ctx); err != nil {
return configDiff{}, fmt.Errorf("while reporting config reload: %w", err)
}
return configDiff{}, nil
}

Expand All @@ -200,3 +206,7 @@ func (u *RemoteConfigReloader) setResourceVersionForAll(resVersion int) {
h.SetResourceVersion(u.resVersion)
}
}

func (u *RemoteConfigReloader) SetResourceVersion(resourceVersion int) {
u.setResourceVersionForAll(resourceVersion)
}
17 changes: 10 additions & 7 deletions internal/config/reloader/remote_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package reloader

import (
"context"
"testing"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kubeshop/botkube/internal/status"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/loggerx"
)
Expand Down Expand Up @@ -79,15 +81,16 @@ func TestRemote_ProcessConfig(t *testing.T) {
t.Run(testCase.Name, func(t *testing.T) {
resVerHldr := &sampleResVerHolder{testCase.InitialResVer}
remoteReloader := RemoteConfigReloader{
log: loggerx.NewNoop(),
interval: time.Minute,
deployCli: nil,
resVerHolders: []ResourceVersionHolder{resVerHldr},
currentCfg: testCase.InitialCfg,
resVersion: testCase.InitialResVer,
log: loggerx.NewNoop(),
interval: time.Minute,
deployCli: nil,
resVerHolders: []ResourceVersionHolder{resVerHldr},
currentCfg: testCase.InitialCfg,
resVersion: testCase.InitialResVer,
statusReporter: status.NoopStatusReporter{},
}

cfgDiff, err := remoteReloader.processNewConfig([]byte(testCase.NewConfig), testCase.NewResVer)
cfgDiff, err := remoteReloader.processNewConfig(context.Background(), []byte(testCase.NewConfig), testCase.NewResVer)
if testCase.ExpectedErrMessage != "" {
require.Error(t, err)
assert.EqualError(t, err, testCase.ExpectedErrMessage)
Expand Down
50 changes: 37 additions & 13 deletions internal/status/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/kubeshop/botkube/pkg/version"
)

var _ StatusReporter = (*GraphQLStatusReporter)(nil)
var _ Reporter = (*GraphQLStatusReporter)(nil)

// GraphQLClient defines GraphQL client.
type GraphQLClient interface {
Expand Down Expand Up @@ -87,18 +87,7 @@ func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) err
})
logger.Debug("Reporting...")

err := r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion, botkubeVersion: $botkubeVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID()),
"resourceVersion": r.getResourceVersion(),
"botkubeVersion": version.Info().Version,
}
err := r.gql.Client().Mutate(ctx, &mutation, variables)
return err
})
err := r.reportDeploymentStartup(ctx, logger)
if err != nil {
return errors.Wrap(err, "while reporting deployment startup")
}
Expand All @@ -107,6 +96,26 @@ func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) err
return nil
}

// AckNewResourceVersion reports that Agent received new configuration resource version and acknowledges it.
// It's used when the reload is not needed as there was no configuration change looking from the Agent point of view.
func (r *GraphQLStatusReporter) AckNewResourceVersion(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID(),
"resourceVersion": r.getResourceVersion(),
"type": "config-reload",
})
logger.Debug("Reporting...")

// we reuse the same mutation as for startup, as we want to report that the new resource version is in use.
err := r.reportDeploymentStartup(ctx, logger)
if err != nil {
return errors.Wrap(err, "while reporting deployment config reload")
}

logger.Debug("Reporting successful.")
return nil
}

// ReportDeploymentShutdown reports deployment shutdown to GraphQL server.
func (r *GraphQLStatusReporter) ReportDeploymentShutdown(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -209,3 +218,18 @@ func (r *GraphQLStatusReporter) getResourceVersion() int {
defer r.resVerMutex.RUnlock()
return r.resourceVersion
}

func (r *GraphQLStatusReporter) reportDeploymentStartup(ctx context.Context, logger logrus.FieldLogger) error {
return r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion, botkubeVersion: $botkubeVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID()),
"resourceVersion": r.getResourceVersion(),
"botkubeVersion": version.Info().Version,
}
err := r.gql.Client().Mutate(ctx, &mutation, variables)
return err
})
}
6 changes: 5 additions & 1 deletion internal/status/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/sirupsen/logrus"
)

var _ StatusReporter = (*NoopStatusReporter)(nil)
var _ Reporter = (*NoopStatusReporter)(nil)

type NoopStatusReporter struct{}

Expand All @@ -18,6 +18,10 @@ func (n NoopStatusReporter) ReportDeploymentStartup(context.Context) error {
return nil
}

func (n NoopStatusReporter) AckNewResourceVersion(context.Context) error {
return nil
}

func (n NoopStatusReporter) ReportDeploymentShutdown(context.Context) error {
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions internal/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
"github.com/kubeshop/botkube/pkg/loggerx"
)

type StatusReporter interface {
type Reporter interface {
ReportDeploymentConnectionInit(ctx context.Context, k8sVer string) error
ReportDeploymentStartup(ctx context.Context) error
AckNewResourceVersion(ctx context.Context) error
ReportDeploymentShutdown(ctx context.Context) error
ReportDeploymentFailure(ctx context.Context, errMsg string) error
SetResourceVersion(resourceVersion int)
SetLogger(logger logrus.FieldLogger)
}

func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) StatusReporter {
func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) Reporter {
if remoteCfgEnabled {
log = withDefaultLogger(log)
return newGraphQLStatusReporter(
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type Controller struct {
log logrus.FieldLogger
conf *config.Config
notifiers map[string]bot.Bot
statusReporter status.StatusReporter
statusReporter status.Reporter
}

// New create a new Controller instance.
func New(log logrus.FieldLogger, conf *config.Config, notifiers map[string]bot.Bot, reporter status.StatusReporter) *Controller {
func New(log logrus.FieldLogger, conf *config.Config, notifiers map[string]bot.Bot, reporter status.Reporter) *Controller {
return &Controller{
log: log,
conf: conf,
Expand Down

0 comments on commit 6a526b7

Please sign in to comment.