From 3cb1506b52f3ff502a4d52ff6910611cb7008e54 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Sun, 7 Apr 2024 18:39:21 +1000 Subject: [PATCH 1/3] backport logger and network changes --- export/export.go | 2 - internal/network/network.go | 50 ++++++++------ internal/network/network_test.go | 112 +++++++++++++++++-------------- logger/logger.go | 42 ++++++++++-- logger/logger_test.go | 16 +++++ slackdump.go | 2 - 6 files changed, 147 insertions(+), 77 deletions(-) create mode 100644 logger/logger_test.go diff --git a/export/export.go b/export/export.go index 547a9f5f..f8fd949b 100644 --- a/export/export.go +++ b/export/export.go @@ -14,7 +14,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/rusq/slackdump/v2" - "github.com/rusq/slackdump/v2/internal/network" "github.com/rusq/slackdump/v2/internal/structures" "github.com/rusq/slackdump/v2/internal/structures/files/dl" "github.com/rusq/slackdump/v2/logger" @@ -38,7 +37,6 @@ func New(sd *slackdump.Session, fs fsadapter.FS, cfg Options) *Export { if cfg.Logger == nil { cfg.Logger = logger.Default } - network.SetLogger(cfg.Logger) se := &Export{ fs: fs, diff --git a/internal/network/network.go b/internal/network/network.go index c42fbaa0..b63151b5 100644 --- a/internal/network/network.go +++ b/internal/network/network.go @@ -26,8 +26,8 @@ var ( // The wait time for a transient error depends on the current retry // attempt number and is calculated as: (attempt+2)^3 seconds, capped at // maxAllowedWaitTime. - maxAllowedWaitTime = 5 * time.Minute - lg logger.Interface = logger.Default + maxAllowedWaitTime = 5 * time.Minute + // waitFn returns the amount of time to wait before retrying depending on // the current attempt. This variable exists to reduce the test time. waitFn = cubicWait @@ -38,17 +38,36 @@ var ( // ErrRetryFailed is returned if number of retry attempts exceeded the retry attempts limit and // function wasn't able to complete without errors. -var ErrRetryFailed = errors.New("callback was unable to complete without errors within the allowed number of retries") +type ErrRetryFailed struct { + Err error +} + +func (e *ErrRetryFailed) Error() string { + return fmt.Sprintf("callback was unable to complete without errors within the allowed number of retries: %s", e.Err) +} + +func (e *ErrRetryFailed) Unwrap() error { + return e.Err +} + +func (e *ErrRetryFailed) Is(target error) bool { + _, ok := target.(*ErrRetryFailed) + return ok +} // WithRetry will run the callback function fn. If the function returns // slack.RateLimitedError, it will delay, and then call it again up to // maxAttempts times. It will return an error if it runs out of attempts. func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func() error) error { + tracelogf(ctx, "info", "maxAttempts=%d", maxAttempts) var ok bool if maxAttempts == 0 { maxAttempts = defNumAttempts } + + var lastErr error for attempt := 0; attempt < maxAttempts; attempt++ { + // calling wait to ensure that we don't exceed the rate limit var err error trace.WithRegion(ctx, "WithRetry.wait", func() { err = lim.Wait(ctx) @@ -59,9 +78,11 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func( cbErr := fn() if cbErr == nil { + tracelogf(ctx, "info", "success") ok = true break } + lastErr = cbErr tracelogf(ctx, "error", "WithRetry: %[1]s (%[1]T) after %[2]d attempts", cbErr, attempt+1) var ( @@ -71,14 +92,14 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func( ) switch { case errors.As(cbErr, &rle): - tracelogf(ctx, "info", "got rate limited, sleeping %s", rle.RetryAfter) + tracelogf(ctx, "info", "got rate limited, sleeping %s (%s)", rle.RetryAfter, cbErr) time.Sleep(rle.RetryAfter) continue case errors.As(cbErr, &sce): if isRecoverable(sce.Code) { // possibly transient error delay := waitFn(attempt) - tracelogf(ctx, "info", "got server error %d, sleeping %s", sce.Code, delay) + tracelogf(ctx, "info", "got server error %d, sleeping %s (%s)", sce.Code, delay, cbErr) time.Sleep(delay) continue } @@ -86,7 +107,7 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func( if ne.Op == "read" || ne.Op == "write" { // possibly transient error delay := netWaitFn(attempt) - tracelogf(ctx, "info", "got network error %s, sleeping %s", ne.Op, delay) + tracelogf(ctx, "info", "got network error %s on %q, sleeping %s", cbErr, ne.Op, delay) time.Sleep(delay) continue } @@ -95,7 +116,7 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func( return fmt.Errorf("callback error: %w", cbErr) } if !ok { - return ErrRetryFailed + return &ErrRetryFailed{Err: lastErr} } return nil } @@ -109,7 +130,7 @@ func isRecoverable(statusCode int) bool { // where x is the current attempt number. The maximum wait time is capped at 5 // minutes. func cubicWait(attempt int) time.Duration { - x := attempt + 2 // this is to ensure that we sleep at least 8 seconds. + x := attempt + 1 // this is to ensure that we sleep at least a second. delay := time.Duration(x*x*x) * time.Second if delay > maxAllowedWaitTime { return maxAllowedWaitTime @@ -128,22 +149,11 @@ func expWait(attempt int) time.Duration { func tracelogf(ctx context.Context, category string, fmt string, a ...any) { mu.RLock() defer mu.RUnlock() - + lg := logger.FromContext(ctx) trace.Logf(ctx, category, fmt, a...) lg.Debugf(fmt, a...) } -// SetLogger sets the package logger. -func SetLogger(l logger.Interface) { - mu.Lock() - defer mu.Unlock() - if l == nil { - l = logger.Default - return - } - lg = l -} - // SetMaxAllowedWaitTime sets the maximum time to wait for a transient error. func SetMaxAllowedWaitTime(d time.Duration) { mu.Lock() diff --git a/internal/network/network_test.go b/internal/network/network_test.go index 306f6a94..84ce994f 100644 --- a/internal/network/network_test.go +++ b/internal/network/network_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/slack-go/slack" + "github.com/stretchr/testify/assert" "golang.org/x/time/rate" ) @@ -121,7 +122,7 @@ func Test_withRetry(t *testing.T) { true, calcRunDuration(testRateLimit, 2), }, - {"rate limiter test 4 limited attempts, 100 ms each", + {"rate limiter test 4 lmited attempts, 100 ms each", args{ context.Background(), rate.NewLimiter(10.0, 1), @@ -177,36 +178,70 @@ func Test_withRetry(t *testing.T) { } }) } -} + t.Run("500 error handling", func(t *testing.T) { + waitFn = func(attempt int) time.Duration { return 50 * time.Millisecond } + defer func() { + waitFn = cubicWait + }() + + var codes = []int{500, 502, 503, 504, 598} + for _, code := range codes { + var thisCode = code + // This test is to ensure that we handle 500 errors correctly. + t.Run(fmt.Sprintf("%d error", code), func(t *testing.T) { + + const ( + testRetryCount = 1 + waitThreshold = 100 * time.Millisecond + ) + + // Create a test server that returns a 500 error. + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(thisCode) + })) + defer ts.Close() + + // Create a new client with the test server as the endpoint. + client := slack.New("token", slack.OptionAPIURL(ts.URL+"/")) + + start := time.Now() + // Call the client with a retry. + err := WithRetry(context.Background(), rate.NewLimiter(1, 1), testRetryCount, func() error { + _, err := client.GetConversationHistory(&slack.GetConversationHistoryParameters{}) + if err == nil { + return errors.New("expected error, got nil") + } + return err + }) + if err == nil { + t.Fatal("expected error, got nil") + } -func Test500ErrorHandling(t *testing.T) { - waitFn = func(attempt int) time.Duration { return 50 * time.Millisecond } - defer func() { - waitFn = cubicWait - }() + dur := time.Since(start) + if dur < waitFn(testRetryCount-1)-waitThreshold || waitFn(testRetryCount-1)+waitThreshold < dur { + t.Errorf("expected duration to be around %s, got %s", waitFn(testRetryCount), dur) + } - var codes = []int{500, 502, 503, 504, 598} - for _, code := range codes { - var thisCode = code - // This test is to ensure that we handle 500 errors correctly. - t.Run(fmt.Sprintf("%d error", code), func(t *testing.T) { + }) + } + t.Run("404 error", func(t *testing.T) { + t.Parallel() const ( testRetryCount = 1 - waitThreshold = 100 * time.Millisecond ) - // Create a test server that returns a 500 error. + // Create a test server that returns a 404 error. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(thisCode) + w.WriteHeader(404) })) defer ts.Close() // Create a new client with the test server as the endpoint. client := slack.New("token", slack.OptionAPIURL(ts.URL+"/")) - start := time.Now() // Call the client with a retry. + start := time.Now() err := WithRetry(context.Background(), rate.NewLimiter(1, 1), testRetryCount, func() error { _, err := client.GetConversationHistory(&slack.GetConversationHistoryParameters{}) if err == nil { @@ -217,46 +252,25 @@ func Test500ErrorHandling(t *testing.T) { if err == nil { t.Fatal("expected error, got nil") } - dur := time.Since(start) - if dur < waitFn(testRetryCount-1)-waitThreshold || waitFn(testRetryCount-1)+waitThreshold < dur { - t.Errorf("expected duration to be around %s, got %s", waitFn(testRetryCount), dur) + if dur > 500*time.Millisecond { // 404 error should not be retried + t.Errorf("expected no sleep, but slept for %s", dur) } - }) - } - t.Run("404 error", func(t *testing.T) { + }) + t.Run("meaningful error message", func(t *testing.T) { t.Parallel() - - const ( - testRetryCount = 1 - ) - - // Create a test server that returns a 404 error. - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(404) - })) - defer ts.Close() - - // Create a new client with the test server as the endpoint. - client := slack.New("token", slack.OptionAPIURL(ts.URL+"/")) - - // Call the client with a retry. - start := time.Now() - err := WithRetry(context.Background(), rate.NewLimiter(1, 1), testRetryCount, func() error { - _, err := client.GetConversationHistory(&slack.GetConversationHistoryParameters{}) - if err == nil { - return errors.New("expected error, got nil") - } - return err - }) + var errFunc = func() error { + return slack.StatusCodeError{Code: 500, Status: "Internal Server Error"} + } + err := WithRetry(context.Background(), rate.NewLimiter(1, 1), 1, errFunc) if err == nil { t.Fatal("expected error, got nil") } - dur := time.Since(start) - if dur > 500*time.Millisecond { // 404 error should not be retried - t.Errorf("expected no sleep, but slept for %s", dur) - } + assert.ErrorContains(t, err, "Internal Server Error") + assert.ErrorIs(t, err, &ErrRetryFailed{}) + var sce slack.StatusCodeError + assert.ErrorAs(t, errors.Unwrap(err), &sce) }) } diff --git a/logger/logger.go b/logger/logger.go index a7ce0b49..23a2d653 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -1,23 +1,57 @@ package logger import ( - "io" + "context" "log" "os" "github.com/rusq/dlog" ) +// Interface is the interface for a logger. type Interface interface { Debug(...any) Debugf(fmt string, a ...any) + Debugln(...any) Print(...any) Printf(fmt string, a ...any) Println(...any) + IsDebug() bool } +// Default is the default logger. It logs to stderr and debug logging can be +// enabled by setting the DEBUG environment variable to 1. For example: +// +// DEBUG=1 slackdump var Default = dlog.New(log.Default().Writer(), "", log.LstdFlags, os.Getenv("DEBUG") == "1") -// note: previously ioutil.Discard which is not deprecated in favord of io.Discard -// so this is valid only from go1.16 -var Silent = dlog.New(io.Discard, "", log.LstdFlags, false) +// Silent is a logger that does not log anything. +var Silent = silent{} + +// Silent is a logger that does not log anything. +type silent struct{} + +func (s silent) Debug(...any) {} +func (s silent) Debugf(fmt string, a ...any) {} +func (s silent) Debugln(...any) {} +func (s silent) Print(...any) {} +func (s silent) Printf(fmt string, a ...any) {} +func (s silent) Println(...any) {} +func (s silent) IsDebug() bool { return false } + +type logCtx uint8 + +const ( + logCtxKey logCtx = iota +) + +func NewContext(ctx context.Context, l Interface) context.Context { + return context.WithValue(ctx, logCtxKey, l) +} + +func FromContext(ctx context.Context) Interface { + if l, ok := ctx.Value(logCtxKey).(Interface); ok { + return l + } + return Default +} diff --git a/logger/logger_test.go b/logger/logger_test.go new file mode 100644 index 00000000..fc5f53c8 --- /dev/null +++ b/logger/logger_test.go @@ -0,0 +1,16 @@ +package logger + +import "testing" + +func BenchmarkSlientPrintf(b *testing.B) { + var l = Silent + for i := 0; i < b.N; i++ { + l.Printf("hello world, %s, %d", "foo", i) + } + // This benchmark compares the performance of the Silent logger when + // using io.Discard, and when using a no-op function. + // io.Discard: BenchmarkSlientPrintf-16 93075956 12.92 ns/op 8 B/op 0 allocs/op + // no-op func: BenchmarkSlientPrintf-16 1000000000 0.2364 ns/op 0 B/op 0 allocs/op + // + // Oh, look! We have an WINNER. The no-op function wins, no surprises. +} diff --git a/slackdump.go b/slackdump.go index f0de8f65..989cd450 100644 --- a/slackdump.go +++ b/slackdump.go @@ -107,8 +107,6 @@ func NewWithOptions(ctx context.Context, authProvider auth.Provider, opts Option fs: fsadapter.NewDirectory("."), // default is to save attachments to the current directory. } - network.SetLogger(sd.l()) - if err := os.MkdirAll(opts.CacheDir, 0700); err != nil { return nil, fmt.Errorf("failed to create the cache directory: %s", err) } From 3b5dd14c4f657852fb7b2eabbdde292f67e91a68 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Sun, 7 Apr 2024 19:01:04 +1000 Subject: [PATCH 2/3] fix cubicwait test --- .gitignore | 2 ++ internal/network/network_test.go | 11 ++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 9fe97e7d..1d905c94 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,5 @@ dist/ /tmp *.dot *.gz + +*.html diff --git a/internal/network/network_test.go b/internal/network/network_test.go index 84ce994f..c76618ce 100644 --- a/internal/network/network_test.go +++ b/internal/network/network_test.go @@ -283,11 +283,12 @@ func Test_cubicWait(t *testing.T) { args args want time.Duration }{ - {"attempt 0", args{0}, 8 * time.Second}, - {"attempt 1", args{1}, 27 * time.Second}, - {"attempt 2", args{2}, 64 * time.Second}, - {"attempt 2", args{4}, 216 * time.Second}, - {"attempt 100", args{5}, maxAllowedWaitTime}, // check if capped properly + {"attempt 0", args{0}, 1 * time.Second}, + {"attempt 1", args{1}, 8 * time.Second}, + {"attempt 2", args{2}, 27 * time.Second}, + {"attempt 4", args{4}, 125 * time.Second}, + {"attempt 5", args{5}, 216 * time.Second}, + {"attempt 6", args{6}, maxAllowedWaitTime}, // check if capped properly {"attempt 100", args{1000}, maxAllowedWaitTime}, } for _, tt := range tests { From 3c8cd36052ddf0e3ea84c0dc8a0e00d2e4117d5a Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Sun, 7 Apr 2024 19:02:03 +1000 Subject: [PATCH 3/3] spelling check --- internal/network/network_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/network/network_test.go b/internal/network/network_test.go index c76618ce..3927f30c 100644 --- a/internal/network/network_test.go +++ b/internal/network/network_test.go @@ -122,7 +122,7 @@ func Test_withRetry(t *testing.T) { true, calcRunDuration(testRateLimit, 2), }, - {"rate limiter test 4 lmited attempts, 100 ms each", + {"rate limiter test 4 limited attempts, 100 ms each", args{ context.Background(), rate.NewLimiter(10.0, 1),