From 6a526b7a2ef6252442d7f19af6e10bbd458fcb42 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Wed, 12 Jun 2024 15:20:36 +0200 Subject: [PATCH] Send ack when the new cfg 'resourceVersion' was loaded without restart --- cmd/botkube-agent/main.go | 5 ++- internal/config/reloader/get.go | 5 ++- internal/config/reloader/remote.go | 34 +++++++++++------ internal/config/reloader/remote_test.go | 17 +++++---- internal/status/gql_reporter.go | 50 ++++++++++++++++++------- internal/status/noop_reporter.go | 6 ++- internal/status/reporter.go | 5 ++- pkg/controller/controller.go | 4 +- 8 files changed, 85 insertions(+), 41 deletions(-) diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index d9cb70ab5..c3d662df2 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -353,13 +353,14 @@ func run(ctx context.Context) (err error) { cfgReloader, err := reloader.Get( remoteCfgEnabled, logger.WithField(componentLogFieldKey, "Config Reloader"), + statusReporter, deployClient, dynamicCli, restarter, analyticsReporter, *conf, cfgVersion, - cfgManager, + cfgManager, statusReporter, ) if err != nil { return reportFatalError("while creating config reloader", err) @@ -519,7 +520,7 @@ func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInte return dynamicK8sCli, discoCacheClient, nil } -func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.StatusReporter) func(ctx string, err error) error { +func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.Reporter) func(ctx string, err error) error { return func(ctx string, err error) error { if err == nil { return nil diff --git a/internal/config/reloader/get.go b/internal/config/reloader/get.go index b748354f3..470ee3f7f 100644 --- a/internal/config/reloader/get.go +++ b/internal/config/reloader/get.go @@ -5,6 +5,7 @@ import ( "k8s.io/client-go/dynamic" "github.com/kubeshop/botkube/internal/analytics" + "github.com/kubeshop/botkube/internal/status" "github.com/kubeshop/botkube/pkg/config" ) @@ -13,10 +14,10 @@ const ( ) // Get returns Reloader based on remoteCfgEnabled flag. -func Get(remoteCfgEnabled bool, log logrus.FieldLogger, deployCli DeploymentClient, dynamicCli dynamic.Interface, restarter *Restarter, reporter analytics.Reporter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) (Reloader, error) { +func Get(remoteCfgEnabled bool, log logrus.FieldLogger, statusReporter status.Reporter, deployCli DeploymentClient, dynamicCli dynamic.Interface, restarter *Restarter, reporter analytics.Reporter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) (Reloader, error) { if remoteCfgEnabled { log = log.WithField(typeKey, "remote") - return NewRemote(log, deployCli, restarter, cfg, cfgVer, resVerHolders...), nil + return NewRemote(log, statusReporter, deployCli, restarter, cfg, cfgVer, resVerHolders...), nil } log = log.WithField(typeKey, "in-cluster") diff --git a/internal/config/reloader/remote.go b/internal/config/reloader/remote.go index a73ecc030..dcb5c4f61 100644 --- a/internal/config/reloader/remote.go +++ b/internal/config/reloader/remote.go @@ -11,6 +11,7 @@ import ( "gopkg.in/yaml.v3" "github.com/kubeshop/botkube/internal/config/remote" + "github.com/kubeshop/botkube/internal/status" "github.com/kubeshop/botkube/pkg/config" ) @@ -23,15 +24,16 @@ type DeploymentClient interface { } // NewRemote returns new RemoteConfigReloader. -func NewRemote(log logrus.FieldLogger, deployCli DeploymentClient, restarter *Restarter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) *RemoteConfigReloader { +func NewRemote(log logrus.FieldLogger, statusReporter status.Reporter, deployCli DeploymentClient, restarter *Restarter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) *RemoteConfigReloader { return &RemoteConfigReloader{ - log: log, - currentCfg: cfg, - resVersion: cfgVer, - interval: cfg.ConfigWatcher.Remote.PollInterval, - deployCli: deployCli, - resVerHolders: resVerHolders, - restarter: restarter, + log: log, + currentCfg: cfg, + resVersion: cfgVer, + interval: cfg.ConfigWatcher.Remote.PollInterval, + deployCli: deployCli, + resVerHolders: resVerHolders, + restarter: restarter, + statusReporter: statusReporter, } } @@ -44,8 +46,9 @@ type RemoteConfigReloader struct { currentCfg config.Config resVersion int - deployCli DeploymentClient - restarter *Restarter + deployCli DeploymentClient + restarter *Restarter + statusReporter status.Reporter } // Do starts the remote config reloader. @@ -88,7 +91,7 @@ func (u *RemoteConfigReloader) Do(ctx context.Context) error { continue } - cfgDiff, err := u.processNewConfig(cfgBytes, resVer) + cfgDiff, err := u.processNewConfig(ctx, cfgBytes, resVer) if err != nil { wrappedErr := fmt.Errorf("while processing new config: %w", err) u.log.Error(wrappedErr.Error()) @@ -148,7 +151,7 @@ type configDiff struct { shouldRestart bool } -func (u *RemoteConfigReloader) processNewConfig(newCfgBytes []byte, newResVer int) (configDiff, error) { +func (u *RemoteConfigReloader) processNewConfig(ctx context.Context, newCfgBytes []byte, newResVer int) (configDiff, error) { // another resource version check, because it can change between the first and second query shouldUpdate, err := u.compareResVer(newResVer) if err != nil { @@ -176,6 +179,9 @@ func (u *RemoteConfigReloader) processNewConfig(newCfgBytes []byte, newResVer in if len(changelog) == 0 { u.log.Debugf("Config with higher version (%d) is the same as the latest one. No need to reload config", newResVer) + if err := u.statusReporter.AckNewResourceVersion(ctx); err != nil { + return configDiff{}, fmt.Errorf("while reporting config reload: %w", err) + } return configDiff{}, nil } @@ -200,3 +206,7 @@ func (u *RemoteConfigReloader) setResourceVersionForAll(resVersion int) { h.SetResourceVersion(u.resVersion) } } + +func (u *RemoteConfigReloader) SetResourceVersion(resourceVersion int) { + u.setResourceVersionForAll(resourceVersion) +} diff --git a/internal/config/reloader/remote_test.go b/internal/config/reloader/remote_test.go index d50ba0346..4b2a7449f 100644 --- a/internal/config/reloader/remote_test.go +++ b/internal/config/reloader/remote_test.go @@ -1,6 +1,7 @@ package reloader import ( + "context" "testing" "time" @@ -8,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/kubeshop/botkube/internal/status" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/loggerx" ) @@ -79,15 +81,16 @@ func TestRemote_ProcessConfig(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { resVerHldr := &sampleResVerHolder{testCase.InitialResVer} remoteReloader := RemoteConfigReloader{ - log: loggerx.NewNoop(), - interval: time.Minute, - deployCli: nil, - resVerHolders: []ResourceVersionHolder{resVerHldr}, - currentCfg: testCase.InitialCfg, - resVersion: testCase.InitialResVer, + log: loggerx.NewNoop(), + interval: time.Minute, + deployCli: nil, + resVerHolders: []ResourceVersionHolder{resVerHldr}, + currentCfg: testCase.InitialCfg, + resVersion: testCase.InitialResVer, + statusReporter: status.NoopStatusReporter{}, } - cfgDiff, err := remoteReloader.processNewConfig([]byte(testCase.NewConfig), testCase.NewResVer) + cfgDiff, err := remoteReloader.processNewConfig(context.Background(), []byte(testCase.NewConfig), testCase.NewResVer) if testCase.ExpectedErrMessage != "" { require.Error(t, err) assert.EqualError(t, err, testCase.ExpectedErrMessage) diff --git a/internal/status/gql_reporter.go b/internal/status/gql_reporter.go index 79b84b1ef..1e7e76df1 100644 --- a/internal/status/gql_reporter.go +++ b/internal/status/gql_reporter.go @@ -14,7 +14,7 @@ import ( "github.com/kubeshop/botkube/pkg/version" ) -var _ StatusReporter = (*GraphQLStatusReporter)(nil) +var _ Reporter = (*GraphQLStatusReporter)(nil) // GraphQLClient defines GraphQL client. type GraphQLClient interface { @@ -87,18 +87,7 @@ func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) err }) logger.Debug("Reporting...") - err := r.withRetry(ctx, logger, func() error { - var mutation struct { - Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion, botkubeVersion: $botkubeVersion)"` - } - variables := map[string]interface{}{ - "id": graphql.ID(r.gql.DeploymentID()), - "resourceVersion": r.getResourceVersion(), - "botkubeVersion": version.Info().Version, - } - err := r.gql.Client().Mutate(ctx, &mutation, variables) - return err - }) + err := r.reportDeploymentStartup(ctx, logger) if err != nil { return errors.Wrap(err, "while reporting deployment startup") } @@ -107,6 +96,26 @@ func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) err return nil } +// AckNewResourceVersion reports that Agent received new configuration resource version and acknowledges it. +// It's used when the reload is not needed as there was no configuration change looking from the Agent point of view. +func (r *GraphQLStatusReporter) AckNewResourceVersion(ctx context.Context) error { + logger := r.log.WithFields(logrus.Fields{ + "deploymentID": r.gql.DeploymentID(), + "resourceVersion": r.getResourceVersion(), + "type": "config-reload", + }) + logger.Debug("Reporting...") + + // we reuse the same mutation as for startup, as we want to report that the new resource version is in use. + err := r.reportDeploymentStartup(ctx, logger) + if err != nil { + return errors.Wrap(err, "while reporting deployment config reload") + } + + logger.Debug("Reporting successful.") + return nil +} + // ReportDeploymentShutdown reports deployment shutdown to GraphQL server. func (r *GraphQLStatusReporter) ReportDeploymentShutdown(ctx context.Context) error { logger := r.log.WithFields(logrus.Fields{ @@ -209,3 +218,18 @@ func (r *GraphQLStatusReporter) getResourceVersion() int { defer r.resVerMutex.RUnlock() return r.resourceVersion } + +func (r *GraphQLStatusReporter) reportDeploymentStartup(ctx context.Context, logger logrus.FieldLogger) error { + return r.withRetry(ctx, logger, func() error { + var mutation struct { + Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion, botkubeVersion: $botkubeVersion)"` + } + variables := map[string]interface{}{ + "id": graphql.ID(r.gql.DeploymentID()), + "resourceVersion": r.getResourceVersion(), + "botkubeVersion": version.Info().Version, + } + err := r.gql.Client().Mutate(ctx, &mutation, variables) + return err + }) +} diff --git a/internal/status/noop_reporter.go b/internal/status/noop_reporter.go index 54852c654..1e428c691 100644 --- a/internal/status/noop_reporter.go +++ b/internal/status/noop_reporter.go @@ -6,7 +6,7 @@ import ( "github.com/sirupsen/logrus" ) -var _ StatusReporter = (*NoopStatusReporter)(nil) +var _ Reporter = (*NoopStatusReporter)(nil) type NoopStatusReporter struct{} @@ -18,6 +18,10 @@ func (n NoopStatusReporter) ReportDeploymentStartup(context.Context) error { return nil } +func (n NoopStatusReporter) AckNewResourceVersion(context.Context) error { + return nil +} + func (n NoopStatusReporter) ReportDeploymentShutdown(context.Context) error { return nil } diff --git a/internal/status/reporter.go b/internal/status/reporter.go index 2a0d1c981..ee477d382 100644 --- a/internal/status/reporter.go +++ b/internal/status/reporter.go @@ -9,16 +9,17 @@ import ( "github.com/kubeshop/botkube/pkg/loggerx" ) -type StatusReporter interface { +type Reporter interface { ReportDeploymentConnectionInit(ctx context.Context, k8sVer string) error ReportDeploymentStartup(ctx context.Context) error + AckNewResourceVersion(ctx context.Context) error ReportDeploymentShutdown(ctx context.Context) error ReportDeploymentFailure(ctx context.Context, errMsg string) error SetResourceVersion(resourceVersion int) SetLogger(logger logrus.FieldLogger) } -func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) StatusReporter { +func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) Reporter { if remoteCfgEnabled { log = withDefaultLogger(log) return newGraphQLStatusReporter( diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 390e94ef8..603847454 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,11 +25,11 @@ type Controller struct { log logrus.FieldLogger conf *config.Config notifiers map[string]bot.Bot - statusReporter status.StatusReporter + statusReporter status.Reporter } // New create a new Controller instance. -func New(log logrus.FieldLogger, conf *config.Config, notifiers map[string]bot.Bot, reporter status.StatusReporter) *Controller { +func New(log logrus.FieldLogger, conf *config.Config, notifiers map[string]bot.Bot, reporter status.Reporter) *Controller { return &Controller{ log: log, conf: conf,