当前位置: 首页 > news >正文

lab4

架构

架构图:

img

简单说, 我们要建立的KV数据库是位于raft层之上的, 或者说我们的KV数据库使用了raft库。客户端(就是代码中的clerk)调用应用层(server)的RPC,应用层收到RPC之后,会调用Start函数,Start函数会立即返回,但是这时,应用层不会返回消息给客户端,因为它还没有执行客户端请求,它也不知道这个请求是否会被Raftcommit。只有在某一时刻,对应于这个客户端请求的消息在applyCh channel中出现, 应用层才会执行这个请求,并返回响应给客户端。

操作流程

image-20250921162355816

这是一个基于 Raft一致性算法 的、复制的状态机 (Replicated State Machine) 系统,具体实现为一个键值存储 (Key/Value Store)。其核心思想是:多个服务器通过 Raft 协议来同步操作日志,从而保证所有服务器上的状态机(即您的键值存储数据)最终保持一致。

代码被组织成三个主要部分,对应图中的三个模块:

  1. client.go: 客户端库,供应用程序调用,发起 GetPut请求。
  2. server.go (KVServer): 键值存储服务器。它接收客户端的 RPC 请求,并将其提交给 Raft 集群。
  3. rsm.go: 复制的状态机 (Replicated State Machine) 层。这是连接你的业务逻辑(键值存储)和底层一致性模块(Raft)的核心桥梁。它负责将操作提交给 Raft、从 Raft 应用已提交的日志条目到状态机,以及管理快照。

图片中描述的三个流程清晰地展示了这些模块如何协作

结合代码仔细看一下操作流程图就能明白这个试验要做什么

个人理解:

  • 一个raft节点,rsm(状态机)和server 同时存在一个物理主机上, 它们之间存在函数相互调用的关系, 而raft节点之间以及client和server之间通过RPC通信,

  • rsm就是状态机, 但是实际的状态指的是server中的k/v数据, 从图中也可看出实际上命令的执行(状态的改变)发生在server端, 状态机只是作为raft和server的中间层, 流程如下:

    • server端注册的RPC函数Get/Put, 收到来自于client的get/put请求
    • server先把RPC的Args传给状态机, 状态机再把命令发给raft层,使raft层节点数据保持一致
    • raft层提交一个log后, 状态机通过监听applyCh去再次获取命令, 然后调用server端的DoOp(命令)去执行客户端请求, 随后把执行结果(DoOp返回值) 传给rsm的submit, 最后submit再把结果以返回值形式给server, server再把结果给client…
    • 说白了rsm就是一个传信的…, 个人感觉不是很有必要搞这个中间层
  • 再次声明, 如果你想通过一次测试很简单, 读一下要求, 几个小时就搞定了, 但是你想在不确定性的网络以及随时会crash掉的节点的条件下, 通过成百上千上万….次测试, 也就是实现一个基本不出错的分布式系统, 绝非易事

lab4A

踩坑实录

这个4A怪不得难度是moderate/hard, 代码量少,但坑不少…

  1. rsm通过start发送给raft的和从applyCh中取回的内容要保持一致; submit()和DoOp()的参数保持一致

  2. Op的id就用raft的log的Index就行

  3. 在从applyCh中收到消息后要进行一些判断, 因为可能因为网络分区, 节点宕机等原因, 会发生leader改变, 重复选举等情况, 只有真正提交log的leader才能回复客户端OK, 注意情况如下:

    • 从applyCh中收到消息意味着有log提交了, 可以去应用了, 但是这个被应用的, 不一定是之前发送的即command改变了, 改变原因可能是leader收到了log, 但是还没完成提议阶段, leader挂了, 那么在并发场景下新上任的leader, 可能会应用其他log, 旧leader未提交的log被覆盖了, 因此需要比较command

    • 需要比较term, term的改变,可能意味着leader的改变, 那么这个旧leader的未提交log可能会被覆盖,

      • 注意: 比较的当前term不能用currentTerm, _ := rsm.rf.GetState()获取, 因为当发生网络分区时, 如果旧leader在小分区中, 仍然为自己是leader, 也就是当前状态机对应的raft节点还认为自己是leader, 那你通过currentTerm, _ := rsm.rf.GetState()获取, 结果就是true…, 会通不过网络分区test

      • 在raft的applyCh中加一个字段, 在applier时返回当前raft节点的term

        if opEntry.term != applyMsg.Term || !reflect.DeepEqual(opEntry.req, op.Req) {}
        
    • 不要根据 isLeader去判断, 因为有可能这个leader是提交了log后才退位的

    • 清理全部的剩余操作(要不然前leader中未被committed的log可能会被应用并submit返回成功), 如下:

      if opEntry.term != applyMsg.Term  || !reflect.DeepEqual(opEntry.Command, applyMsg.Command) {// 当你在reader()里发现某个index被apply时,发现term或command不一致(即被覆盖),// 说明这个index及其之后的所有pendingOps都不可能被commitfor i, op := range rsm.opMap {if i >= applyMsg.CommandIndex {op.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}// close(opEntry.ch)delete(rsm.opMap, i)}}
      }
      
  4. 当我多次执行TestRestartSubmit4A 这个测试, 发现还是大概有几十分之一的概率通不过测试, 报错是因为在raft层当kill()的时候关闭了applyCh, 而此时有可能applier中还在把提交的命令发给applyCh, 刚开始我只是在发给applyCh前加了判断:

    if rf.killed() {return
    }
    case applyCh <- *applyMsg
    

    但是因为没锁, 有可能在killed检查和applyCh <- *applyMsg 之间进行了关闭了applyCh, 为了避免向已关闭的applyCh发消息, 我使用了sync.waitGroup在kill()中阻塞close(rf.applyCh),直到applier中发完:

    func (rf *Raft) Kill() {atomic.StoreInt32(&rf.dead, 1) // 原子操作设置终止标志// Your code here, if desired.rf.applyCond.Broadcast() // 唤醒所有等待的goroutine// close(rf.done) // 关闭done通道,通知所有监听的goroutine退出rf.wg.Wait() // 等待所有goroutine退出close(rf.applyCh)
    }func (rf *Raft) applier(applyCh chan raftapi.ApplyMsg) {rf.wg.Add(1)defer rf.wg.Done()for !rf.killed() {rf.mu.Lock()// 等待直到有新的日志可以应用for rf.commitIndex <= rf.lastapplied || rf.snapShotLock {rf.applyCond.Wait() // 使用条件变量等待// 检查是否已被杀死if rf.killed() {rf.mu.Unlock()return}}// 检查是否已被杀死if rf.killed() {rf.mu.Unlock()return}applyMsgs := make([]*raftapi.ApplyMsg, 0)// 应用所有新的已提交日志tmpApplied := rf.lastappliedfor rf.commitIndex > tmpApplied {// rf.applyLock = truetmpApplied++// // DPrintf("applier---节点server %v, 在term: %v, 欲将日志添加到applyMsgs, 最新应用的日志索引为: %v\n", rf.me, rf.currentTerm, tmpApplied)if tmpApplied <= rf.globalLastIncludedIndex {// 该日志已经被快照覆盖,跳过continue}// 应用日志到状态机applyMsg := &raftapi.ApplyMsg{CommandValid: true,Command:      rf.log[rf.toLocalIndex(tmpApplied)].Command,CommandIndex: tmpApplied,}applyMsgs = append(applyMsgs, applyMsg)}rf.mu.Unlock()// 发送 ApplyMsg 到 applyCh 通道for _, applyMsg := range applyMsgs {// if rf.killed() {// 	return// }rf.mu.Lock()if applyMsg.CommandIndex != rf.lastapplied+1 || rf.snapShotLock {rf.mu.Unlock()continue}// DPrintf("applier---节点server %v, 在term: %v, 欲将日志应用到状态机, 最新应用的日志索引为: %v,  日志内容为: %v,rf.lastapplied更新到:%v, rf.commitIndex更新到:%v\n", rf.me, rf.currentTerm, applyMsg.CommandIndex, rf.log[rf.toLocalIndex(applyMsg.CommandIndex)].Command, rf.lastapplied, rf.commitIndex)rf.mu.Unlock()if rf.killed() {return}// // 使用 select 避免向已关闭的通道发送数据// select {// case applyCh <- *applyMsg:// 	// 发送成功// 	rf.mu.Lock()// 	if applyMsg.CommandIndex == rf.lastapplied+1 && !rf.snapShotLock {// 		rf.lastapplied = applyMsg.CommandIndex// 	}// 	rf.mu.Unlock()// case <-rf.done: // 使用 done 通道检测终止// 	return// }applyCh <- *applyMsgrf.mu.Lock()if applyMsg.CommandIndex != rf.lastapplied+1 || rf.snapShotLock {rf.mu.Unlock()continue}rf.lastapplied = applyMsg.CommandIndex// DPrintf("applier---节点server %v, 在term: %v, 已将日志应用到状态机, 最新应用的日志索引为: %v,  日志内容为: %v,rf.lastapplied更新到:%v, rf.commitIndex更新到:%v\n", rf.me, rf.currentTerm, applyMsg.CommandIndex, rf.log[rf.toLocalIndex(applyMsg.CommandIndex)].Command, rf.lastapplied, rf.commitIndex)rf.mu.Unlock()}}
    }
    

上代码

不再多言上代码

package rsmimport ("reflect""sync""time""6.5840/kvsrv1/rpc""6.5840/labrpc"raft "6.5840/raft1""6.5840/raftapi"tester "6.5840/tester1"
)var useRaftStateMachine bool// 传递操作命令
type Op struct {// 在这里定义你的内容。字段名必须以大写字母开头, 否则RPC将无法正常工作。// Op{Me: rsm.me, Id: id, Req: req}Me  intReq any
}// 一个想要自我复制的服务器(即,../server.go)调用MakeRSM,必须实现StateMachine接口。
// 该接口允许rsm包与服务器进行交互以执行服务器特定的操作:
// 服务器必须实现DoOp以执行一个操作(例如,Get或Put请求),以及Snapshot/Restore用于快照和恢复服务器的状态。
type StateMachine interface {DoOp(any) anySnapshot() []byteRestore([]byte)
}type OpResult struct {err    rpc.Errresult any
}type OpEntry struct {term intreq  anych   chan OpResult
}type RSM struct {mu           sync.Mutexme           intrf           raftapi.RaftapplyCh      chan raftapi.ApplyMsgmaxraftstate int // 如果日志增长到这么大,快照sm           StateMachine// 您的定义在这里。opMap map[int]*OpEntry// 标记 applyCh 是否已关闭// applyClosed bool
}// servers[] 包含将通过 Raft 协作以形成容错键/值服务的服务器集合的端口。
// me 是当前服务器在 servers[] 中的索引。
// k/v 服务器应该通过底层的 Raft 实现存储快照,该实现应该调用 persister.SaveStateAndSnapshot() 原子性地保存 Raft 状态和快照。
// 当 Raft 的保存状态超过 maxraftstate 字节时,RSM 应该进行快照,以允许 Raft 垃圾回收其日志。
// 如果 maxraftstate 是 -1,则不需要快照。
// MakeRSM() 必须快速返回,因此它应该为任何长期运行的工作启动 goroutine。
func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *tester.Persister, maxraftstate int, sm StateMachine) *RSM {rsm := &RSM{me:           me,maxraftstate: maxraftstate,applyCh:      make(chan raftapi.ApplyMsg),sm:           sm,opMap:        make(map[int]*OpEntry),// applyClosed:  false,}if !useRaftStateMachine {rsm.rf = raft.Make(servers, me, persister, rsm.applyCh)}go rsm.applyListen()return rsm
}// 在 rsm.go的 Submit方法中,将请求包装成 Op,调用 rf.Start(op)
func (rsm *RSM) Raft() raftapi.Raft {return rsm.rf
}func (rsm *RSM) applyListen() {for {applyMsg, ok := <-rsm.applyCh // 从通道中获取数据, 当通道关闭且无数据时自动跳出循环if !ok {// // // // log.Printf("RSM 通道关闭")rsm.mu.Lock()// rsm.applyClosed = true // 标记 applyCh 已关闭// 说明applyCh已经关闭,当applyCh关闭时,reader只是退出循环,但没有通知正在等待的Submit操作。这导致Submit操作永远等待,造成超时// 通知所有等待的操作,让他们返回ErrWrongLeaderfor _, opEntry := range rsm.opMap {opEntry.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}}// log.Printf("***********RSM--applyListen %d: applyCh关闭, 通知所有等待的Submit操作返回ErrWrongLeader, 清空opMap\n", rsm.me)rsm.opMap = make(map[int]*OpEntry)rsm.mu.Unlock()break}if applyMsg.CommandValid {// 从applyCh获取Op// 安全地类型断言op, ok := applyMsg.Command.(Op)if !ok {continue}// 调用server端DoOp()执行Op, 所有服务器都需要应用所有已提交的操作​​,无论操作是否由本服务器提交applyResult := rsm.sm.DoOp(op.Req)// log.Printf("***********RSM--applyListen %d: 执行完 Op: %v, 结果: %v\n", rsm.me, op, applyResult)rsm.mu.Lock()// 存在这个Opif opEntry, ok := rsm.opMap[applyMsg.CommandIndex]; ok {// 获取当前raft节点的状态// currentTerm, _ := rsm.rf.GetState()if opEntry.term != applyMsg.Term || !reflect.DeepEqual(opEntry.req, op.Req) {// opEntry.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}// delete(rsm.opMap, applyMsg.CommandIndex)// for i, entry := range rsm.opMap {// 	if i > applyMsg.CommandIndex && entry.term == opEntry.term {// 		entry.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}// 		delete(rsm.opMap, i)// 	}// }for i, op := range rsm.opMap {if i >= applyMsg.CommandIndex {op.ch <- OpResult{err: rpc.ErrWrongLeader, result: nil}delete(rsm.opMap, i)}}} else {// 匹配成功,发送结果// log.Printf("---RSM--applyListen %d: 操作匹配成功, 向submit发送前, index=%d, term=%d", rsm.me, applyMsg.CommandIndex, opEntry.term)// 发送结果并关闭通道opEntry.ch <- OpResult{err: rpc.OK, result: applyResult}// log.Printf("---RSM--applyListen %d: 向submit发送成功, index=%d, term=%d", rsm.me, applyMsg.CommandIndex, opEntry.term)delete(rsm.opMap, applyMsg.CommandIndex)}}rsm.mu.Unlock()// // 检查是否需要创建快照// if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() > rsm.maxraftstate {// 	snapshot := rsm.sm.Snapshot()// 	rsm.rf.Snapshot(applyMsg.CommandIndex, snapshot)// }}//  else if applyMsg.SnapshotValid {// 	// 处理快照// 	rsm.sm.Restore(applyMsg.Snapshot)// }}
}// 在 kvraft1/server.go中,当服务器收到客户端的 Put 或 Get RPC 请求时,它会调用 rsm.Submit()来将操作提交给 Raft 共识层
// 向 Raft 提交一个命令,并等待其被提交。如果客户端应该寻找新的领导者并重试,它应该返回 ErrWrongLeader.
func (rsm *RSM) Submit(req any) (rpc.Err, any) {// Submit创建一个操作结构来通过Raft运行命令;// 例如:op := Op{Me: rsm.me, Id: id, Req: req},其中req是Submit的参数,id是op的唯一标识符。// your code here// rsm.mu.Lock()// if rsm.applyClosed {// 	log.Printf("***********RSM %d: submit操作--start完--发现 applyCh 已关闭, 返回 ErrWrongLeader\n", rsm.me)// 	rsm.mu.Unlock()// 	return rpc.ErrWrongLeader, nil// }// rsm.mu.Unlock()op := Op{Me:  rsm.me,Req: req,}// log.Printf("***********RSM %d: submit操作把Op: %v 通过start传给raft层\n", rsm.me, op)index, term, isLeader := rsm.rf.Start(op)if !isLeader {return rpc.ErrWrongLeader, nil}// 向opResults中添加Oprsm.mu.Lock()resultCh := make(chan OpResult, 1)newEntry := &OpEntry{term: term,req:  req,ch:   resultCh,}// 再次检查 applyCh 是否已关闭(可能在 Start 调用期间关闭)// if rsm.applyClosed {// 	log.Printf("***********RSM %d: submit操作--start完--发现 applyCh 已关闭, 返回 ErrWrongLeader\n", rsm.me)// 	rsm.mu.Unlock()// 	return rpc.ErrWrongLeader, nil// }rsm.opMap[index] = newEntryrsm.mu.Unlock()// 监听, 获取结果select {case result := <-resultCh:return result.err, result.resultcase <-time.After(3 * time.Second): // 超时// 清理待处理操作rsm.mu.Lock()// 检查通道是否仍然有效if entry, exists := rsm.opMap[index]; exists && entry.ch == resultCh {// close(resultCh) // 关闭通道防止后续发送delete(rsm.opMap, index)// log.Printf("***********RSM %d: submit操作等待结果超时, index=%d, term=%d, 清理该操作\n", rsm.me, opID, term)}rsm.mu.Unlock()return rpc.ErrWrongLeader, nil}}

300次测试

老样子测试300次

===== 开始第 1 次测试 =====
=== RUN   TestBasic4A
Test RSM basic (reliable network)...... Passed --  time  1.9s #peers 3 #RPCs    44 #Ops    0
--- PASS: TestBasic4A (1.90s)
=== RUN   TestConcurrent4A
Test concurrent submit (reliable network)...... Passed --  time  0.8s #peers 3 #RPCs   106 #Ops    0
--- PASS: TestConcurrent4A (0.78s)
=== RUN   TestLeaderFailure4A
Test Leader Failure (reliable network)...... Passed --  time  1.8s #peers 3 #RPCs    38 #Ops    0
--- PASS: TestLeaderFailure4A (1.77s)
=== RUN   TestLeaderPartition4A
Test Leader Partition (reliable network)...... Passed --  time  2.7s #peers 3 #RPCs   276 #Ops    0
--- PASS: TestLeaderPartition4A (2.73s)
=== RUN   TestRestartReplay4A
Test Restart (reliable network)...... Passed --  time 13.6s #peers 3 #RPCs   432 #Ops    0
--- PASS: TestRestartReplay4A (13.61s)
=== RUN   TestShutdown4A
Test Shutdown (reliable network)...... Passed --  time 10.0s #peers 3 #RPCs     0 #Ops    0
--- PASS: TestShutdown4A (10.01s)
=== RUN   TestRestartSubmit4A
Test Restart and submit (reliable network)...... Passed --  time 14.5s #peers 3 #RPCs   638 #Ops    0
--- PASS: TestRestartSubmit4A (14.55s)
PASS
ok  	6.5840/kvraft1/rsm	45.353s
===== 结束第 1 次测试 ===== (耗时: 46秒)..............===== 开始第 300 次测试 =====
=== RUN   TestBasic4A
Test RSM basic (reliable network)...... Passed --  time  1.8s #peers 3 #RPCs    44 #Ops    0
--- PASS: TestBasic4A (1.80s)
=== RUN   TestConcurrent4A
Test concurrent submit (reliable network)...... Passed --  time  0.7s #peers 3 #RPCs   104 #Ops    0
--- PASS: TestConcurrent4A (0.73s)
=== RUN   TestLeaderFailure4A
Test Leader Failure (reliable network)...... Passed --  time  2.1s #peers 3 #RPCs    40 #Ops    0
--- PASS: TestLeaderFailure4A (2.13s)
=== RUN   TestLeaderPartition4A
Test Leader Partition (reliable network)...... Passed --  time  2.7s #peers 3 #RPCs   272 #Ops    0
--- PASS: TestLeaderPartition4A (2.72s)
=== RUN   TestRestartReplay4A
Test Restart (reliable network)...... Passed --  time 13.5s #peers 3 #RPCs   430 #Ops    0
--- PASS: TestRestartReplay4A (13.52s)
=== RUN   TestShutdown4A
Test Shutdown (reliable network)...... Passed --  time 10.0s #peers 3 #RPCs     0 #Ops    0
--- PASS: TestShutdown4A (10.04s)
=== RUN   TestRestartSubmit4A
Test Restart and submit (reliable network)...... Passed --  time 24.5s #peers 3 #RPCs   638 #Ops    0
--- PASS: TestRestartSubmit4A (24.45s)
PASS
ok  	6.5840/kvraft1/rsm	55.394s
===== 结束第 300 次测试 ===== (耗时: 55秒)===== 测试统计摘要 =====
总测试次数: 300
成功次数: 293
失败次数: 7
成功率: 97%
总耗时: 15583秒
平均每次测试耗时: 51秒
===== 测试结束 =====

lab4B

个人实现流程图

比官方的详细一点, 仅供参考

image-20250929004556400

踩坑实录

客户端和服务器的代码仿照着lab2写, 具体的代码逻辑我不再多说, 这里只提遇到的坑

  1. 通过打印日志发现,传给DoOp的req有的时候是*rpc.PutArgs(指针类型),有时是rpc.PutArgs(值类型)。这个问题需要去修改labgob注册的类型

    image-20250928235548323

  2. 超时时间不要设置太大, 有个速度测试

  3. 对于reply.Err == rpc.ErrWrongLeader 或者 RPC没有回复, 客户端做法都是

    ck.mu.Lock()
    ck.leader = (leader + 1) % len(ck.servers)
    ck.mu.Unlock()
    isFirstRequest = false
    
  4. 这个lab看似很简单,但是有个大坑, 那就是rpc.ErrMaybe这个玩意很容易误导人, 让你以为重复执行成不成功无所谓返回它就好了(起码我刚开始是这样认为的, 直到我看见了报错信息里说: history不满足线性一致性, 提醒了我…, 这点你要是没注意到,可能就会像我一样多次test有时pass, 有时FAIL, 陷入懵逼之中…)

    • 不要以为在Server端只用version区分重复命令就行了, 你还得保证线性一致性, 要不然对于重复命令: 第一次返回了OK(该Server crash), 第二次返回了ErrMaybe, 因为网络原因重发相同的命令(客户端换了通信的leader), 最终对于client来说相同的命令却得到了不一样的执行结果… ,这不满足线性一致性
    • 也就是在Server端对于命令, 你要记录其id和执行结果, 当遇到重复命令,把上次结果给client就行

上代码

client.go

package kvraftimport ("math/rand""sync""6.5840/kvsrv1/rpc"kvtest "6.5840/kvtest1"tester "6.5840/tester1"
)type Clerk struct {clnt    *tester.Clntservers []string// You will have to modify this struct.// leaderIdx intmu        sync.Mutexleader    intclientId  int64requestId int64
}func MakeClerk(clnt *tester.Clnt, servers []string) kvtest.IKVClerk {// ck := &Clerk{clnt: clnt, servers: servers, leaderIdx: 0}ck := &Clerk{clnt: clnt, servers: servers}// You'll have to add code here.// 客户端id用随机值ck.clientId = int64(rand.Int63())ck.requestId = 0ck.leader = 0// log.Printf("***********MakeClerk: 创建 Clerk, clientId: %d\n", ck.clientId)return ck
}// Get 方法获取一个键的当前值和版本。如果该键不存在,则返回 ErrNoKey。
// 它会在面对其他所有错误时不断尝试。
// 您可以使用以下代码向服务器 i 发送 RPC:ok := ck.clnt.Call(ck.servers[i], "KVServer.Get", &args, &reply)
// args 和 reply 的类型(包括它们是否为指针)必须与 RPC 处理程序函数的参数声明的类型匹配。此外,reply 必须作为指针传递。
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {// You will have to modify this function.args := rpc.GetArgs{Key: key}for {ck.mu.Lock()leader := ck.leaderck.mu.Unlock()// 先尝试leaderreply := rpc.GetReply{}ok := ck.clnt.Call(ck.servers[leader], "KVServer.Get", &args, &reply)if ok && reply.Err != rpc.ErrWrongLeader {if reply.Err != rpc.OK {return "", 0, reply.Err}return reply.Value, reply.Version, rpc.OK}ck.mu.Lock()ck.leader = (leader + 1) % len(ck.servers)ck.mu.Unlock()//如果RPC失败,等待9ms后重试// time.Sleep(9 * time.Millisecond)}
}// 仅在请求中的版本与服务器上的键的版本匹配时,才将更新的键和值放入。
// 如果版本号不匹配,服务器应返回ErrVersion。
// 如果Put在第一次RPC中收到ErrVersion,则Put应返回ErrVersion,因为Put肯定没有在服务器上执行。
// 如果服务器在重发RPC上返回ErrVersion,则Put必须向应用程序返回ErrMaybe,
// 因为其早期的RPC可能已经被服务器成功处理但响应丢失,并且Clerk不知道Put是否执行。
// 您可以使用如下代码向服务器i发送RPC:ok := ck.clnt.Call(ck.servers[i], "KVServer.Put", &args, &reply)
// args和reply的类型(包括是否为指针)必须与RPC处理函数参数的声明类型匹配。此外,reply必须作为指针传递。
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {// You will have to modify this function.args := rpc.PutArgs{Key: key, Value: value, Version: version}// 是否是第一次请求isFirstRequest := trueck.mu.Lock()ck.requestId++args.ClientId = ck.clientIdargs.ReqId = ck.requestIdck.mu.Unlock()// // //// log.Printf("***********Clerk 向 server: %v 发起Put请求: <key:%s, value:%s, version:%d>\n", ck.leader, args.Key, args.Value, args.Version)for {ck.mu.Lock()leader := ck.leaderck.mu.Unlock()reply := rpc.PutReply{}// log.Printf("***********Clerk 向 server: %v 发起Put请求: <key:%s, value:%s, version:%d>\n", ck.leader, args.Key, args.Value, args.Version)ok := ck.clnt.Call(ck.servers[leader], "KVServer.Put", &args, &reply)if ok && reply.Err != rpc.ErrWrongLeader {if reply.Err == rpc.OK {// log.Printf("***Clerk 从server: %v 收到Put结果: <key:%s, value:%s, version:%d>\n", ck.leader, args.Key, args.Value, args.Version)return rpc.OK}if reply.Err == rpc.ErrVersion && isFirstRequest {return rpc.ErrVersion}if reply.Err == rpc.ErrVersion && !isFirstRequest {return rpc.ErrMaybe}return reply.Err}ck.mu.Lock()ck.leader = (leader + 1) % len(ck.servers)ck.mu.Unlock()isFirstRequest = false//如果RPC失败,等待9ms后重试// time.Sleep(9 * time.Millisecond)}
}

server.go

package kvraftimport ("sync""sync/atomic""6.5840/kvraft1/rsm""6.5840/kvsrv1/rpc""6.5840/labgob""6.5840/labrpc"tester "6.5840/tester1"
)type ValueVersion struct {Value   stringVersion rpc.Tversion
}type ClientPutResult struct {ReqId  int64 // Unique ID for the clientResult *rpc.PutReply
}type KVServer struct {me   intdead int32 // set by Kill()rsm  *rsm.RSM// Your definitions here.mu               sync.MutexkvMap            map[string]*ValueVersionclientPutResults map[int64]ClientPutResult // clientId -> ClientPutReq
}func (kv *KVServer) DoGet(args *rpc.GetArgs) (reply *rpc.GetReply) {kv.mu.Lock()defer kv.mu.Unlock()vv, ok := kv.kvMap[args.Key]reply = &rpc.GetReply{} // 创建返回值if !ok {reply.Err = rpc.ErrNoKey// log.Printf("---server: %v---DoGet---reply.Err:%v\n", kv.me, reply.Err)return reply}reply.Value = vv.Valuereply.Version = vv.Versionreply.Err = rpc.OK// log.Printf("---server: %v---DoGet---成功,reply.Err:%v, 更新key:%v, 更新value:%v, 更新version:%v\n", kv.me, reply.Err, args.Key, vv.Value, vv.Version)return reply
}func (kv *KVServer) DoPut(args *rpc.PutArgs) (reply *rpc.PutReply) {kv.mu.Lock()defer kv.mu.Unlock()reply = &rpc.PutReply{} // 创建返回值// // // log.Printf("---server: %v---DoPut---收到请求: <key:%s, value:%s, version:%d>\n", kv.me, args.Key, args.Value, args.Version)// // // log.Printf("---server: %v---DoPut---当前kvMap长度: %d\n", kv.me, len(kv.kvMap))// 打印当前kvMap内容// for key, value := range kv.kvMap {// 	// // // log.Printf("---server: %v---DoPut---应用Put前kvMap内容: key:%s, value:%s, version:%d\n", kv.me, key, value.Value, value.Version)// }// key不存在if prev, ok := kv.clientPutResults[args.ClientId]; ok {if args.ReqId <= prev.ReqId {kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId:  args.ReqId,Result: prev.Result,}return prev.Result}}if _, ok := kv.kvMap[args.Key]; !ok {if args.Version != 0 {reply.Err = rpc.ErrNoKey// reply.Err = rpc.ErrVersion// log.Printf("---server: %v---DoPut---reply.Err:%v\n", kv.me, reply.Err)// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId:  args.ReqId,Result: reply,}return reply}// 创建新条目kv.kvMap[args.Key] = &ValueVersion{Value:   args.Value,Version: 1, // 新键版本从1开始}reply.Err = rpc.OK// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId:  args.ReqId,Result: reply,}// log.Printf("---server: %v---DoPut---成功,reply.Err:%v, 更新key:%v, 更新value:%v, 更新version:%v\n", kv.me, reply.Err, args.Key, args.Value, kv.kvMap[args.Key].Version)return reply}if args.Version != kv.kvMap[args.Key].Version {reply.Err = rpc.ErrVersion// // // log.Printf("---server: %v---DoPut---reply.Err:%v\n", kv.me, reply.Err)// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId:  args.ReqId,Result: reply,}return reply}kv.kvMap[args.Key].Value = args.Valuekv.kvMap[args.Key].Version++reply.Err = rpc.OK// 打印当前kvMap内容// for key, value := range kv.kvMap {// 	// // // log.Printf("---server: %v---DoPut---应用Put后kvMap内容: key:%s, value:%s, version:%d\n", kv.me, key, value.Value, value.Version)// }// log.Printf("---server: %v---DoPut---成功,reply.Err:%v, 更新key:%v, 更新value:%v, 更新version:%v\n", kv.me, reply.Err, args.Key, args.Value, kv.kvMap[args.Key].Version)// 记录该客户端的最新请求结果kv.clientPutResults[args.ClientId] = ClientPutResult{ReqId:  args.ReqId,Result: reply,}return reply
}// 要将req强制转换为正确的类型,可以查看下面的Go语言类型选择或类型断言:// https://go.dev/tour/methods/16
// https://go.dev/tour/methods/15
func (kv *KVServer) DoOp(req any) any {// Your code hereswitch req.(type) {case *rpc.GetArgs:// // // log.Printf("---server: %v---DoOp---收到rpc.GetArgs请求\n", kv.me)return kv.DoGet(req.(*rpc.GetArgs))case *rpc.PutArgs:// // // log.Printf("---server: %v---DoOp---收到rpc.PutArgs请求\n", kv.me)return kv.DoPut(req.(*rpc.PutArgs))}return nil
}func (kv *KVServer) Snapshot() []byte {// Your code herereturn nil
}func (kv *KVServer) Restore(data []byte) {// Your code here
}func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {// 在这里编写您的代码。使用 kv.rsm.Submit() 提交参数。您可以使用 Go 的类型转换将 Submit() 的返回值转换为 GetReply:rep.(rpc.GetReply)err, result := kv.rsm.Submit(args)if err == rpc.ErrWrongLeader || err == rpc.ErrTimeout {reply.Err = rpc.ErrWrongLeaderreturn}// 类型断言newReply := result.(*rpc.GetReply)reply.Value = newReply.Valuereply.Version = newReply.Versionreply.Err = newReply.Err
}func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {// 在这里编写您的代码。使用 kv.rsm.Submit() 提交参数。您可以使用 Go 的类型转换将 Submit() 的任何返回值转换为 PutReply:rep.(rpc.PutReply)err, result := kv.rsm.Submit(args)if err == rpc.ErrWrongLeader {reply.Err = errreturn}// 类型断言newReply := result.(*rpc.PutReply)reply.Err = newReply.Err
}// 当KVServer实例不再需要时,测试者调用Kill()。
// 为了方便您,我们提供了代码来设置rf.dead(无需加锁),
// 以及一个killed()方法用于在长时间运行的循环中测试rf.dead。
// 您也可以向Kill()添加自己的代码。您不需要对此做任何事情,但抑制被Kill()实例的调试输出(例如)可能会很方便。
func (kv *KVServer) Kill() {atomic.StoreInt32(&kv.dead, 1)// Your code here, if desired.
}func (kv *KVServer) killed() bool {z := atomic.LoadInt32(&kv.dead)return z == 1
}// StartKVServer() 和 MakeRSM() 必须快速返回,因此它们应该为任何长期运行的工作启动 goroutine。
func StartKVServer(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {// 在您想要的结构上调用 labgob.Register,以使 Go 的 RPC 库进行序列化/反序列化。labgob.Register(rsm.Op{})labgob.Register(&rpc.PutArgs{})labgob.Register(&rpc.GetArgs{})labgob.Register(ClientPutResult{})labgob.Register(map[int64]ClientPutResult{})kv := &KVServer{me:    me,kvMap: make(map[string]*ValueVersion),}kv.clientPutResults = make(map[int64]ClientPutResult)kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)// You may need initialization code here.return []tester.IService{kv, kv.rsm.Raft()}
}

300次测试

===== 开始第 1 次测试 =====
=== RUN   TestBasic4B
Test: one client (4B basic) (reliable network)...... Passed --  time  3.1s #peers 5 #RPCs  9550 #Ops  633
--- PASS: TestBasic4B (3.06s)
=== RUN   TestSpeed4B
Test: one client (4B speed) (reliable network)...... Passed --  time  4.9s #peers 3 #RPCs  8763 #Ops    0
--- PASS: TestSpeed4B (4.86s)
=== RUN   TestConcurrent4B
Test: many clients (4B many clients) (reliable network)...... Passed --  time  5.4s #peers 5 #RPCs 11190 #Ops  615
--- PASS: TestConcurrent4B (5.40s)
=== RUN   TestUnreliable4B
Test: many clients (4B many clients) (unreliable network)...... Passed --  time  3.8s #peers 5 #RPCs  2367 #Ops  291
--- PASS: TestUnreliable4B (3.80s)
=== RUN   TestOnePartition4B
Test: one client (4B progress in majority) (unreliable network)...... Passed --  time  1.5s #peers 5 #RPCs   140 #Ops    3
Test: no progress in minority (4B) (unreliable network)...... Passed --  time  1.3s #peers 5 #RPCs   141 #Ops    3
Test: completion after heal (4B) (unreliable network)...... Passed --  time  2.2s #peers 5 #RPCs   131 #Ops    4
--- PASS: TestOnePartition4B (4.99s)
=== RUN   TestManyPartitionsOneClient4B
Test: partitions, one client (4B partitions, one client) (reliable network)...... Passed --  time 14.7s #peers 5 #RPCs  9002 #Ops  569
--- PASS: TestManyPartitionsOneClient4B (14.71s)
=== RUN   TestManyPartitionsManyClients4B
Test: partitions, many clients (4B partitions, many clients (4B)) (reliable network)...... Passed --  time 11.5s #peers 5 #RPCs 12287 #Ops  639
--- PASS: TestManyPartitionsManyClients4B (11.50s)
=== RUN   TestPersistOneClient4B
Test: restarts, one client (4B restarts, one client 4B ) (reliable network)...... Passed --  time  6.0s #peers 5 #RPCs 19230 #Ops  367
--- PASS: TestPersistOneClient4B (6.05s)
=== RUN   TestPersistConcurrent4B
Test: restarts, many clients (4B restarts, many clients) (reliable network)...... Passed --  time  9.2s #peers 5 #RPCs 28350 #Ops  415
--- PASS: TestPersistConcurrent4B (9.23s)
=== RUN   TestPersistConcurrentUnreliable4B
Test: restarts, many clients (4B restarts, many clients ) (unreliable network)...... Passed --  time  6.7s #peers 5 #RPCs  1779 #Ops  135
--- PASS: TestPersistConcurrentUnreliable4B (6.75s)
=== RUN   TestPersistPartition4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (reliable network)...... Passed --  time 14.7s #peers 5 #RPCs 27239 #Ops  439
--- PASS: TestPersistPartition4B (14.70s)
=== RUN   TestPersistPartitionUnreliable4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (unreliable network)...... Passed --  time 12.4s #peers 5 #RPCs  1932 #Ops  103
--- PASS: TestPersistPartitionUnreliable4B (12.42s)
=== RUN   TestPersistPartitionUnreliableLinearizable4B
Test: restarts, partitions, random keys, many clients (4B restarts, partitions, random keys, many clients) (unreliable network)...... Passed --  time 12.3s #peers 7 #RPCs  5759 #Ops  226
--- PASS: TestPersistPartitionUnreliableLinearizable4B (12.33s)
PASS
ok  	6.5840/kvraft1	109.799s
===== 结束第 1 次测试 ===== (耗时: 111秒).......................................===== 开始第 300 次测试 =====
=== RUN   TestBasic4B
Test: one client (4B basic) (reliable network)...... Passed --  time  3.0s #peers 5 #RPCs 10009 #Ops  553
--- PASS: TestBasic4B (3.05s)
=== RUN   TestSpeed4B
Test: one client (4B speed) (reliable network)...... Passed --  time  7.5s #peers 3 #RPCs  9589 #Ops    0
--- PASS: TestSpeed4B (7.48s)
=== RUN   TestConcurrent4B
Test: many clients (4B many clients) (reliable network)...... Passed --  time  3.3s #peers 5 #RPCs 10824 #Ops  639
--- PASS: TestConcurrent4B (3.29s)
=== RUN   TestUnreliable4B
Test: many clients (4B many clients) (unreliable network)...... Passed --  time  3.9s #peers 5 #RPCs  2232 #Ops  267
--- PASS: TestUnreliable4B (3.90s)
=== RUN   TestOnePartition4B
Test: one client (4B progress in majority) (unreliable network)...... Passed --  time  2.0s #peers 5 #RPCs   191 #Ops    3
Test: no progress in minority (4B) (unreliable network)...... Passed --  time  1.1s #peers 5 #RPCs   117 #Ops    3
Test: completion after heal (4B) (unreliable network)...... Passed --  time  1.1s #peers 5 #RPCs    93 #Ops    4
--- PASS: TestOnePartition4B (4.27s)
=== RUN   TestManyPartitionsOneClient4B
Test: partitions, one client (4B partitions, one client) (reliable network)...... Passed --  time  9.3s #peers 5 #RPCs  9025 #Ops  581
--- PASS: TestManyPartitionsOneClient4B (9.28s)
=== RUN   TestManyPartitionsManyClients4B
Test: partitions, many clients (4B partitions, many clients (4B)) (reliable network)...... Passed --  time 27.2s #peers 5 #RPCs 12275 #Ops  633
--- PASS: TestManyPartitionsManyClients4B (27.25s)
=== RUN   TestPersistOneClient4B
Test: restarts, one client (4B restarts, one client 4B ) (reliable network)...... Passed --  time  6.0s #peers 5 #RPCs 20870 #Ops  341
--- PASS: TestPersistOneClient4B (6.04s)
=== RUN   TestPersistConcurrent4B
Test: restarts, many clients (4B restarts, many clients) (reliable network)...... Passed --  time  6.3s #peers 5 #RPCs 27714 #Ops  453
--- PASS: TestPersistConcurrent4B (6.29s)
=== RUN   TestPersistConcurrentUnreliable4B
Test: restarts, many clients (4B restarts, many clients ) (unreliable network)...... Passed --  time  7.1s #peers 5 #RPCs  1972 #Ops  181
--- PASS: TestPersistConcurrentUnreliable4B (7.11s)
=== RUN   TestPersistPartition4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (reliable network)...... Passed --  time 26.7s #peers 5 #RPCs 23099 #Ops  355
--- PASS: TestPersistPartition4B (26.65s)
=== RUN   TestPersistPartitionUnreliable4B
Test: restarts, partitions, many clients (4B restarts, partitions, many clients) (unreliable network)...... Passed --  time 12.1s #peers 5 #RPCs  1928 #Ops  129
--- PASS: TestPersistPartitionUnreliable4B (12.15s)
=== RUN   TestPersistPartitionUnreliableLinearizable4B
Test: restarts, partitions, random keys, many clients (4B restarts, partitions, random keys, many clients) (unreliable network)...... Passed --  time 12.4s #peers 7 #RPCs  5829 #Ops  246
--- PASS: TestPersistPartitionUnreliableLinearizable4B (12.44s)
PASS
ok  	6.5840/kvraft1	129.198s
===== 结束第 300 次测试 ===== (耗时: 129秒)===== 测试统计摘要 =====
总测试次数: 300
成功次数: 300
失败次数: 0
成功率: 100%
总耗时: 35445秒
平均每次测试耗时: 118秒
===== 测试结束 =====

lab4C

这个很简单, 按流程图写就行了

实现Server的snaopshot和restore

  • snapshot是对Server的kv数据做快照

  • restore是在raft层收到快照, 在Server中恢复数据

// 持久化一些东西
func (kv *KVServer) Snapshot() []byte {// Your code herekv.mu.Lock()defer kv.mu.Unlock()w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(kv.kvMap)// e.Encode(kv.clientPutResults)// // // log.Printf("---server: %v---Snapshot---快照长度: %d\n", kv.me, len(data))return w.Bytes()
}// 从快照恢复状态
func (kv *KVServer) Restore(data []byte) {// Your code hereif data == nil || len(data) < 1 { // bootstrap without any state?return}r := bytes.NewBuffer(data)d := labgob.NewDecoder(r)var kvMap map[string]*ValueVersion// var clientPutResults map[int64]ClientPutResult// if d.Decode(&kvMap) != nil || d.Decode(&clientPutResults) != nil {if d.Decode(&kvMap) != nil {// 解码错误log.Fatal("Failed to decode server persisted state")} else {kv.mu.Lock()kv.kvMap = kvMap// kv.clientPutResults = clientPutResultskv.mu.Unlock()// // // log.Printf("---server: %v---Restore---快照解码成功\n", kv.me)}
}

调用Snapshot和Restore的时机

  • 当snapshot的大小超过给定大小maxraftstate时,要去执行snapshot

  • lab4C的snapshot是对Server的kv数据做快照(raft的snapshot是按照状态机要求截断log, 别搞混了), 因此判断时机应该在DoOp后(log应用了之后)

    // 检查是否需要创建快照if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() > rsm.maxraftstate {snapshot := rsm.sm.Snapshot()rsm.rf.Snapshot(applyMsg.CommandIndex, snapshot)}
    
  • 而restore的应用时机:

    • 当RSM重启后发现snapshot大小不为0

      snapShot := persister.ReadSnapshot()if len(snapShot) > 0 {// 如果有快照,交给server恢复状态rsm.sm.Restore(snapShot)}
      
    • reader中接收到快照的时候就调用restore

      if applyMsg.SnapshotValid {// 处理快照rsm.sm.Restore(applyMsg.Snapshot)
      }
      

BUG修复

测试了300次,发现有十几次出错, 出错原因是raft 接收快照时时重复解锁, 还有向RSM发快照时忘记用sync.WaitGroup了, 原代码如下:

// 接收快照RPC
func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {flag := truerf.mu.Lock()defer rf.mu.Unlock()// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 准备安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)if args.Term < rf.currentTerm {reply.Term = rf.currentTermrf.persist()return}for flag {switch rf.state {case Leader:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Candidate:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else if args.Term == rf.currentTerm {reply.Term = rf.currentTerm // 回复者termIDrf.state = Followerrf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Follower:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1 // 重置投票人}rf.lastHeartbeatTime = time.Now() // 重置心跳检测// 如果快照比现有快照旧,则忽略它if args.LastIncludedIndex <= rf.globalLastIncludedIndex || args.LastIncludedIndex <= rf.lastapplied {reply.Term = rf.currentTermrf.persist()return}// 保存快照文件,丢弃任何索引更小的现有或部分 快照rf.snapShot = args.Data// 如果现有日志有与快照最后条目索引和任期相同的条目,保留该条目之后的日志并回复if args.LastIncludedIndex < rf.toGlobalIndex(len(rf.log)) && args.LastIncludedIndex >= rf.globalLastIncludedIndex && rf.log[rf.toLocalIndex(args.LastIncludedIndex)].TermId == args.LastIncludedTerm {// 保留该条目之后的日志rf.log = rf.log[rf.toLocalIndex(args.LastIncludedIndex):]} else {// 否则,丢弃整个日志rf.log = make([]*logEntity, 1)                                      // 初始化日志,保留一个占位符rf.log[0] = &logEntity{TermId: args.LastIncludedTerm, Command: nil} // 在0位置放一个占位符}// 通知服务应用快照applyMsg := &raftapi.ApplyMsg{SnapshotValid: true,Snapshot:      args.Data,SnapshotTerm:  args.LastIncludedTerm,SnapshotIndex: args.LastIncludedIndex,}reply.Term = rf.currentTermflag = false// 更新快照相关状态rf.snapShot = args.Datarf.globalLastIncludedIndex = args.LastIncludedIndexrf.globalLastIncludedTerm = args.LastIncludedTermif rf.commitIndex < args.LastIncludedIndex {rf.commitIndex = args.LastIncludedIndex}// if rf.lastapplied < args.LastIncludedIndex {// 	rf.lastapplied = args.LastIncludedIndex// }if args.LastIncludedIndex > rf.globalLastIncludedIndex || rf.lastapplied < args.LastIncludedIndex {rf.lastapplied = args.LastIncludedIndex}// rf.applyCh <- *applyMsg// 复制需要的数据,避免在无锁状态下访问 Raft 结构体snapshotData := make([]byte, len(applyMsg.Snapshot))copy(snapshotData, applyMsg.Snapshot)snapshotIndex := applyMsg.SnapshotIndexsnapshotTerm := applyMsg.SnapshotTerm// 释放锁后再发送消息rf.persist()rf.snapShotLock = truerf.mu.Unlock()               // 报错这里重复解锁了..., 莫名其妙, 暂时没看出什么问题rf.applyCh <- raftapi.ApplyMsg{SnapshotValid: true,Snapshot:      snapshotData,SnapshotTerm:  snapshotTerm,SnapshotIndex: snapshotIndex,}rf.mu.Lock() // 重新获取锁,因为 defer 中还需要解锁rf.snapShotLock = falseif rf.lastapplied < rf.commitIndex {// 唤醒applyrf.applyCond.Broadcast()}// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 已安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)}}}

修改后:

// 接收快照RPC
func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {rf.wg.Add(1)defer rf.wg.Done()// Your code here (3D).flag := truerf.mu.Lock()// defer rf.mu.Unlock()// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 准备安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)if args.Term < rf.currentTerm {reply.Term = rf.currentTermrf.persist()rf.mu.Unlock()return}for flag {switch rf.state {case Leader:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Candidate:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = Followerrf.votedFor = -1 // 重置投票人rf.persist()} else if args.Term == rf.currentTerm {reply.Term = rf.currentTerm // 回复者termIDrf.state = Followerrf.persist()} else {reply.Term = rf.currentTerm // 回复者termIDflag = falserf.persist()}case Follower:if args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1 // 重置投票人}rf.lastHeartbeatTime = time.Now() // 重置心跳检测// 如果快照比现有快照旧,则忽略它if args.LastIncludedIndex <= rf.globalLastIncludedIndex || args.LastIncludedIndex <= rf.lastapplied {reply.Term = rf.currentTermrf.persist()rf.mu.Unlock()return}// 保存快照文件,丢弃任何索引更小的现有或部分 快照rf.snapShot = args.Data// 如果现有日志有与快照最后条目索引和任期相同的条目,保留该条目之后的日志并回复if args.LastIncludedIndex < rf.toGlobalIndex(len(rf.log)) && args.LastIncludedIndex >= rf.globalLastIncludedIndex && rf.log[rf.toLocalIndex(args.LastIncludedIndex)].TermId == args.LastIncludedTerm {// 保留该条目之后的日志rf.log = rf.log[rf.toLocalIndex(args.LastIncludedIndex):]} else {// 否则,丢弃整个日志rf.log = make([]*logEntity, 1)                                      // 初始化日志,保留一个占位符rf.log[0] = &logEntity{TermId: args.LastIncludedTerm, Command: nil} // 在0位置放一个占位符}// 通知服务应用快照applyMsg := &raftapi.ApplyMsg{SnapshotValid: true,Snapshot:      args.Data,SnapshotTerm:  args.LastIncludedTerm,SnapshotIndex: args.LastIncludedIndex,}reply.Term = rf.currentTermflag = false// 更新快照相关状态rf.snapShot = args.Datarf.globalLastIncludedIndex = args.LastIncludedIndexrf.globalLastIncludedTerm = args.LastIncludedTermif rf.commitIndex < args.LastIncludedIndex {rf.commitIndex = args.LastIncludedIndex}// if rf.lastapplied < args.LastIncludedIndex {// 	rf.lastapplied = args.LastIncludedIndex// }if args.LastIncludedIndex > rf.globalLastIncludedIndex || rf.lastapplied < args.LastIncludedIndex {rf.lastapplied = args.LastIncludedIndex}// rf.applyCh <- *applyMsg// 复制需要的数据,避免在无锁状态下访问 Raft 结构体snapshotData := make([]byte, len(applyMsg.Snapshot))copy(snapshotData, applyMsg.Snapshot)snapshotIndex := applyMsg.SnapshotIndexsnapshotTerm := applyMsg.SnapshotTerm// 释放锁后再发送消息rf.persist()rf.snapShotLock = truerf.mu.Unlock()if rf.killed() {return}rf.applyCh <- raftapi.ApplyMsg{SnapshotValid: true,Snapshot:      snapshotData,SnapshotTerm:  snapshotTerm,SnapshotIndex: snapshotIndex,}rf.mu.Lock() // 重新获取锁,因为 defer 中还需要解锁rf.snapShotLock = falseif rf.lastapplied < rf.commitIndex {// 唤醒applyrf.applyCond.Broadcast()}// DPrintf("InstallSnapshot---节点server %v, 在term: %v, 收到 leader %v 的快照: %+v, 已安装快照, rf.lastapplied=%v\n", rf.me, rf.currentTerm, args.LeaderId, args, rf.lastapplied)}}rf.mu.Unlock()}

300次测试

===== 开始第 1 次测试 =====
=== RUN   TestSnapshotRPC4C
Test: snapshots, one client (4C SnapshotsRPC) (reliable network)...
Test: InstallSnapshot RPC (4C) (reliable network)...... Passed --  time  3.6s #peers 3 #RPCs  7032 #Ops   63
--- PASS: TestSnapshotRPC4C (3.56s)
=== RUN   TestSnapshotSize4C
Test: snapshots, one client (4C snapshot size is reasonable) (reliable network)...... Passed --  time  1.1s #peers 3 #RPCs  8620 #Ops  800
--- PASS: TestSnapshotSize4C (1.12s)
=== RUN   TestSpeed4C
Test: snapshots, one client (4C speed) (reliable network)...... Passed --  time  1.3s #peers 3 #RPCs  9672 #Ops    0
--- PASS: TestSpeed4C (1.35s)
=== RUN   TestSnapshotRecover4C
Test: restarts, snapshots, one client (4C restarts, snapshots, one client) (reliable network)...... Passed --  time  6.0s #peers 5 #RPCs 24163 #Ops  705
--- PASS: TestSnapshotRecover4C (6.02s)
=== RUN   TestSnapshotRecoverManyClients4C
Test: restarts, snapshots, many clients (4C restarts, snapshots, many clients ) (reliable network)...
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok... Passed --  time  9.6s #peers 5 #RPCs 31434 #Ops 1305
--- PASS: TestSnapshotRecoverManyClients4C (9.58s)
=== RUN   TestSnapshotUnreliable4C
Test: snapshots, many clients (4C unreliable net, snapshots, many clients) (unreliable network)...... Passed --  time  4.0s #peers 5 #RPCs  2334 #Ops  295
--- PASS: TestSnapshotUnreliable4C (3.98s)
=== RUN   TestSnapshotUnreliableRecover4C
Test: restarts, snapshots, many clients (4C unreliable net, restarts, snapshots, many clients) (unreliable network)...... Passed --  time  6.9s #peers 5 #RPCs  1893 #Ops  157
--- PASS: TestSnapshotUnreliableRecover4C (6.90s)
=== RUN   TestSnapshotUnreliableRecoverConcurrentPartition4C
Test: restarts, partitions, snapshots, many clients (4C unreliable net, restarts, partitions, snapshots, many clients) (unreliable network)...... Passed --  time 12.5s #peers 5 #RPCs  2115 #Ops  107
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4C (12.47s)
=== RUN   TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C
Test: restarts, partitions, snapshots, random keys, many clients (4C unreliable net, restarts, partitions, snapshots, random keys, many clients) (unreliable network)...... Passed --  time 15.6s #peers 7 #RPCs  6354 #Ops  360
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C (15.61s)
PASS
ok  	6.5840/kvraft1	60.601s
===== 结束第 1 次测试 ===== (耗时: 61秒)...............===== 开始第 300 次测试 =====
=== RUN   TestSnapshotRPC4C
Test: snapshots, one client (4C SnapshotsRPC) (reliable network)...
Test: InstallSnapshot RPC (4C) (reliable network)...... Passed --  time  2.5s #peers 3 #RPCs  7187 #Ops   63
--- PASS: TestSnapshotRPC4C (2.52s)
=== RUN   TestSnapshotSize4C
Test: snapshots, one client (4C snapshot size is reasonable) (reliable network)...... Passed --  time  1.1s #peers 3 #RPCs  9909 #Ops  800
--- PASS: TestSnapshotSize4C (1.15s)
=== RUN   TestSpeed4C
Test: snapshots, one client (4C speed) (reliable network)...... Passed --  time  1.7s #peers 3 #RPCs 11776 #Ops    0
--- PASS: TestSpeed4C (1.68s)
=== RUN   TestSnapshotRecover4C
Test: restarts, snapshots, one client (4C restarts, snapshots, one client) (reliable network)...... Passed --  time  6.0s #peers 5 #RPCs 24630 #Ops  941
--- PASS: TestSnapshotRecover4C (6.02s)
=== RUN   TestSnapshotRecoverManyClients4C
Test: restarts, snapshots, many clients (4C restarts, snapshots, many clients ) (reliable network)...
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok
info: linearizability check timed out, assuming history is ok... Passed --  time  9.6s #peers 5 #RPCs 34146 #Ops 1357
--- PASS: TestSnapshotRecoverManyClients4C (9.56s)
=== RUN   TestSnapshotUnreliable4C
Test: snapshots, many clients (4C unreliable net, snapshots, many clients) (unreliable network)...... Passed --  time  4.1s #peers 5 #RPCs  2316 #Ops  269
--- PASS: TestSnapshotUnreliable4C (4.13s)
=== RUN   TestSnapshotUnreliableRecover4C
Test: restarts, snapshots, many clients (4C unreliable net, restarts, snapshots, many clients) (unreliable network)...... Passed --  time  6.7s #peers 5 #RPCs  1709 #Ops  123
--- PASS: TestSnapshotUnreliableRecover4C (6.74s)
=== RUN   TestSnapshotUnreliableRecoverConcurrentPartition4C
Test: restarts, partitions, snapshots, many clients (4C unreliable net, restarts, partitions, snapshots, many clients) (unreliable network)...... Passed --  time 15.5s #peers 5 #RPCs  2229 #Ops   83
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4C (15.54s)
=== RUN   TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C
Test: restarts, partitions, snapshots, random keys, many clients (4C unreliable net, restarts, partitions, snapshots, random keys, many clients) (unreliable network)...... Passed --  time 21.4s #peers 7 #RPCs  7348 #Ops  270
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4C (21.39s)
PASS
ok  	6.5840/kvraft1	68.737s
===== 结束第 300 次测试 ===== (耗时: 69秒)===== 测试统计摘要 =====
总测试次数: 300
成功次数: 300
失败次数: 0
成功率: 100%
总耗时: 22437秒
平均每次测试耗时: 74秒
===== 测试结束 =====
http://www.hskmm.com/?act=detail&tid=25151

相关文章:

  • NumPy广播:12个技巧替代循环,让数组计算快40倍
  • 某中心2026年推出1111个技术实习岗位
  • 川土微变频器应用分享
  • POLIR-Society-Philosophy- Hegels 形而上学System Philosophy Dialectics 系统化哲学辩证法: 自由意志+封闭的绝对精神
  • 解决VLC 无法解码格式“h264” (H264 - MPEG-4 AVC (part 10))
  • CF2115 总结
  • luogu P8816 [CSP-J 2022] 上升点列 题解
  • CF558C Amr and Chemistry BFS解
  • Atbash密码和摩斯密码
  • Redis 中如何保证缓存与数据库的内容一致性?
  • Payload CMS:开发者优先的Next.js原生开源解决优秀的方案,重新定义无头内容管理
  • 第一次写博客
  • 07. 自定义组件
  • python语法记录
  • 2025 年储罐厂家推荐最新公司权威排行榜榜单发布,深度解析衬四氟储罐 / 硫酸储罐 / 盐酸储罐工厂选购指南
  • UnicodeEncodeError: locale codec cant encode character \u5e74 in position 2: encoding error
  • 2025 年生物除臭设备厂家最新推荐排行榜:覆盖污水处理厂 / 垃圾中转站等多场景,助力企业精准挑选优质设备
  • 2025 年球墨铸铁管公司:重庆南恩物资全品类管材供应与市政工程适配解决方案解析
  • 2025生物除臭设备厂家最新品牌企业推荐排行榜揭晓:印染厂污水,食品厂污水,污水处理厂,污水泵站,污水站,餐厨垃圾,屠宰场,厨余垃圾生物除臭设备公司推荐
  • 2025 工业加热器选型必看:六大加热器实力厂家深度推荐,覆盖多场景加热设备解决方案
  • YOLO模型部署
  • 从理念到沙盘:用悟空博弈模拟器点亮人机共治的曙光
  • 深入解析:Redis事务详解:原理、使用与注意事项
  • phone num
  • Perplexity发布搜索API,驱动下一代AI应用开发
  • PWN手的成长之路-09-SWPUCTF 2023 秋季新生赛Shellcode
  • 20251005 总结
  • OKR1
  • 2025 年装盒机制造厂 TOP 企业品牌推荐排行榜,自动化 / 喷胶 / 牙膏 / 手机壳 / 3C 数码 / 内外盒 / 面膜 / 电子产品 / 玩具 / 日用品装盒机推荐这十家公司!
  • 英语_阅读_Chinas Spring Festival_待读