架构
架构图:
简单说, 我们要建立的KV
数据库是位于raft
层之上的, 或者说我们的KV
数据库使用了raft
库。客户端(就是代码中的clerk
)调用应用层(server
)的RPC
,应用层收到RPC
之后,会调用Start
函数,Start
函数会立即返回,但是这时,应用层不会返回消息给客户端,因为它还没有执行客户端请求,它也不知道这个请求是否会被Raft
层commit
。只有在某一时刻,对应于这个客户端请求的消息在applyCh channel
中出现, 应用层才会执行这个请求,并返回响应给客户端。
操作流程
这是一个基于 Raft一致性算法 的、复制的状态机 (Replicated State Machine) 系统,具体实现为一个键值存储 (Key/Value Store)。其核心思想是:多个服务器通过 Raft 协议来同步操作日志,从而保证所有服务器上的状态机(即您的键值存储数据)最终保持一致。
代码被组织成三个主要部分,对应图中的三个模块:
client.go
: 客户端库,供应用程序调用,发起Get
和Put
请求。server.go
(KVServer): 键值存储服务器。它接收客户端的 RPC 请求,并将其提交给 Raft 集群。rsm.go
: 复制的状态机 (Replicated State Machine) 层。这是连接你的业务逻辑(键值存储)和底层一致性模块(Raft)的核心桥梁。它负责将操作提交给 Raft、从 Raft 应用已提交的日志条目到状态机,以及管理快照。
图片中描述的三个流程清晰地展示了这些模块如何协作
结合代码仔细看一下操作流程图就能明白这个试验要做什么
个人理解:
-
一个raft节点,rsm(状态机)和server 同时存在一个物理主机上, 它们之间存在函数相互调用的关系, 而raft节点之间以及client和server之间通过RPC通信,
-
rsm就是状态机, 但是实际的状态指的是server中的k/v数据, 从图中也可看出实际上命令的执行(状态的改变)发生在server端, 状态机只是作为raft和server的中间层, 流程如下:
- server端注册的RPC函数Get/Put, 收到来自于client的get/put请求
- server先把RPC的Args传给状态机, 状态机再把命令发给raft层,使raft层节点数据保持一致
- raft层提交一个log后, 状态机通过监听applyCh去再次获取命令, 然后调用server端的DoOp(命令)去执行客户端请求, 随后把执行结果(DoOp返回值) 传给rsm的submit, 最后submit再把结果以返回值形式给server, server再把结果给client…
- 说白了rsm就是一个传信的…, 个人感觉不是很有必要搞这个中间层
-
再次声明, 如果你想通过一次测试很简单, 读一下要求, 几个小时就搞定了, 但是你想在不确定性的网络以及随时会crash掉的节点的条件下, 通过成百上千上万….次测试, 也就是实现一个基本不出错的分布式系统, 绝非易事
lab4A
踩坑实录
这个4A怪不得难度是moderate/hard, 代码量少,但坑不少…
-
rsm通过start发送给raft的和从applyCh中取回的内容要保持一致; submit()和DoOp()的参数保持一致
-
Op的id就用raft的log的Index就行
-
在从applyCh中收到消息后要进行一些判断, 因为可能因为网络分区, 节点宕机等原因, 会发生leader改变, 重复选举等情况, 只有真正提交log的leader才能回复客户端OK, 注意情况如下:
-
从applyCh中收到消息意味着有log提交了, 可以去应用了, 但是这个被应用的, 不一定是之前发送的即command改变了, 改变原因可能是leader收到了log, 但是还没完成提议阶段, leader挂了, 那么在并发场景下新上任的leader, 可能会应用其他log, 旧leader未提交的log被覆盖了, 因此需要比较command
-
需要比较term, term的改变,可能意味着leader的改变, 那么这个旧leader的未提交log可能会被覆盖,
-
注意: 比较的当前term不能用currentTerm, _ := rsm.rf.GetState()获取, 因为当发生网络分区时, 如果旧leader在小分区中, 仍然为自己是leader, 也就是当前状态机对应的raft节点还认为自己是leader, 那你通过currentTerm, _ := rsm.rf.GetState()获取, 结果就是true…, 会通不过网络分区test
-
在raft的applyCh中加一个字段, 在applier时返回当前raft节点的term
if opEntry.term != applyMsg.Term || !reflect.DeepEqual(opEntry.req, op.Req) {}
-
-
不要根据 isLeader去判断, 因为有可能这个leader是提交了log后才退位的
-
清理全部的剩余操作(要不然前leader中未被committed的log可能会被应用并submit返回成功), 如下:
if opEntry.term != applyMsg.Term || !reflect.DeepEqual(opEntry.Command, applyMsg.Command) {// 当你在reader()里发现某个index被apply时,发现term或command不一致(即被覆盖),// 说明这个index及其之后的所有pendingOps都不可能被commitfor i, op := range rsm.opMap {if i >= applyMsg.CommandIndex {op.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}// close(opEntry.ch)delete(rsm.opMap, i)}} }
-
-
当我多次执行TestRestartSubmit4A 这个测试, 发现还是大概有几十分之一的概率通不过测试, 报错是因为在raft层当kill()的时候关闭了applyCh, 而此时有可能applier中还在把提交的命令发给applyCh, 刚开始我只是在发给applyCh前加了判断:
if rf.killed() {return } case applyCh <- *applyMsg
但是因为没锁, 有可能在killed检查和applyCh <- *applyMsg 之间进行了关闭了applyCh, 为了避免向已关闭的applyCh发消息, 我使用了sync.waitGroup在kill()中阻塞close(rf.applyCh),直到applier中发完:
func (rf *Raft) Kill() {atomic.StoreInt32(&rf.dead, 1) // 原子操作设置终止标志// Your code here, if desired.rf.applyCond.Broadcast() // 唤醒所有等待的goroutine// close(rf.done) // 关闭done通道,通知所有监听的goroutine退出rf.wg.Wait() // 等待所有goroutine退出close(rf.applyCh) }func (rf *Raft) applier(applyCh chan raftapi.ApplyMsg) {rf.wg.Add(1)defer rf.wg.Done()for !rf.killed() {rf.mu.Lock()// 等待直到有新的日志可以应用for rf.commitIndex <= rf.lastapplied || rf.snapShotLock {rf.applyCond.Wait() // 使用条件变量等待// 检查是否已被杀死if rf.killed() {rf.mu.Unlock()return}}// 检查是否已被杀死if rf.killed() {rf.mu.Unlock()return}applyMsgs := make([]*raftapi.ApplyMsg, 0)// 应用所有新的已提交日志tmpApplied := rf.lastappliedfor rf.commitIndex > tmpApplied {// rf.applyLock = truetmpApplied++// // DPrintf("applier---节点server %v, 在term: %v, 欲将日志添加到applyMsgs, 最新应用的日志索引为: %v\n", rf.me, rf.currentTerm, tmpApplied)if tmpApplied <= rf.globalLastIncludedIndex {// 该日志已经被快照覆盖,跳过continue}// 应用日志到状态机applyMsg := &raftapi.ApplyMsg{CommandValid: true,Command: rf.log[rf.toLocalIndex(tmpApplied)].Command,CommandIndex: tmpApplied,}applyMsgs = append(applyMsgs, applyMsg)}rf.mu.Unlock()// 发送 ApplyMsg 到 applyCh 通道for _, applyMsg := range applyMsgs {// if rf.killed() {// return// }rf.mu.Lock()if applyMsg.CommandIndex != rf.lastapplied+1 || rf.snapShotLock {rf.mu.Unlock()continue}// DPrintf("applier---节点server %v, 在term: %v, 欲将日志应用到状态机, 最新应用的日志索引为: %v, 日志内容为: %v,rf.lastapplied更新到:%v, rf.commitIndex更新到:%v\n", rf.me, rf.currentTerm, applyMsg.CommandIndex, rf.log[rf.toLocalIndex(applyMsg.CommandIndex)].Command, rf.lastapplied, rf.commitIndex)rf.mu.Unlock()if rf.killed() {return}// // 使用 select 避免向已关闭的通道发送数据// select {// case applyCh <- *applyMsg:// // 发送成功// rf.mu.Lock()// if applyMsg.CommandIndex == rf.lastapplied+1 && !rf.snapShotLock {// rf.lastapplied = applyMsg.CommandIndex// }// rf.mu.Unlock()// case <-rf.done: // 使用 done 通道检测终止// return// }applyCh <- *applyMsgrf.mu.Lock()if applyMsg.CommandIndex != rf.lastapplied+1 || rf.snapShotLock {rf.mu.Unlock()continue}rf.lastapplied = applyMsg.CommandIndex// DPrintf("applier---节点server %v, 在term: %v, 已将日志应用到状态机, 最新应用的日志索引为: %v, 日志内容为: %v,rf.lastapplied更新到:%v, rf.commitIndex更新到:%v\n", rf.me, rf.currentTerm, applyMsg.CommandIndex, rf.log[rf.toLocalIndex(applyMsg.CommandIndex)].Command, rf.lastapplied, rf.commitIndex)rf.mu.Unlock()}} }
上代码
不再多言上代码
package rsmimport ("reflect""sync""time""6.5840/kvsrv1/rpc""6.5840/labrpc"raft "6.5840/raft1""6.5840/raftapi"tester "6.5840/tester1"
)var useRaftStateMachine bool// 传递操作命令
type Op struct {// 在这里定义你的内容。字段名必须以大写字母开头, 否则RPC将无法正常工作。// Op{Me: rsm.me, Id: id, Req: req}Me intReq any
}// 一个想要自我复制的服务器(即,../server.go)调用MakeRSM,必须实现StateMachine接口。
// 该接口允许rsm包与服务器进行交互以执行服务器特定的操作:
// 服务器必须实现DoOp以执行一个操作(例如,Get或Put请求),以及Snapshot/Restore用于快照和恢复服务器的状态。
type StateMachine interface {DoOp(any) anySnapshot() []byteRestore([]byte)
}type OpResult struct {err rpc.Errresult any
}type OpEntry struct {term intreq anych chan OpResult
}type RSM struct {mu sync.Mutexme intrf raftapi.RaftapplyCh chan raftapi.ApplyMsgmaxraftstate int // 如果日志增长到这么大,快照sm StateMachine// 您的定义在这里。opMap map[int]*OpEntry// 标记 applyCh 是否已关闭// applyClosed bool
}// servers[] 包含将通过 Raft 协作以形成容错键/值服务的服务器集合的端口。
// me 是当前服务器在 servers[] 中的索引。
// k/v 服务器应该通过底层的 Raft 实现存储快照,该实现应该调用 persister.SaveStateAndSnapshot() 原子性地保存 Raft 状态和快照。
// 当 Raft 的保存状态超过 maxraftstate 字节时,RSM 应该进行快照,以允许 Raft 垃圾回收其日志。
// 如果 maxraftstate 是 -1,则不需要快照。
// MakeRSM() 必须快速返回,因此它应该为任何长期运行的工作启动 goroutine。
func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *tester.Persister, maxraftstate int, sm StateMachine) *RSM {rsm := &RSM{me: me,maxraftstate: maxraftstate,applyCh: make(chan raftapi.ApplyMsg),sm: sm,opMap: make(map[int]*OpEntry),// applyClosed: false,}if !useRaftStateMachine {rsm.rf = raft.Make(servers, me, persister, rsm.applyCh)}go rsm.applyListen()return rsm
}// 在 rsm.go的 Submit方法中,将请求包装成 Op,调用 rf.Start(op)
func (rsm *RSM) Raft() raftapi.Raft {return rsm.rf
}func (rsm *RSM) applyListen() {for {applyMsg, ok := <-rsm.applyCh // 从通道中获取数据, 当通道关闭且无数据时自动跳出循环if !ok {// // // // log.Printf("RSM 通道关闭")rsm.mu.Lock()// rsm.applyClosed = true // 标记 applyCh 已关闭// 说明applyCh已经关闭,当applyCh关闭时,reader只是退出循环,但没有通知正在等待的Submit操作。这导致Submit操作永远等待,造成超时// 通知所有等待的操作,让他们返回ErrWrongLeaderfor _, opEntry := range rsm.opMap {opEntry.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}}// log.Printf("***********RSM--applyListen %d: applyCh关闭, 通知所有等待的Submit操作返回ErrWrongLeader, 清空opMap\n", rsm.me)rsm.opMap = make(map[int]*OpEntry)rsm.mu.Unlock()break}if applyMsg.CommandValid {// 从applyCh获取Op// 安全地类型断言op, ok := applyMsg.Command.(Op)if !ok {continue}// 调用server端DoOp()执行Op, 所有服务器都需要应用所有已提交的操作,无论操作是否由本服务器提交applyResult := rsm.sm.DoOp(op.Req)// log.Printf("***********RSM--applyListen %d: 执行完 Op: %v, 结果: %v\n", rsm.me, op, applyResult)rsm.mu.Lock()// 存在这个Opif opEntry, ok := rsm.opMap[applyMsg.CommandIndex]; ok {// 获取当前raft节点的状态// currentTerm, _ := rsm.rf.GetState()if opEntry.term != applyMsg.Term || !reflect.DeepEqual(opEntry.req, op.Req) {// opEntry.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}// delete(rsm.opMap, applyMsg.CommandIndex)// for i, entry := range rsm.opMap {// if i > applyMsg.CommandIndex && entry.term == opEntry.term {// entry.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}// delete(rsm.opMap, i)// }// }for i, op := range rsm.opMap {if i >= applyMsg.CommandIndex {op.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}delete(rsm.opMap, i)}}} else {// 匹配成功,发送结果// log.Printf("---RSM--applyListen %d: 操作匹配成功, 向submit发送前, index=%d, term=%d", rsm.me, applyMsg.CommandIndex, opEntry.term)// 发送结果并关闭通道opEntry.ch <- OpResult{err: rpc.OK, result: applyResult}// log.Printf("---RSM--applyListen %d: 向submit发送成功, index=%d, term=%d", rsm.me, applyMsg.CommandIndex, opEntry.term)delete(rsm.opMap, applyMsg.CommandIndex)}}rsm.mu.Unlock()// // 检查是否需要创建快照// if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() > rsm.maxraftstate {// snapshot := rsm.sm.Snapshot()// rsm.rf.Snapshot(applyMsg.CommandIndex, snapshot)// }}// else if applyMsg.SnapshotValid {// // 处理快照// rsm.sm.Restore(applyMsg.Snapshot)// }}
}// 在 kvraft1/server.go中,当服务器收到客户端的 Put 或 Get RPC 请求时,它会调用 rsm.Submit()来将操作提交给 Raft 共识层
// 向 Raft 提交一个命令,并等待其被提交。如果客户端应该寻找新的领导者并重试,它应该返回 ErrWrongLeader.
func (rsm *RSM) Submit(req any) (rpc.Err, any) {// Submit创建一个操作结构来通过Raft运行命令;// 例如:op := Op{Me: rsm.me, Id: id, Req: req},其中req是Submit的参数,id是op的唯一标识符。// your code here// rsm.mu.Lock()// if rsm.applyClosed {// log.Printf("***********RSM %d: submit操作--start完--发现 applyCh 已关闭, 返回 ErrWrongLeader\n", rsm.me)// rsm.mu.Unlock()// return rpc.ErrWrongLeader, nil// }// rsm.mu.Unlock()op := Op{Me: rsm.me,Req: req,}// log.Printf("***********RSM %d: submit操作把Op: %v 通过start传给raft层\n", rsm.me, op)index, term, isLeader := rsm.rf.Start(op)if !isLeader {return rpc.ErrWrongLeader, nil}// 向opResults中添加Oprsm.mu.Lock()resultCh := make(chan OpResult, 1)newEntry := &OpEntry{term: term,req: req,ch: resultCh,}// 再次检查 applyCh 是否已关闭(可能在 Start 调用期间关闭)// if rsm.applyClosed {// log.Printf("***********RSM %d: submit操作--start完--发现 applyCh 已关闭, 返回 ErrWrongLeader\n", rsm.me)// rsm.mu.Unlock()// return rpc.ErrWrongLeader, nil// }rsm.opMap[index] = newEntryrsm.mu.Unlock()// 监听, 获取结果select {case result := <-resultCh:return result.err, result.resultcase <-time.After(3 * time.Second): // 超时// 清理待处理操作rsm.mu.Lock()// 检查通道是否仍然有效if entry, exists := rsm.opMap[index]; exists && entry.ch == resultCh {// close(resultCh) // 关闭通道防止后续发送delete(rsm.opMap, index)// log.Printf("***********RSM %d: submit操作等待结果超时, index=%d, term=%d, 清理该操作\n", rsm.me, opID, term)}rsm.mu.Unlock()return rpc.ErrWrongLeader, nil}}
300次测试
老样子测试300次
===== 开始第 1 次测试 =====
=== RUN TestBasic4A
Test RSM basic (reliable network)...... Passed -- time 1.9s #peers 3 #RPCs 44 #Ops 0
--- PASS: TestBasic4A (1.90s)
=== RUN TestConcurrent4A
Test concurrent submit (reliable network)...... Passed -- time 0.8s #peers 3 #RPCs 106 #Ops 0
--- PASS: TestConcurrent4A (0.78s)
=== RUN TestLeaderFailure4A
Test Leader Failure (reliable network)...... Passed -- time 1.8s #peers 3 #RPCs 38 #Ops 0
--- PASS: TestLeaderFailure4A (1.77s)
=== RUN TestLeaderPartition4A
Test Leader Partition (reliable network)...... Passed -- time 2.7s #peers 3 #RPCs 276 #Ops 0
--- PASS: TestLeaderPartition4A (2.73s)
=== RUN TestRestartReplay4A
Test Restart (reliable network)...... Passed -- time 13.6s #peers 3 #RPCs 432 #Ops 0
--- PASS: TestRestartReplay4A (13.61s)
=== RUN TestShutdown4A
Test Shutdown (reliable network)...... Passed -- time 10.0s #peers 3 #RPCs 0 #Ops 0
--- PASS: TestShutdown4A (10.01s)
=== RUN TestRestartSubmit4A
Test Restart and submit (reliable network)...... Passed -- time 14.5s #peers 3 #RPCs 638 #Ops 0
--- PASS: TestRestartSubmit4A (14.55s)
PASS
ok 6.5840/kvraft1/rsm 45.353s
===== 结束第 1 次测试 ===== (耗时: 46秒)..............===== 开始第 300 次测试 =====
=== RUN TestBasic4A
Test RSM basic (reliable network)...... Passed -- time 1.8s #peers 3 #RPCs 44 #Ops 0
--- PASS: TestBasic4A (1.80s)
=== RUN TestConcurrent4A
Test concurrent submit (reliable network)...... Passed -- time 0.7s #peers 3 #RPCs 104 #Ops 0
--- PASS: TestConcurrent4A (0.73s)
=== RUN TestLeaderFailure4A
Test Leader Failure (reliable network)...... Passed -- time 2.1s #peers 3 #RPCs 40 #Ops 0
--- PASS: TestLeaderFailure4A (2.13s)
=== RUN TestLeaderPartition4A
Test Leader Partition (reliable network)...... Passed -- time 2.7s #peers 3 #RPCs 272 #Ops 0
--- PASS: TestLeaderPartition4A (2.72s)
=== RUN TestRestartReplay4A
Test Restart (reliable network)...... Passed -- time 13.5s #peers 3 #RPCs 430 #Ops 0
--- PASS: TestRestartReplay4A (13.52s)
=== RUN TestShutdown4A
Test Shutdown (reliable network)...... Passed -- time 10.0s #peers 3 #RPCs 0 #Ops 0
--- PASS: TestShutdown4A (10.04s)
=== RUN TestRestartSubmit4A
Test Restart and submit (reliable network)...... Passed -- time 24.5s #peers 3 #RPCs 638 #Ops 0
--- PASS: TestRestartSubmit4A (24.45s)
PASS
ok 6.5840/kvraft1/rsm 55.394s
===== 结束第 300 次测试 ===== (耗时: 55秒)===== 测试统计摘要 =====
总测试次数: 300
成功次数: 293
失败次数: 7
成功率: 97%
总耗时: 15583秒
平均每次测试耗时: 51秒
===== 测试结束 =====
lab4B
个人实现流程图
比官方的详细一点, 仅供参考
踩坑实录
客户端和服务器的代码仿照着lab2写, 具体的代码逻辑我不再多说, 这里只提遇到的坑
-
通过打印日志发现,传给DoOp的req有的时候是*rpc.PutArgs(指针类型),有时是rpc.PutArgs(值类型)。这个问题需要去修改labgob注册的类型
-
超时时间不要设置太大, 有个速度测试
-
对于reply.Err == rpc.ErrWrongLeader 或者 RPC没有回复, 客户端做法都是
ck.mu.Lock() ck.leader = (leader + 1) % len(ck.servers) ck.mu.Unlock() isFirstRequest = false
-
这个lab看似很简单,但是有个大坑, 那就是rpc.ErrMaybe这个玩意很容易误导人, 让你以为重复执行成不成功无所谓返回它就好了(起码我刚开始是这样认为的, 直到我看见了报错信息里说: history不满足线性一致性, 提醒了我…, 这点你要是没注意到,可能就会像我一样多次test有时pass, 有时FAIL, 陷入懵逼之中…)
- 不要以为在Server端只用version区分重复命令就行了, 你还得保证线性一致性, 要不然对于重复命令: 第一次返回了OK(该Server crash), 第二次返回了ErrMaybe, 因为网络原因重发相同的命令(客户端换了通信的leader), 最终对于client来说相同的命令却得到了不一样的执行结果… ,这不满足线性一致性
- 也就是在Server端对于命令, 你要记录其id和执行结果, 当遇到重复命令,把上次结果给client就行
上代码
client.go
package kvraftimport ("math/rand""sync""6.5840/kvsrv1/rpc"kvtest "6.5840/kvtest1"tester "6.5840/tester1"
)type Clerk struct {clnt *tester.Clntservers []string// You will have to modify this struct.// leaderIdx intmu sync.Mutexleader intclientId int64requestId int64
}func MakeClerk(clnt *tester.Clnt, servers []string) kvtest.IKVClerk {// ck := &Clerk{clnt: clnt, servers: servers, leaderIdx: 0}ck := &Clerk{clnt: clnt, servers: servers}// You'll have to add code here.// 客户端id用随机值ck.clientId = int64(rand.Int63())ck.requestId = 0ck.leader = 0// log.Printf("***********MakeClerk: 创建 Clerk, clientId: %d\n", ck.clientId)return ck
}// Get 方法获取一个键的当前值和版本。如果该键不存在,则返回 ErrNoKey。
// 它会在面对其他所有错误时不断尝试。
// 您可以使用以下代码向服务器 i 发送 RPC:ok := ck.clnt.Call(ck.servers[i], "KVServer.Get", &args, &reply)
// args 和 reply 的类型(包括它们是否为指针)必须与 RPC 处理程序函数的参数声明的类型匹配。此外,reply 必须作为指针传递。
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {// You will have to modify this function.args := rpc.GetArgs{Key: key}for {ck.mu.Lock()leader := ck.leaderck.mu.Unlock()// 先尝试leaderreply := rpc.GetReply{}ok := ck.clnt.Call(ck.servers[leader], "KVServer.Get", &args, &reply)if ok && reply.Err != rpc.ErrWrongLeader {if reply.Err != rpc.OK {return "", 0, reply.Err}return reply.Value, reply.Version, rpc.OK}ck.mu.Lock()ck.leader = (leader + 1) % len(ck.servers)ck.mu.Unlock()//如果RPC失败,等待9ms后重试// time.Sleep(9 * time.Millisecond)}
}// 仅在请求中的版本与服务器上的键的版本匹配时,才将更新的键和值放入。
// 如果版本号不匹配,服务器应返回ErrVersion。
// 如果Put在第一次RPC中收到ErrVersion,则Put应返回ErrVersion,因为Put肯定没有在服务器上执行。
// 如果服务器在重发RPC上返回ErrVersion,则Put必须向应用程序返回ErrMaybe,
// 因为其早期的RPC可能已经被服务器成功处理但响应丢失,并且Clerk不知道Put是否执行。
// 您可以使用如下代码向服务器i发送RPC:ok := ck.clnt.Call(ck.servers[i], "KVServer.Put", &args, &reply)
// args和reply的类型(包括是否为指针)必须与RPC处理函数参数的声明类型匹配。此外,reply必须作为指针传递。
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {// You will have to modify this function.args := rpc.PutArgs{Key: key, Value: value, Version: version}// 是否是第一次请求isFirstRequest := trueck.mu.Lock()ck.requestId++args.ClientId = ck.clientIdargs.ReqId = ck.requestIdck.mu.Unlock()// // //// log.Printf("***********Clerk 向 server: %v 发起Put请求: <key:%s, value:%s, version:%d>\n", ck.leader, args.Key, args.Value, args.Version)for {ck.mu.Lock()leader := ck.leaderck.mu.Unlock()reply := rpc.PutReply{}// log.Printf("***********Clerk 向 server: %v 发起Put请求: <key:%s, value:%s, version:%d>\n", ck.leader, args.Key, args.Value, args.Version)ok := ck.clnt.Call(ck.servers[leader], "KVServer.Put", &args, &reply)if ok && reply.Err != rpc.ErrWrongLeader {if reply.Err == rpc.OK {// log.Printf("***Clerk 从server: %v 收到Put结果: <key:%s, value:%s, version:%d>\n", ck.leader, args.Key, args.Value, args.Version)return rpc.OK}if reply.Err == rpc.ErrVersion && isFirstRequest {return rpc.ErrVersion}if reply.Err == rpc.ErrVersion && !isFirstRequest {return rpc.ErrMaybe}return reply.Err}ck.mu.Lock()ck.leader = (leader + 1) % len(ck.servers)ck.mu.Unlock()isFirstRequest = false//如果RPC失败,等待9ms后重试// time.Sleep(9 * time.Millisecond)}
}
server.go
package kvraftimport ("sync""sync/atomic""6.5840/kvraft1/rsm""6.5840/kvsrv1/rpc""6.5840/labgob""6.5840/labrpc"tester "6.5840/tester1"
)type ValueVersion struct {Value stringVersion rpc.Tversion
}type ClientPutResult struct {ReqId int64 // Unique ID for the clientResult *rpc.PutReply
}type KVServer struct {me intdead int32 // set by Kill()rsm *rsm.RSM// Your definitions here.mu sync.MutexkvMap map[string]*ValueVersionclientPutResults map[int64]ClientPutResult // clientId -> ClientPutReq
}func (kv *KVServer) DoGet(args *rpc.GetArgs) (reply *rpc.GetReply) {kv.mu.Lock()defer kv.mu.Unlock()vv, ok := kv.kvMap[args.Key]reply = &rpc.GetReply{} // 创建返回值if !ok {reply.Err = rpc.ErrNoKey// log.Printf("---server: %v---DoGet---reply.Err:%v\n", kv.me, reply.Err)return reply}reply.Value = vv.Valuereply.Version = vv.Versionreply.Err = rpc.OK// log.Printf("---server: %v---DoGet---成功,reply.Err:%v, 更新key:%v, 更新value:%v, 更新version:%v\n", kv.me, reply.Err, args.Key, vv.Value, vv.Version)return reply
}func (kv *KVServer) DoPut(args *rpc.PutArgs) (reply *rpc.PutReply) {kv.mu.Lock()defer kv.mu.Unlock()reply = &rpc.PutReply{} // 创建返回值// // // log.Printf("---server: %v---DoPut---收到请求: <key:%s, value:%s, version:%d>\n", kv.me, args.Key, args.Value, args.Version)// // // log.Printf("---server: %v---DoPut---当前kvMap长度: %d\n", kv.me, len(kv.kvMap))// 打印当前kvMap内容// for key, value := range kv.kvMap {// // // // log.Printf("---server: %v---DoPut---应用Put前kvMap内容: key:%s, value:%s, version:%d\n", kv.me, key, value.Value, value.Version)// }// key不存在if prev, ok := kv.clientPutResults[args.ClientId]; ok {if args.ReqId <= prev.ReqId {kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId: args.ReqId,Result: prev.Result,}return prev.Result}}if _, ok := kv.kvMap[args.Key]; !ok {if args.Version != 0 {reply.Err = rpc.ErrNoKey// reply.Err = rpc.ErrVersion// log.Printf("---server: %v---DoPut---reply.Err:%v\n", kv.me, reply.Err)// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId: args.ReqId,Result: reply,}return reply}// 创建新条目kv.kvMap[args.Key] = &ValueVersion{Value: args.Value,Version: 1, // 新键版本从1开始}reply.Err = rpc.OK// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId: args.ReqId,Result: reply,}// log.Printf("---server: %v---DoPut---成功,reply.Err:%v, 更新key:%v, 更新value:%v, 更新version:%v\n", kv.me, reply.Err, args.Key, args.Value, kv.kvMap[args.Key].Version)return reply}if args.Version != kv.kvMap[args.Key].Version {reply.Err = rpc.ErrVersion// // // log.Printf("---server: %v---DoPut---reply.Err:%v\n", kv.me, reply.Err)// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId: args.ReqId,Result: reply,}return reply}kv.kvMap[args.Key].Value = args.Valuekv.kvMap[args.Key].Version++reply.Err = rpc.OK// 打印当前kvMap内容// for key, value := range kv.kvMap {// // // // log.Printf("---server: %v---DoPut---应用Put后kvMap内容: key:%s, value:%s, version:%d\n", kv.me, key, value.Value, value.Version)// }// log.Printf("---server: %v---DoPut---成功,reply.Err:%v, 更新key:%v, 更新value:%v, 更新version:%v\n", kv.me, reply.Err, args.Key, args.Value, kv.kvMap[args.Key].Version)// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId: args.ReqId,Result: reply,}return reply
}// 要将req强制转换为正确的类型,可以查看下面的Go语言类型选择或类型断言:// https://go.dev/tour/methods/16
// https://go.dev/tour/methods/15
func (kv *KVServer) DoOp(req any) any {// Your code hereswitch req.(type) {case *rpc.GetArgs:// // // log.Printf("---server: %v---DoOp---收到rpc.GetArgs请求\n", kv.me)return kv.DoGet(req.(*rpc.GetArgs))case *rpc.PutArgs:// // // log.Printf("---server: %v---DoOp---收到rpc.PutArgs请求\n", kv.me)return kv.DoPut(req.(*rpc.PutArgs))}return nil
}func (kv *KVServer) Snapshot() []byte {// Your code herereturn nil
}func (kv *KVServer) Restore(data []byte) {// Your code here
}func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {// 在这里编写您的代码。使用 kv.rsm.Submit() 提交参数。您可以使用 Go 的类型转换将 Submit() 的返回值转换为 GetReply:rep.(rpc.GetReply)err, result := kv.rsm.Submit(args)if err == rpc.ErrWrongLeader || err == rpc.ErrTimeout {reply.Err = rpc.ErrWrongLeaderreturn}// 类型断言newReply := result.(*rpc.GetReply)reply.Value = newReply.Valuereply.Version = newReply.Versionreply.Err = newReply.Err
}func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {// 在这里编写您的代码。使用 kv.rsm.Submit() 提交参数。您可以使用 Go 的类型转换将 Submit() 的任何返回值转换为 PutReply:rep.(rpc.PutReply)err, result := kv.rsm.Submit(args)if err == rpc.ErrWrongLeader {reply.Err = errreturn}// 类型断言newReply := result.(*rpc.PutReply)reply.Err = newReply.Err
}// 当KVServer实例不再需要时,测试者调用Kill()。
// 为了方便您,我们提供了代码来设置rf.dead(无需加锁),
// 以及一个killed()方法用于在长时间运行的循环中测试rf.dead。
// 您也可以向Kill()添加自己的代码。您不需要对此做任何事情,但抑制被Kill()实例的调试输出(例如)可能会很方便。
func (kv *KVServer) Kill() {atomic.StoreInt32(&kv.dead, 1)// Your code here, if desired.
}func (kv *KVServer) killed() bool {z := atomic.LoadInt32(&kv.dead)return z == 1
}// StartKVServer() 和 MakeRSM() 必须快速返回,因此它们应该为任何长期运行的工作启动 goroutine。
func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {// 在您想要的结构上调用 labgob.Register,以使 Go 的 RPC 库进行序列化/反序列化。labgob.Register(rsm.Op{})labgob.Register(&rpc.PutArgs{})labgob.Register(&rpc.GetArgs{})labgob.Register(ClientPutResult{})labgob.Register(map[int64]ClientPutResult{})kv := &KVServer{me: me,kvMap: make(map[string]*ValueVersion),}kv.clientPutResults = make(map[int64]ClientPutResult)kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)// You may need initialization code here.return []tester.IService{kv, kv.rsm.Raft()}
}
300次测试
===== 开始第 1 次测试 =====
=== RUN TestBasic4B
Test: one client (4B basic) (reliable network)...... Passed -- time 3.1s #peers 5 #RPCs 9550 #Ops 633
--- PASS: TestBasic4B (3.06s)
=== RUN TestSpeed4B
Test: one client (4B speed) (reliable network)...... Passed -- time 4.9s #peers 3 #RPCs 8763 #Ops 0
--- PASS: TestSpeed4B (4.86s)
=== RUN TestConcurrent4B
Test: many clients (4B many clients) (reliable network)...... Passed -- time 5.4s #peers 5 #RPCs 11190 #Ops 615
--- PASS: TestConcurrent4B (5.40s)
=== RUN TestUnreliable4B
Test: many clients (4B many clients) (unreliable network)...... Passed -- time 3.8s #peers 5 #RPCs 2367 #Ops 291
--- PASS: TestUnreliable4B (3.80s)
=== RUN TestOnePartition4B
Test: one client (4B progress in majority) (unreliable network)...... Passed -- time 1.5s #peers 5 #RPCs 140 #Ops 3
Test: no progress in minority (4B) (unreliable network)...... Passed -- time 1.3s #peers 5 #RPCs 141 #Ops 3
Test: completion after heal (4B) (unreliable network)...... Passed -- time 2.2s #peers 5 #RPCs 131 #Ops 4
--- PASS: TestOnePartition4B (4.99s)
=== RUN TestManyPartitionsOneClient4B
Test: partitions, one client (4B partitions, one client) (reliable network)...... Passed -- time 14.7s #peers 5 #RPCs 9002 #Ops 569
--- PASS: TestManyPartitionsOneClient4B (14.71s)
=== RUN TestManyPartitionsManyClients4B
Test: partitions, many clients (4B partitions, many clients (4B)) (reliable network)...... Passed -- time 11.5s #peers 5 #RPCs 12287 #Ops 639
--- PASS: TestManyPartitionsManyClients4B (11.50s)
=== RUN TestPersistOneClient4B
Test: restarts, one client (4B restarts, one client 4B ) (reliable network)...... Passed -- time 6.0s #peers 5 #RPCs 19230 #Ops 367
--- PASS: TestPersistOneClient4B (6.05s)
=== RUN TestPersistConcurrent4B
Test: restarts, many clients (4B restarts, many clients) (reliable network)...... Passed -- time 9.2s #peers 5 #RPCs 28350 #Ops 415
--- PASS: TestPersistConcurrent4B (9.23s)
=== RUN TestPersistConcurrentUnreliable4B
Test: restarts, many clients (4B restarts, many clients ) (unreliable network)...... Passed -- time 6.7s #peers 5 #RPCs 1779 #Ops 135
--- PASS: TestPersistConcurrentUnreliable4B (6.75s)
=== RUN TestPersistPartition4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (reliable network)...... Passed -- time 14.7s #peers 5 #RPCs 27239 #Ops 439
--- PASS: TestPersistPartition4B (14.70s)
=== RUN TestPersistPartitionUnreliable4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (unreliable network)...... Passed -- time 12.4s #peers 5 #RPCs 1932 #Ops 103
--- PASS: TestPersistPartitionUnreliable4B (12.42s)
=== RUN TestPersistPartitionUnreliableLinearizable4B
Test: restarts, partitions, random keys, many clients (4B restarts, partitions, random keys, many clients) (unreliable network)...... Passed -- time 12.3s #peers 7 #RPCs 5759 #Ops 226
--- PASS: TestPersistPartitionUnreliableLinearizable4B (12.33s)
PASS
ok 6.5840/kvraft1 109.799s
===== 结束第 1 次测试 ===== (耗时: 111秒).......................................===== 开始第 300 次测试 =====
=== RUN TestBasic4B
Test: one client (4B basic) (reliable network)...... Passed -- time 3.0s #peers 5 #RPCs 10009 #Ops 553
--- PASS: TestBasic4B (3.05s)
=== RUN TestSpeed4B
Test: one client (4B speed) (reliable network)...... Passed -- time 7.5s #peers 3 #RPCs 9589 #Ops 0
--- PASS: TestSpeed4B (7.48s)
=== RUN TestConcurrent4B
Test: many clients (4B many clients) (reliable network)...... Passed -- time 3.3s #peers 5 #RPCs 10824 #Ops 639
--- PASS: TestConcurrent4B (3.29s)
=== RUN TestUnreliable4B
Test: many clients (4B many clients) (unreliable network)...... Passed -- time 3.9s #peers 5 #RPCs 2232 #Ops 267
--- PASS: TestUnreliable4B (3.90s)
=== RUN TestOnePartition4B
Test: one client (4B progress in majority) (unreliable network)...... Passed -- time 2.0s #peers 5 #RPCs 191 #Ops 3
Test: no progress in minority (4B) (unreliable network)...... Passed -- time 1.1s #peers 5 #RPCs 117 #Ops 3
Test: completion after heal (4B) (unreliable network)...... Passed -- time 1.1s #peers 5 #RPCs 93 #Ops 4
--- PASS: TestOnePartition4B (4.27s)
=== RUN TestManyPartitionsOneClient4B
Test: partitions, one client (4B partitions, one client) (reliable network)...... Passed -- time 9.3s #peers 5 #RPCs 9025 #Ops 581
--- PASS: TestManyPartitionsOneClient4B (9.28s)
=== RUN TestManyPartitionsManyClients4B
Test: partitions, many clients (4B partitions, many clients (4B)) (reliable network)...... Passed -- time 27.2s #peers 5 #RPCs 12275 #Ops 633
--- PASS: TestManyPartitionsManyClients4B (27.25s)
=== RUN TestPersistOneClient4B
Test: restarts, one client (4B restarts, one client 4B ) (reliable network)...... Passed -- time 6.0s #peers 5 #RPCs 20870 #Ops 341
--- PASS: TestPersistOneClient4B (6.04s)
=== RUN TestPersistConcurrent4B
Test: restarts, many clients (4B restarts, many clients) (reliable network)...... Passed -- time 6.3s #peers 5 #RPCs 27714 #Ops 453
--- PASS: TestPersistConcurrent4B (6.29s)
=== RUN TestPersistConcurrentUnreliable4B
Test: restarts, many clients (4B restarts, many clients ) (unreliable network)...... Passed -- time 7.1s #peers 5 #RPCs 1972 #Ops 181
--- PASS: TestPersistConcurrentUnreliable4B (7.11s)
=== RUN TestPersistPartition4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (reliable network)...... Passed -- time 26.7s #peers 5 #RPCs 23099 #Ops 355
--- PASS: TestPersistPartition4B (26.65s)
=== RUN TestPersistPartitionUnreliable4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (unreliable network)...... Passed -- time 12.1s #peers 5 #RPCs 1928 #Ops 129
--- PASS: TestPersistPartitionUnreliable4B (12.15s)
=== RUN TestPersistPartitionUnreliableLinearizable4B
Test: restarts, partitions, random keys, many clients (4B restarts, partitions, random keys, many clients) (unreliable network)...... Passed -- time 12.4s #peers 7 #RPCs 5829 #Ops 246
--- PASS: TestPersistPartitionUnreliableLinearizable4B (12.44s)
PASS
ok 6.5840/kvraft1 129.198s
===== 结束第 300 次测试 ===== (耗时: 129秒)===== 测试统计摘要 =====
总测试次数: 300
成功次数: 300
失败次数: 0
成功率: 100%
总耗时: 35445秒
平均每次测试耗时: 118秒
===== 测试结束 =====
lab4C
这个很简单, 按流程图写就行了
实现Server的snaopshot和restore
-
snapshot是对Server的kv数据做快照
-
restore是在raft层收到快照, 在Server中恢复数据
// 持久化一些东西
func (kv *KVServer) Snapshot() []byte {// Your code herekv.mu.Lock()defer kv.mu.Unlock()w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(kv.kvMap)// e.Encode(kv.clientPutResults)// // // log.Printf("---server: %v---Snapshot---快照长度: %d\n", kv.me, len(data))return w.Bytes()
}// 从快照恢复状态
func (kv *KVServer) Restore(data []byte) {// Your code hereif data == nil || len(data) < 1 { // bootstrap without any state?return}r := bytes.NewBuffer(data)d := labgob.NewDecoder(r)var kvMap map[string]*ValueVersion// var clientPutResults map[int64]ClientPutResult// if d.Decode(&kvMap) != nil || d.Decode(&clientPutResults) != nil {if d.Decode(&kvMap) != nil {// 解码错误log.Fatal("Failed to decode server persisted state")} else {kv.mu.Lock()kv.kvMap = kvMap// kv.clientPutResults = clientPutResultskv.mu.Unlock()// // // log.Printf("---server: %v---Restore---快照解码成功\n", kv.me)}
}
调用Snapshot和Restore的时机
-
当snapshot的大小超过给定大小maxraftstate时,要去执行snapshot
-
lab4C的snapshot是对Server的kv数据做快照(raft的snapshot是按照状态机要求截断log, 别搞混了), 因此判断时机应该在DoOp后(log应用了之后)
// 检查是否需要创建快照if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() > rsm.maxraftstate {snapshot := rsm.sm.Snapshot()rsm.rf.Snapshot(applyMsg.CommandIndex, snapshot)}
-
而restore的应用时机:
-
当RSM重启后发现snapshot大小不为0
snapShot := persister.ReadSnapshot()if len(snapShot) > 0 {// 如果有快照,交给server恢复状态rsm.sm.Restore(snapShot)}
-
reader中接收到快照的时候就调用restore
if applyMsg.SnapshotValid {// 处理快照rsm.sm.Restore(applyMsg.Snapshot) }
-
BUG修复
测试了300次,发现有十几次出错, 出错原因是raft 接收快照时时重复解锁, 还有向RSM发快照时忘记用sync.WaitGroup了, 原代码如下:
// 接收快照RPC
func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {flag := truerf.mu.Lock()defer rf.mu.Unlock()// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 准备安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)if args.Term < rf.currentTerm {reply.Term = rf.currentTermrf.persist()return}for flag {switch rf.state {case Leader:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Candidate:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else if args.Term == rf.currentTerm {reply.Term = rf.currentTerm // 回复者termIDrf.state = Followerrf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Follower:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1 // 重置投票人}rf.lastHeartbeatTime = time.Now() // 重置心跳检测// 如果快照比现有快照旧,则忽略它if args.LastIncludedIndex <= rf.globalLastIncludedIndex || args.LastIncludedIndex <= rf.lastapplied {reply.Term = rf.currentTermrf.persist()return}// 保存快照文件,丢弃任何索引更小的现有或部分 快照rf.snapShot = args.Data// 如果现有日志有与快照最后条目索引和任期相同的条目,保留该条目之后的日志并回复if args.LastIncludedIndex < rf.toGlobalIndex(len(rf.log)) && args.LastIncludedIndex >= rf.globalLastIncludedIndex && rf.log[rf.toLocalIndex(args.LastIncludedIndex)].TermId == args.LastIncludedTerm {// 保留该条目之后的日志rf.log = rf.log[rf.toLocalIndex(args.LastIncludedIndex):]} else {// 否则,丢弃整个日志rf.log = make([]*logEntity, 1) // 初始化日志,保留一个占位符rf.log[0] = &logEntity{TermId: args.LastIncludedTerm, Command: nil} // 在0位置放一个占位符}// 通知服务应用快照applyMsg := &raftapi.ApplyMsg{SnapshotValid: true,Snapshot: args.Data,SnapshotTerm: args.LastIncludedTerm,SnapshotIndex: args.LastIncludedIndex,}reply.Term = rf.currentTermflag = false// 更新快照相关状态rf.snapShot = args.Datarf.globalLastIncludedIndex = args.LastIncludedIndexrf.globalLastIncludedTerm = args.LastIncludedTermif rf.commitIndex < args.LastIncludedIndex {rf.commitIndex = args.LastIncludedIndex}// if rf.lastapplied < args.LastIncludedIndex {// rf.lastapplied = args.LastIncludedIndex// }if args.LastIncludedIndex > rf.globalLastIncludedIndex || rf.lastapplied < args.LastIncludedIndex {rf.lastapplied = args.LastIncludedIndex}// rf.applyCh <- *applyMsg// 复制需要的数据,避免在无锁状态下访问 Raft 结构体snapshotData := make([]byte, len(applyMsg.Snapshot))copy(snapshotData, applyMsg.Snapshot)snapshotIndex := applyMsg.SnapshotIndexsnapshotTerm := applyMsg.SnapshotTerm// 释放锁后再发送消息rf.persist()rf.snapShotLock = truerf.mu.Unlock() // 报错这里重复解锁了..., 莫名其妙, 暂时没看出什么问题rf.applyCh <- raftapi.ApplyMsg{SnapshotValid: true,Snapshot: snapshotData,SnapshotTerm: snapshotTerm,SnapshotIndex: snapshotIndex,}rf.mu.Lock() // 重新获取锁,因为 defer 中还需要解锁rf.snapShotLock = falseif rf.lastapplied < rf.commitIndex {// 唤醒applyrf.applyCond.Broadcast()}// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 已安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)}}}
修改后:
// 接收快照RPC
func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {rf.wg.Add(1)defer rf.wg.Done()// Your code here (3D).flag := truerf.mu.Lock()// defer rf.mu.Unlock()// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 准备安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)if args.Term < rf.currentTerm {reply.Term = rf.currentTermrf.persist()rf.mu.Unlock()return}for flag {switch rf.state {case Leader:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Candidate:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else if args.Term == rf.currentTerm {reply.Term = rf.currentTerm // 回复者termIDrf.state = Followerrf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Follower:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1 // 重置投票人}rf.lastHeartbeatTime = time.Now() // 重置心跳检测// 如果快照比现有快照旧,则忽略它if args.LastIncludedIndex <= rf.globalLastIncludedIndex || args.LastIncludedIndex <= rf.lastapplied {reply.Term = rf.currentTermrf.persist()rf.mu.Unlock()return}// 保存快照文件,丢弃任何索引更小的现有或部分 快照rf.snapShot = args.Data// 如果现有日志有与快照最后条目索引和任期相同的条目,保留该条目之后的日志并回复if args.LastIncludedIndex < rf.toGlobalIndex(len(rf.log)) && args.LastIncludedIndex >= rf.globalLastIncludedIndex && rf.log[rf.toLocalIndex(args.LastIncludedIndex)].TermId == args.LastIncludedTerm {// 保留该条目之后的日志rf.log = rf.log[rf.toLocalIndex(args.LastIncludedIndex):]} else {// 否则,丢弃整个日志rf.log = make([]*logEntity, 1) // 初始化日志,保留一个占位符rf.log[0] = &logEntity{TermId: args.LastIncludedTerm, Command: nil} // 在0位置放一个占位符}// 通知服务应用快照applyMsg := &raftapi.ApplyMsg{SnapshotValid: true,Snapshot: args.Data,SnapshotTerm: args.LastIncludedTerm,SnapshotIndex: args.LastIncludedIndex,}reply.Term = rf.currentTermflag = false// 更新快照相关状态rf.snapShot = args.Datarf.globalLastIncludedIndex = args.LastIncludedIndexrf.globalLastIncludedTerm = args.LastIncludedTermif rf.commitIndex < args.LastIncludedIndex {rf.commitIndex = args.LastIncludedIndex}// if rf.lastapplied < args.LastIncludedIndex {// rf.lastapplied = args.LastIncludedIndex// }if args.LastIncludedIndex > rf.globalLastIncludedIndex || rf.lastapplied < args.LastIncludedIndex {rf.lastapplied = args.LastIncludedIndex}// rf.applyCh <- *applyMsg// 复制需要的数据,避免在无锁状态下访问 Raft 结构体snapshotData := make([]byte, len(applyMsg.Snapshot))copy(snapshotData, applyMsg.Snapshot)snapshotIndex := applyMsg.SnapshotIndexsnapshotTerm := applyMsg.SnapshotTerm// 释放锁后再发送消息rf.persist()rf.snapShotLock = truerf.mu.Unlock()if rf.killed() {return}rf.applyCh <- raftapi.ApplyMsg{SnapshotValid: true,Snapshot: snapshotData,SnapshotTerm: snapshotTerm,SnapshotIndex: snapshotIndex,}rf.mu.Lock() // 重新获取锁,因为 defer 中还需要解锁rf.snapShotLock = falseif rf.lastapplied < rf.commitIndex {// 唤醒applyrf.applyCond.Broadcast()}// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 已安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)}}rf.mu.Unlock()}
300次测试
===== 开始第 1 次测试 =====
=== RUN TestSnapshotRPC4C
Test: snapshots, one client (4C SnapshotsRPC) (reliable network)...
Test: InstallSnapshot RPC (4C) (reliable network)...... Passed -- time 3.6s #peers 3 #RPCs 7032 #Ops 63
--- PASS: TestSnapshotRPC4C (3.56s)
=== RUN TestSnapshotSize4C
Test: snapshots, one client (4C snapshot size is reasonable) (reliable network)...... Passed -- time 1.1s #peers 3 #RPCs 8620 #Ops 800
--- PASS: TestSnapshotSize4C (1.12s)
=== RUN TestSpeed4C
Test: snapshots, one client (4C speed) (reliable network)...... Passed -- time 1.3s #peers 3 #RPCs 9672 #Ops 0
--- PASS: TestSpeed4C (1.35s)
=== RUN TestSnapshotRecover4C
Test: restarts, snapshots, one client (4C restarts, snapshots, one client) (reliable network)...... Passed -- time 6.0s #peers 5 #RPCs 24163 #Ops 705
--- PASS: TestSnapshotRecover4C (6.02s)
=== RUN TestSnapshotRecoverManyClients4C
Test: restarts, snapshots, many clients (4C restarts, snapshots, many clients ) (reliable network)...
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok... Passed -- time 9.6s #peers 5 #RPCs 31434 #Ops 1305
--- PASS: TestSnapshotRecoverManyClients4C (9.58s)
=== RUN TestSnapshotUnreliable4C
Test: snapshots, many clients (4C unreliable net, snapshots, many clients) (unreliable network)...... Passed -- time 4.0s #peers 5 #RPCs 2334 #Ops 295
--- PASS: TestSnapshotUnreliable4C (3.98s)
=== RUN TestSnapshotUnreliableRecover4C
Test: restarts, snapshots, many clients (4C unreliable net, restarts, snapshots, many clients) (unreliable network)...... Passed -- time 6.9s #peers 5 #RPCs 1893 #Ops 157
--- PASS: TestSnapshotUnreliableRecover4C (6.90s)
=== RUN TestSnapshotUnreliableRecoverConcurrentPartition4C
Test: restarts, partitions, snapshots, many clients (4C unreliable net, restarts, partitions, snapshots, many clients) (unreliable network)...... Passed -- time 12.5s #peers 5 #RPCs 2115 #Ops 107
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4C (12.47s)
=== RUN TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C
Test: restarts, partitions, snapshots, random keys, many clients (4C unreliable net, restarts, partitions, snapshots, random keys, many clients) (unreliable network)...... Passed -- time 15.6s #peers 7 #RPCs 6354 #Ops 360
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C (15.61s)
PASS
ok 6.5840/kvraft1 60.601s
===== 结束第 1 次测试 ===== (耗时: 61秒)...............===== 开始第 300 次测试 =====
=== RUN TestSnapshotRPC4C
Test: snapshots, one client (4C SnapshotsRPC) (reliable network)...
Test: InstallSnapshot RPC (4C) (reliable network)...... Passed -- time 2.5s #peers 3 #RPCs 7187 #Ops 63
--- PASS: TestSnapshotRPC4C (2.52s)
=== RUN TestSnapshotSize4C
Test: snapshots, one client (4C snapshot size is reasonable) (reliable network)...... Passed -- time 1.1s #peers 3 #RPCs 9909 #Ops 800
--- PASS: TestSnapshotSize4C (1.15s)
=== RUN TestSpeed4C
Test: snapshots, one client (4C speed) (reliable network)...... Passed -- time 1.7s #peers 3 #RPCs 11776 #Ops 0
--- PASS: TestSpeed4C (1.68s)
=== RUN TestSnapshotRecover4C
Test: restarts, snapshots, one client (4C restarts, snapshots, one client) (reliable network)...... Passed -- time 6.0s #peers 5 #RPCs 24630 #Ops 941
--- PASS: TestSnapshotRecover4C (6.02s)
=== RUN TestSnapshotRecoverManyClients4C
Test: restarts, snapshots, many clients (4C restarts, snapshots, many clients ) (reliable network)...
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok... Passed -- time 9.6s #peers 5 #RPCs 34146 #Ops 1357
--- PASS: TestSnapshotRecoverManyClients4C (9.56s)
=== RUN TestSnapshotUnreliable4C
Test: snapshots, many clients (4C unreliable net, snapshots, many clients) (unreliable network)...... Passed -- time 4.1s #peers 5 #RPCs 2316 #Ops 269
--- PASS: TestSnapshotUnreliable4C (4.13s)
=== RUN TestSnapshotUnreliableRecover4C
Test: restarts, snapshots, many clients (4C unreliable net, restarts, snapshots, many clients) (unreliable network)...... Passed -- time 6.7s #peers 5 #RPCs 1709 #Ops 123
--- PASS: TestSnapshotUnreliableRecover4C (6.74s)
=== RUN TestSnapshotUnreliableRecoverConcurrentPartition4C
Test: restarts, partitions, snapshots, many clients (4C unreliable net, restarts, partitions, snapshots, many clients) (unreliable network)...... Passed -- time 15.5s #peers 5 #RPCs 2229 #Ops 83
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4C (15.54s)
=== RUN TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C
Test: restarts, partitions, snapshots, random keys, many clients (4C unreliable net, restarts, partitions, snapshots, random keys, many clients) (unreliable network)...... Passed -- time 21.4s #peers 7 #RPCs 7348 #Ops 270
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C (21.39s)
PASS
ok 6.5840/kvraft1 68.737s
===== 结束第 300 次测试 ===== (耗时: 69秒)===== 测试统计摘要 =====
总测试次数: 300
成功次数: 300
失败次数: 0
成功率: 100%
总耗时: 22437秒
平均每次测试耗时: 74秒
===== 测试结束 =====