From 4967696da856349257c92d54b5b64bdb8a925d47 Mon Sep 17 00:00:00 2001 From: Vikrant Gupta Date: Sat, 4 Jan 2025 01:28:54 +0530 Subject: [PATCH] feat: added new cache package for query service (#6733) * feat: added new cache package for query service * feat: handle type checking for inmemory * feat: some copy corrections * feat: added inmemory test cases * chore: some renaming * feat: added redis handling * chore: add redis tests * feat(cache): refactor the code * feat(cache): refactor the code * feat(cache): added defaults for redis config * feat(cache): update makefile to run all tetss * feat(cache): update tests and docs * feat(cache): update tests and docs * feat(cache): handle signoz web flag * feat(cache): handle signoz web flag * feat(cache): handle signoz web flag --- Makefile | 2 +- conf/defaults.yaml | 23 +- ee/query-service/app/server.go | 6 +- ee/query-service/main.go | 11 +- pkg/cache/cache.go | 71 ++++++ pkg/cache/config.go | 49 +++++ pkg/cache/strategy/memory/memory.go | 96 +++++++++ pkg/cache/strategy/memory/memory_test.go | 264 +++++++++++++++++++++++ pkg/cache/strategy/redis/redis.go | 120 +++++++++++ pkg/cache/strategy/redis/redis_test.go | 139 ++++++++++++ pkg/config/config.go | 3 + pkg/config/config_test.go | 43 ++-- pkg/signoz/signoz.go | 37 ++++ 13 files changed, 830 insertions(+), 34 deletions(-) create mode 100644 pkg/cache/cache.go create mode 100644 pkg/cache/config.go create mode 100644 pkg/cache/strategy/memory/memory.go create mode 100644 pkg/cache/strategy/memory/memory_test.go create mode 100644 pkg/cache/strategy/redis/redis.go create mode 100644 pkg/cache/strategy/redis/redis_test.go create mode 100644 pkg/signoz/signoz.go diff --git a/Makefile b/Makefile index c7dd0e4a9a..8e460b3042 100644 --- a/Makefile +++ b/Makefile @@ -190,4 +190,4 @@ check-no-ee-references: fi test: - go test ./pkg/query-service/... + go test ./pkg/... diff --git a/conf/defaults.yaml b/conf/defaults.yaml index dd571e89fb..9239005fdf 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -8,4 +8,25 @@ web: # The prefix to serve web on prefix: / # The directory containing the static build files. - directory: /etc/signoz/web \ No newline at end of file + directory: /etc/signoz/web + +##################### Cache ##################### +cache: + # specifies the caching provider to use. + provider: memory + # memory: Uses in-memory caching. + memory: + # Time-to-live for cache entries in memory. Specify the duration in ns + ttl: 60000000000 + # The interval at which the cache will be cleaned up + cleanupInterval: + # redis: Uses Redis as the caching backend. + redis: + # The hostname or IP address of the Redis server. + host: localhost + # The port on which the Redis server is running. Default is usually 6379. + port: 6379 + # The password for authenticating with the Redis server, if required. + password: + # The Redis database number to use + db: 0 \ No newline at end of file diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index afd9dad4c5..1b17ca43d0 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -32,6 +32,7 @@ import ( baseauth "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/migrate" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/signoz" "go.signoz.io/signoz/pkg/web" licensepkg "go.signoz.io/signoz/ee/query-service/license" @@ -62,6 +63,7 @@ import ( const AppDbEngine = "sqlite" type ServerOptions struct { + SigNoz *signoz.SigNoz PromConfigPath string SkipTopLvlOpsPath string HTTPHostPort string @@ -109,7 +111,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status { } // NewServer creates and initializes Server -func NewServer(serverOptions *ServerOptions, web *web.Web) (*Server, error) { +func NewServer(serverOptions *ServerOptions) (*Server, error) { modelDao, err := dao.InitDao("sqlite", baseconst.RELATIONAL_DATASOURCE_PATH) if err != nil { @@ -291,7 +293,7 @@ func NewServer(serverOptions *ServerOptions, web *web.Web) (*Server, error) { usageManager: usageManager, } - httpServer, err := s.createPublicServer(apiHandler, web) + httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web) if err != nil { return nil, err diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 3514376213..f9713c22bd 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -20,7 +20,7 @@ import ( baseconst "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/migrate" "go.signoz.io/signoz/pkg/query-service/version" - signozweb "go.signoz.io/signoz/pkg/web" + "go.signoz.io/signoz/pkg/signoz" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -148,12 +148,13 @@ func main() { zap.L().Fatal("Failed to create config", zap.Error(err)) } - web, err := signozweb.New(zap.L(), config.Web) - if err != nil && !skipWebFrontend { - zap.L().Fatal("Failed to create web", zap.Error(err)) + signoz, err := signoz.New(config, skipWebFrontend) + if err != nil { + zap.L().Fatal("Failed to create signoz struct", zap.Error(err)) } serverOptions := &app.ServerOptions{ + SigNoz: signoz, HTTPHostPort: baseconst.HTTPHostPort, PromConfigPath: promConfigPath, SkipTopLvlOpsPath: skipTopLvlOpsPath, @@ -188,7 +189,7 @@ func main() { zap.L().Info("Migration successful") } - server, err := app.NewServer(serverOptions, web) + server, err := app.NewServer(serverOptions) if err != nil { zap.L().Fatal("Failed to create server", zap.Error(err)) } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go new file mode 100644 index 0000000000..c064883f96 --- /dev/null +++ b/pkg/cache/cache.go @@ -0,0 +1,71 @@ +package cache + +import ( + "context" + "encoding" + "fmt" + "reflect" + "time" +) + +// cacheable entity +type CacheableEntity interface { + encoding.BinaryMarshaler + encoding.BinaryUnmarshaler +} + +func WrapCacheableEntityErrors(rt reflect.Type, caller string) error { + if rt == nil { + return fmt.Errorf("%s: (nil)", caller) + } + + if rt.Kind() != reflect.Pointer { + return fmt.Errorf("%s: (non-pointer \"%s\")", caller, rt.String()) + } + + return fmt.Errorf("%s: (nil \"%s\")", caller, rt.String()) + +} + +// cache status +type RetrieveStatus int + +const ( + RetrieveStatusHit = RetrieveStatus(iota) + RetrieveStatusPartialHit + RetrieveStatusRangeMiss + RetrieveStatusKeyMiss + RetrieveStatusRevalidated + + RetrieveStatusError +) + +func (s RetrieveStatus) String() string { + switch s { + case RetrieveStatusHit: + return "hit" + case RetrieveStatusPartialHit: + return "partial hit" + case RetrieveStatusRangeMiss: + return "range miss" + case RetrieveStatusKeyMiss: + return "key miss" + case RetrieveStatusRevalidated: + return "revalidated" + case RetrieveStatusError: + return "error" + default: + return "unknown" + } +} + +// cache interface +type Cache interface { + Connect(ctx context.Context) error + Store(ctx context.Context, cacheKey string, data CacheableEntity, ttl time.Duration) error + Retrieve(ctx context.Context, cacheKey string, dest CacheableEntity, allowExpired bool) (RetrieveStatus, error) + SetTTL(ctx context.Context, cacheKey string, ttl time.Duration) + Remove(ctx context.Context, cacheKey string) + BulkRemove(ctx context.Context, cacheKeys []string) + Close(ctx context.Context) error +} diff --git a/pkg/cache/config.go b/pkg/cache/config.go new file mode 100644 index 0000000000..213fcaba0e --- /dev/null +++ b/pkg/cache/config.go @@ -0,0 +1,49 @@ +package cache + +import ( + "time" + + go_cache "github.com/patrickmn/go-cache" + "go.signoz.io/signoz/pkg/confmap" +) + +// Config satisfies the confmap.Config interface +var _ confmap.Config = (*Config)(nil) + +type Memory struct { + TTL time.Duration `mapstructure:"ttl"` + CleanupInterval time.Duration `mapstructure:"cleanupInterval"` +} + +type Redis struct { + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + Password string `mapstructure:"password"` + DB int `mapstructure:"db"` +} + +type Config struct { + Provider string `mapstructure:"provider"` + Memory Memory `mapstructure:"memory"` + Redis Redis `mapstructure:"redis"` +} + +func (c *Config) NewWithDefaults() confmap.Config { + return &Config{ + Provider: "memory", + Memory: Memory{ + TTL: go_cache.NoExpiration, + CleanupInterval: 1 * time.Minute, + }, + Redis: Redis{ + Host: "localhost", + Port: 6379, + Password: "", + DB: 0, + }, + } +} + +func (c *Config) Validate() error { + return nil +} diff --git a/pkg/cache/strategy/memory/memory.go b/pkg/cache/strategy/memory/memory.go new file mode 100644 index 0000000000..5649eecf54 --- /dev/null +++ b/pkg/cache/strategy/memory/memory.go @@ -0,0 +1,96 @@ +package memory + +import ( + "context" + "fmt" + "reflect" + "time" + + go_cache "github.com/patrickmn/go-cache" + _cache "go.signoz.io/signoz/pkg/cache" +) + +type cache struct { + cc *go_cache.Cache +} + +func New(opts *_cache.Memory) *cache { + return &cache{cc: go_cache.New(opts.TTL, opts.CleanupInterval)} +} + +// Connect does nothing +func (c *cache) Connect(_ context.Context) error { + return nil +} + +// Store stores the data in the cache +func (c *cache) Store(_ context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error { + // check if the data being passed is a pointer and is not nil + rv := reflect.ValueOf(data) + if rv.Kind() != reflect.Pointer || rv.IsNil() { + return _cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory") + } + + c.cc.Set(cacheKey, data, ttl) + return nil +} + +// Retrieve retrieves the data from the cache +func (c *cache) Retrieve(_ context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) { + // check if the destination being passed is a pointer and is not nil + dstv := reflect.ValueOf(dest) + if dstv.Kind() != reflect.Pointer || dstv.IsNil() { + return _cache.RetrieveStatusError, _cache.WrapCacheableEntityErrors(reflect.TypeOf(dest), "inmemory") + } + + // check if the destination value is settable + if !dstv.Elem().CanSet() { + return _cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem()) + } + + data, found := c.cc.Get(cacheKey) + if !found { + return _cache.RetrieveStatusKeyMiss, nil + } + + // check the type compatbility between the src and dest + srcv := reflect.ValueOf(data) + if !srcv.Type().AssignableTo(dstv.Type()) { + return _cache.RetrieveStatusError, fmt.Errorf("src type is not assignable to dst type") + } + + // set the value to from src to dest + dstv.Elem().Set(srcv.Elem()) + return _cache.RetrieveStatusHit, nil +} + +// SetTTL sets the TTL for the cache entry +func (c *cache) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) { + item, found := c.cc.Get(cacheKey) + if !found { + return + } + c.cc.Replace(cacheKey, item, ttl) +} + +// Remove removes the cache entry +func (c *cache) Remove(_ context.Context, cacheKey string) { + c.cc.Delete(cacheKey) +} + +// BulkRemove removes the cache entries +func (c *cache) BulkRemove(_ context.Context, cacheKeys []string) { + for _, cacheKey := range cacheKeys { + c.cc.Delete(cacheKey) + } +} + +// Close does nothing +func (c *cache) Close(_ context.Context) error { + return nil +} + +// Configuration returns the cache configuration +func (c *cache) Configuration() *_cache.Memory { + return nil +} diff --git a/pkg/cache/strategy/memory/memory_test.go b/pkg/cache/strategy/memory/memory_test.go new file mode 100644 index 0000000000..d8434e6b2e --- /dev/null +++ b/pkg/cache/strategy/memory/memory_test.go @@ -0,0 +1,264 @@ +package memory + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + _cache "go.signoz.io/signoz/pkg/cache" +) + +// TestNew tests the New function +func TestNew(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + assert.NotNil(t, c) + assert.NotNil(t, c.cc) + assert.NoError(t, c.Connect(context.Background())) +} + +type CacheableEntity struct { + Key string + Value int + Expiry time.Duration +} + +func (ce CacheableEntity) MarshalBinary() ([]byte, error) { + return json.Marshal(ce) +} + +func (ce CacheableEntity) UnmarshalBinary(data []byte) error { + return nil +} + +type DCacheableEntity struct { + Key string + Value int + Expiry time.Duration +} + +func (dce DCacheableEntity) MarshalBinary() ([]byte, error) { + return json.Marshal(dce) +} + +func (dce DCacheableEntity) UnmarshalBinary(data []byte) error { + return nil +} + +// TestStore tests the Store function +// this should fail because of nil pointer error +func TestStoreWithNilPointer(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + var storeCacheableEntity *CacheableEntity + assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) +} + +// this should fail because of no pointer error +func TestStoreWithStruct(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + var storeCacheableEntity CacheableEntity + assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) +} + +func TestStoreWithNonNilPointer(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) +} + +// TestRetrieve tests the Retrieve function +func TestRetrieveWithNilPointer(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) + + var retrieveCacheableEntity *CacheableEntity + + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.Error(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusError) +} + +func TestRetrieveWitNonPointer(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) + + var retrieveCacheableEntity CacheableEntity + + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.Error(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusError) +} + +func TestRetrieveWithDifferentTypes(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) + + retrieveCacheableEntity := new(DCacheableEntity) + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.Error(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusError) +} + +func TestRetrieveWithSameTypes(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) + + retrieveCacheableEntity := new(CacheableEntity) + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusHit) + assert.Equal(t, storeCacheableEntity, retrieveCacheableEntity) +} + +// TestSetTTL tests the SetTTL function +func TestSetTTL(t *testing.T) { + c := New(&_cache.Memory{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second}) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + retrieveCacheableEntity := new(CacheableEntity) + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 2*time.Second)) + time.Sleep(3 * time.Second) + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss) + assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity) + + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 2*time.Second)) + c.SetTTL(context.Background(), "key", 4*time.Second) + time.Sleep(3 * time.Second) + retrieveStatus, err = c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusHit) + assert.Equal(t, retrieveCacheableEntity, storeCacheableEntity) +} + +// TestRemove tests the Remove function +func TestRemove(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + retrieveCacheableEntity := new(CacheableEntity) + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) + c.Remove(context.Background(), "key") + + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss) + assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity) +} + +// TestBulkRemove tests the BulkRemove function +func TestBulkRemove(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + retrieveCacheableEntity := new(CacheableEntity) + assert.NoError(t, c.Store(context.Background(), "key1", storeCacheableEntity, 10*time.Second)) + assert.NoError(t, c.Store(context.Background(), "key2", storeCacheableEntity, 10*time.Second)) + c.BulkRemove(context.Background(), []string{"key1", "key2"}) + + retrieveStatus, err := c.Retrieve(context.Background(), "key1", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss) + assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity) + + retrieveStatus, err = c.Retrieve(context.Background(), "key2", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss) + assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity) +} + +// TestCache tests the cache +func TestCache(t *testing.T) { + opts := &_cache.Memory{ + TTL: 10 * time.Second, + CleanupInterval: 10 * time.Second, + } + c := New(opts) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + retrieveCacheableEntity := new(CacheableEntity) + assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) + retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + assert.NoError(t, err) + assert.Equal(t, retrieveStatus, _cache.RetrieveStatusHit) + assert.Equal(t, storeCacheableEntity, retrieveCacheableEntity) + c.Remove(context.Background(), "key") +} diff --git a/pkg/cache/strategy/redis/redis.go b/pkg/cache/strategy/redis/redis.go new file mode 100644 index 0000000000..0309072656 --- /dev/null +++ b/pkg/cache/strategy/redis/redis.go @@ -0,0 +1,120 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/go-redis/redis/v8" + _cache "go.signoz.io/signoz/pkg/cache" + "go.uber.org/zap" +) + +type cache struct { + client *redis.Client + opts *_cache.Redis +} + +func New(opts *_cache.Redis) *cache { + return &cache{opts: opts} +} + +// WithClient creates a new cache with the given client +func WithClient(client *redis.Client) *cache { + return &cache{client: client} +} + +// Connect connects to the redis server +func (c *cache) Connect(_ context.Context) error { + c.client = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:%d", c.opts.Host, c.opts.Port), + Password: c.opts.Password, + DB: c.opts.DB, + }) + return nil +} + +// Store stores the data in the cache +func (c *cache) Store(ctx context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error { + return c.client.Set(ctx, cacheKey, data, ttl).Err() +} + +// Retrieve retrieves the data from the cache +func (c *cache) Retrieve(ctx context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) { + err := c.client.Get(ctx, cacheKey).Scan(dest) + if err != nil { + if errors.Is(err, redis.Nil) { + return _cache.RetrieveStatusKeyMiss, nil + } + return _cache.RetrieveStatusError, err + } + return _cache.RetrieveStatusHit, nil +} + +// SetTTL sets the TTL for the cache entry +func (c *cache) SetTTL(ctx context.Context, cacheKey string, ttl time.Duration) { + err := c.client.Expire(ctx, cacheKey, ttl).Err() + if err != nil { + zap.L().Error("error setting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Duration("ttl", ttl), zap.Error(err)) + } +} + +// Remove removes the cache entry +func (c *cache) Remove(ctx context.Context, cacheKey string) { + c.BulkRemove(ctx, []string{cacheKey}) +} + +// BulkRemove removes the cache entries +func (c *cache) BulkRemove(ctx context.Context, cacheKeys []string) { + if err := c.client.Del(ctx, cacheKeys...).Err(); err != nil { + zap.L().Error("error deleting cache keys", zap.Strings("cacheKeys", cacheKeys), zap.Error(err)) + } +} + +// Close closes the connection to the redis server +func (c *cache) Close(_ context.Context) error { + return c.client.Close() +} + +// Ping pings the redis server +func (c *cache) Ping(ctx context.Context) error { + return c.client.Ping(ctx).Err() +} + +// GetClient returns the redis client +func (c *cache) GetClient() *redis.Client { + return c.client +} + +// GetOptions returns the options +func (c *cache) GetOptions() *_cache.Redis { + return c.opts +} + +// GetTTL returns the TTL for the cache entry +func (c *cache) GetTTL(ctx context.Context, cacheKey string) time.Duration { + ttl, err := c.client.TTL(ctx, cacheKey).Result() + if err != nil { + zap.L().Error("error getting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Error(err)) + } + return ttl +} + +// GetKeys returns the keys matching the pattern +func (c *cache) GetKeys(ctx context.Context, pattern string) ([]string, error) { + return c.client.Keys(ctx, pattern).Result() +} + +// GetKeysWithTTL returns the keys matching the pattern with their TTL +func (c *cache) GetKeysWithTTL(ctx context.Context, pattern string) (map[string]time.Duration, error) { + keys, err := c.GetKeys(ctx, pattern) + if err != nil { + return nil, err + } + result := make(map[string]time.Duration) + for _, key := range keys { + result[key] = c.GetTTL(ctx, key) + } + return result, nil +} diff --git a/pkg/cache/strategy/redis/redis_test.go b/pkg/cache/strategy/redis/redis_test.go new file mode 100644 index 0000000000..2b1539f2bd --- /dev/null +++ b/pkg/cache/strategy/redis/redis_test.go @@ -0,0 +1,139 @@ +package redis + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/go-redis/redismock/v8" + "github.com/stretchr/testify/assert" + _cache "go.signoz.io/signoz/pkg/cache" +) + +type CacheableEntity struct { + Key string + Value int + Expiry time.Duration +} + +func (ce *CacheableEntity) MarshalBinary() ([]byte, error) { + return json.Marshal(ce) +} + +func (ce *CacheableEntity) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, ce) +} + +func TestStore(t *testing.T) { + db, mock := redismock.NewClientMock() + cache := WithClient(db) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + + mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil() + cache.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestRetrieve(t *testing.T) { + db, mock := redismock.NewClientMock() + cache := WithClient(db) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + retrieveCacheableEntity := new(CacheableEntity) + + mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil() + cache.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second) + + data, err := storeCacheableEntity.MarshalBinary() + assert.NoError(t, err) + + mock.ExpectGet("key").SetVal(string(data)) + retrieveStatus, err := cache.Retrieve(context.Background(), "key", retrieveCacheableEntity, false) + if err != nil { + t.Errorf("unexpected error: %s", err) + } + + if retrieveStatus != _cache.RetrieveStatusHit { + t.Errorf("expected status %d, got %d", _cache.RetrieveStatusHit, retrieveStatus) + } + + assert.Equal(t, storeCacheableEntity, retrieveCacheableEntity) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestSetTTL(t *testing.T) { + db, mock := redismock.NewClientMock() + cache := WithClient(db) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + + mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil() + cache.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second) + + mock.ExpectExpire("key", 4*time.Second).RedisNil() + cache.SetTTL(context.Background(), "key", 4*time.Second) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestRemove(t *testing.T) { + db, mock := redismock.NewClientMock() + c := WithClient(db) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + + mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil() + c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second) + + mock.ExpectDel("key").RedisNil() + c.Remove(context.Background(), "key") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} + +func TestBulkRemove(t *testing.T) { + db, mock := redismock.NewClientMock() + c := WithClient(db) + storeCacheableEntity := &CacheableEntity{ + Key: "some-random-key", + Value: 1, + Expiry: time.Microsecond, + } + + mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil() + c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second) + + mock.ExpectSet("key2", storeCacheableEntity, 10*time.Second).RedisNil() + c.Store(context.Background(), "key2", storeCacheableEntity, 10*time.Second) + + mock.ExpectDel("key", "key2").RedisNil() + c.BulkRemove(context.Background(), []string{"key", "key2"}) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 6d88cacb61..a1333a89da 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "context" + "go.signoz.io/signoz/pkg/cache" signozconfmap "go.signoz.io/signoz/pkg/confmap" "go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/web" @@ -13,6 +14,7 @@ var ( defaults = map[string]signozconfmap.Config{ "instrumentation": &instrumentation.Config{}, "web": &web.Config{}, + "cache": &cache.Config{}, } ) @@ -20,6 +22,7 @@ var ( type Config struct { Instrumentation instrumentation.Config `mapstructure:"instrumentation"` Web web.Config `mapstructure:"web"` + Cache cache.Config `mapstructure:"cache"` } func New(ctx context.Context, settings ProviderSettings) (*Config, error) { diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index b3e3007bb4..ac2ce3e762 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -3,22 +3,22 @@ package config import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/confmap" - contribsdkconfig "go.opentelemetry.io/contrib/config" + "go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/confmap/provider/signozenvprovider" - "go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/web" ) func TestNewWithSignozEnvProvider(t *testing.T) { - t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__ENABLED", "true") - t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__PROCESSORS__BATCH__EXPORTER__OTLP__ENDPOINT", "0.0.0.0:4317") - t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__PROCESSORS__BATCH__EXPORT_TIMEOUT", "10") + t.Setenv("SIGNOZ__WEB__PREFIX", "/web") t.Setenv("SIGNOZ__WEB__DIRECTORY", "/build") + t.Setenv("SIGNOZ__CACHE__PROVIDER", "redis") + t.Setenv("SIGNOZ__CACHE__REDIS__HOST", "127.0.0.1") config, err := New(context.Background(), ProviderSettings{ ResolverSettings: confmap.ResolverSettings{ @@ -30,31 +30,24 @@ func TestNewWithSignozEnvProvider(t *testing.T) { }) require.NoError(t, err) - i := 10 expected := &Config{ - Instrumentation: instrumentation.Config{ - Logs: instrumentation.LogsConfig{ - Enabled: true, - LoggerProvider: contribsdkconfig.LoggerProvider{ - Processors: []contribsdkconfig.LogRecordProcessor{ - { - Batch: &contribsdkconfig.BatchLogRecordProcessor{ - ExportTimeout: &i, - Exporter: contribsdkconfig.LogRecordExporter{ - OTLP: &contribsdkconfig.OTLP{ - Endpoint: "0.0.0.0:4317", - }, - }, - }, - }, - }, - }, - }, - }, Web: web.Config{ Prefix: "/web", Directory: "/build", }, + Cache: cache.Config{ + Provider: "redis", + Memory: cache.Memory{ + TTL: time.Duration(-1), + CleanupInterval: 1 * time.Minute, + }, + Redis: cache.Redis{ + Host: "127.0.0.1", + Port: 6379, + Password: "", + DB: 0, + }, + }, } assert.Equal(t, expected, config) diff --git a/pkg/signoz/signoz.go b/pkg/signoz/signoz.go new file mode 100644 index 0000000000..a2a42d3073 --- /dev/null +++ b/pkg/signoz/signoz.go @@ -0,0 +1,37 @@ +package signoz + +import ( + "go.signoz.io/signoz/pkg/cache" + "go.signoz.io/signoz/pkg/cache/strategy/memory" + "go.signoz.io/signoz/pkg/cache/strategy/redis" + "go.signoz.io/signoz/pkg/config" + "go.signoz.io/signoz/pkg/web" + "go.uber.org/zap" +) + +type SigNoz struct { + Cache cache.Cache + Web *web.Web +} + +func New(config *config.Config, skipWebFrontend bool) (*SigNoz, error) { + var cache cache.Cache + + // init for the cache + switch config.Cache.Provider { + case "memory": + cache = memory.New(&config.Cache.Memory) + case "redis": + cache = redis.New(&config.Cache.Redis) + } + + web, err := web.New(zap.L(), config.Web) + if err != nil && !skipWebFrontend { + return nil, err + } + + return &SigNoz{ + Cache: cache, + Web: web, + }, nil +}