Skip to content

Commit

Permalink
Reduce amount of anonymous analytics events (#1310)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec authored Nov 17, 2023
1 parent 0fd6ce2 commit b8dc1bb
Show file tree
Hide file tree
Showing 14 changed files with 582 additions and 142 deletions.
54 changes: 31 additions & 23 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ func run(ctx context.Context) (err error) {
logger.Warnf("Configuration validation warnings: %v", confDetails.ValidateWarnings.Error())
}
// Set up analytics reporter
reporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger)
analyticsReporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger)
if err != nil {
return fmt.Errorf("while creating analytics reporter: %w", err)
}
defer func() {
err := reporter.Close()
err := analyticsReporter.Close()
if err != nil {
logger.Errorf("while closing reporter: %s", err.Error())
}
}()
// from now on recover from any panic, report it and close reader and app.
// The reader must be not closed to report the panic properly.
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)

reportFatalError := reportFatalErrFn(logger, reporter, statusReporter)
reportFatalError := reportFatalErrFn(logger, analyticsReporter, statusReporter)
// Prepare K8s clients and mapper
kubeConfig, err := kubex.BuildConfigFromFlags("", conf.Settings.Kubeconfig, conf.Settings.SACredentialsPathPrefix)
if err != nil {
Expand All @@ -148,7 +148,7 @@ func run(ctx context.Context) (err error) {
if err = statusReporter.ReportDeploymentConnectionInit(ctx, k8sVer); err != nil {
return reportFatalError("while reporting botkube connection initialization", err)
}
err = reporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier)
err = analyticsReporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier)
if err != nil {
return reportFatalError("while registering current identity", err)
}
Expand Down Expand Up @@ -183,6 +183,14 @@ func run(ctx context.Context) (err error) {
err = reportFatalError("while waiting for goroutines to finish gracefully", multiErr.ErrorOrNil())
}()

errGroup.Go(func() error {
err := analyticsReporter.Run(ctx)
if err != nil {
logger.Errorf("while closing reporter: %s", err.Error())
}
return err
})

schedulerChan := make(chan string)
pluginHealthStats := plugin.NewHealthStats(conf.Plugins.RestartPolicy.Threshold)
collector := plugin.NewCollector(logger)
Expand All @@ -193,7 +201,7 @@ func run(ctx context.Context) (err error) {
healthChecker := health.NewChecker(ctx, conf, pluginHealthStats)
healthSrv := healthChecker.NewServer(logger.WithField(componentLogFieldKey, "Health server"), conf.Settings.HealthPort)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return healthSrv.Serve(ctx)
})

Expand All @@ -206,7 +214,7 @@ func run(ctx context.Context) (err error) {
// Prometheus metrics
metricsSrv := newMetricsServer(logger.WithField(componentLogFieldKey, "Metrics server"), conf.Settings.MetricsPort)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return metricsSrv.Serve(ctx)
})

Expand All @@ -218,7 +226,7 @@ func run(ctx context.Context) (err error) {
Log: logger.WithField(componentLogFieldKey, "Executor"),
Cfg: *conf,
CfgManager: cfgManager,
AnalyticsReporter: reporter,
AnalyticsReporter: analyticsReporter,
CommandGuard: cmdGuard,
PluginManager: pluginManager,
BotKubeVersion: botkubeVersion,
Expand Down Expand Up @@ -253,62 +261,62 @@ func run(ctx context.Context) (err error) {
scheduleBotNotifier := func(in bot.Bot) {
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter)
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
})
}

// Run bots
if commGroupCfg.Slack.Enabled {
sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, reporter)
sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Slack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.SocketSlack.Enabled {
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, reporter)
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating SocketSlack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.CloudSlack.Enabled {
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter)
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.Mattermost.Enabled {
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, reporter)
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Mattermost bot", err)
}
scheduleBotNotifier(mb)
}

if commGroupCfg.Teams.Enabled {
tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter)
tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Teams bot", err)
}
scheduleBotNotifier(tb)
}

if commGroupCfg.CloudTeams.Enabled {
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, reporter)
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(ctb)
}

if commGroupCfg.Discord.Enabled {
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, reporter)
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Discord bot", err)
}
Expand All @@ -317,15 +325,15 @@ func run(ctx context.Context) (err error) {

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, reporter)
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
return reportFatalError("while creating Elasticsearch sink", err)
}
sinkNotifiers = append(sinkNotifiers, es)
}

if commGroupCfg.Webhook.Enabled {
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, reporter)
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
return reportFatalError("while creating Webhook sink", err)
}
Expand All @@ -352,7 +360,7 @@ func run(ctx context.Context) (err error) {
deployClient,
dynamicCli,
restarter,
reporter,
analyticsReporter,
*conf,
cfgVersion,
cfgManager,
Expand All @@ -361,7 +369,7 @@ func run(ctx context.Context) (err error) {
return reportFatalError("while creating config reloader", err)
}
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return cfgReloader.Do(ctx)
})
}
Expand All @@ -384,14 +392,14 @@ func run(ctx context.Context) (err error) {
ghCli.Repositories,
)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return upgradeChecker.Run(ctx)
})
}

actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory)

sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, reporter, auditReporter, kubeConfig)
sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, analyticsReporter, auditReporter, kubeConfig)
scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan)
err = scheduler.Start(ctx)
if err != nil {
Expand All @@ -407,7 +415,7 @@ func run(ctx context.Context) (err error) {
)

errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
defer analytics.ReportPanicIfOccurs(logger, analyticsReporter)
return incomingWebhookSrv.Serve(ctx)
})
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/mattermost/mattermost/server/public v0.0.6
github.com/mattn/go-isatty v0.0.19
github.com/mattn/go-shellwords v1.0.12
github.com/mitchellh/mapstructure v1.5.0
github.com/morikuni/aec v1.0.0
github.com/muesli/reflow v0.3.0
github.com/olekukonko/tablewriter v0.0.5
Expand Down Expand Up @@ -202,7 +203,6 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/locker v1.0.1 // indirect
github.com/moby/spdystream v0.2.0 // indirect
Expand Down
71 changes: 71 additions & 0 deletions internal/analytics/batched/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package batched

import (
"sync"
)

const (

// Segment limits the API calls to 32kB per request: https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/
// We save 2kB (2048 characters) for general metadata. The rest of 30kB we can spend for sending source event details.
// Average event details size is 300 characters. So in theory we could include 30*1024/300=102.4 events.
// As the plugin name and additional labels don't have fixed size, we limit the number of events to 75 to be on the safe side.
maxEventDetailsCount = 75
)

// Data is a struct that holds data for batched reporting
type Data struct {
mutex sync.RWMutex

defaultTimeWindowInHours int
heartbeatProperties HeartbeatProperties
}

func NewData(defaultTimeWindowInHours int) *Data {
return &Data{
defaultTimeWindowInHours: defaultTimeWindowInHours,
heartbeatProperties: HeartbeatProperties{
TimeWindowInHours: defaultTimeWindowInHours,
EventsCount: 0,
Sources: make(map[string]SourceProperties),
}}
}

func (d *Data) IncrementTimeWindowInHours() {
d.mutex.Lock()
defer d.mutex.Unlock()

d.heartbeatProperties.TimeWindowInHours++
}

func (d *Data) Reset() {
d.mutex.Lock()
defer d.mutex.Unlock()

d.heartbeatProperties.TimeWindowInHours = d.defaultTimeWindowInHours
d.heartbeatProperties.Sources = make(map[string]SourceProperties)
d.heartbeatProperties.EventsCount = 0
}

func (d *Data) HeartbeatProperties() HeartbeatProperties {
d.mutex.RLock()
defer d.mutex.RUnlock()

return d.heartbeatProperties
}

func (d *Data) AddSourceEvent(in SourceEvent) {
d.mutex.Lock()
defer d.mutex.Unlock()

d.heartbeatProperties.EventsCount++

key := in.PluginName
sourceProps := d.heartbeatProperties.Sources[key]
sourceProps.EventsCount++
if d.heartbeatProperties.EventsCount <= maxEventDetailsCount {
// save event details only if we didn't exceed the limit
sourceProps.Events = append(sourceProps.Events, in)
}
d.heartbeatProperties.Sources[key] = sourceProps
}
Loading

0 comments on commit b8dc1bb

Please sign in to comment.