title | date | category | tags | |||||
---|---|---|---|---|---|---|---|---|
MIT6.5840(6.824) Lab3: 分布式KV数据库 3A |
2024-01-30 12:21:22 -0800 |
|
|
本文将介绍lab3A
部分的实现, lab3A
要求基于raft
实现一个容错的分布式KV
数据库, 但不要求实现快照, 难度还是不小
Lab
文档见: http://nil.csail.mit.edu/6.5840/2023/labs/lab-kvraft.html
我的代码: https://github.com/GFX9/MIT6.5840/tree/lab3A
首先又是万恶的老旧代码bug
修复
在lab3A
中, kv
数据库的命令要求命令能够尽快被commit
, 且要求比一个心跳间隔更快, 但我在lab2
的实现中, 无论是否调用了Start
, 都不影响心跳的发送频率, 因此自然commit
速度很慢, 过不了测试。因此需要修改lab2
中Start
, 使其立即唤醒一次心跳
由于需要在发送心跳的携程函数SendHeartBeats
外控制心跳发送, 因此可以简单地修改SendHeartBeats
通过事件触发心跳发送, 而不是发送后简单地Sleep
,
- 首先设置一个心跳定时器
type Raft struct {
...
heartTimer *time.Timer
...
}
SendHeartBeats
通过事件触发心跳发送
func (rf *Raft) SendHeartBeats() {
for !rf.killed() {
<-rf.heartTimer.C
...
rf.ResetHeartTimer(HeartBeatTimeOut)
}
}
func (rf *Raft) ResetHeartTimer(timeStamp int) {
rf.heartTimer.Reset(time.Duration(timeStamp) * time.Millisecond)
}
Start
函数理解触发心跳
func (rf *Raft) Start(command interface{}) (int, int, bool) {
...
defer func() {
rf.ResetHeartTimer(1)
}()
return rf.VirtualLogIdx(len(rf.log) - 1), rf.currentTerm, true
}
其余重设定时器的地方就不在赘述了
简单说就是某节点2轮选举撞在了一起, 首先先回顾选举相关的结构体成员:
type Raft struct {
...
muVote sync.Mutex // 保护投票数据
voteCount int // 票数
...
}
这个bug
原来本来是没有的, 因为之前Start
并不会立即发送心跳,所以不容易出现如RPC
重复, RPC
乱序等问题, 但修改了Start
后, 并发场景更复杂, 因此出现了如下的场景:
- 某一时刻
Follower 2
进行选举 Follower 2
选举还没结束时, 又收到了新的Leader
的心跳, 证明选举结束了, 但由于选举的某个携程的RPC
响应很慢, 其还没有进行选举是否结束(自身变为了Follower
)的判断- 选举超时又被触发,
Follower 2
进行新一轮选举, 由于票数是以结构体成员voteCount
保存的, 因此voteCount
可能与之前的选票发生冲突
既然结构体成员会发生冲突, 那不如为每轮选票临时创建一个成员和投票锁:
func (rf *Raft) Elect() {
// 特别注意, 要先对muVote加锁, 再对mu加锁, 这是为了统一获取锁的顺序以避免死锁
...
voteCount := 1 // 自己有一票
var muVote sync.Mutex // 临时的投票锁
...
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go rf.collectVote(i, args, &muVote, &voteCount)
}
}
collectVote
函数就不展示了, 将原来的结构体成员muVote
和voteCount
换为临时创建的变量即可
首先先贴出官方提供的架构图:
简单说, 我们要建立的KV
数据库是位于raft
层之上的, 或者说我们的KV
数据库使用了raft
库。客户端(就是代码中的clerk
)调用应用层(server
)的RPC
,应用层收到RPC
之后,会调用Start
函数,Start
函数会立即返回,但是这时,应用层不会返回消息给客户端,因为它还没有执行客户端请求,它也不知道这个请求是否会被Raft
层commit
。只有在某一时刻,对应于这个客户端请求的消息在applyCh channel
中出现, 应用层才会执行这个请求,并返回响应给客户端。
对于上述过程, 可参考我在课堂笔记中画的图:
clerk
和真正的客户端交互并管理RPC
, 而server
收到请求后需要将请求传递给raft
层进行集群复制, 然后收到raft
的commit
, 在应用到状态机并返回给客户端。
但问题在于需要确保以上操作的线性一致性, 那什么时候会出现线形不一致的情况呢?就是重复的请求。因为网络问题,clerk
可能认为之前的请求丢包了, 所以会再次发送请求。而raft
层是无法判断这个请求是否重复的, 如果server
层没有特殊处理, 有的请可能在客户端看来只执行了一次, 但在server
执行了多次, 并且如果这是Put
等改变状态机的请求, 执行了多次将导致逻辑错误。
首先,server
需要判断某一个请求是否重复,最简单的方法就是让clerk
携带一个全局递增的序列号,并且server
需要在第一次将这个请求应用到状态机时记录这个序列号, 用以判断后续的请求是否重复。由于clerk
不是并发的, 所以server
只需要记录某个clerk
序列号最高的一个请求即可, 序列号更低的请求不会出现, 只需要考虑请求重复的场景。
除了记录某个clerk
请求的序列号外, 还需要记录器执行结果,因为如果是一个重复的Get
请求, 其返回的结果应该与其第一次发送请求时一致, 否则将导致线性不一致。如果是重复的Put
等改变状态机的请求,就不应该被执行
总结下来, 思路就是:
- 重复的
Put/Append
请求只在第一次出现时应用到状态机 - 记录每次应用到状态机的请求结果和序列号
先贴代码:
type Clerk struct {
servers []*labrpc.ClientEnd
seq uint64
identifier int64
leaderId int
}
identifier
用于标识clerk
, seq
是单调递增的序列号, 标记请求, identifier
和seq
一起标记了唯一的请求, leaderId
记录领导者
RPC
请求只需要额外携带identifier
和seq
, RPC
回复则需要携带结果和错误信息:
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
Seq uint64
Identifier int64
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
Seq uint64
Identifier int64
}
type GetReply struct {
Err Err
Value string
}
这2个函数很简单 ,不断轮询server
即可, 但是需要注意, 如果对方返回了超时错误和通道关闭错误等意料之外的错误, 需要重试
func (ck *Clerk) Get(key string) string {
args := &GetArgs{Key: key, Seq: ck.GetSeq(), Identifier: ck.identifier}
for {
reply := &GetReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.Get", args, reply)
if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
ck.leaderId += 1
ck.leaderId %= len(ck.servers)
continue
}
switch reply.Err {
case ErrChanClose:
continue
case ErrHandleOpTimeOut:
continue
case ErrKeyNotExist:
return reply.Value
}
return reply.Value
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := &PutAppendArgs{Key: key, Value: value, Op: op, Seq: ck.GetSeq(), Identifier: ck.identifier}
for {
reply := &PutAppendReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, reply)
if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
ck.leaderId += 1
ck.leaderId %= len(ck.servers)
continue
}
switch reply.Err {
case ErrChanClose:
continue
case ErrHandleOpTimeOut:
continue
}
return
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
重试
RPC
时, 需要新建reply
结构体, 重复使用同一个结构体将导致labgob
报错
根据前文分析可知, RPC handler
(就是Get/Put handler
)只会在raft
层的commit
信息到达后才能回复, 因此其逻辑顺序就是
- 将请求封装后通过接口
Start
交付给raft
层- 如果
raft
层节点不是Leader
, 返回相应错误 - 否则继续
- 如果
- 等待
commit
信息- 信息到达, 根据
commit
信息处理回复(具体是什么样的信息回复后面会说) - 超时, 返回相应错误
- 信息到达, 根据
分析到这里可知, 必然有一个协程在不断地接收raft
层的commit
日志(此后称为ApplyHandler
协程), 那上述提到的重复RPC
判别和处理是在ApplyHandler
中进行, 还是在RPC handler
中进行呢?
我的处理方式是在ApplyHandler
中进行, 因为ApplyHandler
是绝对串行的, 在其中处理这些日志是最安全的, 否则通过通道发送给RPC handler
货条件变量唤醒RPC handler
, 都存在一些并发同步的问题, 因此, ApplyHandler
需要进行重复RPC
判别和处理(可能需要存储), 并将这个请求(commit log
就对应一个请求)的结果返回给RPC handler
因此, 通过上述分析, server
结构体如下:
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
waiCh map[int]*chan result // 映射 startIndex->ch
historyMap map[int64]*result // 映射 Identifier->*result
maxraftstate int // snapshot if log grows this big
maxMapLen int
db map[string]string
}
type result struct {
LastSeq uint64
Err Err
Value string
ResTerm int
}
其中:
historyMap
记录某clerk
的最高序列号的请求的序列号和结果result
result
结构体存储一个请求的序列号和结果, 以及ResTerm
记录commit
被apply
时的term
, 因为其可能与Start
相比发生了变化, 需要将这一信息返回给客户端waiCh
纪录等待commit
信息的RPC handler
的通道
RPC handler
设计较为简单,只需要调用Start
, 等待commit
信息即可, 不过还需要考虑超时的错误处理
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// 先判断是不是leader
_, isLeader := kv.rf.GetState()
if !isLeader {
reply.Err = ErrNotLeader
return
}
opArgs := &Op{OpType: OPGet, Seq: args.Seq, Key: args.Key, Identifier: args.Identifier}
res := kv.HandleOp(opArgs)
reply.Err = res.Err
reply.Value = res.Value
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
// 先判断是不是leader
_, isLeader := kv.rf.GetState()
if !isLeader {
reply.Err = ErrNotLeader
return
}
opArgs := &Op{Seq: args.Seq, Key: args.Key, Val: args.Value, Identifier: args.Identifier}
if args.Op == "Put" {
opArgs.OpType = OPPut
} else {
opArgs.OpType = OPAppend
}
res := kv.HandleOp(opArgs)
reply.Err = res.Err
}
Get
和PutAppend
都将请求封装成Op
结构体, 统一给HandleOp
处理, HandleOp
处理ApplyHandler
发过来的commit
信息并生成回复, 这里我采用的通信方式是管道, 每一个请求会将自己创建的管道存储在waiCh
中, 并在函数离开时清理管道和waiCh
:
func (kv *KVServer) HandleOp(opArgs *Op) (res result) {
startIndex, startTerm, isLeader := kv.rf.Start(*opArgs)
if !isLeader {
return result{Err: ErrNotLeader, Value: ""}
}
kv.mu.Lock()
// 直接覆盖之前记录的chan
newCh := make(chan result)
kv.waiCh[startIndex] = &newCh
DPrintf("leader %v identifier %v Seq %v 的请求: 新建管道: %p\n", kv.me, opArgs.Identifier, opArgs.Seq, &newCh)
kv.mu.Unlock() // Start函数耗时较长, 先解锁
defer func() {
kv.mu.Lock()
delete(kv.waiCh, startIndex)
close(newCh)
kv.mu.Unlock()
}()
// 等待消息到达或超时
select {
case <-time.After(HandleOpTimeOut):
res.Err = ErrHandleOpTimeOut
DPrintf("server %v identifier %v Seq %v: 超时", kv.me, opArgs.Identifier, opArgs.Seq)
return
case msg, success := <-newCh:
if success && msg.ResTerm == startTerm {
res = msg
return
} else if !success {
// 通道已经关闭, 有另一个协程收到了消息 或 通道被更新的RPC覆盖
// TODO: 是否需要判断消息到达时自己已经不是leader了?
DPrintf("server %v identifier %v Seq %v: 通道已经关闭, 有另一个协程收到了消息 或 更新的RPC覆盖, args.OpType=%v, args.Key=%+v", kv.me, opArgs.Identifier, opArgs.Seq, opArgs.OpType, opArgs.Key)
res.Err = ErrChanClose
return
} else {
// term与一开始不匹配, 说明这个Leader可能过期了
DPrintf("server %v identifier %v Seq %v: term与一开始不匹配, 说明这个Leader可能过期了, res.ResTerm=%v, startTerm=%+v", kv.me, opArgs.Identifier, opArgs.Seq, res.ResTerm, startTerm)
res.Err = ErrLeaderOutDated
res.Value = ""
return
}
}
}
这里需要额外注意错误处理:
- 超时错误
- 通道关闭错误
Leader
可能过期的错误(term
不匹配)- 不是
Leader
的错误
同时这里还有一个难点, 就是如果出现了重复的RPC
, 他们都在等待commit
信息, 那么他们的管道存储在waiCh
中的key
是什么呢? 如果使用Identifier
或Seq
, 那么必然后来的RPC
会覆盖之前的管道, 可能造成错误, 因为两个重复RPC
的Identifier
或Seq
是一样的。 这里可以巧妙地利用Start
函数的第一个返回值, 其代表如果commit
成功, 其日志项的索引号, 由于raft
层不区分重复RPC
的log
, 因此这个索引号肯定是不同的, 不会相互覆盖
ApplyHandler
是3A
的最核心的部分, 其思路是:
- 先判断
log
请求的Identifier
和Seq
是否在历史记录historyMap
中是否存在, 如果存在就直接返回历史记录 - 不存在就需要应用到状态机, 并更新历史记录
historyMap
- 如果
log
请求的CommandIndex
对应的key
在waiCh
中存在, 表面当前节点可能是一个Leader
, 需要将结果发送给RPC handler
func (kv *KVServer) ApplyHandler() {
for !kv.killed() {
log := <-kv.applyCh
if log.CommandValid {
op := log.Command.(Op)
kv.mu.Lock()
// 需要判断这个log是否需要被再次应用
var res result
needApply := false
if hisMap, exist := kv.historyMap[op.Identifier]; exist {
if hisMap.LastSeq == op.Seq {
// 历史记录存在且Seq相同, 直接套用历史记录
res = *hisMap
} else if hisMap.LastSeq < op.Seq {
// 否则新建
needApply = true
}
} else {
// 历史记录不存在
needApply = true
}
_, isLeader := kv.rf.GetState()
if needApply {
// 执行log
res = kv.DBExecute(&op, isLeader)
res.ResTerm = log.SnapshotTerm
// 更新历史记录
kv.historyMap[op.Identifier] = &res
}
if !isLeader {
// 不是leader则继续检查下一个log
kv.mu.Unlock()
continue
}
// Leader还需要额外通知handler处理clerk回复
ch, exist := kv.waiCh[log.CommandIndex]
if !exist {
// 接收端的通道已经被删除了并且当前节点是 leader, 说明这是重复的请求, 但这种情况不应该出现, 所以panic
DPrintf("leader %v ApplyHandler 发现 identifier %v Seq %v 的管道不存在, 应该是超时被关闭了", kv.me, op.Identifier, op.Seq)
kv.mu.Unlock()
continue
}
kv.mu.Unlock()
// 发送消息
func() {
defer func() {
if recover() != nil {
// 如果这里有 panic,是因为通道关闭
DPrintf("leader %v ApplyHandler 发现 identifier %v Seq %v 的管道不存在, 应该是超时被关闭了", kv.me, op.Identifier, op.Seq)
}
}()
res.ResTerm = log.SnapshotTerm
*ch <- res
}()
}
}
}
这里有几大易错点:
- 需要额外传递
Term
以供RPC handler
判断与调用Start
时相比,term
是否变化, 如果变化, 可能是Leader
过期, 需要告知clerk
- 发送消息到通道时, 需要解锁
- 因为发送消息到通道时解锁, 所以通道可能被关闭, 因此需要单独在一个函数中使用
recover
处理发送消息到不存在的通道时的错误 - 这个
ApplyHandler
是leader
和follower
都存在的协程, 只不过follower
到应用到状态机和判重那里就结束了,leader
多出来告知RPC handler
结果的部分
DBExecute
就是将日志项应用到状态机, 逻辑很简单:
func (kv *KVServer) DBExecute(op *Op, isLeader bool) (res result) {
// 调用该函数需要持有锁
res.LastSeq = op.Seq
switch op.OpType {
case OPGet:
val, exist := kv.db[op.Key]
if exist {
kv.LogInfoDBExecute(op, "", val, isLeader)
res.Value = val
return
} else {
res.Err = ErrKeyNotExist
res.Value = ""
kv.LogInfoDBExecute(op, "", ErrKeyNotExist, isLeader)
return
}
case OPPut:
kv.db[op.Key] = op.Val
kv.LogInfoDBExecute(op, "", kv.db[op.Key], isLeader)
return
case OPAppend:
val, exist := kv.db[op.Key]
if exist {
kv.db[op.Key] = val + op.Val
kv.LogInfoDBExecute(op, "", kv.db[op.Key], isLeader)
return
} else {
kv.db[op.Key] = op.Val
kv.LogInfoDBExecute(op, "", kv.db[op.Key], isLeader)
return
}
}
return
}
执行测试命令
go test -v -run 3A
该代码经过150次测试没有报错