Skip to content

Commit

Permalink
feat: cache support tag
Browse files Browse the repository at this point in the history
  • Loading branch information
leeqvip committed Jul 20, 2022
1 parent 72099f2 commit 6f6c610
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 14 deletions.
26 changes: 18 additions & 8 deletions cache/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type item struct {

// Expired Returns true if the item has expired.
func (item item) Expired() bool {
if item.Expiration == 0 {
if item.Expiration < 0 {
return false
}
return time.Now().UnixNano() > item.Expiration
Expand Down Expand Up @@ -65,8 +65,8 @@ func (s *MemoryStore) Get(key string, val interface{}) error {

// Put set cached value with key and expire time.
func (s *MemoryStore) Put(key string, val interface{}, timeout time.Duration) error {
var e int64
if timeout > 0 {
var e int64 = -1
if timeout >= 0 {
e = time.Now().Add(timeout).UnixNano()
}

Expand All @@ -78,7 +78,7 @@ func (s *MemoryStore) Put(key string, val interface{}, timeout time.Duration) er
Expiration: e,
}

if e > 0 {
if e >= 0 {
s.DeleteExpired()
}

Expand Down Expand Up @@ -133,6 +133,11 @@ func (s *MemoryStore) Decrement(key string, value ...int) (int, error) {
return by, nil
}

// Forever Store an item in the cache indefinitely.
func (s *MemoryStore) Forever(key string, val interface{}) error {
return s.Put(key, val, 0)
}

// Exist check cache's existence in memory.
func (s *MemoryStore) Exist(key string) bool {
s.mu.RLock()
Expand All @@ -149,8 +154,8 @@ func (s *MemoryStore) Exist(key string) bool {

// Expire set value expire time.
func (s *MemoryStore) Expire(key string, timeout time.Duration) error {
var e int64
if timeout > 0 {
var e int64 = -1
if timeout >= 0 {
e = time.Now().Add(timeout).UnixNano()
}

Expand All @@ -165,7 +170,7 @@ func (s *MemoryStore) Expire(key string, timeout time.Duration) error {
item.Expiration = e
s.items[s.prefix+key] = item

if e > 0 {
if e >= 0 {
s.DeleteExpired()
}

Expand All @@ -188,6 +193,11 @@ func (s *MemoryStore) Flush() error {
return nil
}

func (s *MemoryStore) Tags(names ...string) Store {
// tags not be supported
return s
}

func (s *MemoryStore) TTL(key string) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down Expand Up @@ -230,7 +240,7 @@ func (s *MemoryStore) DeleteExpired() {

smallestDuration := 0 * time.Nanosecond
for key, item := range s.items {
if item.Expiration == 0 {
if item.Expiration < 0 {
continue
}
// "Inlining" of expired
Expand Down
199 changes: 194 additions & 5 deletions cache/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@ package cache
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gomodule/redigo/redis"
)

// ReferenceKeyForever Forever reference key.
const ReferenceKeyForever = "forever_ref"

// ReferenceKeyStandard Standard reference key.
const ReferenceKeyStandard = "standard_ref"

type RedisStore struct {
pool *redis.Pool // redis connection pool
tagSet *TagSet
prefix string
}

// NewStore Create a redis cache store
// NewRedisStore Create a redis cache store
func NewRedisStore(pool *redis.Pool, prefix string) *RedisStore {
s := RedisStore{}
return s.SetPool(pool).SetPrefix(prefix)
Expand All @@ -38,6 +46,12 @@ func (s *RedisStore) Put(key string, val interface{}, timeout time.Duration) err
if err != nil {
return err
}

err = s.pushStandardKeys(key)
if err != nil {
return err
}

c := s.pool.Get()
defer c.Close()
_, err = c.Do("SETEX", s.prefix+key, int64(timeout/time.Second), string(b))
Expand All @@ -46,6 +60,11 @@ func (s *RedisStore) Put(key string, val interface{}, timeout time.Duration) err

// Increment the value of an item in the cache.
func (s *RedisStore) Increment(key string, value ...int) (int, error) {
err := s.pushStandardKeys(key)
if err != nil {
return 0, err
}

c := s.pool.Get()
defer c.Close()

Expand All @@ -59,6 +78,11 @@ func (s *RedisStore) Increment(key string, value ...int) (int, error) {

// Decrement the value of an item in the cache.
func (s *RedisStore) Decrement(key string, value ...int) (int, error) {
err := s.pushStandardKeys(key)
if err != nil {
return 0, err
}

c := s.pool.Get()
defer c.Close()

Expand All @@ -70,6 +94,24 @@ func (s *RedisStore) Decrement(key string, value ...int) (int, error) {
return redis.Int(c.Do("DECRBY", s.prefix+key, by))
}

// Forever Store an item in the cache indefinitely.
func (s *RedisStore) Forever(key string, val interface{}) error {
b, err := json.Marshal(val)
if err != nil {
return err
}

err = s.pushForeverKeys(key)
if err != nil {
return err
}

c := s.pool.Get()
defer c.Close()
_, err = c.Do("SET", s.prefix+key, string(b))
return err
}

// Exist check cache's existence in redis.
func (s *RedisStore) Exist(key string) bool {
c := s.pool.Get()
Expand Down Expand Up @@ -100,6 +142,26 @@ func (s *RedisStore) Forget(key string) error {

// Remove all items from the cache.
func (s *RedisStore) Flush() error {
if s.tagSet != nil {
err := s.deleteForeverKeys()
if err != nil {
return err
}
err = s.deleteStandardKeys()
if err != nil {
return err
}
err = s.tagSet.Reset()
if err != nil {
return err
}
return nil
}

return s.flush()
}

func (s *RedisStore) flush() error {
c := s.pool.Get()
defer c.Close()

Expand All @@ -121,12 +183,34 @@ func (s *RedisStore) Flush() error {
break
}
}
for _, key := range keys {
if _, err = c.Do("DEL", key); err != nil {
return err

length := len(keys)
if length == 0 {
return nil
}

var keysChunk []interface{}
for i, key := range keys {
keysChunk = append(keysChunk, key)
if i == length-1 || len(keysChunk) == 1000 {
_, err = c.Do("DEL", keysChunk...)
if err != nil {
return err
}
}
}
return err

return nil
}

func (s *RedisStore) Tags(names ...string) Store {
if len(names) == 0 {
return s
}
ss := s.clone()
ss.tagSet = NewTagSet(s, names)

return ss
}

func (s *RedisStore) TTL(key string) (int64, error) {
Expand Down Expand Up @@ -156,3 +240,108 @@ func (s *RedisStore) SetPrefix(prefix string) *RedisStore {
}
return s
}

func (s *RedisStore) clone() *RedisStore {
return &RedisStore{
pool: s.pool,
prefix: s.prefix,
}
}

func (s *RedisStore) pushStandardKeys(key string) error {
return s.pushKeys(key, ReferenceKeyStandard)
}

func (s *RedisStore) pushForeverKeys(key string) error {
return s.pushKeys(key, ReferenceKeyForever)
}

func (s *RedisStore) pushKeys(key, reference string) error {
if s.tagSet == nil {
return nil
}

namespace, err := s.tagSet.GetNamespace()
if err != nil {
return err
}

fullKey := s.prefix + key
segments := strings.Split(namespace, "|")

c := s.pool.Get()
defer c.Close()
for _, segment := range segments {
_, err = c.Do("SADD", s.referenceKey(segment, reference), fullKey)
if err != nil {
return err
}
}
return nil
}

func (s *RedisStore) deleteStandardKeys() error {
return s.deleteKeysByReference(ReferenceKeyStandard)
}

func (s *RedisStore) deleteForeverKeys() error {
return s.deleteKeysByReference(ReferenceKeyForever)
}

func (s *RedisStore) deleteKeysByReference(reference string) error {
if s.tagSet == nil {
return nil
}

namespace, err := s.tagSet.GetNamespace()
if err != nil {
return err
}
segments := strings.Split(namespace, "|")
c := s.pool.Get()
defer c.Close()

for _, segment := range segments {
segment = s.referenceKey(segment, reference)
err = s.deleteKeys(segment)
if err != nil {
return err
}
_, err = c.Do("DEL", segment)
if err != nil {
return err
}
}

return nil
}

func (s *RedisStore) deleteKeys(referenceKey string) error {
c := s.pool.Get()
defer c.Close()
keys, err := redis.Strings(c.Do("SMEMBERS", referenceKey))
if err != nil {
return err
}
var length = len(keys)
if length == 0 {
return nil
}

var keysChunk []interface{}
for i, key := range keys {
keysChunk = append(keysChunk, key)
if i == length-1 || len(keysChunk) == 1000 {
_, err = c.Do("DEL", keysChunk...)
if err != nil {
return err
}
}
}

return nil
}

func (s *RedisStore) referenceKey(segment, suffix string) string {
return s.prefix + segment + ":" + suffix
}
Loading

0 comments on commit 6f6c610

Please sign in to comment.