diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c7f0d76b4..b1e63c414c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,8 @@ Main (unreleased) - Support TLS client settings for clustering (@tiagorossig) +- Add support for `not_modified` response in `remotecfg`. (@spartan0x117) + v1.4.2 ----------------- diff --git a/go.mod b/go.mod index 5a880856f3..ffc0e088c9 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/google/renameio/v2 v2.0.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 - github.com/grafana/alloy-remote-config v0.0.8 + github.com/grafana/alloy-remote-config v0.0.9 github.com/grafana/alloy/syntax v0.1.0 github.com/grafana/beyla v1.8.2 github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d diff --git a/go.sum b/go.sum index 39e11916b0..eb68d61c0a 100644 --- a/go.sum +++ b/go.sum @@ -1192,6 +1192,8 @@ github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099o github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/alloy-remote-config v0.0.8 h1:bQTk7rkR1Hykss+bfMv7CucpF/fRsi2lixJHfIcOMnc= github.com/grafana/alloy-remote-config v0.0.8/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k= +github.com/grafana/alloy-remote-config v0.0.9 h1:gy34SxZ8Iq/HrDTIFZi80+8BlT+FnJhKiP9mryHNEUE= +github.com/grafana/alloy-remote-config v0.0.9/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k= github.com/grafana/beyla v1.8.2 h1:AkHpUFnfX2SaRsLZkMtC8BPRtfEZRfP7A7ewRr3ruS0= github.com/grafana/beyla v1.8.2/go.mod h1:82jt8ZJA50qq7R5Ri8tHcGFJ6vJmqDexprVTYSdu6cY= github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 h1:ju6EcY2aEobeBg185ETtFCKj5WzaQ48qfkbsSRRQrF4= diff --git a/internal/service/remotecfg/remotecfg.go b/internal/service/remotecfg/remotecfg.go index 2687f7621e..cbe5dcedbd 100644 --- a/internal/service/remotecfg/remotecfg.go +++ b/internal/service/remotecfg/remotecfg.go @@ -2,6 +2,7 @@ package remotecfg import ( "context" + "errors" "fmt" "hash/fnv" "maps" @@ -40,6 +41,8 @@ func getHash(in []byte) string { const baseJitter = 100 * time.Millisecond +var errNotModified = errors.New("config not modified since last fetch") + // Service implements a service for remote configuration. // The default value of ch is nil; this means it will block forever if the // remotecfg service is not configured. In addition, we're keeping track of @@ -60,10 +63,15 @@ type Service struct { systemAttrs map[string]string attrs map[string]string metrics *metrics + + // This is the hash received from the API. It is used to determine if + // the configuration has changed since the last fetch + remoteHash string } type metrics struct { lastFetchSuccess prometheus.Gauge + lastFetchNotModified prometheus.Gauge totalFailures prometheus.Counter configHash *prometheus.GaugeVec lastFetchSuccessTime prometheus.Gauge @@ -178,6 +186,12 @@ func (s *Service) registerMetrics() { Help: "Remote config loaded successfully", }, ), + lastFetchNotModified: prom.NewGauge( + prometheus.GaugeOpts{ + Name: "remotecfg_last_load_not_modified", + Help: "Remote config not modified since last fetch", + }, + ), totalFailures: prom.NewCounter( prometheus.CounterOpts{ Name: "remotecfg_load_failures_total", @@ -345,15 +359,27 @@ func (s *Service) fetchRemote() error { return nil } + level.Debug(s.opts.Logger).Log("msg", "fetching remote configuration") + b, err := s.getAPIConfig() s.metrics.totalAttempts.Add(1) - if err != nil { + + if err == nil || err == errNotModified { + s.metrics.lastFetchSuccess.Set(1) + s.metrics.lastFetchSuccessTime.SetToCurrentTime() + } else { s.metrics.totalFailures.Add(1) s.metrics.lastFetchSuccess.Set(0) return err } - s.metrics.lastFetchSuccess.Set(1) - s.metrics.lastFetchSuccessTime.SetToCurrentTime() + + if err == errNotModified { + level.Debug(s.opts.Logger).Log("msg", "skipping over API response since it has not been modified since last fetch") + s.metrics.lastFetchNotModified.Set(1) + return nil + } else { + s.metrics.lastFetchNotModified.Set(0) + } // API return the same configuration, no need to reload. newConfigHash := getHash(b) @@ -391,6 +417,7 @@ func (s *Service) getAPIConfig() ([]byte, error) { req := connect.NewRequest(&collectorv1.GetConfigRequest{ Id: s.args.ID, Attributes: s.attrs, + Hash: s.remoteHash, }) client := s.asClient s.mut.RUnlock() @@ -401,6 +428,14 @@ func (s *Service) getAPIConfig() ([]byte, error) { return nil, err } s.metrics.getConfigTime.Observe(time.Since(start).Seconds()) + if gcr.Msg.NotModified { + return nil, errNotModified + } + if gcr.Msg.Hash != "" { + s.mut.Lock() + s.remoteHash = gcr.Msg.Hash + s.mut.Unlock() + } return []byte(gcr.Msg.GetContent()), nil } diff --git a/internal/service/remotecfg/remotecfg_test.go b/internal/service/remotecfg/remotecfg_test.go index 0bdfc7b81d..171046c08c 100644 --- a/internal/service/remotecfg/remotecfg_test.go +++ b/internal/service/remotecfg/remotecfg_test.go @@ -48,7 +48,7 @@ func TestOnDiskCache(t *testing.T) { client.registerCollectorFunc = buildRegisterCollectorFunc(®isterCalled) // Mock client to return an unparseable response. - client.getConfigFunc = buildGetConfigHandler("unparseable config") + client.getConfigFunc = buildGetConfigHandler("unparseable config", "", false) // Write the cache contents, and run the service. err := os.WriteFile(env.svc.dataPath, []byte(cacheContents), 0644) @@ -84,7 +84,7 @@ func TestAPIResponse(t *testing.T) { // Mock client to return a valid response. var registerCalled atomic.Bool client.mut.Lock() - client.getConfigFunc = buildGetConfigHandler(cfg1) + client.getConfigFunc = buildGetConfigHandler(cfg1, "", false) client.registerCollectorFunc = buildRegisterCollectorFunc(®isterCalled) client.mut.Unlock() @@ -103,7 +103,7 @@ func TestAPIResponse(t *testing.T) { // Update the response returned by the API. client.mut.Lock() - client.getConfigFunc = buildGetConfigHandler(cfg2) + client.getConfigFunc = buildGetConfigHandler(cfg2, "", false) client.mut.Unlock() // Verify that the service has loaded the updated response. @@ -114,11 +114,61 @@ func TestAPIResponse(t *testing.T) { cancel() } -func buildGetConfigHandler(in string) func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) { +func TestAPIResponseNotModified(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + url := "https://example.com/" + cfg1 := `loki.process "default" { forward_to = [] }` + + // Create a new service. + env := newTestEnvironment(t) + require.NoError(t, env.ApplyConfig(fmt.Sprintf(` + url = "%s" + poll_frequency = "10s" + `, url))) + + client := &collectorClient{} + env.svc.asClient = client + + // Mock client to return a valid response. + var registerCalled atomic.Bool + client.mut.Lock() + client.getConfigFunc = buildGetConfigHandler(cfg1, "12345", false) + client.registerCollectorFunc = buildRegisterCollectorFunc(®isterCalled) + client.mut.Unlock() + + // Run the service. + go func() { + require.NoError(t, env.Run(ctx)) + }() + + require.Eventually(t, func() bool { return registerCalled.Load() }, 1*time.Second, 10*time.Millisecond) + + // As the API response was successful, verify that the service has loaded + // the valid response. + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, getHash([]byte(cfg1)), env.svc.getCfgHash()) + }, time.Second, 10*time.Millisecond) + + // Update the response returned by the API. + client.mut.Lock() + client.getConfigFunc = buildGetConfigHandler("", "12345", true) + client.mut.Unlock() + + // Verify that the service has loaded the updated response. + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, getHash([]byte(cfg1)), env.svc.getCfgHash()) + }, 1*time.Second, 10*time.Millisecond) + + cancel() +} + +func buildGetConfigHandler(in string, hash string, notModified bool) func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) { return func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) { rsp := &connect.Response[collectorv1.GetConfigResponse]{ Msg: &collectorv1.GetConfigResponse{ - Content: in, + Content: in, + NotModified: notModified, + Hash: hash, }, } return rsp, nil