Skip to content

Commit

Permalink
[patch] delete update inprogress key even if update fails
Browse files Browse the repository at this point in the history
[patch] assign the err watcher from config to cache
[patch] added tests to cover more scenarios, debounce, err watcher
[-] updated README for clarity
  • Loading branch information
bnkamalesh committed Oct 11, 2024
1 parent 63eacd0 commit 6d7cc6b
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 41 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Add key here Get key within window Key expires

When a key is fetched between 9-10 minutes (within the threshold window), Pocache initiates an update for that key (_preemptive_). This ensures fresh data availability, anticipating future usage (_optimistic_).

## Why Use Preemptive Updates?
## Why use preemptive updates?

In highly concurrent environments (e.g., web servers), multiple requests might try to access the same cache entry simultaneously. Without preemptive updates, the system would query the underlying database multiple times until the cache is refreshed.

Expand Down
33 changes: 18 additions & 15 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ type Cache[K comparable, T any] struct {

deleteQ chan<- K

// following configurations are used only when threshold update is enabled
// following configurations are used only when an updater & threshold update are enabled
// threshold is the duration within which if the cache is about to expire, it is eligible to be updated
threshold time.Duration
updateQ chan<- K
updater updater[K, T]
errWatcher ErrOnUpdate
updaterTimeout time.Duration
threshold time.Duration
updateQ chan<- K
updater updater[K, T]
updaterTimeout time.Duration
// updateInProgress is used to handle update debounce
updateInProgress *sync.Map
errWatcher ErrOnUpdate
}

// initUpdater initializes all configuration required for threshold based update
Expand All @@ -143,19 +144,22 @@ func (ch *Cache[K, T]) initUpdater(cfg *Config[K, T]) {
}

ch.threshold = cfg.Threshold.Abs()
ch.updaterTimeout = cfg.UpdaterTimeout
ch.updateInProgress = &sync.Map{}
ch.updater = cfg.Updater

updateQ := make(chan K, cfg.QLength)
ch.updateQ = updateQ

ch.updater = cfg.Updater
ch.updaterTimeout = cfg.UpdaterTimeout
ch.updateInProgress = new(sync.Map)
ch.errWatcher = cfg.ErrWatcher

go ch.updateListener(updateQ)
}

func (ch *Cache[K, T]) errCallback(err error) {
if err == nil || ch.errWatcher == nil {
return
}

ch.errWatcher(err)
}

Expand Down Expand Up @@ -195,13 +199,13 @@ func (ch *Cache[K, T]) update(key K) {
defer cancel()

value, err := ch.updater(ctx, key)
ch.updateInProgress.Delete(key)
if err != nil {
ch.errCallback(err)
return
}

ch.Add(key, value)
ch.updateInProgress.Delete(key)
}

func (ch *Cache[K, T]) Get(key K) Value[T] {
Expand Down Expand Up @@ -283,12 +287,11 @@ func New[K comparable, T any](cfg Config[K, T]) (*Cache[K, T], error) {

deleteQ := make(chan K, cfg.QLength)
ch := &Cache[K, T]{
store: cstore,
deleteQ: deleteQ,
cacheAge: cfg.CacheAge.Abs(),
isDisabled: cfg.DisableCache,
disableServeStale: !cfg.ServeStale,
errWatcher: cfg.ErrWatcher,
store: cstore,
cacheAge: cfg.CacheAge.Abs(),
deleteQ: deleteQ,
}

ch.initUpdater(&cfg)
Expand Down
172 changes: 147 additions & 25 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
func TestCache(tt *testing.T) {
var (
prefix = "prefix"
value = "value"
requirer = require.New(t)
requirer = require.New(tt)
asserter = require.New(tt)
)

t.Run("found", func(t *testing.T) {
tt.Run("found", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Expand All @@ -28,11 +29,11 @@ func TestCache(t *testing.T) {

cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
v := cache.Get(prefix)
requirer.True(v.Found)
requirer.Equal(v.V, value)
asserter.True(v.Found)
asserter.Equal(v.V, value)
})

t.Run("not found", func(t *testing.T) {
tt.Run("not found", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Expand All @@ -42,11 +43,11 @@ func TestCache(t *testing.T) {

cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
v := cache.Get(prefix + "_does_not_exist")
requirer.False(v.Found)
requirer.Equal(v.V, nil)
asserter.False(v.Found)
asserter.Equal(v.V, nil)
})

t.Run("cache age expired", func(t *testing.T) {
tt.Run("cache age expired", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 1,
CacheAge: time.Nanosecond,
Expand All @@ -57,11 +58,11 @@ func TestCache(t *testing.T) {
cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
time.Sleep(time.Millisecond)
v := cache.Get(prefix)
requirer.False(v.Found)
requirer.Equal(v.V, nil)
asserter.False(v.Found)
asserter.Equal(v.V, nil)
})

t.Run("update cache", func(t *testing.T) {
tt.Run("update cache", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Expand All @@ -71,17 +72,17 @@ func TestCache(t *testing.T) {

cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
v := cache.Get(prefix)
requirer.True(v.Found)
requirer.Equal(v.V, value)
asserter.True(v.Found)
asserter.Equal(v.V, value)

value = "new_value"
cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
v = cache.Get(prefix)
requirer.True(v.Found)
requirer.Equal(v.V, value)
asserter.True(v.Found)
asserter.Equal(v.V, value)
})

t.Run("multiple Add/Get to check if channel blocks", func(t *testing.T) { //nolint:govet
tt.Run("multiple Add/Get to check if channel blocks", func(t *testing.T) { //nolint:govet
// limit should be greater than the channel buffer for updateQ & deleteQ
limit := 200
cache, err := New(Config[string, any]{
Expand All @@ -101,16 +102,125 @@ func TestCache(t *testing.T) {
prefix := fmt.Sprintf("%s_%d", prefix, i)
value := fmt.Sprintf("%s_%d", value, i)
v := cache.Get(prefix)
requirer.True(v.Found)
requirer.Equal(v.V, value)
asserter.True(v.Found)
asserter.Equal(v.V, value)
}
})

tt.Run("serve stale", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Second * 2,
DisableCache: false,
ServeStale: true,
})
requirer.NoError(err)

cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for cache to expire
time.Sleep(time.Second * 3)

v := cache.Get(prefix)
asserter.True(v.Found)
asserter.Equal(v.V, value)
})

tt.Run("debounce", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
// intentional delay in updater to retain debounce key
// in the map long enough to be tested
time.Sleep(time.Second * 3)
return key, nil
},
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// re-trigger auto update within threshold window
_ = cache.Get(prefix)
// check if key added to debounce checker map
_, found := cache.updateInProgress.Load(prefix)
asserter.True(found)
})

tt.Run("err watcher", func(t *testing.T) {
forcedErr := fmt.Errorf("forced error")
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
return nil, forcedErr
},
ErrWatcher: func(watcherErr error) {
ranErrWatcher.Store(true)
asserter.ErrorIs(watcherErr, forcedErr)
},
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.True(ranErrWatcher.Load())
})

tt.Run("no err watcher", func(t *testing.T) {
forcedErr := fmt.Errorf("forced error")
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
return nil, forcedErr
},
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.False(ranErrWatcher.Load())
})

}

func TestThresholdUpdater(t *testing.T) {
func TestThresholdUpdater(tt *testing.T) {
var (
requirer = require.New(t)
asserter = require.New(t)
requirer = require.New(tt)
asserter = require.New(tt)
cacheAge = time.Second
threshold = time.Millisecond * 500
)
Expand All @@ -126,7 +236,7 @@ func TestThresholdUpdater(t *testing.T) {
},
})
requirer.NoError(err)
t.Run("before threshold", func(t *testing.T) {
tt.Run("before threshold", func(t *testing.T) {
ranUpdater.Store(false)
key := "key_1"
ch.Add(key, key)
Expand All @@ -138,7 +248,7 @@ func TestThresholdUpdater(t *testing.T) {
asserter.EqualValues(key, v.V)
})

t.Run("during threshold", func(t *testing.T) {
tt.Run("during threshold", func(t *testing.T) {
ranUpdater.Store(false)
key := "key_2"

Expand All @@ -152,7 +262,7 @@ func TestThresholdUpdater(t *testing.T) {
asserter.True(ranUpdater.Load())
})

t.Run("after threshold (cache expired)", func(t *testing.T) {
tt.Run("after threshold (cache expired)", func(t *testing.T) {
ranUpdater.Store(false)
key := "key_3"

Expand Down Expand Up @@ -199,3 +309,15 @@ func TestValidate(tt *testing.T) {
requirer.Nil(err)
})
}

func TestSanitize(tt *testing.T) {
asserter := assert.New(tt)

cfg := Config[string, string]{}
cfg.Sanitize()
asserter.Equal(cfg.LRUCacheSize, uint(1000))
asserter.Equal(cfg.QLength, uint(1000))
asserter.Equal(cfg.CacheAge, time.Minute)
asserter.Equal(cfg.Threshold, time.Second*59)
asserter.Equal(cfg.UpdaterTimeout, time.Second)
}

0 comments on commit 6d7cc6b

Please sign in to comment.