Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send ack when the new cfg 'resourceVersion' was loaded without restart #1455

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,14 @@ func run(ctx context.Context) (err error) {
cfgReloader, err := reloader.Get(
remoteCfgEnabled,
logger.WithField(componentLogFieldKey, "Config Reloader"),
statusReporter,
deployClient,
dynamicCli,
restarter,
analyticsReporter,
*conf,
cfgVersion,
cfgManager,
cfgManager, statusReporter,
mszostok marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return reportFatalError("while creating config reloader", err)
Expand Down Expand Up @@ -519,7 +520,7 @@ func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInte
return dynamicK8sCli, discoCacheClient, nil
}

func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.StatusReporter) func(ctx string, err error) error {
func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.Reporter) func(ctx string, err error) error {
return func(ctx string, err error) error {
if err == nil {
return nil
Expand Down
5 changes: 3 additions & 2 deletions internal/config/reloader/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/client-go/dynamic"

"github.com/kubeshop/botkube/internal/analytics"
"github.com/kubeshop/botkube/internal/status"
"github.com/kubeshop/botkube/pkg/config"
)

Expand All @@ -13,10 +14,10 @@ const (
)

// Get returns Reloader based on remoteCfgEnabled flag.
func Get(remoteCfgEnabled bool, log logrus.FieldLogger, deployCli DeploymentClient, dynamicCli dynamic.Interface, restarter *Restarter, reporter analytics.Reporter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) (Reloader, error) {
func Get(remoteCfgEnabled bool, log logrus.FieldLogger, statusReporter status.Reporter, deployCli DeploymentClient, dynamicCli dynamic.Interface, restarter *Restarter, reporter analytics.Reporter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) (Reloader, error) {
if remoteCfgEnabled {
log = log.WithField(typeKey, "remote")
return NewRemote(log, deployCli, restarter, cfg, cfgVer, resVerHolders...), nil
return NewRemote(log, statusReporter, deployCli, restarter, cfg, cfgVer, resVerHolders...), nil
}

log = log.WithField(typeKey, "in-cluster")
Expand Down
34 changes: 22 additions & 12 deletions internal/config/reloader/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/kubeshop/botkube/internal/config/remote"
"github.com/kubeshop/botkube/internal/status"
"github.com/kubeshop/botkube/pkg/config"
)

Expand All @@ -23,15 +24,16 @@ type DeploymentClient interface {
}

// NewRemote returns new RemoteConfigReloader.
func NewRemote(log logrus.FieldLogger, deployCli DeploymentClient, restarter *Restarter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) *RemoteConfigReloader {
func NewRemote(log logrus.FieldLogger, statusReporter status.Reporter, deployCli DeploymentClient, restarter *Restarter, cfg config.Config, cfgVer int, resVerHolders ...ResourceVersionHolder) *RemoteConfigReloader {
return &RemoteConfigReloader{
log: log,
currentCfg: cfg,
resVersion: cfgVer,
interval: cfg.ConfigWatcher.Remote.PollInterval,
deployCli: deployCli,
resVerHolders: resVerHolders,
restarter: restarter,
log: log,
currentCfg: cfg,
resVersion: cfgVer,
interval: cfg.ConfigWatcher.Remote.PollInterval,
deployCli: deployCli,
resVerHolders: resVerHolders,
restarter: restarter,
statusReporter: statusReporter,
}
}

Expand All @@ -44,8 +46,9 @@ type RemoteConfigReloader struct {
currentCfg config.Config
resVersion int

deployCli DeploymentClient
restarter *Restarter
deployCli DeploymentClient
restarter *Restarter
statusReporter status.Reporter
}

// Do starts the remote config reloader.
Expand Down Expand Up @@ -88,7 +91,7 @@ func (u *RemoteConfigReloader) Do(ctx context.Context) error {
continue
}

cfgDiff, err := u.processNewConfig(cfgBytes, resVer)
cfgDiff, err := u.processNewConfig(ctx, cfgBytes, resVer)
if err != nil {
wrappedErr := fmt.Errorf("while processing new config: %w", err)
u.log.Error(wrappedErr.Error())
Expand Down Expand Up @@ -148,7 +151,7 @@ type configDiff struct {
shouldRestart bool
}

func (u *RemoteConfigReloader) processNewConfig(newCfgBytes []byte, newResVer int) (configDiff, error) {
func (u *RemoteConfigReloader) processNewConfig(ctx context.Context, newCfgBytes []byte, newResVer int) (configDiff, error) {
// another resource version check, because it can change between the first and second query
shouldUpdate, err := u.compareResVer(newResVer)
if err != nil {
Expand Down Expand Up @@ -176,6 +179,9 @@ func (u *RemoteConfigReloader) processNewConfig(newCfgBytes []byte, newResVer in

if len(changelog) == 0 {
u.log.Debugf("Config with higher version (%d) is the same as the latest one. No need to reload config", newResVer)
if err := u.statusReporter.AckNewResourceVersion(ctx); err != nil {
return configDiff{}, fmt.Errorf("while reporting config reload: %w", err)
}
return configDiff{}, nil
}

Expand All @@ -200,3 +206,7 @@ func (u *RemoteConfigReloader) setResourceVersionForAll(resVersion int) {
h.SetResourceVersion(u.resVersion)
}
}

func (u *RemoteConfigReloader) SetResourceVersion(resourceVersion int) {
u.setResourceVersionForAll(resourceVersion)
}
17 changes: 10 additions & 7 deletions internal/config/reloader/remote_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package reloader

import (
"context"
"testing"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kubeshop/botkube/internal/status"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/loggerx"
)
Expand Down Expand Up @@ -79,15 +81,16 @@ func TestRemote_ProcessConfig(t *testing.T) {
t.Run(testCase.Name, func(t *testing.T) {
resVerHldr := &sampleResVerHolder{testCase.InitialResVer}
remoteReloader := RemoteConfigReloader{
log: loggerx.NewNoop(),
interval: time.Minute,
deployCli: nil,
resVerHolders: []ResourceVersionHolder{resVerHldr},
currentCfg: testCase.InitialCfg,
resVersion: testCase.InitialResVer,
log: loggerx.NewNoop(),
interval: time.Minute,
deployCli: nil,
resVerHolders: []ResourceVersionHolder{resVerHldr},
currentCfg: testCase.InitialCfg,
resVersion: testCase.InitialResVer,
statusReporter: status.NoopStatusReporter{},
}

cfgDiff, err := remoteReloader.processNewConfig([]byte(testCase.NewConfig), testCase.NewResVer)
cfgDiff, err := remoteReloader.processNewConfig(context.Background(), []byte(testCase.NewConfig), testCase.NewResVer)
if testCase.ExpectedErrMessage != "" {
require.Error(t, err)
assert.EqualError(t, err, testCase.ExpectedErrMessage)
Expand Down
50 changes: 37 additions & 13 deletions internal/status/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/kubeshop/botkube/pkg/version"
)

var _ StatusReporter = (*GraphQLStatusReporter)(nil)
var _ Reporter = (*GraphQLStatusReporter)(nil)

// GraphQLClient defines GraphQL client.
type GraphQLClient interface {
Expand Down Expand Up @@ -87,18 +87,7 @@ func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) err
})
logger.Debug("Reporting...")

err := r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion, botkubeVersion: $botkubeVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID()),
"resourceVersion": r.getResourceVersion(),
"botkubeVersion": version.Info().Version,
}
err := r.gql.Client().Mutate(ctx, &mutation, variables)
return err
})
err := r.reportDeploymentStartup(ctx, logger)
if err != nil {
return errors.Wrap(err, "while reporting deployment startup")
}
Expand All @@ -107,6 +96,26 @@ func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) err
return nil
}

// AckNewResourceVersion reports that Agent received new configuration resource version and acknowledges it.
// It's used when the reload is not needed as there was no configuration change looking from the Agent point of view.
func (r *GraphQLStatusReporter) AckNewResourceVersion(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID(),
"resourceVersion": r.getResourceVersion(),
"type": "config-reload",
})
logger.Debug("Reporting...")

// we reuse the same mutation as for startup, as we want to report that the new resource version is in use.
err := r.reportDeploymentStartup(ctx, logger)
if err != nil {
return errors.Wrap(err, "while reporting deployment config reload")
}

logger.Debug("Reporting successful.")
return nil
}

// ReportDeploymentShutdown reports deployment shutdown to GraphQL server.
func (r *GraphQLStatusReporter) ReportDeploymentShutdown(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -209,3 +218,18 @@ func (r *GraphQLStatusReporter) getResourceVersion() int {
defer r.resVerMutex.RUnlock()
return r.resourceVersion
}

func (r *GraphQLStatusReporter) reportDeploymentStartup(ctx context.Context, logger logrus.FieldLogger) error {
return r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion, botkubeVersion: $botkubeVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID()),
"resourceVersion": r.getResourceVersion(),
"botkubeVersion": version.Info().Version,
}
err := r.gql.Client().Mutate(ctx, &mutation, variables)
return err
})
}
6 changes: 5 additions & 1 deletion internal/status/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/sirupsen/logrus"
)

var _ StatusReporter = (*NoopStatusReporter)(nil)
var _ Reporter = (*NoopStatusReporter)(nil)

type NoopStatusReporter struct{}

Expand All @@ -18,6 +18,10 @@ func (n NoopStatusReporter) ReportDeploymentStartup(context.Context) error {
return nil
}

func (n NoopStatusReporter) AckNewResourceVersion(context.Context) error {
return nil
}

func (n NoopStatusReporter) ReportDeploymentShutdown(context.Context) error {
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions internal/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
"github.com/kubeshop/botkube/pkg/loggerx"
)

type StatusReporter interface {
type Reporter interface {
ReportDeploymentConnectionInit(ctx context.Context, k8sVer string) error
ReportDeploymentStartup(ctx context.Context) error
AckNewResourceVersion(ctx context.Context) error
ReportDeploymentShutdown(ctx context.Context) error
ReportDeploymentFailure(ctx context.Context, errMsg string) error
SetResourceVersion(resourceVersion int)
SetLogger(logger logrus.FieldLogger)
}

func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) StatusReporter {
func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) Reporter {
if remoteCfgEnabled {
log = withDefaultLogger(log)
return newGraphQLStatusReporter(
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type Controller struct {
log logrus.FieldLogger
conf *config.Config
notifiers map[string]bot.Bot
statusReporter status.StatusReporter
statusReporter status.Reporter
}

// New create a new Controller instance.
func New(log logrus.FieldLogger, conf *config.Config, notifiers map[string]bot.Bot, reporter status.StatusReporter) *Controller {
func New(log logrus.FieldLogger, conf *config.Config, notifiers map[string]bot.Bot, reporter status.Reporter) *Controller {
return &Controller{
log: log,
conf: conf,
Expand Down
20 changes: 11 additions & 9 deletions test/e2e/bots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,17 +853,19 @@ func runBotTest(t *testing.T,
err = botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.SecondChannel().ID(), expectedMessage)
assert.NoError(t, err)

t.Log("Starting notifier in second channel...")
command = "enable notifications"
expectedBody = codeBlock(fmt.Sprintf("Brace yourselves, incoming notifications from cluster '%s'.", appCfg.ClusterName))
expectedMessage = fmt.Sprintf("%s\n%s", cmdHeader(command), expectedBody)
if botDriver.Type() != commplatform.TeamsBot { // base on the previous assertion, we know that notifications are enabled
t.Log("Starting notifier in second channel...")
command = "enable notifications"
expectedBody = codeBlock(fmt.Sprintf("Brace yourselves, incoming notifications from cluster '%s'.", appCfg.ClusterName))
expectedMessage = fmt.Sprintf("%s\n%s", cmdHeader(command), expectedBody)

botDriver.PostMessageToBot(t, botDriver.SecondChannel().Identifier(), command)
err = botDriver.WaitForMessagePosted(botDriver.BotUserID(), botDriver.SecondChannel().ID(), limitMessages(), botDriver.AssertEquals(expectedMessage))
require.NoError(t, err)
botDriver.PostMessageToBot(t, botDriver.SecondChannel().Identifier(), command)
err = botDriver.WaitForMessagePosted(botDriver.BotUserID(), botDriver.SecondChannel().ID(), limitMessages(), botDriver.AssertEquals(expectedMessage))
require.NoError(t, err)

if botDriver.Type().IsCloud() {
waitForRestart(t, botDriver, botDriver.BotUserID(), botDriver.FirstChannel().ID(), appCfg.ClusterName)
if botDriver.Type().IsCloud() {
waitForRestart(t, botDriver, botDriver.BotUserID(), botDriver.FirstChannel().ID(), appCfg.ClusterName)
}
}

cfgMapCli := k8sCli.CoreV1().ConfigMaps(appCfg.Deployment.Namespace)
Expand Down
Loading