Skip to content

Commit

Permalink
feat(chore): allow chained storages
Browse files Browse the repository at this point in the history
  • Loading branch information
darkweak committed Aug 19, 2023
1 parent 71d93e6 commit ad59438
Show file tree
Hide file tree
Showing 27 changed files with 221 additions and 32 deletions.
7 changes: 7 additions & 0 deletions configurationtypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type DefaultCache struct {
Port Port `json:"port" yaml:"port"`
Regex Regex `json:"regex" yaml:"regex"`
Stale Duration `json:"stale" yaml:"stale"`
Storers []string `json:"storers" yaml:"storers"`
Timeout Timeout `json:"timeout" yaml:"timeout"`
TTL Duration `json:"ttl" yaml:"ttl"`
DefaultCacheControl string `json:"default_cache_control" yaml:"default_cache_control"`
Expand Down Expand Up @@ -314,6 +315,11 @@ func (d *DefaultCache) GetStale() time.Duration {
return d.Stale.Duration
}

// GetStale returns the stale duration
func (d *DefaultCache) GetStorers() []string {
return d.Storers
}

// GetDefaultCacheControl returns the default Cache-Control response header value when empty
func (d *DefaultCache) GetDefaultCacheControl() string {
return d.DefaultCacheControl
Expand All @@ -335,6 +341,7 @@ type DefaultCacheInterface interface {
GetKey() Key
GetRegex() Regex
GetStale() time.Duration
GetStorers() []string
GetTimeout() Timeout
GetTTL() time.Duration
GetDefaultCacheControl() string
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type MapHandler struct {
// GenerateHandlerMap generate the MapHandler
func GenerateHandlerMap(
configuration configurationtypes.AbstractConfigurationInterface,
storer storage.Storer,
storers []storage.Storer,
surrogateStorage providers.SurrogateInterface,
) *MapHandler {
hm := make(map[string]http.HandlerFunc)
Expand All @@ -30,7 +30,7 @@ func GenerateHandlerMap(
basePathAPIS = "/souin-api"
}

for _, endpoint := range Initialize(configuration, storer, surrogateStorage) {
for _, endpoint := range Initialize(configuration, storers, surrogateStorage) {
if endpoint.IsEnabled() {
shouldEnable = true
hm[basePathAPIS+endpoint.GetBasePath()] = endpoint.HandleRequest
Expand All @@ -45,7 +45,7 @@ func GenerateHandlerMap(
}

// Initialize contains all apis that should be enabled
func Initialize(c configurationtypes.AbstractConfigurationInterface, storer storage.Storer, surrogateStorage providers.SurrogateInterface) []EndpointInterface {
return []EndpointInterface{initializeSouin(c, storer,
func Initialize(c configurationtypes.AbstractConfigurationInterface, storers []storage.Storer, surrogateStorage providers.SurrogateInterface) []EndpointInterface {
return []EndpointInterface{initializeSouin(c, storers,
surrogateStorage), debug.InitializeDebug(c), prometheus.InitializePrometheus(c)}
}
29 changes: 21 additions & 8 deletions pkg/api/souin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
type SouinAPI struct {
basePath string
enabled bool
storer storage.Storer
storers []storage.Storer
surrogateStorage providers.SurrogateInterface
}

func initializeSouin(
configuration configurationtypes.AbstractConfigurationInterface,
storer storage.Storer,
storers []storage.Storer,
surrogateStorage providers.SurrogateInterface,
) *SouinAPI {
basePath := configuration.GetAPI().Souin.BasePath
Expand All @@ -31,24 +31,33 @@ func initializeSouin(
return &SouinAPI{
basePath,
configuration.GetAPI().Souin.Enable,
storer,
storers,
surrogateStorage,
}
}

// BulkDelete allow user to delete multiple items with regexp
func (s *SouinAPI) BulkDelete(key string) {
s.storer.DeleteMany(key)
for _, current := range s.storers {
current.DeleteMany(key)
}
}

// Delete will delete a record into the provider cache system and will update the Souin API if enabled
func (s *SouinAPI) Delete(key string) {
s.storer.Delete(key)
for _, current := range s.storers {
current.Delete(key)
}
}

// GetAll will retrieve all stored keys in the provider
func (s *SouinAPI) GetAll() []string {
return s.storer.ListKeys()
keys := []string{}
for _, current := range s.storers {
keys = append(keys, current.ListKeys()...)
}

return keys
}

// GetBasePath will return the basepath for this resource
Expand Down Expand Up @@ -100,7 +109,9 @@ func (s *SouinAPI) HandleRequest(w http.ResponseWriter, r *http.Request) {
flushRg := regexp.MustCompile(s.GetBasePath() + "/flush$")

if flushRg.FindString(r.RequestURI) != "" {
s.storer.DeleteMany(".+")
for _, current := range s.storers {
current.DeleteMany(".+")
}
e := s.surrogateStorage.Destruct()
if e != nil {
fmt.Printf("Error while purging the surrogate keys: %+v.", e)
Expand All @@ -113,7 +124,9 @@ func (s *SouinAPI) HandleRequest(w http.ResponseWriter, r *http.Request) {
} else {
ck, _ := s.surrogateStorage.Purge(r.Header)
for _, k := range ck {
s.storer.Delete(k)
for _, current := range s.storers {
current.Delete(k)
}
}
}
w.WriteHeader(http.StatusNoContent)
Expand Down
65 changes: 55 additions & 10 deletions pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewHTTPCacheHandler(c configurationtypes.AbstractConfigurationInterface) *S
c.SetLogger(logger)
}

storer, err := storage.NewStorage(c)
storers, err := storage.NewStorages(c)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -87,20 +87,22 @@ func NewHTTPCacheHandler(c configurationtypes.AbstractConfigurationInterface) *S

return &SouinBaseHandler{
Configuration: c,
Storer: storer,
InternalEndpointHandlers: api.GenerateHandlerMap(c, storer, surrogateStorage),
Storers: storers,
InternalEndpointHandlers: api.GenerateHandlerMap(c, storers, surrogateStorage),
ExcludeRegex: excludedRegexp,
RegexpUrls: regexpUrls,
DefaultMatchedUrl: defaultMatchedUrl,
SurrogateKeyStorer: surrogateStorage,
context: ctx,
bufPool: bufPool,
storersLen: len(storers),
}
}

type SouinBaseHandler struct {
Configuration configurationtypes.AbstractConfigurationInterface
Storer storage.Storer
Storers []storage.Storer
InternalEndpointHandlers *api.MapHandler
ExcludeRegex *regexp.Regexp
RegexpUrls regexp.Regexp
Expand All @@ -109,6 +111,7 @@ type SouinBaseHandler struct {
DefaultMatchedUrl configurationtypes.URL
context *context.Context
bufPool *sync.Pool
storersLen int
}

type upsreamError struct{}
Expand Down Expand Up @@ -234,16 +237,46 @@ func (s *SouinBaseHandler) Store(
if err == nil {
variedHeaders := rfc.HeaderAllCommaSepValues(res.Header)
cachedKey += rfc.GetVariedCacheKey(rq, variedHeaders)
s.Configuration.GetLogger().Sugar().Infof("Store the response %+v with duration %v", res, ma)
if s.Storer.Set(cachedKey, response, currentMatchedURL, ma) == nil {
s.Configuration.GetLogger().Sugar().Debugf("Store the cache key %s into the surrogate keys from the following headers %v", cachedKey, res)
s.Configuration.GetLogger().Sugar().Debugf("Store the response %+v with duration %v", res, ma)

var wg sync.WaitGroup
mu := sync.Mutex{}
fails := []string{}
for _, storer := range s.Storers {
wg.Add(1)
go func(currentStorer storage.Storer) {
defer wg.Done()
if currentStorer.Set(cachedKey, response, currentMatchedURL, ma) == nil {
s.Configuration.GetLogger().Sugar().Infof("Stored the key %s in the %s provider", cachedKey, currentStorer.Name())
} else {
mu.Lock()
fails = append(fails, fmt.Sprintf("; detail=%s-INSERTION-ERROR", currentStorer.Name()))
mu.Unlock()
}
}(storer)
}

wg.Wait()
if len(fails) < s.storersLen {
go func(rs http.Response, key string) {
_ = s.SurrogateKeyStorer.Store(&rs, key)
}(res, cachedKey)
status += "; stored"
} else {
status += "; detail=STORAGE-INSERTION-ERROR"
}

if len(fails) > 0 {
status += strings.Join(fails, "")
}

// if s.Storer.Set(cachedKey, response, currentMatchedURL, ma) == nil {
// s.Configuration.GetLogger().Sugar().Debugf("Store the cache key %s into the surrogate keys from the following headers %v", cachedKey, res)
// go func(rs http.Response, key string) {
// _ = s.SurrogateKeyStorer.Store(&rs, key)
// }(res, cachedKey)
// status += "; stored"
// } else {
// status += "; detail=STORAGE-INSERTION-ERROR"
// }
}
} else {
status += "; detail=NO-STORE-DIRECTIVE"
Expand Down Expand Up @@ -394,7 +427,14 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
s.Configuration.GetLogger().Sugar().Debugf("Request cache-control %+v", requestCc)
if modeContext.Bypass_request || !requestCc.NoCache {
validator := rfc.ParseRequest(rq)
response := s.Storer.Prefix(cachedKey, rq, validator)
var response *http.Response
for _, currentStorer := range s.Storers {
response = currentStorer.Prefix(cachedKey, rq, validator)
if response != nil {
s.Configuration.GetLogger().Sugar().Debugf("Found response in the %s storage", currentStorer.Name())
break
}
}

if response != nil && (!modeContext.Strict || rfc.ValidateCacheControl(response, requestCc)) {
if validator.ResponseETag != "" && validator.Matched {
Expand Down Expand Up @@ -425,7 +465,12 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
return err
}
} else if response == nil && !requestCc.OnlyIfCached && (requestCc.MaxStaleSet || requestCc.MaxStale > -1) {
response = s.Storer.Prefix(storage.StalePrefix+cachedKey, rq, validator)
for _, currentStorer := range s.Storers {
response := currentStorer.Prefix(storage.StalePrefix+cachedKey, rq, validator)
if response != nil {
break
}
}
if nil != response && (!modeContext.Strict || rfc.ValidateCacheControl(response, requestCc)) {
addTime, _ := time.ParseDuration(response.Header.Get(rfc.StoredTTLHeader))
rfc.SetCacheStatusHeader(response)
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/badgerProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func BadgerConnectionFactory(c t.AbstractConfigurationInterface) (Storer, error)
return i, nil
}

// Name returns the storer name
func (provider *Badger) Name() string {
return "BADGER"
}

// ListKeys method returns the list of existing keys
func (provider *Badger) ListKeys() []string {
keys := []string{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/embeddedOlricProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func EmbeddedOlricConnectionFactory(configuration t.AbstractConfigurationInterfa
}, e
}

// Name returns the storer name
func (provider *EmbeddedOlric) Name() string {
return "EMBEDDED_OLRIC"
}

// ListKeys method returns the list of existing keys
func (provider *EmbeddedOlric) ListKeys() []string {

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/etcdProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func EtcdConnectionFactory(c t.AbstractConfigurationInterface) (Storer, error) {
}, nil
}

// Name returns the storer name
func (provider *Etcd) Name() string {
return "ETCD"
}

// ListKeys method returns the list of existing keys
func (provider *Etcd) ListKeys() []string {
if provider.reconnecting {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/nutsProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func NutsConnectionFactory(c t.AbstractConfigurationInterface) (Storer, error) {
}, nil
}

// Name returns the storer name
func (provider *Nuts) Name() string {
return "NUTS"
}

// ListKeys method returns the list of existing keys
func (provider *Nuts) ListKeys() []string {
keys := []string{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/olricProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func OlricConnectionFactory(configuration t.AbstractConfigurationInterface) (Sto
}, nil
}

// Name returns the storer name
func (provider *Olric) Name() string {
return "OLRIC"
}

// ListKeys method returns the list of existing keys
func (provider *Olric) ListKeys() []string {
if provider.reconnecting {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/redisProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func RedisConnectionFactory(c t.AbstractConfigurationInterface) (Storer, error)
}, nil
}

// Name returns the storer name
func (provider *Redis) Name() string {
return "REDIS"
}

// ListKeys method returns the list of existing keys
func (provider *Redis) ListKeys() []string {
if provider.reconnecting {
Expand Down
50 changes: 50 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Storer interface {
Delete(key string)
DeleteMany(key string)
Init() error
Name() string
Reset() error
}

Expand Down Expand Up @@ -67,6 +68,55 @@ func NewStorage(configuration configurationtypes.AbstractConfigurationInterface)
return nil, errors.New("Storer with name" + storerName + " not found")
}

func uniqueStorers(storers []string) []string {
storerPresent := make(map[string]bool)
s := []string{}

for _, current := range storers {
if _, found := storerPresent[current]; !found {
storerPresent[current] = true
s = append(s, current)
}
}

return s
}

func NewStorages(configuration configurationtypes.AbstractConfigurationInterface) ([]Storer, error) {
storers := []Storer{}
for _, storerName := range uniqueStorers(configuration.GetDefaultCache().GetStorers()) {
if newStorage, found := storageMap[storerName]; found {
instance, err := newStorage(configuration)
if err != nil {
configuration.GetLogger().Sugar().Debugf("Cannot load configuration for the chianed provider %s: %+v", storerName, err)
continue
}

configuration.GetLogger().Sugar().Debugf("Append storer %s to the chain", storerName)
storers = append(storers, instance)
} else {
configuration.GetLogger().Sugar().Debugf("Storer with name %s not found", storerName)
}
}

if len(storers) == 0 {
configuration.GetLogger().Debug("Not able to create storers chain from the storers slice, fallback to the default storer creation")
instance, err := NewStorage(configuration)
if err != nil || instance == nil {
return nil, err
}

storers = append(storers, instance)
}

names := []string{}
for _, s := range storers {
names = append(names, s.Name())
}
configuration.GetLogger().Sugar().Debugf("Run with %d chained providers with the given order %s", len(storers), strings.Join(names, ", "))
return storers, nil
}

func varyVoter(baseKey string, req *http.Request, currentKey string) bool {
if currentKey == baseKey {
return true
Expand Down
Loading

0 comments on commit ad59438

Please sign in to comment.