Skip to content

Commit

Permalink
Merge pull request #400 from rusq/retry-EOF
Browse files Browse the repository at this point in the history
add redownload tool and treat EOF as recoverable error
  • Loading branch information
rusq authored Jan 10, 2025
2 parents 0b96430 + 2d1bf1f commit 0e02dcb
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 12 deletions.
189 changes: 189 additions & 0 deletions cmd/slackdump/internal/diag/redownload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package diag

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"

"github.com/rusq/fsadapter"
"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/bootstrap"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/processor"
)

var cmdRedownload = &base.Command{
UsageLine: "tools redownload [flags] <archive_dir>",
Short: "attempts to redownload missing files from the archive",
Long: `# File redownload tool
Redownload tool scans the directory with Slackdump Archive, validating the files.
If a file is missing or has zero length, it will be redownloaded from the Slack API.
The tool will not overwrite existing files, so it is safe to run multiple times.
Please note:
1. It requires you to have a valid authentication in the selected workspace.
2. Ensure that you have selected the correct workspace using "slackdump workspace select".
3. It only works with Slackdump Archive directories, Slack exports and dumps
are not supported.`,
FlagMask: cfg.OmitAll &^ cfg.OmitAuthFlags,
Run: runRedownload,
PrintFlags: true,
RequireAuth: true,
}

func runRedownload(ctx context.Context, _ *base.Command, args []string) error {
if len(args) != 1 {
base.SetExitStatus(base.SInvalidParameters)
return errors.New("expected exactly one argument")
}
dir := args[0]
if fi, err := os.Stat(dir); err != nil {
base.SetExitStatus(base.SUserError)
return fmt.Errorf("error accessing the directory: %w", err)
} else if !fi.IsDir() {
base.SetExitStatus(base.SUserError)
return errors.New("expected a directory")
}
if fi, err := os.Stat(filepath.Join(dir, "workspace.json.gz")); err != nil {
base.SetExitStatus(base.SUserError)
return fmt.Errorf("error accessing the workspace file: %w", err)
} else if fi.IsDir() {
base.SetExitStatus(base.SUserError)
return errors.New("this does not look like an archive directory")
}

if n, err := redownload(ctx, dir); err != nil {
base.SetExitStatus(base.SApplicationError)
return err
} else {
if n == 0 {
slog.Info("no missing files found")
} else {
slog.Info("redownloaded missing files", "num_files", n)
}
}

return nil
}

func redownload(ctx context.Context, dir string) (int, error) {
cd, err := chunk.OpenDir(dir)
if err != nil {
return 0, fmt.Errorf("error opening directory: %w", err)
}
defer cd.Close()

channels, err := cd.Channels()
if err != nil {
return 0, fmt.Errorf("error reading channels: %w", err)
}
if len(channels) == 0 {
return 0, errors.New("no channels found")
}
slog.Info("directory opened", "num_channels", len(channels))

sess, err := bootstrap.SlackdumpSession(ctx)
if err != nil {
return 0, fmt.Errorf("error creating slackdump session: %w", err)
}
dl, stop := fileproc.NewDownloader(
ctx,
true,
sess.Client(),
fsadapter.NewDirectory(cd.Name()),
cfg.Log,
)
defer stop()
// we are using the same file subprocessor as the mattermost export.
fproc := fileproc.NewExport(fileproc.STmattermost, dl)

total := 0
for _, ch := range channels {
if n, err := redlChannel(ctx, fproc, cd, &ch); err != nil {
return total, err
} else {
total += n
}
}

return total, nil
}

func redlChannel(ctx context.Context, fp processor.Filer, cd *chunk.Directory, ch *slack.Channel) (int, error) {
slog.Info("processing channel", "channel", ch.ID)
f, err := cd.Open(chunk.FileID(ch.ID))
if err != nil {
return 0, fmt.Errorf("error reading messages: %w", err)
}
defer f.Close()
msgs, err := f.AllMessages(ch.ID)
if err != nil {
return 0, fmt.Errorf("error reading messages: %w", err)
}
if len(msgs) == 0 {
return 0, nil
}
slog.Info("scanning messages", "num_messages", len(msgs))
return scanMsgs(ctx, fp, cd, f, ch, msgs)
}

func scanMsgs(ctx context.Context, fp processor.Filer, cd *chunk.Directory, f *chunk.File, ch *slack.Channel, msgs []slack.Message) (int, error) {
lg := slog.With("channel", ch.ID)
total := 0
for _, m := range msgs {
if structures.IsThreadStart(&m) {
tm, err := f.AllThreadMessages(ch.ID, m.ThreadTimestamp)
if err != nil {
return 0, fmt.Errorf("error reading thread messages: %w", err)
}
lg.Info("scanning thread messages", "num_messages", len(tm), "thread", m.ThreadTimestamp)
if n, err := scanMsgs(ctx, fp, cd, f, ch, tm); err != nil {
return total, err
} else {
total += n
}
}

// collect all missing files from the message.
var missing []slack.File
for _, ff := range m.Files {
name := filepath.Join(cd.Name(), fileproc.MattermostFilepath(ch, &ff))
lg := lg.With("file", name)
lg.Debug("checking file")
if fi, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
// file does not exist
lg.Debug("missing file")
missing = append(missing, ff)
} else {
lg.Error("error accessing file", "error", err)
// some other error
return total, fmt.Errorf("error accessing file: %w", err)
}
} else if fi.Size() == 0 {
// zero length files are considered missing
lg.Debug("zero length file")
missing = append(missing, ff)
} else {
lg.Debug("file OK")
}
}
if len(missing) > 0 {
total += len(missing)
lg.Info("found missing files", "num_files", len(missing))
if err := fp.Files(ctx, ch, m, missing); err != nil {
return total, fmt.Errorf("error processing files: %w", err)
}
}
}
return total, nil
}
1 change: 1 addition & 0 deletions cmd/slackdump/internal/diag/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Tools command contains different tools, running which may be requested if you op
// cmdSearch,
cmdThread,
cmdHydrate,
cmdRedownload,
// cmdWizDebug,
},
}
11 changes: 11 additions & 0 deletions internal/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -117,6 +118,16 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(
ne *net.OpError // read tcp error: see #234
)
switch {
case errors.Is(cbErr, io.EOF):
// EOF is a transient error
delay := wait(attempt)
slog.WarnContext(ctx, "got EOF, sleeping", "error", cbErr, "delay", delay.String())
tracelogf(ctx, "info", "got EOF, sleeping %s (%s)", delay, cbErr)
if err := sleepCtx(ctx, delay); err != nil {
return err
}
slog.Debug("resuming after EOF")
continue
case errors.As(cbErr, &rle):
slog.InfoContext(ctx, "got rate limited, sleeping", "retry_after_sec", rle.RetryAfter, "error", cbErr)
tracelogf(ctx, "info", "got rate limited, sleeping %s (%s)", rle.RetryAfter, cbErr)
Expand Down
30 changes: 23 additions & 7 deletions internal/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -186,12 +187,10 @@ func TestWithRetry(t *testing.T) {
}
})
}
// setting fast wait function
t.Run("500 error handling", func(t *testing.T) {
t.Parallel()
setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond })
defer func() {
setWaitFunc(cubicWait)
}()
t.Cleanup(func() { setWaitFunc(cubicWait) })

codes := []int{500, 502, 503, 504, 598}
for _, code := range codes {
Expand Down Expand Up @@ -232,8 +231,6 @@ func TestWithRetry(t *testing.T) {
})
}
t.Run("404 error", func(t *testing.T) {
t.Parallel()

const (
testRetryCount = 1
)
Expand Down Expand Up @@ -265,8 +262,27 @@ func TestWithRetry(t *testing.T) {
}
})
})
t.Run("EOF error", func(t *testing.T) {
setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond })
t.Cleanup(func() { setWaitFunc(cubicWait) })

reterr := []error{io.EOF, io.EOF, nil}
var retries int

ctx := context.Background()
err := WithRetry(ctx, rate.NewLimiter(1, 1), 3, func() error {
err := reterr[retries]
if err != nil {
retries++
}
return err
})
assert.NoError(t, err)
assert.Equal(t, 2, retries)
})
t.Run("meaningful error message", func(t *testing.T) {
t.Parallel()
setWaitFunc(func(attempt int) time.Duration { return 50 * time.Millisecond })
t.Cleanup(func() { setWaitFunc(cubicWait) })
errFunc := func() error {
return slack.StatusCodeError{Code: 500, Status: "Internal Server Error"}
}
Expand Down
12 changes: 7 additions & 5 deletions stream/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"time"

"github.com/rusq/slack"
"golang.org/x/sync/errgroup"

"github.com/rusq/slackdump/v3/internal/network"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/processor"
"golang.org/x/sync/errgroup"
)

// SyncConversations fetches the conversations from the link which can be a
Expand Down Expand Up @@ -277,7 +278,7 @@ func (cs *Stream) thread(ctx context.Context, req request, callback func(mm []sl
func procChanMsg(ctx context.Context, proc processor.Conversations, threadC chan<- request, channel *slack.Channel, isLast bool, mm []slack.Message) (int, error) {
lg := slog.With("channel_id", channel.ID, "is_last", isLast, "msg_count", len(mm))

var trs = make([]request, 0, len(mm))
trs := make([]request, 0, len(mm))
for i := range mm {
// collecting threads to get their count. But we don't start
// processing them yet, before we send the messages with the number of
Expand Down Expand Up @@ -326,11 +327,12 @@ func procThreadMsg(ctx context.Context, proc processor.Conversations, channel *s
return err
}
if err := proc.ThreadMessages(ctx, channel.ID, head, threadOnly, isLast, rest); err != nil {
return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", head.Msg.Timestamp, threadTS, err)
return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", head.Timestamp, threadTS, err)
}
return nil
}

// procFiles proceses the files in slice of Messages msgs.
func procFiles(ctx context.Context, proc processor.Filer, channel *slack.Channel, msgs ...slack.Message) error {
if len(msgs) == 0 {
return nil
Expand Down Expand Up @@ -416,7 +418,7 @@ func (cs *Stream) procChannelUsers(ctx context.Context, proc processor.ChannelIn
func (cs *Stream) procChannelInfoWithUsers(ctx context.Context, proc processor.ChannelInformer, channelID, threadTS string) (*slack.Channel, error) {
var eg errgroup.Group

var chC = make(chan slack.Channel, 1)
chC := make(chan slack.Channel, 1)
eg.Go(func() error {
defer close(chC)
ch, err := cs.procChannelInfo(ctx, proc, channelID, threadTS)
Expand All @@ -427,7 +429,7 @@ func (cs *Stream) procChannelInfoWithUsers(ctx context.Context, proc processor.C
return nil
})

var uC = make(chan []string, 1)
uC := make(chan []string, 1)
eg.Go(func() error {
defer close(uC)
m, err := cs.procChannelUsers(ctx, proc, channelID, threadTS)
Expand Down
3 changes: 3 additions & 0 deletions utils/count_chunks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

gzcat $1.json.gz| jq '.t' | awk '{count[$1]++}END{for(t in count)print t,count[t]}'

0 comments on commit 0e02dcb

Please sign in to comment.