title | date | category | tags | |||||
---|---|---|---|---|---|---|---|---|
MIT6.5840(6.824) Lab3: 分布式KV数据库 3B |
2024-02-03 10:19:25 -0800 |
|
|
本文将介绍lab3B
部分的实现, lab3B
要求基于在lab3A
的基础上实现快照, 仅从lab
本省来讲其实很简单, 可以说是目前我做的所有lab
里面难度最小的一个模块了, 但实际上遇到了很多困难, 调试的时间比lab3A
还久, 核心原因就是自己的lab2
实现的raft
底层有一些问题(真真真真太折磨了, 即时过了所有单元测试, 还是时不时地在后续的lab
暴露出问题来, 并且这些问题还挺难定位, 得仔细分析海量的log
输出才可以)
Lab
文档见: http://nil.csail.mit.edu/6.5840/2023/labs/lab-kvraft.html
我的代码: https://github.com/GFX9/MIT6.5840/tree/lab3B
简单说, lab3B
就是要在底层raft
的log
过大时生成快照并截断日志, 从而节省内存空间, 并且快照会持久化存储到本地。因此, 原来的代码结构只需要在以下几个方面做出调整:
- 需要再某个地方定期地判断底层
raft
的日志大小, 决定是否要生成快照, 生成快照直接调用我们在lab2
中实现的接口Snapshot
即可 - 由于
follower
的底层raft
会出现无法从Leader
获取log
的情况, 这时Leader
会发送给follower
的raft
层一个快照,raft
层会将其上交给server
,server
通过快照改变自己的状态机 server
启动时需要判断是否有持久化的快照需要加载, 如果有就加载
快照首先应该包含的肯定是内存中的KV
数据库, 也就是自己维护的map
, 但是还应该包含对每个clerk
序列号的记录信息, 因为从快照恢复后的server
应该具备判断重复的客户端请求的能力, 同时也应该记录最近一次应用到状态机的日志索引, 凡是低于这个索引的日志都是包含在快照中
因此, server
结构体需要添加如下成员:
type KVServer struct {
...
persister *raft.Persister
lastApplied int
}
通过上述分析, 快照的加载和生成就很简单了,代码如下:
func (kv *KVServer) GenSnapShot() []byte {
// 调用时必须持有锁mu
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.db)
e.Encode(kv.historyMap)
serverState := w.Bytes()
return serverState
}
func (kv *KVServer) LoadSnapShot(snapShot []byte) {
// 调用时必须持有锁mu
if len(snapShot) == 0 || snapShot == nil {
ServerLog("server %v LoadSnapShot: 快照为空", kv.me)
return
}
r := bytes.NewBuffer(snapShot)
d := labgob.NewDecoder(r)
tmpDB := make(map[string]string)
tmpHistoryMap := make(map[int64]*Result)
if d.Decode(&tmpDB) != nil ||
d.Decode(&tmpHistoryMap) != nil {
ServerLog("server %v LoadSnapShot 加载快照失败\n", kv.me)
} else {
kv.db = tmpDB
kv.historyMap = tmpHistoryMap
ServerLog("server %v LoadSnapShot 加载快照成功\n", kv.me)
}
}
GenSnapShot
和LoadSnapShot
分别生成和加载快照, 唯一需要注意的就是这两个函数应当在持有锁时才能调用
由于ApplyHandler
协程会不断地读取raft commit
的通道, 所以每收到一个log
后进行判断即可:
func (kv *KVServer) ApplyHandler() {
for !kv.killed() {
log := <-kv.applyCh
if log.CommandValid {
...
// 如果在follower一侧, 可能这个log包含在快照中, 直接跳过
if log.CommandIndex <= kv.lastApplied {
kv.mu.Unlock()
continue
}
...
// 每收到一个log就检测是否需要生成快照
if kv.maxraftstate != -1 && kv.persister.RaftStateSize() >= kv.maxraftstate/100*95 {
// 当达到95%容量时需要生成快照
snapShot := kv.GenSnapShot()
kv.rf.Snapshot(log.CommandIndex, snapShot)
}
kv.mu.Unlock()
}
...
}
}
这里还需要进行之前提到的判断: 低于lastApplied
索引的日志都是包含在快照中, 在尽显lab3A
的操作之后, 再判断是否需要生成快照, 在我的实现中, 如果仅仅比较maxraftstate
和persister.RaftStateSize()
相等才生成快照的话, 无法通过测例, 因为可能快照RPC
存在一定延时, 所以我采用的手段是只要达到阈值的95%, 就生成快照
首先启动时需要判断是否需要加载快照, 然后就是ApplyHandler
从通道收到快照时需要判断加载, 都很简单:
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
...
kv.persister = persister
...
// 先在启动时检查是否有快照
kv.mu.Lock()
kv.LoadSnapShot(persister.ReadSnapshot())
kv.mu.Unlock()
go kv.ApplyHandler()
return kv
}
func (kv *KVServer) ApplyHandler() {
for !kv.killed() {
log := <-kv.applyCh
if log.CommandValid {
...
} else if log.SnapshotValid {
// 日志项是一个快照
kv.mu.Lock()
if log.SnapshotIndex >= kv.lastApplied {
kv.LoadSnapShot(log.Snapshot)
kv.lastApplied = log.SnapshotIndex
}
kv.mu.Unlock()
}
}
}
这里才是这个lab
的重头戏, 我在完成上述所有修改后, 会在TestSnapshotUnreliable3B
这个单元测试中大概率卡死, 一直会卡到go
默认的十分钟单元测试时间截止后才报错退出, 在反复检查了死锁和持锁接发通道消息等常见问题并确认无误后, 我再次观察超时报错的堆栈信息和日志输出, 得到结论就是:
raft
层因为无法承受测试的高并发程度而导致大量的RPC
请求失败, 从而导致clerk
无限重复发送请求RPC
(我的实现是RPC
请求失败)就重试
......陷入了沉思, 自己还是菜啊, 但代码还得慢慢修, 总不能把raft
推倒重来吧......
我最后分别从raft
层和server
层进行了优化
通过对日志的调试发现, AppendEntries RPC
数量太多了, 这是因为我在lab3A
中做了如下修改:
func (rf *Raft) Start(command interface{}) (int, int, bool) {
...
defer func() {
rf.ResetHeartTimer(1)
}()
return rf.VirtualLogIdx(len(rf.log) - 1), rf.currentTerm, true
}
也就是在接受一个请求并追加一个log
后立即发送AppendEntries RPC
, 但是如果在高并发的场景下, 新的请求绵绵不断地到来, 每到达一个请求都发一个RPC
, 并且每个RPC
可能只包含了长度为1的日志切片, 这是不太合理的设计, 过多的RPC
使得raft
无法及时处理而出现RPC
卡死的情况, 因此, 我手动修改了重置定时器的时间为15ms
, 这个值比心跳间隔小很多, 但又不是很小, 足以在满足响应速度的前提下摊销多个命令, 使一次AppendEntries RPC
包含多个新的日志项:
func (rf *Raft) Start(command interface{}) (int, int, bool) {
...
defer func() {
rf.ResetHeartTimer(15)
}()
return rf.VirtualLogIdx(len(rf.log) - 1), rf.currentTerm, true
}
至于为什么是15ms
..., 我自己也说不出理由, 随便设的, 比心跳小很多, 但又不太小就是了, 本质目的就是积攒多个AppendEntries RPC
后一次性发送, 避免AppendEntries RPC
数量过大
在我原来的设计中, InstallSnapshot RPC
的发送有2中情形:
handleAppendEntries
在处理AppendEntries RPC
回复时发现follower
需要的日志项背快照截断, 立即调用go rf.handleInstallSnapshot(serverTo)
协程发送快照- 心跳函数发送时发现
PrevLogIndex < rf.lastIncludedIndex
, 则发送快照
这和之前的情形类似, 在高并发的场景下,follower
和Leader
之间的日志复制也很频繁, 如果某一个日志触发了InstallSnapshot RPC
的发送, 接下来连续很多个日志也会触发InstallSnapshot RPC
的发送, 因为InstallSnapshot RPC
的发送时间消耗更大, 这样以来, 又加大了raft
的压力, 所以, 我对InstallSnapshot RPC
的发送做出修改:
handleAppendEntries
在处理AppendEntries RPC
回复时发现follower
需要的日志项背快照截断, 仅仅设置rf.nextIndex[serverTo] = rf.lastIncludedIndex
, 这将导致下一次心跳时调用go rf.handleInstallSnapshot(serverTo)
协程发送快照- 心跳函数发送时发现
PrevLogIndex < rf.lastIncludedIndex
, 则发送快照
代码如下:
func (rf *Raft) handleAppendEntries(serverTo int, args *AppendEntriesArgs) {
...
if reply.Term == rf.currentTerm && rf.role == Leader {
// term仍然相同, 且自己还是leader, 表名对应的follower在prevLogIndex位置没有与prevLogTerm匹配的项
// 快速回退的处理
if reply.XTerm == -1 {
// PrevLogIndex这个位置在Follower中不存在
DPrintf("leader %v 收到 server %v 的回退请求, 原因是log过短, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, serverTo, rf.nextIndex[serverTo], serverTo, reply.XLen)
if rf.lastIncludedIndex >= reply.XLen {
// 由于snapshot被截断
// 下一次心跳添加InstallSnapshot的处理
rf.nextIndex[serverTo] = rf.lastIncludedIndex
} else {
rf.nextIndex[serverTo] = reply.XLen
}
return
}
...
if i == rf.lastIncludedIndex && rf.log[rf.RealLogIdx(i)].Term > reply.XTerm {
// 要找的位置已经由于snapshot被截断
// 下一次心跳添加InstallSnapshot的处理
rf.nextIndex[serverTo] = rf.lastIncludedIndex
} else if rf.log[rf.RealLogIdx(i)].Term == reply.XTerm {
...
} else {
// 之前PrevLogIndex发生冲突位置时, Follower的Term自己没有
if reply.XIndex <= rf.lastIncludedIndex {
// XIndex位置也被截断了
// 添加InstallSnapshot的处理
rf.nextIndex[serverTo] = rf.lastIncludedIndex
} else {
rf.nextIndex[serverTo] = reply.XIndex
}
}
return
}
}
server
层应该尽量减小对raft
层的接口的调用, 因为大量的接口调用将获取raft
层的一把大锁, 从而阻碍RPC
的响应
之前的实现中, 无论是Put/Append
还是Get
, 都是封装成OP
结构体, 在HandleOp
中一股脑调用Start
扔给raft
层处理, 然后在ApplyHandler
处进行去重判断, 现在可以在调用raft
层的Start
之前就从historyMap
中判断是否有历史记录可以直接返回:
func (kv *KVServer) HandleOp(opArgs *Op) (res Result) {
// 先判断是否有历史记录
kv.mu.Lock()
if hisMap, exist := kv.historyMap[opArgs.Identifier]; exist && hisMap.LastSeq == opArgs.Seq {
kv.mu.Unlock()
ServerLog("leader %v HandleOp: identifier %v Seq %v 的请求: %s(%v, %v) 从历史记录返回\n", kv.me, opArgs.Identifier, opArgs.OpType, opArgs.Key, opArgs.Val)
return *hisMap
}
kv.mu.Unlock()
...
}
ratf
的GetState
也会获取锁, 从而阻碍RPC
的响应速度, 我原来的实现中, GetState
会在2个地方调用:
Get
和PutAppend
调用GetState
判断是否是leader
, 不是则返回错误ApplyHandler
在通过通道唤醒HandleOp
时, 需要判断当前节点是不是leader
, 不是leader
则不需要唤醒
以上2不操作看似合理, 但实际上是冗余的:
- 首先,
Get
和PutAppend
在后续的HandleOp
会调用Start
,Start
也会因为当前节点不是leader
而返回, 所以GetState
是冗余的, 反而阻碍RPC
响应速度 - 其次,
ApplyHandler
在通过通道唤醒HandleOp
时, 日志项本身有term
的记录,HandleOp
会调用Start
时也会获取那时的term
,HandleOp
只需要在被唤醒后比较前后的term
是否相同, 就可以判断出当前的节点是不是一个过时的leader
以上2处修改很简单, 由于是删代码而不是新增和修改, 就不贴代码了, 感兴趣可以看仓库
这个修改也很简单, 如果server
返回了需要重试类型的错误, clerk
先sleep
一会, 再重试, 代码如下:
func (ck *Clerk) Get(key string) string {
args := &GetArgs{Key: key, Seq: ck.GetSeq(), Identifier: ck.identifier}
for {
reply := &GetReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.Get", args, reply)
if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
if !ok {
reply.Err = ERRRPCFailed
}
if reply.Err != ErrNotLeader {
DPrintf("clerk %v Seq %v 重试Get(%v), Err=%s", args.Identifier, args.Key, args.Key, reply.Err)
}
ck.leaderId += 1
ck.leaderId %= len(ck.servers)
time.Sleep(RpcRetryInterval)
continue
}
switch reply.Err {
case ErrChanClose:
DPrintf("clerk %v Seq %v 重试Get(%v), Err=%s", args.Identifier, args.Key, args.Key, reply.Err)
time.Sleep(time.Microsecond * 5)
continue
case ErrHandleOpTimeOut:
DPrintf("clerk %v Seq %v 重试Get(%v), Err=%s", args.Identifier, args.Key, args.Key, reply.Err)
time.Sleep(RpcRetryInterval)
continue
case ErrKeyNotExist:
DPrintf("clerk %v Seq %v 成功: Get(%v)=%v, Err=%s", args.Identifier, args.Key, args.Key, reply.Value, reply.Err)
return reply.Value
}
DPrintf("clerk %v Seq %v 成功: Get(%v)=%v, Err=%s", args.Identifier, args.Key, args.Key, reply.Value, reply.Err)
return reply.Value
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := &PutAppendArgs{Key: key, Value: value, Op: op, Seq: ck.GetSeq(), Identifier: ck.identifier}
for {
reply := &PutAppendReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, reply)
if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
if !ok {
reply.Err = ERRRPCFailed
}
if reply.Err != ErrNotLeader {
DPrintf("clerk %v Seq %v 重试%s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
}
ck.leaderId += 1
ck.leaderId %= len(ck.servers)
time.Sleep(RpcRetryInterval)
continue
}
switch reply.Err {
case ErrChanClose:
DPrintf("clerk %v Seq %v 重试%s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
time.Sleep(RpcRetryInterval)
continue
case ErrHandleOpTimeOut:
DPrintf("clerk %v Seq %v 重试%s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
time.Sleep(RpcRetryInterval)
continue
}
DPrintf("clerk %v Seq %v 成功: %s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
return
}
}
- 执行测试命令测试
lab3B
go test -run 3B
结果如下:
可以看出, 取消立即发送心跳广播导致前3个测例满了不少, 但也就凑活吧(不想优化了, 麻木了)
该代码经过150次测试没有报错
- 执行测试命令测试整个
lab3
go test -run 3
- 修改后再次测试
lab2
cd ../raft/
go test -run 2
该代码经过150次测试没有报错