forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_group_test.go
95 lines (85 loc) · 2.72 KB
/
consumer_group_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package sarama
import (
"context"
"sync"
"testing"
)
type handler struct {
*testing.T
cancel context.CancelFunc
}
func (h *handler) Setup(s ConsumerGroupSession) error { return nil }
func (h *handler) Cleanup(s ConsumerGroupSession) error { return nil }
func (h *handler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
sess.MarkMessage(msg, "")
h.Logf("consumed msg %v", msg)
h.cancel()
break
}
return nil
}
// TestConsumerGroupNewSessionDuringOffsetLoad ensures that the consumer group
// will retry Join and Sync group operations, if it receives a temporary
// OffsetsLoadInProgress error response, in the same way as it would for a
// RebalanceInProgress.
func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
config := NewTestConfig()
config.ClientID = t.Name()
config.Version = V2_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Retry.Max = 2
config.Consumer.Offsets.AutoCommit.Enable = false
broker0 := NewMockBroker(t, 0)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my-topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my-topic", 0, OffsetOldest, 0).
SetOffset("my-topic", 0, OffsetNewest, 1),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, "my-group", broker0),
"HeartbeatRequest": NewMockHeartbeatResponse(t),
"JoinGroupRequest": NewMockSequence(
NewMockJoinGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
NewMockJoinGroupResponse(t),
),
"SyncGroupRequest": NewMockSequence(
NewMockSyncGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
NewMockSyncGroupResponse(t).SetMemberAssignment(
&ConsumerGroupMemberAssignment{
Version: 0,
Topics: map[string][]int32{
"my-topic": {0},
},
}),
),
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(
"my-group", "my-topic", 0, 0, "", ErrNoError,
).SetError(ErrNoError),
"FetchRequest": NewMockSequence(
NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 0, StringEncoder("foo")).
SetMessage("my-topic", 0, 1, StringEncoder("bar")),
NewMockFetchResponse(t, 1),
),
})
group, err := NewConsumerGroup([]string{broker0.Addr()}, "my-group", config)
if err != nil {
t.Fatal(err)
}
defer func() { _ = group.Close() }()
ctx, cancel := context.WithCancel(context.Background())
h := &handler{t, cancel}
var wg sync.WaitGroup
wg.Add(1)
go func() {
topics := []string{"my-topic"}
if err := group.Consume(ctx, topics, h); err != nil {
t.Error(err)
}
wg.Done()
}()
wg.Wait()
}