Skip to content

Commit

Permalink
feat: Add support of multiple kind of cache for relabeling components
Browse files Browse the repository at this point in the history
  • Loading branch information
pbailhache committed Sep 30, 2024
1 parent 8f1be0e commit b74427d
Show file tree
Hide file tree
Showing 9 changed files with 605 additions and 57 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo=
github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
Expand Down
1 change: 0 additions & 1 deletion internal/component/faro/receiver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (s *server) Run(ctx context.Context) error {
})

mw := middleware.Instrument{
RouteMatcher: r,
Duration: s.metrics.requestDuration,
RequestBodySize: s.metrics.rxMessageSize,
ResponseBodySize: s.metrics.txMessageSize,
Expand Down
87 changes: 58 additions & 29 deletions internal/component/prometheus/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/cache"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
lru "github.com/hashicorp/golang-lru/v2"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -20,11 +20,19 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"

"go.uber.org/atomic"
)

const name = "prometheus.relabel"

// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
// to be recalculated again.
type labelAndID struct {
Labels labels.Labels `json:"labels"`
ID uint64 `json:"id"`
}

func init() {
component.Register(component.Registration{
Name: name,
Expand All @@ -47,22 +55,38 @@ type Arguments struct {
// The relabelling rules to apply to each metric before it's forwarded.
MetricRelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"`

// Cache size to use for LRU cache.
CacheSize int `alloy:"max_cache_size,attr,optional"`
// DEPRECATED Use type = inmemory and cache_size field.
InMemoryCacheSizeDeprecated int `alloy:"max_cache_size,attr,optional"`

// Cache backend configuration.
CacheConfig cache.CacheConfig `alloy:"cache,block,optional"`
}

// SetToDefault implements syntax.Defaulter.
func (arg *Arguments) SetToDefault() {
*arg = Arguments{
CacheSize: 100_000,
CacheConfig: cache.CacheConfig{
Backend: cache.InMemory,
InMemory: cache.InMemoryCacheConfig{
CacheSize: 100_000,
},
},
}
}

// Validate implements syntax.Validator.
func (arg *Arguments) Validate() error {
if arg.CacheSize <= 0 {
return fmt.Errorf("max_cache_size must be greater than 0 and is %d", arg.CacheSize)
switch arg.CacheConfig.Backend {
case cache.InMemory:
if arg.CacheConfig.InMemory.CacheSize <= 0 {
return fmt.Errorf("cache_size must be greater than 0 and is %d", arg.CacheConfig.InMemory.CacheSize)
}
case cache.Memcached:
case cache.Redis:
default:
return fmt.Errorf("unknown cache backend, should be one of %s", cache.SupportedCaches)
}

return nil
}

Expand Down Expand Up @@ -91,7 +115,7 @@ type Component struct {
debugDataPublisher livedebugging.DebugDataPublisher

cacheMut sync.RWMutex
cache *lru.Cache[uint64, *labelAndID]
cache cache.Cache[labelAndID]
}

var (
Expand All @@ -101,7 +125,13 @@ var (

// New creates a new prometheus.relabel component.
func New(o component.Options, args Arguments) (*Component, error) {
cache, err := lru.New[uint64, *labelAndID](args.CacheSize)
// to be removed after deprecation of max cache size
if args.CacheConfig.Backend == "" && args.InMemoryCacheSizeDeprecated != 0 {
args.CacheConfig.Backend = cache.InMemory
args.CacheConfig.InMemory.CacheSize = args.InMemoryCacheSizeDeprecated
}

relabelCache, err := cache.NewCache[labelAndID](args.CacheConfig)
if err != nil {
return nil, err
}
Expand All @@ -117,7 +147,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
}
c := &Component{
opts: o,
cache: cache,
cache: relabelCache,
ls: data.(labelstore.LabelStore),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
Expand Down Expand Up @@ -230,7 +260,11 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.clearCache(newArgs.CacheSize)

// in case of in_memory cache we need to clean the cache
if newArgs.CacheConfig.Backend == cache.InMemory {
c.clearCache(newArgs.CacheConfig.InMemory.CacheSize)
}
c.mrc = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs)
c.fanout.UpdateChildren(newArgs.ForwardTo)

Expand All @@ -253,7 +287,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
c.cacheHits.Inc()
// If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels
if newLbls != nil {
relabelled = newLbls.labels
relabelled = newLbls.Labels
}
} else {
// Relabel against a copy of the labels to prevent modifying the original
Expand All @@ -271,7 +305,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
}
// Set the cache size to the cache.len
// TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance.
c.cacheSize.Set(float64(c.cache.Len()))
// c.cacheSize.Set(float64(c.cache.GetCacheSize()))

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
Expand All @@ -285,44 +319,39 @@ func (c *Component) getFromCache(id uint64) (*labelAndID, bool) {
c.cacheMut.RLock()
defer c.cacheMut.RUnlock()

fm, found := c.cache.Get(id)
return fm, found
value, err := c.cache.Get(fmt.Sprintf("%d", id))

return value, err == nil
}

func (c *Component) deleteFromCache(id uint64) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()
c.cacheDeletes.Inc()
c.cache.Remove(id)

c.cache.Remove(fmt.Sprintf("%d", id))
}

func (c *Component) clearCache(cacheSize int) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()
cache, _ := lru.New[uint64, *labelAndID](cacheSize)
c.cache = cache
_ = c.cache.Clear(cacheSize)
}

func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()

if !keep {
c.cache.Add(originalID, nil)
_ = c.cache.Set(fmt.Sprintf("%d", originalID), nil, 0)
return
}
newGlobal := c.ls.GetOrAddGlobalRefID(lbls)
c.cache.Add(originalID, &labelAndID{
labels: lbls,
id: newGlobal,
})

_ = c.cache.Set(fmt.Sprintf("%d", originalID), &labelAndID{
Labels: lbls,
ID: newGlobal,
}, 0)
}

func (c *Component) LiveDebugging(_ int) {}

// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
// to be recalculated again.
type labelAndID struct {
labels labels.Labels
id uint64
}
50 changes: 24 additions & 26 deletions internal/component/prometheus/relabel/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/service/cache"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util"
Expand All @@ -25,39 +26,35 @@ import (
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
func TestLRUCache(t *testing.T) {
lc := labelstore.New(nil, prom.DefaultRegisterer)
relabeller := generateRelabel(t)
relabeller := generateRelabelWithLRUCache(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
require.True(t, relabeller.cache.Len() == 1)
require.True(t, relabeller.cache.GetCacheSize() == 1)
entry, found := relabeller.getFromCache(lc.GetOrAddGlobalRefID(lbls))
require.True(t, found)
require.NotNil(t, entry)
require.True(
t,
lc.GetOrAddGlobalRefID(entry.labels) != lc.GetOrAddGlobalRefID(lbls),
lc.GetOrAddGlobalRefID(entry.Labels) != lc.GetOrAddGlobalRefID(lbls),
)
}

func TestUpdateReset(t *testing.T) {
relabeller := generateRelabel(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
require.True(t, relabeller.cache.Len() == 1)
_ = relabeller.Update(Arguments{
CacheSize: 100000,
MetricRelabelConfigs: []*alloy_relabel.Config{},
})
require.True(t, relabeller.cache.Len() == 0)
}

func TestValidator(t *testing.T) {
args := Arguments{CacheSize: 0}
args := Arguments{
CacheConfig: cache.CacheConfig{
Backend: "unknown",
},
}
err := args.Validate()
require.Error(t, err)

args.CacheSize = 1
args.CacheConfig.Backend = cache.InMemory
err = args.Validate()
require.Error(t, err)

args.CacheConfig.InMemory.CacheSize = 1
err = args.Validate()
require.NoError(t, err)
}
Expand All @@ -83,7 +80,7 @@ func TestNil(t *testing.T) {
Action: "drop",
},
},
CacheSize: 100000,
InMemoryCacheSizeDeprecated: 100000,
})
require.NotNil(t, relabeller)
require.NoError(t, err)
Expand All @@ -93,22 +90,22 @@ func TestNil(t *testing.T) {
}

func TestLRU(t *testing.T) {
relabeller := generateRelabel(t)
relabeller := generateRelabelWithLRUCache(t)

for i := 0; i < 600_000; i++ {
lbls := labels.FromStrings("__address__", "localhost", "inc", strconv.Itoa(i))
relabeller.relabel(0, lbls)
}
require.True(t, relabeller.cache.Len() == 100_000)
require.True(t, relabeller.cache.GetCacheSize() == 100_000)
}

func TestLRUNaN(t *testing.T) {
relabeller := generateRelabel(t)
relabeller := generateRelabelWithLRUCache(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
require.True(t, relabeller.cache.Len() == 1)
require.True(t, relabeller.cache.GetCacheSize() == 1)
relabeller.relabel(math.Float64frombits(value.StaleNaN), lbls)
require.True(t, relabeller.cache.Len() == 0)
require.True(t, relabeller.cache.GetCacheSize() == 0)
}

func BenchmarkCache(b *testing.B) {
Expand Down Expand Up @@ -147,7 +144,7 @@ func BenchmarkCache(b *testing.B) {
app.Commit()
}

func generateRelabel(t *testing.T) *Component {
func generateRelabelWithLRUCache(t *testing.T) *Component {
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(t, l.Has("new_label"))
Expand All @@ -170,8 +167,9 @@ func generateRelabel(t *testing.T) *Component {
Action: "replace",
},
},
CacheSize: 100_000,
InMemoryCacheSizeDeprecated: 100_000,
})

require.NotNil(t, relabeller)
require.NoError(t, err)
return relabeller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/alloy/internal/component/prometheus/relabel"
"github.com/grafana/alloy/internal/converter/internal/common"
"github.com/grafana/alloy/internal/converter/internal/prometheusconvert/build"
"github.com/grafana/alloy/internal/service/cache"
prom_relabel "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
)
Expand Down Expand Up @@ -36,7 +37,12 @@ func toRelabelArguments(relabelConfigs []*prom_relabel.Config, forwardTo []stora
return &relabel.Arguments{
ForwardTo: forwardTo,
MetricRelabelConfigs: ToAlloyRelabelConfigs(relabelConfigs),
CacheSize: 100_000,
CacheConfig: cache.CacheConfig{
Backend: cache.InMemory,
InMemory: cache.InMemoryCacheConfig{
CacheSize: 100_000,
},
},
}
}

Expand Down
Loading

0 comments on commit b74427d

Please sign in to comment.