-
Notifications
You must be signed in to change notification settings - Fork 3
/
redis_dlm.go
154 lines (122 loc) · 2.83 KB
/
redis_dlm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package dlm
import (
"fmt"
"sync"
"time"
"gopkg.in/redis.v4"
)
const redisReleaseScript = `
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end`
// RedisDLM is a DLM that uses Redis as a backend. The implementation is based in
// the single node algorithm described here: http://redis.io/topics/distlock
type RedisDLM struct {
client *redis.Client
namespace string
}
// NewRedisDLM creates a new RedisDLM.
func NewRedisDLM(addr string, opts *Options) (DLM, error) {
if opts == nil {
opts = &Options{}
}
client := redis.NewClient(&redis.Options{
Addr: addr,
})
return &RedisDLM{client, opts.Namespace}, nil
}
// NewLock creates a lock for the given key. The returned lock is not held
// and must be acquired with a call to .Lock.
func (d *RedisDLM) NewLock(key string, opts *LockOptions) (Locker, error) {
if opts == nil {
opts = &LockOptions{}
}
opts = opts.WithDefaults()
token, err := randstr(32)
if err != nil {
return nil, fmt.Errorf("failed to generate random token: %v", err)
}
lock := redisLock{
ttl: opts.TTL,
waitTime: opts.WaitTime,
retryTime: opts.RetryTime,
client: d.client,
namespace: d.namespace,
key: key,
token: token,
}
return &lock, nil
}
type redisLock struct {
mutex sync.Mutex // Used while manipulating the internal state of the lock itself
client *redis.Client
ttl time.Duration
waitTime time.Duration
retryTime time.Duration
namespace string
key string
token string // A random string used to safely release the lock
isHeld bool
}
func (l *redisLock) Key() string {
return l.key
}
func (l *redisLock) Namespace() string {
return l.namespace
}
func (l *redisLock) Lock() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.isHeld {
return ErrLockHeld
}
key := l.namespace + l.key
ok, err := l.client.SetNX(key, l.token, l.ttl).Result()
if err != nil {
return fmt.Errorf("failed to acquire lock: %v", err)
}
if ok {
l.isHeld = true
return nil
}
timeout := time.NewTimer(l.waitTime)
retry := time.NewTicker(l.retryTime)
defer retry.Stop()
for {
select {
case <-timeout.C:
return ErrCannotLock
case <-retry.C:
ok, err := l.client.SetNX(key, l.token, l.ttl).Result()
if err != nil {
timeout.Stop()
return fmt.Errorf("failed to acquire lock: %v", err)
}
if ok {
l.isHeld = true
timeout.Stop()
return nil
}
}
}
}
func (l *redisLock) Unlock() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.isHeld {
return ErrLockNotHeld
}
key := l.namespace + l.key
n, err := l.client.Eval(redisReleaseScript, []string{key}, l.token).Result()
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}
if n.(int64) == 0 {
// the lock has already expired
return ErrLockNotHeld
}
l.isHeld = false
return nil
}