diff --git a/cachettl/cachettl.go b/cachettl/cachettl.go index 71b5c30d..4da67293 100644 --- a/cachettl/cachettl.go +++ b/cachettl/cachettl.go @@ -10,10 +10,11 @@ import ( // the tail node (end) is the node with the highest expiration time // Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory. type Cache[K comparable, V any] struct { - root *node[K, V] - mu sync.Mutex - m map[K]*node[K, V] - now func() time.Time + root *node[K, V] + mu sync.Mutex + m map[K]*node[K, V] + now func() time.Time + onEvicted func(key K, value V) } type node[K comparable, V any] struct { @@ -49,8 +50,11 @@ func (c *Cache[K, V]) Get(key K) (zero V) { cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail for cn != nil && cn != c.root { if c.now().After(cn.expiration) { - cn.remove() // removes a node from the linked list (leaves the map untouched) - delete(c.m, cn.key) // remove node from map too + cn.remove() // removes a node from the linked list (leaves the map untouched) + delete(c.m, cn.key) // remove node from map too + if c.onEvicted != nil { // call the OnEvicted callback if it's set + c.onEvicted(cn.key, cn.value) + } } else { // there is nothing else to clean up, no need to iterate further break } @@ -101,6 +105,10 @@ func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) { c.add(n) } +func (c *Cache[K, V]) OnEvicted(onEvicted func(key K, value V)) { + c.onEvicted = onEvicted +} + func (c *Cache[K, V]) add(n *node[K, V]) { cn := c.root.prev // tail for cn != nil { // iterate from tail to root because we have expiring nodes towards the tail diff --git a/resourcettl/cache.go b/resourcettl/cache.go new file mode 100644 index 00000000..737447bf --- /dev/null +++ b/resourcettl/cache.go @@ -0,0 +1,178 @@ +package resourcettl + +import ( + "fmt" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/rudderlabs/rudder-go-kit/cachettl" + kitsync "github.com/rudderlabs/rudder-go-kit/sync" +) + +// NewCache creates a new resource cache. +// +// - ttl - is the time after which the resource is considered expired and cleaned up. +// +// A resource's ttl is extended every time it is checked out. +// +// The cache keeps track of the resources' usage and makes sure that +// expired resources are not cleaned up while they are still in use +// and cleaned up only when they are no longer needed (zero checkouts). +// +// Resources with any of following methods can be cleaned up: +// - Cleanup() +// - Close() +// - Close() error +// - Stop() +// - Stop() error +func NewCache[K comparable, R any](ttl time.Duration) *Cache[K, R] { + c := &Cache[K, R]{ + keyMu: kitsync.NewPartitionLocker(), + resources: make(map[string]R), + checkouts: make(map[string]int), + expiries: make(map[string]struct{}), + ttl: ttl, + ttlcache: cachettl.New[K, string](), + } + c.ttlcache.OnEvicted(c.onEvicted) + return c +} + +// Cache is a cache for resources that need to be closed/cleaned-up after expiration. +// +// The cache keeps track of the resources' usage and makes sure that +// expired resources are not cleaned up while they are still in use +// and cleaned up only when they are no longer needed (zero checkouts). +// +// Resources with any of following methods can be cleaned up: +// - Cleanup() +// - Close() +// - Close() error +// - Stop() +// - Stop() error +type Cache[K comparable, R any] struct { + // synchronizes access to the cache for a given key. This is to + // allow multiple go-routines to access the cache concurrently for different keys, but still + // avoid multiple go-routines creating multiple resources for the same key. + keyMu *kitsync.PartitionLocker + + mu sync.RWMutex // protects the following maps + resources map[string]R // maps an resourceID to its resource + checkouts map[string]int // keeps track of how many checkouts are active for a given resourceID + expiries map[string]struct{} // keeps track of resources that are expired and need to be cleaned up after all checkouts are done + + ttl time.Duration + ttlcache *cachettl.Cache[K, string] +} + +// Checkout returns a resource for the given key. If the resource is not available, it creates a new one, using the new function. +// The caller must call the returned checkin function when the resource is no longer needed, to release the resource. +// Multiple checkouts for the same key are allowed and they can all share the same resource. The resource is cleaned up +// only when all checkouts are checked-in and the resource's ttl has expired (or its key has been invalidated through [Invalidate]). +func (c *Cache[K, R]) Checkout(key K, new func() (R, error)) (resource R, checkin func(), err error) { + defer c.lockKey(key)() + + if resourceID := c.ttlcache.Get(key); resourceID != "" { + c.mu.Lock() + defer c.mu.Unlock() + r := c.resources[resourceID] + c.checkouts[resourceID]++ + return r, c.checkinFunc(r, resourceID), nil + } + return c.newResource(key, new) +} + +// Invalidate invalidates the resource for the given key. +func (c *Cache[K, R]) Invalidate(key K) { + defer c.lockKey(key)() + resourceID := c.ttlcache.Get(key) + if resourceID != "" { + c.ttlcache.Put(key, "", -1) + } + if resourceID != "" { + c.onEvicted(key, resourceID) + } +} + +// newResource creates a new resource for the given key. +func (c *Cache[K, R]) newResource(key K, new func() (R, error)) (R, func(), error) { + r, err := new() + if err != nil { + return r, nil, err + } + + resourceID := uuid.NewString() + c.mu.Lock() + defer c.mu.Unlock() + c.resources[resourceID] = r + c.checkouts[resourceID]++ + c.ttlcache.Put(key, resourceID, c.ttl) + return r, c.checkinFunc(r, resourceID), nil +} + +// checkinFunc returns a function that decrements the checkout count and cleans up the resource if it is no longer needed. +func (c *Cache[K, R]) checkinFunc(r R, resourceID string) func() { + var once sync.Once + return func() { + once.Do(func() { + c.mu.Lock() + defer c.mu.Unlock() + c.checkouts[resourceID]-- + if _, ok := c.expiries[resourceID]; ok && // resource is expired + c.checkouts[resourceID] == 0 { // no more checkouts + delete(c.expiries, resourceID) + go c.cleanup(r) + } + }) + } +} + +// onEvicted is called when a key is evicted from the cache. It cleans up the resource if it is not checked out. +func (c *Cache[K, R]) onEvicted(_ K, resourceID string) { + c.mu.Lock() + defer c.mu.Unlock() + checkouts, ok := c.checkouts[resourceID] + if !ok { + return // already cleaned up through Invalidate + } + if checkouts == 0 { + r := c.resources[resourceID] + delete(c.resources, resourceID) + delete(c.checkouts, resourceID) + go c.cleanup(r) + } else { // mark the resource for cleanup + c.expiries[resourceID] = struct{}{} + } +} + +// cleanup cleans up the resource if it implements the cleanup interface or io.Closer. +func (c *Cache[K, R]) cleanup(r R) { + cleanup := func() {} + var v any = r + switch v := v.(type) { + case interface{ Cleanup() }: + cleanup = v.Cleanup + case interface{ Cleanup() error }: + cleanup = func() { _ = v.Cleanup() } + case interface{ Close() }: + cleanup = v.Close + case interface{ Close() error }: + cleanup = func() { _ = v.Close() } + case interface{ Stop() }: + cleanup = v.Stop + case interface{ Stop() error }: + cleanup = func() { _ = v.Stop() } + } + cleanup() +} + +// lockKey locks the key for exclusive access and returns a function to unlock it, which can be deferred. +func (c *Cache[K, R]) lockKey(key K) func() { + k := fmt.Sprintf("%v", key) + c.keyMu.Lock(k) + return func() { + c.keyMu.Unlock(k) + } +} diff --git a/resourcettl/cache_test.go b/resourcettl/cache_test.go new file mode 100644 index 00000000..e83ba3d6 --- /dev/null +++ b/resourcettl/cache_test.go @@ -0,0 +1,174 @@ +package resourcettl_test + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/resourcettl" +) + +func TestCache(t *testing.T) { + const key = "key" + ttl := 10 * time.Millisecond + t.Run("checkout, checkin, then expire", func(t *testing.T) { + t.Run("using cleanup", func(t *testing.T) { + producer := &MockProducer{} + c := resourcettl.NewCache[string, *cleanuper](ttl) + + r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err1, "it should be able to create a new resource") + require.NotNil(t, r1, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") + + r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err2, "it should be able to checkout the same resource") + require.NotNil(t, r2, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") + require.Equal(t, r1.id, r2.id, "it should return the same resource") + + time.Sleep(ttl + time.Millisecond) + checkin1() + checkin2() + + r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err3, "it should be able to create a new resource") + require.NotNil(t, r3, "it should return a resource") + require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") + require.NotEqual(t, r1.id, r3.id, "it should return a different resource") + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") + checkin3() + }) + + t.Run("using closer", func(t *testing.T) { + producer := &MockProducer{} + c := resourcettl.NewCache[string, *closer](ttl) + + r1, checkin1, err1 := c.Checkout(key, producer.NewCloser) + require.NoError(t, err1, "it should be able to create a new resource") + require.NotNil(t, r1, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") + + r2, checkin2, err2 := c.Checkout(key, producer.NewCloser) + require.NoError(t, err2, "it should be able to checkout the same resource") + require.NotNil(t, r2, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") + require.Equal(t, r1.id, r2.id, "it should return the same resource") + + time.Sleep(ttl + time.Millisecond) + checkin1() + checkin2() + + r3, checkin3, err3 := c.Checkout(key, producer.NewCloser) + require.NoError(t, err3, "it should be able to create a new resource") + require.NotNil(t, r3, "it should return a resource") + require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") + require.NotEqual(t, r1.id, r3.id, "it should return a different resource") + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") + checkin3() + }) + }) + + t.Run("expire while being used", func(t *testing.T) { + producer := &MockProducer{} + c := resourcettl.NewCache[string, *cleanuper](ttl) + + r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err1, "it should be able to create a new resource") + require.NotNil(t, r1, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") + + r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err2, "it should be able to checkout the same resource") + require.NotNil(t, r2, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") + require.Equal(t, r1.id, r2.id, "it should return the same resource") + + time.Sleep(ttl + time.Millisecond) // wait for expiration + + r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err3, "it should be able to return a resource") + require.NotNil(t, r3, "it should return a resource") + require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") + require.NotEqual(t, r1.id, r3.id, "it should return a different resource") + require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients") + checkin1() + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 clients") + checkin2() + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource since it is not being used by any client") + checkin3() + }) + + t.Run("invalidate", func(t *testing.T) { + producer := &MockProducer{} + c := resourcettl.NewCache[string, *cleanuper](ttl) + + r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err1, "it should be able to create a new resource") + require.NotNil(t, r1, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") + + r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err2, "it should be able to checkout the same resource") + require.NotNil(t, r2, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") + require.Equal(t, r1.id, r2.id, "it should return the same resource") + + c.Invalidate(key) + + r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err3, "it should be able to create a new resource") + require.NotNil(t, r3, "it should return a resource") + require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one was invalidated") + require.NotEqual(t, r1.id, r3.id, "it should return a different resource") + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 2 clients") + checkin1() + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 0, r1.cleanups.Load(), "it shouldn't cleanup the expired resource yet since it is being used by 1 client") + checkin2() + time.Sleep(time.Millisecond) // wait for async cleanup + require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") + checkin3() + }) +} + +type MockProducer struct { + instances atomic.Int32 +} + +func (m *MockProducer) NewCleanuper() (*cleanuper, error) { + m.instances.Add(1) + return &cleanuper{id: uuid.NewString()}, nil +} + +func (m *MockProducer) NewCloser() (*closer, error) { + m.instances.Add(1) + return &closer{id: uuid.NewString()}, nil +} + +type cleanuper struct { + id string + cleanups atomic.Int32 +} + +func (m *cleanuper) Cleanup() { + m.cleanups.Add(1) +} + +type closer struct { + id string + cleanups atomic.Int32 +} + +func (m *closer) Close() error { + m.cleanups.Add(1) + return nil +}