Skip to content

Commit

Permalink
Merge pull request #24 from hyp3rd/development
Browse files Browse the repository at this point in the history
Cache size handling - Redis full implementation
  • Loading branch information
hyp3rd authored Jan 25, 2023
2 parents 253b27a + a38a8c5 commit 5648487
Show file tree
Hide file tree
Showing 26 changed files with 1,087 additions and 239 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## Synopsis

HyperCache is a **thread-safe** **high-performance** cache implementation in Go that supports multiple backends with the expiration and eviction of items supporting custom algorithms alongside the defaults. It can be used as a standalone cache or as a cache middleware for a service. It can implement a [service interface](./service.go) to intercept cache methods and decorate em with middleware (default or custom).
HyperCache is a **thread-safe** **high-performance** cache implementation in `Go` that supports multiple backends with optional size limit, expiration and eviction of items supporting custom algorithms alongside the defaults. It can be used as a standalone cache or as a cache middleware for a service. It can implement a [service interface](./service.go) to intercept and decorate the cache methods with middleware (default or custom).
It is optimized for performance and flexibility allowing to specify the expiration and eviction intervals, provide and register new eviction algorithms, stats collectors, middleware(s).
It ships with a default [historigram stats collector](./stats/statscollector.go) and several eviction algorithms, but you can develop and register your own as long as it implements the [Eviction Algorithm interface](./eviction/eviction.go).:

Expand Down Expand Up @@ -50,12 +50,13 @@ goos: darwin
goarch: amd64
pkg: github.com/hyp3rd/hypercache/tests/benchmark
cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
BenchmarkHyperCache_Get-16 38833602 123.9 ns/op 0 B/op 0 allocs/op
BenchmarkHyperCache_Get_ProactiveEviction-16 38079158 124.4 ns/op 0 B/op 0 allocs/op
BenchmarkHyperCache_Set-16 4361000 1217 ns/op 203 B/op 3 allocs/op
BenchmarkHyperCache_Set_Proactive_Eviction-16 4343996 1128 ns/op 92 B/op 3 allocs/op
BenchmarkHyperCache_Get-16 39429110 115.7 ns/op 0 B/op 0 allocs/op
BenchmarkHyperCache_Get_ProactiveEviction-16 42094736 118.0 ns/op 0 B/op 0 allocs/op
BenchmarkHyperCache_List-16 10898176 437.0 ns/op 85 B/op 1 allocs/op
BenchmarkHyperCache_Set-16 3034786 1546 ns/op 252 B/op 4 allocs/op
BenchmarkHyperCache_Set_Proactive_Eviction-16 2725557 1833 ns/op 162 B/op 3 allocs/op
PASS
ok github.com/hyp3rd/hypercache/tests/benchmark 23.723s
ok github.com/hyp3rd/hypercache/tests/benchmark 30.031s
```

### Examples
Expand Down
4 changes: 2 additions & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type IBackend[T IBackendConstrain] interface {
Capacity() int
// SetCapacity sets the maximum number of items that can be stored in the cache.
SetCapacity(capacity int)
// Size returns the number of items currently stored in the cache.
Size() int
// Count returns the number of items currently stored in the cache.
Count() int
// Remove deletes the item with the given key from the cache.
Remove(keys ...string) error
}
Expand Down
62 changes: 32 additions & 30 deletions backend/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@ import (
"sort"
"sync"

"github.com/hyp3rd/hypercache/datastructure"
datastructure "github.com/hyp3rd/hypercache/datastructure/v3"
"github.com/hyp3rd/hypercache/errors"
"github.com/hyp3rd/hypercache/models"
"github.com/hyp3rd/hypercache/types"
)

// InMemory is a cache backend that stores the items in memory, leveraging a custom `ConcurrentMap`.
type InMemory struct {
items datastructure.ConcurrentMap[string, *models.Item] // map to store the items in the cache
capacity int // capacity of the cache, limits the number of items that can be stored in the cache
mutex sync.RWMutex // mutex to protect the cache from concurrent access
SortFilters // filters applied when listing the items in the cache
// items datastructure.ConcurrentMap[string, *models.Item] // map to store the items in the cache
items datastructure.ConcurrentMap // map to store the items in the cache
capacity int // capacity of the cache, limits the number of items that can be stored in the cache
mutex sync.RWMutex // mutex to protect the cache from concurrent access
SortFilters // filters applied when listing the items in the cache
}

// NewInMemory creates a new in-memory cache with the given options.
func NewInMemory[T InMemory](opts ...Option[InMemory]) (backend IInMemory[T], err error) {

InMemory := &InMemory{
items: datastructure.New[*models.Item](),
items: datastructure.New(),
}

// Apply the backend options
ApplyOptions(InMemory, opts...)

// Check if the `capacity` is valid
if InMemory.capacity < 0 {
return nil, errors.ErrInvalidCapacity
}
Expand All @@ -40,12 +40,16 @@ func (cacheBackend *InMemory) SetCapacity(capacity int) {
if capacity < 0 {
return
}

cacheBackend.capacity = capacity
}

// itemCount returns the number of items in the cache.
func (cacheBackend *InMemory) itemCount() int {
// Capacity returns the capacity of the cacheBackend.
func (cacheBackend *InMemory) Capacity() int {
return cacheBackend.capacity
}

// Count returns the number of items in the cache.
func (cacheBackend *InMemory) Count() int {
return cacheBackend.items.Count()
}

Expand All @@ -55,7 +59,6 @@ func (cacheBackend *InMemory) Get(key string) (item *models.Item, ok bool) {
if !ok {
return nil, false
}

// return the item
return item, true
}
Expand All @@ -80,12 +83,19 @@ func (cacheBackend *InMemory) List(options ...FilterOption[InMemory]) ([]*models
// Apply the filter options
ApplyFilterOptions(cacheBackend, options...)

items := make([]*models.Item, 0)
items := make([]*models.Item, 0, cacheBackend.items.Count())
wg := sync.WaitGroup{}
wg.Add(cacheBackend.items.Count())
for item := range cacheBackend.items.IterBuffered() {
if cacheBackend.FilterFunc == nil || cacheBackend.FilterFunc(item.Val) {
items = append(items, item.Val)
}
// go func(item datastructure.Tuple[string, *models.Item]) {
go func(item datastructure.Tuple) {
defer wg.Done()
if cacheBackend.FilterFunc == nil || cacheBackend.FilterFunc(&item.Val) {
items = append(items, &item.Val)
}
}(item)
}
wg.Wait()

if cacheBackend.SortBy == "" {
return items, nil
Expand Down Expand Up @@ -116,6 +126,9 @@ func (cacheBackend *InMemory) List(options ...FilterOption[InMemory]) ([]*models
// Remove removes items with the given key from the cacheBackend. If an item is not found, it does nothing.
func (cacheBackend *InMemory) Remove(keys ...string) (err error) {
//TODO: determine if handling the error or not
// var ok bool
// item := models.ItemPool.Get().(*models.Item)
// defer models.ItemPool.Put(item)
for _, key := range keys {
cacheBackend.items.Remove(key)
}
Expand All @@ -124,17 +137,6 @@ func (cacheBackend *InMemory) Remove(keys ...string) (err error) {

// Clear removes all items from the cacheBackend.
func (cacheBackend *InMemory) Clear() {
for item := range cacheBackend.items.IterBuffered() {
cacheBackend.items.Remove(item.Key)
}
}

// Capacity returns the capacity of the cacheBackend.
func (cacheBackend *InMemory) Capacity() int {
return cacheBackend.capacity
}

// Size returns the number of items in the cacheBackend.
func (cacheBackend *InMemory) Size() int {
return cacheBackend.itemCount()
// clear the cacheBackend
cacheBackend.items.Clear()
}
2 changes: 1 addition & 1 deletion backend/options.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package backend

import (
"github.com/go-redis/redis/v9"
"github.com/hyp3rd/hypercache/libs/serializer"
"github.com/hyp3rd/hypercache/models"
"github.com/hyp3rd/hypercache/types"
"github.com/redis/go-redis/v9"
)

// ISortableBackend is an interface that defines the methods that a backend should implement to be sortable.
Expand Down
25 changes: 8 additions & 17 deletions backend/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"
"sort"

"github.com/go-redis/redis/v9"
"github.com/hyp3rd/hypercache/errors"
"github.com/hyp3rd/hypercache/libs/serializer"
"github.com/hyp3rd/hypercache/models"
"github.com/hyp3rd/hypercache/types"
"github.com/redis/go-redis/v9"
)

// Redis is a cache backend that stores the items in a redis implementation.
Expand All @@ -36,7 +36,6 @@ func NewRedisBackend[T Redis](redisOptions ...Option[Redis]) (backend IRedisBack
if rb.capacity < 0 {
return nil, errors.ErrInvalidCapacity
}

// Check if the `keysSetName` is empty
if rb.keysSetName == "" {
rb.keysSetName = "hypercache"
Expand All @@ -55,11 +54,6 @@ func NewRedisBackend[T Redis](redisOptions ...Option[Redis]) (backend IRedisBack
return rb, nil
}

// Capacity returns the maximum number of items that can be stored in the cache.
func (cacheBackend *Redis) Capacity() int {
return cacheBackend.capacity
}

// SetCapacity sets the capacity of the cache.
func (cacheBackend *Redis) SetCapacity(capacity int) {
if capacity < 0 {
Expand All @@ -68,23 +62,21 @@ func (cacheBackend *Redis) SetCapacity(capacity int) {
cacheBackend.capacity = capacity
}

// itemCount returns the number of items in the cache.
func (cacheBackend *Redis) itemCount() int {
count, _ := cacheBackend.rdb.DBSize(context.Background()).Result()
return int(count)
// Capacity returns the maximum number of items that can be stored in the cache.
func (cacheBackend *Redis) Capacity() int {
return cacheBackend.capacity
}

// Size returns the number of items in the cache.
func (cacheBackend *Redis) Size() int {
return cacheBackend.itemCount()
func (cacheBackend *Redis) Count() int {
count, _ := cacheBackend.rdb.DBSize(context.Background()).Result()
return int(count)
}

// Get retrieves the Item with the given key from the cacheBackend. If the item is not found, it returns nil.
func (cacheBackend *Redis) Get(key string) (item *models.Item, ok bool) {
// pipe := cacheBackend.rdb.Conn().Pipeline()
// Check if the key is in the set of keys
isMember, err := cacheBackend.rdb.SIsMember(context.Background(), cacheBackend.keysSetName, key).Result()
// isMember, err := pipe.SIsMember(context.Background(), cacheBackend.keysSetName, key).Result()
if err != nil {
return nil, false
}
Expand All @@ -96,9 +88,8 @@ func (cacheBackend *Redis) Get(key string) (item *models.Item, ok bool) {
item = models.ItemPool.Get().(*models.Item)
// Return the item to the pool
defer models.ItemPool.Put(item)

data, err := cacheBackend.rdb.HGet(context.Background(), key, "data").Bytes()
// data, _ := pipe.HGet(context.Background(), key, "data").Bytes()
// _, err = pipe.Exec(context.Background())
if err != nil {
// Check if the item is not found
if err == redis.Nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/redis/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"crypto/tls"
"time"

"github.com/go-redis/redis/v9"
"github.com/redis/go-redis/v9"
)

// Option is a function type that can be used to configure the `Redis`.
Expand Down
2 changes: 1 addition & 1 deletion backend/redis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"time"

"github.com/go-redis/redis/v9"
"github.com/redis/go-redis/v9"
)

// Store is a redis store instance with redis client
Expand Down
13 changes: 13 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func ApplyHyperCacheOptions[T backend.IBackendConstrain](cache *HyperCache[T], o
}
}

// WithMaxCacheSize is an option that sets the maximum size of the cache.
// The maximum size of the cache is the maximum number of items that can be stored in the cache.
// If the maximum size of the cache is reached, the least recently used item will be evicted from the cache.
func WithMaxCacheSize[T backend.IBackendConstrain](maxCacheSize int64) Option[T] {
return func(cache *HyperCache[T]) {
// If the max cache size is less than 0, set it to 0.
if maxCacheSize < 0 {
maxCacheSize = 0
}
cache.maxCacheSize = maxCacheSize
}
}

// WithEvictionAlgorithm is an option that sets the eviction algorithm name field of the `HyperCache` struct.
// The eviction algorithm name determines which eviction algorithm will be used to evict items from the cache.
// The eviction algorithm name must be one of the following:
Expand Down
24 changes: 12 additions & 12 deletions datastructure/cmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ type Stringer interface {

// ConcurrentMap is a "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
type ConcurrentMap[K comparable, V any] struct {
type ConcurrentMap[K comparable, V interface{}] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
}

// ConcurrentMapShared is a "thread" safe string to anything map.
type ConcurrentMapShared[K comparable, V any] struct {
type ConcurrentMapShared[K comparable, V interface{}] struct {
items map[K]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}

func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
func create[K comparable, V interface{}](sharding func(key K) uint32) ConcurrentMap[K, V] {
m := ConcurrentMap[K, V]{
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], ShardCount),
Expand All @@ -41,17 +41,17 @@ func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V
}

// New creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
func New[V interface{}]() ConcurrentMap[string, V] {
return create[string, V](fnv32)
}

// NewStringer creates a new concurrent map.
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
func NewStringer[K Stringer, V interface{}]() ConcurrentMap[K, V] {
return create[K, V](strfnv32[K])
}

// NewWithCustomShardingFunction creates a new concurrent map.
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
func NewWithCustomShardingFunction[K comparable, V interface{}](sharding func(key K) uint32) ConcurrentMap[K, V] {
return create[K, V](sharding)
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (m ConcurrentMap[K, V]) Set(key K, value V) {
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
type UpsertCb[V interface{}] func(exist bool, valueInMap V, newValue V) V

// Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
Expand Down Expand Up @@ -159,7 +159,7 @@ func (m ConcurrentMap[K, V]) Remove(key K) (err error) {

// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb[K any, V any] func(key K, v V, exists bool) bool
type RemoveCb[K interface{}, V interface{}] func(key K, v V, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
Expand Down Expand Up @@ -194,7 +194,7 @@ func (m ConcurrentMap[K, V]) IsEmpty() bool {
}

// Tuple is used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple[K comparable, V any] struct {
type Tuple[K comparable, V interface{}] struct {
Key K
Val V
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (m ConcurrentMap[K, V]) Clear() {
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
func snapshot[K comparable, V interface{}](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
//When you access map items before initializing.
if len(m.shards) == 0 {
panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
Expand All @@ -249,7 +249,7 @@ func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K,
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
func fanIn[K comparable, V interface{}](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (m ConcurrentMap[K, V]) Items() map[K]V {
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb[K comparable, V any] func(key K, v V)
type IterCb[K comparable, V interface{}] func(key K, v V)

// Callback based iterator, cheapest way to read
// all elements in a map.
Expand Down
Loading

0 comments on commit 5648487

Please sign in to comment.