Skip to content

Commit

Permalink
Honour rate limit error while streaming Channels
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Jan 8, 2025
1 parent f0af85a commit c11270d
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 8 deletions.
215 changes: 215 additions & 0 deletions stream/mock_stream/mock_stream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
)

// Slacker is the interface with some functions of slack.Client.
//
//go:generate mockgen -destination mock_stream/mock_stream.go . Slacker
type Slacker interface {
AuthTestContext(context.Context) (response *slack.AuthTestResponse, err error)
GetConversationHistoryContext(ctx context.Context, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error)
Expand Down Expand Up @@ -233,12 +235,12 @@ func (cs *Stream) ListChannels(ctx context.Context, proc processor.Channels, p *
var next string
for {
p.Cursor = next
var (
ch []slack.Channel
err error
)
ch, next, err = cs.client.GetConversationsContext(ctx, p)
if err != nil {
var ch []slack.Channel
if err := network.WithRetry(ctx, cs.limits.channels, cs.limits.tier.Tier3.Retries, func() error {
var err error
ch, next, err = cs.client.GetConversationsContext(ctx, p)
return err
}); err != nil {
return fmt.Errorf("API error: %w", err)
}

Expand Down
105 changes: 103 additions & 2 deletions stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (

"github.com/rusq/chttp"
"github.com/rusq/slack"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/rusq/slackdump/v3/auth"
"github.com/rusq/slackdump/v3/internal/cache"
"github.com/rusq/slackdump/v3/internal/chunk"
Expand All @@ -21,8 +24,7 @@ import (
"github.com/rusq/slackdump/v3/internal/network"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/mocks/mock_processor"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"github.com/rusq/slackdump/v3/stream/mock_stream"
)

const testConversation = "CO720D65C25A"
Expand Down Expand Up @@ -342,3 +344,102 @@ func TestStream_Users(t *testing.T) {
err := s.Users(ctx, m)
assert.Error(t, err)
}

func TestStream_ListChannels(t *testing.T) {
testlimits := rateLimits{
channels: network.NewLimiter(network.NoTier, 100, 100),
tier: &network.DefLimits,
}
type args struct {
ctx context.Context
// proc processor.Channels
p *slack.GetConversationsParameters
}
tests := []struct {
name string
cs *Stream
args args
expectFn func(ms *mock_stream.MockSlacker, mc *mock_processor.MockChannels)
wantErr bool
}{
{
name: "happy path",
cs: &Stream{limits: testlimits},
args: args{ctx: context.Background(), p: &slack.GetConversationsParameters{}},
expectFn: func(ms *mock_stream.MockSlacker, mc *mock_processor.MockChannels) {
ms.EXPECT().
GetConversationsContext(gomock.Any(), gomock.Any()).
Return(fixtures.Load[[]slack.Channel](fixtures.TestChannels), "", nil)
mc.EXPECT().
Channels(gomock.Any(), gomock.Any()).
Return(nil)
},
wantErr: false,
},
{
name: "No channels returned, processor not called",
cs: &Stream{limits: testlimits},
args: args{ctx: context.Background(), p: &slack.GetConversationsParameters{}},
expectFn: func(ms *mock_stream.MockSlacker, mc *mock_processor.MockChannels) {
ms.EXPECT().
GetConversationsContext(gomock.Any(), gomock.Any()).
Return([]slack.Channel{}, "", nil)
},
wantErr: false,
},
{
name: "next cursor causes another iteration",
cs: &Stream{limits: testlimits},
args: args{ctx: context.Background(), p: &slack.GetConversationsParameters{}},
expectFn: func(ms *mock_stream.MockSlacker, mc *mock_processor.MockChannels) {
ms.EXPECT().
GetConversationsContext(gomock.Any(), gomock.Any()).
Return(fixtures.Load[[]slack.Channel](fixtures.TestChannels), "next", nil)
ms.EXPECT().
GetConversationsContext(gomock.Any(), gomock.Any()).
Return([]slack.Channel{}, "", nil)
mc.EXPECT().
Channels(gomock.Any(), gomock.Any()).
Return(nil)
},
wantErr: false,
},
{
name: "rate limiting error causes retry",
cs: &Stream{limits: testlimits},
args: args{ctx: context.Background(), p: &slack.GetConversationsParameters{}},
expectFn: func(ms *mock_stream.MockSlacker, mc *mock_processor.MockChannels) {
call := ms.EXPECT().
GetConversationsContext(gomock.Any(), gomock.Any()).
Return([]slack.Channel{}, "", &slack.RateLimitedError{RetryAfter: 100 * time.Millisecond})
ms.EXPECT().
GetConversationsContext(gomock.Any(), gomock.Any()).
Return(fixtures.Load[[]slack.Channel](fixtures.TestChannels), "", nil).
After(call)

mc.EXPECT().
Channels(gomock.Any(), gomock.Any()).
Return(nil)
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
ms := mock_stream.NewMockSlacker(ctrl)
mc := mock_processor.NewMockChannels(ctrl)

cs := tt.cs
cs.client = ms
if tt.expectFn != nil {
tt.expectFn(ms, mc)
}

if err := cs.ListChannels(tt.args.ctx, mc, tt.args.p); (err != nil) != tt.wantErr {
t.Errorf("Stream.ListChannels() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

0 comments on commit c11270d

Please sign in to comment.