-
Notifications
You must be signed in to change notification settings - Fork 5
/
hashring.go
131 lines (107 loc) · 2.72 KB
/
hashring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package hashring
import (
"fmt"
"hash"
"hash/fnv"
"sort"
"sync"
)
// nodeIdx type implementing Sort Interface
type nodeIdx []uint32
// Len returns the size of nodeIdx
func (idx nodeIdx) Len() int {
return len(idx)
}
// Swap swaps the ith with jth
func (idx nodeIdx) Swap(i, j int) {
idx[i], idx[j] = idx[j], idx[i]
}
// Less returns true if ith <= jth else false
func (idx nodeIdx) Less(i, j int) bool {
return idx[i] <= idx[j]
}
// HashRing to hold the nodes and indexes
type HashRing struct {
nodes map[uint32]string // map to idx -> node
idx nodeIdx // sorted indexes
replicaCount int // replicas to be inserted
hash hash.Hash32
mu sync.RWMutex // to protect above fields
}
// New returns a Hash ring with provided virtual node count and hash
// If hash is nil, fvn32a is used instead
func New(replicaCount int, hash hash.Hash32) *HashRing {
if hash == nil {
hash = fnv.New32a()
}
return &HashRing{
nodes: make(map[uint32]string),
replicaCount: replicaCount,
hash: hash,
}
}
// getHash returns uint32 hash
func getHash(hash hash.Hash32, key []byte) (uint32, error) {
hash.Reset()
_, err := hash.Write(key)
if err != nil {
return 0, err
}
return hash.Sum32(), nil
}
// Add adds a node to Hash ring
func (hr *HashRing) Add(node string) error {
hr.mu.Lock()
defer hr.mu.Unlock()
for i := 0; i < hr.replicaCount; i++ {
key := fmt.Sprintf("%s:%d", node, i)
hkey, err := getHash(hr.hash, []byte(key))
if err != nil {
return fmt.Errorf("failed to add node: %v", err)
}
hr.idx = append(hr.idx, hkey)
hr.nodes[hkey] = node
}
sort.Sort(hr.idx)
return nil
}
// getKeys returns the keys of map m
func getKeys(m map[uint32]string) (idx nodeIdx) {
for k := range m {
idx = append(idx, k)
}
return idx
}
// Delete deletes the nodes from hash ring
func (hr *HashRing) Delete(node string) error {
hr.mu.Lock()
defer hr.mu.Unlock()
for i := 0; i < hr.replicaCount; i++ {
key := fmt.Sprintf("%s:%d", node, i)
hkey, err := getHash(hr.hash, []byte(key))
if err != nil {
return fmt.Errorf("failed to delete node: %v", err)
}
delete(hr.nodes, hkey)
}
hr.idx = getKeys(hr.nodes)
sort.Sort(hr.idx)
return nil
}
// Locate returns the node for a given key
func (hr *HashRing) Locate(key string) (node string, err error) {
hr.mu.RLock()
defer hr.mu.RUnlock()
if len(hr.idx) < 1 {
return node, fmt.Errorf("no available nodes")
}
hkey, err := getHash(hr.hash, []byte(key))
if err != nil {
return node, fmt.Errorf("failed to fetch node: %v", err)
}
pos := sort.Search(len(hr.idx), func(i int) bool { return hr.idx[i] >= hkey })
if pos == len(hr.idx) {
return hr.nodes[hr.idx[0]], nil
}
return hr.nodes[hr.idx[pos]], nil
}