Skip to content

Commit

Permalink
Reduce memory consumption for Kubernetes source configuration (#1425)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec authored Apr 3, 2024
1 parent 7d4b0e2 commit 2cb5eac
Show file tree
Hide file tree
Showing 13 changed files with 529 additions and 241 deletions.
2 changes: 1 addition & 1 deletion cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func run(ctx context.Context) (err error) {
scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan)
err = scheduler.Start(ctx)
if err != nil {
return reportFatalError("while starting source plugin event dispatcher: %w", err)
return reportFatalError("while starting source plugin event dispatcher", err)
}

if conf.Plugins.IncomingWebhook.Enabled {
Expand Down
2 changes: 0 additions & 2 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func NewDispatcher(log logrus.FieldLogger, clusterName string, notifiers map[str
}

// Dispatch starts a given plugin, watches for incoming events and calling all notifiers to dispatch received event.
// Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here:
// https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253
func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error {
log := d.log.WithFields(logrus.Fields{
"pluginName": dispatch.pluginName,
Expand Down
70 changes: 70 additions & 0 deletions internal/source/kubernetes/bg_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kubernetes

import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// backgroundProcessor is responsible for running background processes.
type backgroundProcessor struct {
mu sync.RWMutex
cancelCtxFn func()
startTime time.Time

errGroup *errgroup.Group
}

// newBackgroundProcessor creates new background processor.
func newBackgroundProcessor() *backgroundProcessor {
return &backgroundProcessor{}
}

// StartTime returns the start time of the background processor.
func (b *backgroundProcessor) StartTime() time.Time {
b.mu.RLock()
defer b.mu.RUnlock()
return b.startTime
}

// Run starts the background processes.
func (b *backgroundProcessor) Run(parentCtx context.Context, fns []func(ctx context.Context)) {
b.mu.Lock()
defer b.mu.Unlock()

b.startTime = time.Now()
ctx, cancelFn := context.WithCancel(parentCtx)
b.cancelCtxFn = cancelFn

errGroup, errGroupCtx := errgroup.WithContext(ctx)
b.errGroup = errGroup

for _, fn := range fns {
fn := fn
errGroup.Go(func() error {
fn(errGroupCtx)
return nil
})
}
}

// StopAndWait stops the background processes and waits for them to finish.
func (b *backgroundProcessor) StopAndWait(log logrus.FieldLogger) error {
b.mu.Lock()
defer b.mu.Unlock()

if b.cancelCtxFn != nil {
log.Debug("Cancelling context of the background processor...")
b.cancelCtxFn()
}

if b.errGroup == nil {
return nil
}

log.Debug("Waiting for background processor to finish...")
return b.errGroup.Wait()
}
93 changes: 93 additions & 0 deletions internal/source/kubernetes/configuration_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package kubernetes

import (
"fmt"
"sync"

"github.com/kubeshop/botkube/pkg/maputil"
)

// configurationStore stores all source configurations in a thread-safe way.
type configurationStore struct {
store map[string]SourceConfig
storeByKubeconfig map[string]map[string]struct{}

lock sync.RWMutex
}

// newConfigurations creates new empty configurationStore instance.
func newConfigurations() *configurationStore {
return &configurationStore{
store: make(map[string]SourceConfig),
storeByKubeconfig: make(map[string]map[string]struct{}),
}
}

// Store stores SourceConfig in a thread-safe way.
func (c *configurationStore) Store(sourceName string, cfg SourceConfig) {
c.lock.Lock()
defer c.lock.Unlock()

key := c.keyForStore(sourceName, cfg.isInteractivitySupported)

c.store[key] = cfg

kubeConfigKey := string(cfg.kubeConfig)
if _, ok := c.storeByKubeconfig[kubeConfigKey]; !ok {
c.storeByKubeconfig[kubeConfigKey] = make(map[string]struct{})
}
c.storeByKubeconfig[kubeConfigKey][key] = struct{}{}
}

// Get returns SourceConfig by a key.
func (c *configurationStore) Get(sourceKey string) (SourceConfig, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
val, ok := c.store[sourceKey]
return val, ok
}

// GetSystemConfig returns system Source Config.
// The system config is used for getting system (plugin-wide) logger and informer resync period.
func (c *configurationStore) GetSystemConfig() (SourceConfig, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

sortedKeys := maputil.SortKeys(c.store)
if len(sortedKeys) == 0 {
return SourceConfig{}, false
}

return c.store[sortedKeys[0]], true
}

// Len returns number of stored SourceConfigs.
func (c *configurationStore) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.store)
}

// CloneByKubeconfig returns a copy of the underlying map of source configurations grouped by kubeconfigs.
func (c *configurationStore) CloneByKubeconfig() map[string]map[string]SourceConfig {
c.lock.RLock()
defer c.lock.RUnlock()

var out = make(map[string]map[string]SourceConfig)
for kubeConfig, srcIndex := range c.storeByKubeconfig {
if out[kubeConfig] == nil {
out[kubeConfig] = make(map[string]SourceConfig)
}

for srcKey := range srcIndex {
out[kubeConfig][srcKey] = c.store[srcKey]
}
}

return out
}

// keyForStore returns a key for storing configuration in the store.
func (c *configurationStore) keyForStore(sourceName string, isInteractivitySupported bool) string {
return fmt.Sprintf("%s/%t", sourceName, isInteractivitySupported)
}
3 changes: 1 addition & 2 deletions internal/source/kubernetes/filterengine/filterengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func New(log logrus.FieldLogger) *DefaultFilterEngine {
func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.Event {
f.log.Debug("Running registered filters")
filters := f.RegisteredFilters()
f.log.Debugf("registered filters: %+v", filters)

for _, filter := range filters {
if !filter.Enabled {
Expand All @@ -59,7 +58,7 @@ func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.

err := filter.Run(ctx, &event)
if err != nil {
f.log.Errorf("while running filter %q: %w", filter.Name(), err)
f.log.Errorf("while running filter %q: %s", filter.Name(), err.Error())
}
f.log.Debugf("ran filter name: %q, event was skipped: %t", filter.Name(), event.Skip)
}
Expand Down
Loading

0 comments on commit 2cb5eac

Please sign in to comment.