Skip to content

Commit

Permalink
feat(chore): singleflight when request backend multiple time at once
Browse files Browse the repository at this point in the history
  • Loading branch information
darkweak committed Oct 8, 2023
1 parent f45816f commit 29597e5
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 175 deletions.
49 changes: 0 additions & 49 deletions cache/coalescing/requestCoalescing.go

This file was deleted.

17 changes: 0 additions & 17 deletions cache/coalescing/types.go

This file was deleted.

3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/dgraph-io/ristretto v0.1.1
github.com/fsnotify/fsnotify v1.6.0
github.com/go-chi/stampede v0.5.1
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.13
github.com/nutsdb/nutsdb v0.14.0
Expand All @@ -17,6 +16,7 @@ require (
github.com/redis/go-redis/v9 v9.1.0
go.etcd.io/etcd/client/v3 v3.5.9
go.uber.org/zap v1.26.0
golang.org/x/sync v0.2.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -58,7 +58,6 @@ require (
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/go-chi/cors v1.2.0 h1:tV1g1XENQ8ku4Bq3K9ub2AtgG+p16SmzeMSGTwrOKdE=
github.com/go-chi/cors v1.2.0/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-chi/stampede v0.5.1 h1:faVKiLt5dCFrfTmAOzWxcW/YLfKUNJCNbbfUgGn8ESU=
github.com/go-chi/stampede v0.5.1/go.mod h1:lrMOBraJLDgizaoAD+LQoC/sVB2t9mYsGwqDTjUHPCE=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
Expand Down
42 changes: 0 additions & 42 deletions pkg/coalescing/requestCoalescing.go

This file was deleted.

13 changes: 0 additions & 13 deletions pkg/coalescing/types.go

This file was deleted.

146 changes: 98 additions & 48 deletions pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pquerna/cachecontrol/cacheobject"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/singleflight"
)

func NewHTTPCacheHandler(c configurationtypes.AbstractConfigurationInterface) *SouinBaseHandler {
Expand Down Expand Up @@ -96,6 +97,7 @@ func NewHTTPCacheHandler(c configurationtypes.AbstractConfigurationInterface) *S
context: ctx,
bufPool: bufPool,
storersLen: len(storers),
singleflightPool: singleflight.Group{},
}
}

Expand All @@ -109,6 +111,7 @@ type SouinBaseHandler struct {
SurrogateKeyStorer providers.SurrogateInterface
DefaultMatchedUrl configurationtypes.URL
context *context.Context
singleflightPool singleflight.Group
bufPool *sync.Pool
storersLen int
}
Expand Down Expand Up @@ -275,6 +278,12 @@ func (s *SouinBaseHandler) Store(
return nil
}

type singleflightValue struct {
body []byte
headers http.Header
code int
}

func (s *SouinBaseHandler) Upstream(
customWriter *CustomWriter,
rq *http.Request,
Expand All @@ -284,73 +293,114 @@ func (s *SouinBaseHandler) Upstream(
) error {
s.Configuration.GetLogger().Sugar().Debug("Request the upstream server")
prometheus.Increment(prometheus.RequestCounter)
if err := next(customWriter, rq); err != nil {
customWriter.Header().Set("Cache-Status", fmt.Sprintf("%s; fwd=uri-miss; key=%s; detail=SERVE-HTTP-ERROR", rq.Context().Value(context.CacheName), rfc.GetCacheKeyFromCtx(rq.Context())))
return err
}
shared := true

sfValue, err, _ := s.singleflightPool.Do(cachedKey, func() (interface{}, error) {
shared = false
if e := next(customWriter, rq); e != nil {
customWriter.Header().Set("Cache-Status", fmt.Sprintf("%s; fwd=uri-miss; key=%s; detail=SERVE-HTTP-ERROR", rq.Context().Value(context.CacheName), rfc.GetCacheKeyFromCtx(rq.Context())))
return nil, e
}

s.SurrogateKeyStorer.Invalidate(rq.Method, customWriter.Header())
if !isCacheableCode(customWriter.statusCode) {
customWriter.Headers.Set("Cache-Status", fmt.Sprintf("%s; fwd=uri-miss; key=%s; detail=UNCACHEABLE-STATUS-CODE", rq.Context().Value(context.CacheName), rfc.GetCacheKeyFromCtx(rq.Context())))
s.SurrogateKeyStorer.Invalidate(rq.Method, customWriter.Header())
if !isCacheableCode(customWriter.statusCode) {
customWriter.Headers.Set("Cache-Status", fmt.Sprintf("%s; fwd=uri-miss; key=%s; detail=UNCACHEABLE-STATUS-CODE", rq.Context().Value(context.CacheName), rfc.GetCacheKeyFromCtx(rq.Context())))

switch customWriter.statusCode {
case 500, 502, 503, 504:
return new(upsreamError)
switch customWriter.statusCode {
case 500, 502, 503, 504:
return nil, new(upsreamError)
}
}

return nil
}
if customWriter.Header().Get("Cache-Control") == "" {
customWriter.Header().Set("Cache-Control", s.DefaultMatchedUrl.DefaultCacheControl)
}

if customWriter.Header().Get("Cache-Control") == "" {
// TODO see with @mnot if mandatory to not store the response when no Cache-Control given.
// if s.DefaultMatchedUrl.DefaultCacheControl == "" {
// customWriter.Headers.Set("Cache-Status", fmt.Sprintf("%s; fwd=uri-miss; key=%s; detail=EMPTY-RESPONSE-CACHE-CONTROL", rq.Context().Value(context.CacheName), rfc.GetCacheKeyFromCtx(rq.Context())))
// return nil
// }
customWriter.Header().Set("Cache-Control", s.DefaultMatchedUrl.DefaultCacheControl)
}
select {
case <-rq.Context().Done():
return nil, baseCtx.Canceled
default:
err := s.Store(customWriter, rq, requestCc, cachedKey)

select {
case <-rq.Context().Done():
return baseCtx.Canceled
default:
return s.Store(customWriter, rq, requestCc, cachedKey)
return singleflightValue{
body: customWriter.Buf.Bytes(),
headers: customWriter.Headers,
code: customWriter.statusCode,
}, err
}
})

if err != nil {
return err
}
if sfWriter, ok := sfValue.(singleflightValue); ok && shared {
s.Configuration.GetLogger().Sugar().Infof("Reused response from concurrent request with the key %s", cachedKey)
customWriter.Buf.Reset()
customWriter.Buf.Write(sfWriter.body)
customWriter.Headers = sfWriter.headers
customWriter.statusCode = sfWriter.code
}

return nil
}

func (s *SouinBaseHandler) Revalidate(validator *rfc.Revalidator, next handlerFunc, customWriter *CustomWriter, rq *http.Request, requestCc *cacheobject.RequestCacheDirectives, cachedKey string) error {
s.Configuration.GetLogger().Sugar().Debug("Revalidate the request with the upstream server")
prometheus.Increment(prometheus.RequestRevalidationCounter)
err := next(customWriter, rq)
s.SurrogateKeyStorer.Invalidate(rq.Method, customWriter.Header())

if err == nil {
if validator.IfUnmodifiedSincePresent && customWriter.statusCode != http.StatusNotModified {
customWriter.Buf.Reset()
for h, v := range customWriter.Headers {
if len(v) > 0 {
customWriter.Rw.Header().Set(h, strings.Join(v, ", "))

shared := true

sfValue, err, _ := s.singleflightPool.Do(cachedKey, func() (interface{}, error) {
shared = false
err := next(customWriter, rq)
s.SurrogateKeyStorer.Invalidate(rq.Method, customWriter.Header())

if err == nil {
if validator.IfUnmodifiedSincePresent && customWriter.statusCode != http.StatusNotModified {
customWriter.Buf.Reset()
for h, v := range customWriter.Headers {
if len(v) > 0 {
customWriter.Rw.Header().Set(h, strings.Join(v, ", "))
}
}
customWriter.Rw.WriteHeader(http.StatusPreconditionFailed)

return nil, errors.New("")
}
customWriter.Rw.WriteHeader(http.StatusPreconditionFailed)

return errors.New("")
if customWriter.statusCode != http.StatusNotModified {
err = s.Store(customWriter, rq, requestCc, cachedKey)
}
}

if customWriter.statusCode != http.StatusNotModified {
err = s.Store(customWriter, rq, requestCc, cachedKey)
}
customWriter.Header().Set(
"Cache-Status",
fmt.Sprintf(
"%s; fwd=request; fwd-status=%d; key=%s; detail=REQUEST-REVALIDATION",
rq.Context().Value(context.CacheName),
customWriter.statusCode,
rfc.GetCacheKeyFromCtx(rq.Context()),
),
)

return singleflightValue{
body: customWriter.Buf.Bytes(),
headers: customWriter.Headers,
code: customWriter.statusCode,
}, err
})

if err != nil {
return err
}
if sfWriter, ok := sfValue.(singleflightValue); ok && shared {
s.Configuration.GetLogger().Sugar().Infof("Reused response from concurrent request with the key %s", cachedKey)
customWriter.Buf.Reset()
customWriter.Buf.Write(sfWriter.body)
customWriter.Headers = sfWriter.headers
customWriter.statusCode = sfWriter.code
}

customWriter.Header().Set(
"Cache-Status",
fmt.Sprintf(
"%s; fwd=request; fwd-status=%d; key=%s; detail=REQUEST-REVALIDATION",
rq.Context().Value(context.CacheName),
customWriter.statusCode,
rfc.GetCacheKeyFromCtx(rq.Context()),
),
)
return err
}

Expand Down
Loading

0 comments on commit 29597e5

Please sign in to comment.