Skip to content

Commit

Permalink
Merge branch 'main' into fix-content-types
Browse files Browse the repository at this point in the history
  • Loading branch information
matt0x6F authored Jun 2, 2024
2 parents fe4c2f0 + 0ef761c commit 7d327c7
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 17 deletions.
6 changes: 3 additions & 3 deletions cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func addProxyRoutes(

lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs)
checker := storage.WithChecker(s)
withSingleFlight, err := getSingleFlight(l, c, checker)
withSingleFlight, err := getSingleFlight(l, c, s, checker)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a
l.logger.WithContext(ctx).Printf(format, v...)
}

func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType {
case "", "memory":
return stash.WithSingleflight, nil
Expand Down Expand Up @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (
if c.StorageType != "gcp" {
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
}
return stash.WithGCSLock, nil
return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s)
case "azureblob":
if c.StorageType != "azureblob" {
return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType)
Expand Down
4 changes: 4 additions & 0 deletions config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ ShutdownTimeout = 60
# Max retries while acquiring the lock. Defaults to 10.
# Env override: ATHENS_REDIS_LOCK_MAX_RETRIES
MaxRetries = 10
[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
[Storage]
# Only storage backends that are specified in Proxy.StorageType are required here
[Storage.CDN]
Expand Down
11 changes: 11 additions & 0 deletions docs/content/configuration/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red
SentinelPassword = "sekret"

Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis.


### Using GCP as a singleflight mechanism

The GCP singleflight mechanism does not required configuration, and works out of the box. It has a
single option with which it can be customized:

[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func defaultConfig() *Config {
SentinelPassword: "sekret",
LockConfig: DefaultRedisLockConfig(),
},
GCP: DefaultGCPConfig(),
},
Index: &Index{
MySQL: &MySQL{
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) {
LockConfig: DefaultRedisLockConfig(),
},
Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"},
GCP: DefaultGCPConfig(),
}

expConf := &Config{
Expand Down Expand Up @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string {
} else if singleFlight.Etcd != nil {
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd"
envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints
} else if singleFlight.GCP != nil {
envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold)
}
}
return envVars
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type SingleFlight struct {
Etcd *Etcd
Redis *Redis
RedisSentinel *RedisSentinel
GCP *GCP
}

// Etcd holds client side configuration
Expand Down Expand Up @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig {
MaxRetries: 10,
}
}

// GCP is the configuration for GCP locking.
type GCP struct {
StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"`
}

// DefaultGCPConfig returns the default GCP locking configuration.
func DefaultGCPConfig() *GCP {
return &GCP{
StaleThreshold: 120,
}
}
21 changes: 19 additions & 2 deletions pkg/stash/with_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,32 @@ package stash

import (
"context"
"fmt"
"time"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/gcp"
)

// WithGCSLock returns a distributed singleflight
// using a GCS backend. See the config.toml documentation for details.
func WithGCSLock(s Stasher) Stasher {
return &gcsLock{s}
func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) {
if staleThreshold <= 0 {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold"))
}
// Since we *must* be using a GCP stoagfe backend, we can abuse this
// fact to mutate it, so that we can get our threshold into Save().
// Your instincts are correct, this is kind of gross.
gs, ok := s.(*gcp.Storage)
if !ok {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage"))
}
gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second)
return func(s Stasher) Stasher {
return &gcsLock{s}
}, nil
}

type gcsLock struct {
Expand Down
79 changes: 78 additions & 1 deletion pkg/stash/with_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stash
import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
Expand All @@ -17,6 +18,12 @@ import (
"golang.org/x/sync/errgroup"
)

type failReader int

func (f *failReader) Read([]byte) (int, error) {
return 0, fmt.Errorf("failure")
}

// TestWithGCS requires a real GCP backend implementation
// and it will ensure that saving to modules at the same time
// is done synchronously so that only the first module gets saved.
Expand All @@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) {
for i := 0; i < 5; i++ {
content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
s := WithGCSLock(ms)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down Expand Up @@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) {
}
}

// TestWithGCSPartialFailure equires a real GCP backend implementation
// and ensures that if one of the non-singleflight-lock files fails to
// upload, that the cache does not remain poisoned.
func TestWithGCSPartialFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
const (
mod = "stashmod"
ver = "v1.0.0"
)
strg := getStorage(t)
strg.Delete(ctx, mod, ver)
defer strg.Delete(ctx, mod, ver)

// sanity check
_, err := strg.GoMod(ctx, mod, ver)
if !errors.Is(err, errors.KindNotFound) {
t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err)
}

content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
fr := new(failReader)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
// We simulate a failure by manually passing an io.Reader that will fail.
err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content))
if err == nil {
// We *want* to fail.
t.Fatal(err)
}

// Now try a Stash. This should upload the missing files.
_, err = s.Stash(ctx, "stashmod", "v1.0.0")
if err != nil {
t.Fatal(err)
}

info, err := strg.Info(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
modContent, err := strg.GoMod(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
zip, err := strg.Zip(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
defer zip.Close()
zipContent, err := io.ReadAll(zip)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(info, modContent) {
t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent))
}
if !bytes.Equal(info, zipContent) {
t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent))
}
}

// mockGCPStasher is like mockStasher
// but leverages in memory storage
// so that redis can determine
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (

// Storage implements the (./pkg/storage).Backend interface.
type Storage struct {
bucket *storage.BucketHandle
timeout time.Duration
bucket *storage.BucketHandle
timeout time.Duration
staleThreshold time.Duration
}

// New returns a new Storage instance backed by a Google Cloud Storage bucket.
Expand Down
Loading

0 comments on commit 7d327c7

Please sign in to comment.