forked from twitchscience/kinsumer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
checkpoints_test.go
106 lines (87 loc) · 3.06 KB
/
checkpoints_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright (c) 2016 Twitch Interactive.
package kinsumer
import (
"testing"
"time"
"github.com/twitchscience/kinsumer/mocks"
)
func TestCheckpointer(t *testing.T) {
table := "checkpoints"
mock := mocks.NewMockDynamo([]string{table})
stats := &NoopStatReceiver{}
cp, err := capture("shard", table, mock, "ownerName", "ownerId", 3*time.Minute, stats)
// Initially, we expect that there is no record, so our new record should have no sequence number
if err != nil {
t.Errorf("current 1 err=%q", err)
}
if cp == nil {
t.Errorf("Should always be able to capture the shard if there is no entry in dynamo")
}
if cp.sequenceNumber != "" {
t.Errorf("sequence number should initially be an empty string")
}
// Update the sequence number. This shouldn't cause any external request.
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "update(seq1)", func() {
cp.update("seq1")
})
// Now actually commit.
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq1)", func() {
if _, err = cp.commit(); err != nil {
t.Errorf("commit seq1 err=%q", err)
}
})
// Call update, but keep the same sequence number
cp.update("seq1")
// Since the sequence number hasn't changed, committing shouldn't make a request.
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "commit unchanged sequence number", func() {
if _, err = cp.commit(); err != nil {
t.Errorf("commit unchanged err=%q", err)
}
})
// Call update again with a new value
cp.update("seq2")
// committing should trigger a request
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq2)", func() {
if _, err = cp.commit(); err != nil {
t.Errorf("commit seq2 err=%q", err)
}
})
// Call update with a new value twice in a row
cp.update("seq3")
cp.update("seq3")
// This should still trigger an update
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq3)", func() {
if _, err = cp.commit(); err != nil {
t.Errorf("commit seq3 err=%q", err)
}
})
// Try to get another checkpointer for this shard, should not succeed but not error
cp2, err := capture("shard", table, mock, "differentOwner", "differentOwnerId", 3*time.Minute, stats)
if err != nil {
t.Errorf("cp2 first attempt err=%q", err)
}
if cp2 != nil {
t.Errorf("Should not be able to steal shard")
}
cp.update("lastseq")
// release should trigger an update
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "cp.release()", func() {
if err = cp.release(); err != nil {
t.Errorf("release err=%q", err)
}
})
//TODO: Test fails because dynamo mock does not handle replacing records in put, need to resolve that
/*
// Now that we have released the shard, we should be able to grab it
cp2, err = newCheckpointer(aws.String("shard"), table, mock, "differentOwner", "differentOwnerId", 3*time.Minute)
if err != nil {
t.Errorf("cp2 second attempt err=%q", err)
}
if cp2 == nil {
t.Errorf("The shard should be ours!")
}
if cp2.sequenceNumber != "lastseq" {
t.Errorf("Release should have committed `lastseq` but new checkpointer got %s!", cp2.sequenceNumber)
}
*/
}