-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkvstore.go
128 lines (112 loc) · 2.74 KB
/
kvstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"bytes"
"encoding/gob"
"encoding/json"
"log"
"strings"
"sync"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
)
// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current applied key-value pairs
snapshotter *snap.Snapshotter
}
type kv struct {
Key string
Value string
}
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit,
errorC <-chan error) *kvstore {
s := &kvstore{
proposeC: proposeC,
kvStore: make(map[string]string),
snapshotter: snapshotter,
}
snapshot, err := s.loadSnapshot()
if err != nil {
log.Panic(err)
}
if snapshot != nil {
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}
go s.readCommits(commitC, errorC)
return s
}
func (s *kvstore) Lookup(key string) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.kvStore[key]
return v, ok
}
func (s *kvstore) Propose(k string, v string) {
var buf strings.Builder
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
if commit == nil {
// signaled to load snapshot
snapshot, err := s.loadSnapshot()
if err != nil {
log.Panic(err)
}
if snapshot != nil {
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}
continue
}
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("kvstore: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Value
s.mu.Unlock()
}
close(commit.applyDoneC)
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
func (s *kvstore) getSnapshot() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return json.Marshal(s.kvStore)
}
func (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) {
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return nil, nil
}
if err != nil {
return nil, err
}
return snapshot, nil
}
func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {
var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
return nil
}