Skip to content

Commit

Permalink
feat: introduce a ttl cache for resources (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored May 29, 2024
1 parent 3689ae2 commit 2d5c6b2
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 6 deletions.
20 changes: 14 additions & 6 deletions cachettl/cachettl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
// the tail node (end) is the node with the highest expiration time
// Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory.
type Cache[K comparable, V any] struct {
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
onEvicted func(key K, value V)
}

type node[K comparable, V any] struct {
Expand Down Expand Up @@ -49,8 +50,11 @@ func (c *Cache[K, V]) Get(key K) (zero V) {
cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail
for cn != nil && cn != c.root {
if c.now().After(cn.expiration) {
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
if c.onEvicted != nil { // call the OnEvicted callback if it's set
c.onEvicted(cn.key, cn.value)
}
} else { // there is nothing else to clean up, no need to iterate further
break
}
Expand Down Expand Up @@ -101,6 +105,10 @@ func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) {
c.add(n)
}

func (c *Cache[K, V]) OnEvicted(onEvicted func(key K, value V)) {
c.onEvicted = onEvicted
}

func (c *Cache[K, V]) add(n *node[K, V]) {
cn := c.root.prev // tail
for cn != nil { // iterate from tail to root because we have expiring nodes towards the tail
Expand Down
178 changes: 178 additions & 0 deletions resourcettl/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package resourcettl

import (
"fmt"
"sync"
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/cachettl"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
)

// NewCache creates a new resource cache.
//
// - ttl - is the time after which the resource is considered expired and cleaned up.
//
// A resource's ttl is extended every time it is checked out.
//
// The cache keeps track of the resources' usage and makes sure that
// expired resources are not cleaned up while they are still in use
// and cleaned up only when they are no longer needed (zero checkouts).
//
// Resources with any of following methods can be cleaned up:
// - Cleanup()
// - Close()
// - Close() error
// - Stop()
// - Stop() error
func NewCache[K comparable, R any](ttl time.Duration) *Cache[K, R] {
c := &Cache[K, R]{
keyMu: kitsync.NewPartitionLocker(),
resources: make(map[string]R),
checkouts: make(map[string]int),
expiries: make(map[string]struct{}),
ttl: ttl,
ttlcache: cachettl.New[K, string](),
}
c.ttlcache.OnEvicted(c.onEvicted)
return c
}

// Cache is a cache for resources that need to be closed/cleaned-up after expiration.
//
// The cache keeps track of the resources' usage and makes sure that
// expired resources are not cleaned up while they are still in use
// and cleaned up only when they are no longer needed (zero checkouts).
//
// Resources with any of following methods can be cleaned up:
// - Cleanup()
// - Close()
// - Close() error
// - Stop()
// - Stop() error
type Cache[K comparable, R any] struct {
// synchronizes access to the cache for a given key. This is to
// allow multiple go-routines to access the cache concurrently for different keys, but still
// avoid multiple go-routines creating multiple resources for the same key.
keyMu *kitsync.PartitionLocker

mu sync.RWMutex // protects the following maps
resources map[string]R // maps an resourceID to its resource
checkouts map[string]int // keeps track of how many checkouts are active for a given resourceID
expiries map[string]struct{} // keeps track of resources that are expired and need to be cleaned up after all checkouts are done

ttl time.Duration
ttlcache *cachettl.Cache[K, string]
}

// Checkout returns a resource for the given key. If the resource is not available, it creates a new one, using the new function.
// The caller must call the returned checkin function when the resource is no longer needed, to release the resource.
// Multiple checkouts for the same key are allowed and they can all share the same resource. The resource is cleaned up
// only when all checkouts are checked-in and the resource's ttl has expired (or its key has been invalidated through [Invalidate]).
func (c *Cache[K, R]) Checkout(key K, new func() (R, error)) (resource R, checkin func(), err error) {
defer c.lockKey(key)()

if resourceID := c.ttlcache.Get(key); resourceID != "" {
c.mu.Lock()
defer c.mu.Unlock()
r := c.resources[resourceID]
c.checkouts[resourceID]++
return r, c.checkinFunc(r, resourceID), nil
}
return c.newResource(key, new)
}

// Invalidate invalidates the resource for the given key.
func (c *Cache[K, R]) Invalidate(key K) {
defer c.lockKey(key)()
resourceID := c.ttlcache.Get(key)
if resourceID != "" {
c.ttlcache.Put(key, "", -1)
}
if resourceID != "" {
c.onEvicted(key, resourceID)
}
}

// newResource creates a new resource for the given key.
func (c *Cache[K, R]) newResource(key K, new func() (R, error)) (R, func(), error) {
r, err := new()
if err != nil {
return r, nil, err
}

resourceID := uuid.NewString()
c.mu.Lock()
defer c.mu.Unlock()
c.resources[resourceID] = r
c.checkouts[resourceID]++
c.ttlcache.Put(key, resourceID, c.ttl)
return r, c.checkinFunc(r, resourceID), nil
}

// checkinFunc returns a function that decrements the checkout count and cleans up the resource if it is no longer needed.
func (c *Cache[K, R]) checkinFunc(r R, resourceID string) func() {
var once sync.Once
return func() {
once.Do(func() {
c.mu.Lock()
defer c.mu.Unlock()
c.checkouts[resourceID]--
if _, ok := c.expiries[resourceID]; ok && // resource is expired
c.checkouts[resourceID] == 0 { // no more checkouts
delete(c.expiries, resourceID)
go c.cleanup(r)
}
})
}
}

// onEvicted is called when a key is evicted from the cache. It cleans up the resource if it is not checked out.
func (c *Cache[K, R]) onEvicted(_ K, resourceID string) {
c.mu.Lock()
defer c.mu.Unlock()
checkouts, ok := c.checkouts[resourceID]
if !ok {
return // already cleaned up through Invalidate
}
if checkouts == 0 {
r := c.resources[resourceID]
delete(c.resources, resourceID)
delete(c.checkouts, resourceID)
go c.cleanup(r)
} else { // mark the resource for cleanup
c.expiries[resourceID] = struct{}{}
}
}

// cleanup cleans up the resource if it implements the cleanup interface or io.Closer.
func (c *Cache[K, R]) cleanup(r R) {
cleanup := func() {}
var v any = r
switch v := v.(type) {
case interface{ Cleanup() }:
cleanup = v.Cleanup
case interface{ Cleanup() error }:
cleanup = func() { _ = v.Cleanup() }
case interface{ Close() }:
cleanup = v.Close
case interface{ Close() error }:
cleanup = func() { _ = v.Close() }
case interface{ Stop() }:
cleanup = v.Stop
case interface{ Stop() error }:
cleanup = func() { _ = v.Stop() }
}
cleanup()
}

// lockKey locks the key for exclusive access and returns a function to unlock it, which can be deferred.
func (c *Cache[K, R]) lockKey(key K) func() {
k := fmt.Sprintf("%v", key)
c.keyMu.Lock(k)
return func() {
c.keyMu.Unlock(k)
}
}
174 changes: 174 additions & 0 deletions resourcettl/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package resourcettl_test

import (
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/resourcettl"
)

func TestCache(t *testing.T) {
const key = "key"
ttl := 10 * time.Millisecond
t.Run("checkout, checkin, then expire", func(t *testing.T) {
t.Run("using cleanup", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

time.Sleep(ttl + time.Millisecond)
checkin1()
checkin2()

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})

t.Run("using closer", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *closer](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCloser)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCloser)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

time.Sleep(ttl + time.Millisecond)
checkin1()
checkin2()

r3, checkin3, err3 := c.Checkout(key, producer.NewCloser)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})
})

t.Run("expire while being used", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

time.Sleep(ttl + time.Millisecond) // wait for expiration

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to return a resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients")
checkin1()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 clients")
checkin2()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource since it is not being used by any client")
checkin3()
})

t.Run("invalidate", func(t *testing.T) {
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl)

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

c.Invalidate(key)

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one was invalidated")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients")
checkin1()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 client")
checkin2()
time.Sleep(time.Millisecond) // wait for async cleanup
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})
}

type MockProducer struct {
instances atomic.Int32
}

func (m *MockProducer) NewCleanuper() (*cleanuper, error) {
m.instances.Add(1)
return &cleanuper{id: uuid.NewString()}, nil
}

func (m *MockProducer) NewCloser() (*closer, error) {
m.instances.Add(1)
return &closer{id: uuid.NewString()}, nil
}

type cleanuper struct {
id string
cleanups atomic.Int32
}

func (m *cleanuper) Cleanup() {
m.cleanups.Add(1)
}

type closer struct {
id string
cleanups atomic.Int32
}

func (m *closer) Close() error {
m.cleanups.Add(1)
return nil
}

0 comments on commit 2d5c6b2

Please sign in to comment.