Skip to content

Commit

Permalink
Search concurrency fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Dec 19, 2024
1 parent fd71bd3 commit d93fd7c
Show file tree
Hide file tree
Showing 17 changed files with 186 additions and 123 deletions.
12 changes: 6 additions & 6 deletions clienter_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 25 additions & 5 deletions cmd/slackdump/internal/archive/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"log/slog"
"strings"
"sync"

"github.com/rusq/fsadapter"
"github.com/schollz/progressbar/v3"

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/bootstrap"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
Expand All @@ -29,12 +32,14 @@ var CmdSearch = &base.Command{
},
}

const flagMask = cfg.OmitUserCacheFlag | cfg.OmitCacheDir | cfg.OmitTimeframeFlag

var cmdSearchMessages = &base.Command{
UsageLine: "slackdump search messages [flags] query terms",
Short: "records search results matching the given query",
Long: `Searches for messages matching criteria.`,
RequireAuth: true,
FlagMask: cfg.OmitUserCacheFlag | cfg.OmitCacheDir,
FlagMask: flagMask,
Run: runSearchMsg,
PrintFlags: true,
}
Expand All @@ -44,7 +49,7 @@ var cmdSearchFiles = &base.Command{
Short: "records search results matching the given query",
Long: `Searches for messages matching criteria.`,
RequireAuth: true,
FlagMask: cfg.OmitUserCacheFlag | cfg.OmitCacheDir,
FlagMask: flagMask,
Run: runSearchFiles,
PrintFlags: true,
}
Expand All @@ -54,7 +59,7 @@ var cmdSearchAll = &base.Command{
Short: "Searches for messages and files matching criteria. ",
Long: `Records search message and files results matching the given query`,
RequireAuth: true,
FlagMask: cfg.OmitUserCacheFlag | cfg.OmitCacheDir,
FlagMask: flagMask,
Run: runSearchAll,
PrintFlags: true,
}
Expand Down Expand Up @@ -135,15 +140,30 @@ func initController(ctx context.Context, args []string) (*control.Controller, fu
defer cd.Close()

lg := slog.Default()
dl, stop := fileproc.NewDownloader(
dl, dlstop := fileproc.NewDownloader(
ctx,
cfg.DownloadFiles,
sess.Client(),
fsadapter.NewDirectory(cd.Name()),
lg,
)

var sopts []stream.Option
pb := bootstrap.ProgressBar(ctx, lg, progressbar.OptionShowCount()) // progress bar
stop := func() {
dlstop()
pb.Finish()
}

var once sync.Once

sopts := []stream.Option{
stream.OptResultFn(func(sr stream.Result) error {
lg.DebugContext(ctx, "stream", "result", sr.String())
once.Do(func() { pb.Describe(sr.String()) })
pb.Add(sr.Count)
return nil
}),
}
if fastSearch {
sopts = append(sopts, stream.OptFastSearch())
}
Expand Down
30 changes: 30 additions & 0 deletions cmd/slackdump/internal/bootstrap/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package bootstrap

import (
"context"
"log/slog"

"github.com/schollz/progressbar/v3"
)

func ProgressBar(ctx context.Context, lg *slog.Logger, opts ...progressbar.Option) *progressbar.ProgressBar {
fullopts := append([]progressbar.Option{
progressbar.OptionClearOnFinish(),
progressbar.OptionSpinnerType(8),
}, opts...)

pb := newProgressBar(progressbar.NewOptions(
-1,
fullopts...),
lg.Enabled(ctx, slog.LevelDebug),
)
_ = pb.RenderBlank()
return pb
}

func newProgressBar(pb *progressbar.ProgressBar, debug bool) *progressbar.ProgressBar {
if debug {
return progressbar.DefaultSilent(0)
}
return pb
}
16 changes: 2 additions & 14 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/rusq/slackdump/v3"

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/bootstrap"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/control"
Expand Down Expand Up @@ -58,13 +59,7 @@ func export(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, list
sdl, stop := fileproc.NewDownloader(ctx, dlEnabled, sess.Client(), fsa, lg)
defer stop()

pb := newProgressBar(progressbar.NewOptions(
-1,
progressbar.OptionClearOnFinish(),
progressbar.OptionSpinnerType(8)),
lg.Enabled(ctx, slog.LevelDebug),
)
_ = pb.RenderBlank()
pb := bootstrap.ProgressBar(ctx, lg, progressbar.OptionShowCount()) // progress bar

stream := sess.Stream(
stream.OptOldest(time.Time(cfg.Oldest)),
Expand Down Expand Up @@ -110,13 +105,6 @@ func export(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, list
return nil
}

func newProgressBar(pb *progressbar.ProgressBar, debug bool) progresser {
if debug {
return progressbar.DefaultSilent(0)
}
return pb
}

// progresser is an interface for progress bars.
type progresser interface {
RenderBlank() error
Expand Down
14 changes: 9 additions & 5 deletions cmd/slackdump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -37,7 +38,6 @@ import (
)

func init() {

base.Slackdump.Commands = []*base.Command{
workspace.CmdWorkspace,
archive.CmdArchive,
Expand Down Expand Up @@ -121,8 +121,12 @@ BigCmdLoop:
continue
}
if err := invoke(cmd, args); err != nil {
msg := fmt.Sprintf("%03[1]d (%[1]s): %[2]s.", base.ExitStatus(), err)
slog.Error(msg)
if errors.Is(err, context.Canceled) {
slog.Info("operation cancelled")
} else {
msg := fmt.Sprintf("%03[1]d (%[1]s): %[2]s.", base.ExitStatus(), err)
slog.Error(msg)
}
}
base.Exit()
return
Expand Down Expand Up @@ -250,15 +254,15 @@ func initLog(filename string, jsonHandler bool, verbose bool) (*slog.Logger, err
if verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
}
var opts = &slog.HandlerOptions{
opts := &slog.HandlerOptions{
Level: iftrue(verbose, slog.LevelDebug, slog.LevelInfo),
}
if jsonHandler {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, opts)))
}
if filename != "" {
slog.Debug("log messages will be written to file", "filename", filename)
lf, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
lf, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o666)
if err != nil {
return slog.Default(), fmt.Errorf("failed to create the log file: %w", err)
}
Expand Down
47 changes: 26 additions & 21 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (

"github.com/rusq/fsadapter"
"github.com/rusq/slack"
"github.com/rusq/slackdump/v3/internal/network"
"golang.org/x/time/rate"

"github.com/rusq/slackdump/v3/internal/network"
)

const (
Expand All @@ -35,7 +36,7 @@ var (
// in tests.
type Downloader interface {
// GetFile retreives a given file from its private download URL
GetFile(downloadURL string, writer io.Writer) error
GetFileContext(ctx context.Context, downloadURL string, writer io.Writer) error
}

// Client is the instance of the downloader.
Expand Down Expand Up @@ -152,6 +153,11 @@ func (c *Client) Start(ctx context.Context) {

c.requests = req
c.wg = c.startWorkers(ctx, req)
go func() {
// if all workers die, we should stop the downloader
c.wg.Wait()
slog.Debug("all workers terminated, stopping downloader")
}()
c.started.Store(true)
}

Expand All @@ -164,7 +170,7 @@ func (c *Client) startWorkers(ctx context.Context, req <-chan Request) *sync.Wai
seen := fltSeen(req)
var wg sync.WaitGroup
// create workers
for i := 0; i < c.workers; i++ {
for i := range c.workers {
wg.Add(1)
go func(workerNum int) {
c.worker(ctx, seen)
Expand Down Expand Up @@ -195,6 +201,7 @@ func fltSeen(reqC <-chan Request) <-chan Request {
seen[h] = struct{}{}
filtered <- r
}
slog.Debug("all files processed")
}()
return filtered
}
Expand All @@ -210,22 +217,17 @@ func hash(s string) uint64 {
// worker receives requests from reqC and passes them to saveFile function.
// It will stop if either context is Done, or reqC is closed.
func (c *Client) worker(ctx context.Context, reqC <-chan Request) {
for {
select {
case <-ctx.Done():
trace.Log(ctx, "info", "worker context cancelled")
return
case req, moar := <-reqC:
if !moar {
return
}
lg := c.lg.With("filename", path.Base(req.URL), "destination", req.Fullpath)
lg.DebugContext(ctx, "saving file")
n, err := c.download(ctx, req.Fullpath, req.URL)
if err != nil {
for req := range reqC {
lg := c.lg.With("filename", path.Base(req.URL), "destination", req.Fullpath)
lg.DebugContext(ctx, "saving file")
n, err := c.download(ctx, req.Fullpath, req.URL)
if err != nil {
if errors.Is(err, context.Canceled) {
lg.DebugContext(ctx, "download cancelled")
} else {
lg.ErrorContext(ctx, "error saving file", "error", err)
break
}
} else {
lg.DebugContext(ctx, "file saved", "bytes_written", n)
}
}
Expand All @@ -250,7 +252,7 @@ func (c *Client) download(ctx context.Context, fullpath string, url string) (int
region := trace.StartRegion(ctx, "GetFile")
defer region.End()

if err := c.sc.GetFile(url, tf); err != nil {
if err := c.sc.GetFileContext(ctx, url, tf); err != nil {
if _, err := tf.Seek(0, io.SeekStart); err != nil {
c.lg.ErrorContext(ctx, "seek", "error", err)
}
Expand Down Expand Up @@ -283,14 +285,16 @@ func (c *Client) download(ctx context.Context, fullpath string, url string) (int

// Stop waits for all transfers to finish, and stops the downloader.
func (c *Client) Stop() {
c.mu.Lock()
defer c.mu.Unlock()

if !c.started.CompareAndSwap(true, false) {
return
}

c.mu.Lock()
defer c.mu.Unlock()

slog.Debug("mutex locked, stopping downloader")
close(c.requests)

c.lg.Debug("requests channel closed, waiting for all downloads to complete")
c.wg.Wait()
c.lg.Debug("wait complete: no more files to download")
Expand All @@ -309,6 +313,7 @@ func (c *Client) Download(fullpath string, url string) error {
}

c.requests <- Request{Fullpath: fullpath, URL: url}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/chunk/control/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func userWorker(ctx context.Context, s Streamer, chunkdir *chunk.Directory, tf T
return fmt.Errorf("unable to proceed, no users found")
}
if err := tf.StartWithUsers(ctx, users); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("error starting the transformer: %w", err)
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/chunk/transform/fileproc/fileproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/rusq/fsadapter"
"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/downloader"
"github.com/rusq/slackdump/v3/internal/structures/files"
)
Expand Down Expand Up @@ -118,7 +119,7 @@ func (NoopDownloader) Download(fullpath string, url string) error {

type FileGetter interface {
// GetFile retreives a given file from its private download URL
GetFile(downloadURL string, writer io.Writer) error
GetFileContext(ctx context.Context, downloadURL string, writer io.Writer) error
}

// NewDownloader initializes the downloader and returns it, along with a
Expand Down
6 changes: 4 additions & 2 deletions internal/edge/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ func (w *Wrapper) AuthTestContext(ctx context.Context) (response *slack.AuthTest
func (w *Wrapper) GetConversationHistoryContext(ctx context.Context, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) {
return w.cl.GetConversationHistoryContext(ctx, params)
}

func (w *Wrapper) GetConversationRepliesContext(ctx context.Context, params *slack.GetConversationRepliesParameters) (msgs []slack.Message, hasMore bool, nextCursor string, err error) {
return w.cl.GetConversationRepliesContext(ctx, params)
}

func (w *Wrapper) GetUsersPaginated(options ...slack.GetUsersOption) slack.UserPagination {
return w.cl.GetUsersPaginated(options...)
}
Expand All @@ -60,8 +62,8 @@ func (w *Wrapper) GetUsersInConversationContext(ctx context.Context, params *sla
return w.edge.GetUsersInConversationContext(ctx, params)
}

func (w *Wrapper) GetFile(downloadURL string, writer io.Writer) error {
return w.cl.GetFile(downloadURL, writer)
func (w *Wrapper) GetFileContext(ctx context.Context, downloadURL string, writer io.Writer) error {
return w.cl.GetFileContext(ctx, downloadURL, writer)
}

func (w *Wrapper) GetUsersContext(ctx context.Context, options ...slack.GetUsersOption) ([]slack.User, error) {
Expand Down
Loading

0 comments on commit d93fd7c

Please sign in to comment.