这份笔记不包括最简单的Lab1,很久之前做的,细节已经忘记了。
花费20多天做完了Lab2-Lab4,稳定通过所有测试(含Lab4的两个Challenge),用例也有不全面的地方,下面会提到。遗憾的地方也有不少:
代码结构上不够美观;代码行数上,Lab4 server 1000行,Lab2 raft 2000行,也有不少的压缩空间。 
Lab3 Lab4实现的kvServer没有优化,据说get是可以不加锁的 
不少地方使用轮询,用wait/signal会更加高效(与优雅) 
 
难度上,我认同[2]作者的排序,Lab4 > Lab2 >> Lab3 > Lab1。所有 细节。2. 第一次编写调试分布式代码
推荐资料 TODO 补充这些资料的简略说明
作业自带的说明与Hint MIT6.824-2021  四个Lab的实现笔记,Lab2的Snapshot和Lab4的部分思路参考于这里Raft架构图 Instructors’ Guide to Raft Students’ Guide to Raft The Secret Lives of Data - Raft: Understandable Distributed Consensus  Raft算法可视化,初学时辅助理解CONSENSUS: BRIDGING THEORY AND PRACTICE  Raft的PhD论文go-test-many.sh  并行测试脚本,使用方法 ./go-test-many.sh 1024 32 TestUnreliableAgree2C,三个参数分别是测试次数、并行执行数、用例包含的字符串Raft does not Guarantee Liveness in the face of Network Faults TiKV 功能介绍 - Lease Read Performance In Search of an Understandable Consensus Algorithm Raft论文本身 
Raft成员变更的工程实践 etcd raft如何实现Linearizable Read  
相关资料 来不及看的一些文章,优先级递减
Linearizability 一致性验证 ARC: Analysis of Raft Consensus dist101 Debugging by Pretty Printing  
Lab 2 TODO:协程整理
Lab2分四次小作业
2A: 实现选举+心跳包 
2B: 实现Basic Raft 
2C: 持久化 
2D: 日志压缩 
 
整体上需要注意的点有:
在实现snapshot时会截断日志,因此一定不要使用Log数组的下标保存index,后面改起来会很麻烦 
RPC的发送需要手动管理超时,当发送给挂掉的server时,网络时延可能高达7s 
测试时确保任意时刻CPU不要打满。在测试Lab4的TestJoinLeave和TestChallenge2Unaffected时,用例中给了一个假定的时限用于迁移shard,随后shutdown那个group,如果server进展过慢则会fail。使用并行数为8进行测试时,5800x在某些时刻存在打满的情况,测试结果中会低概率(<1%)无法通过这两个测试,给两倍时间,或并行数修改为4,能够稳定通过所有测试。 
 
结构定义与初始化 结构定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 type  Raft struct  {int                  int32                int  int  int  string  chan  string  chan  string  chan  string  chan  ApplyMsg string  bool bool bool type  PersistentState struct  {int int type  RaftSnapshot struct  {int int 
常量
选举timeout 550ms-1100ms,心跳180ms,RPC超时100ms
1 2 3 4 5 6 ELECTION_TIMEOUT_MIN   = 550  550 180  100  15  
初始化,只列出必要代码(略去辅助代码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func  Make (peers []*labrpc.ClientEnd, me int , 	persister *Persister, applyCh chan  ApplyMsg) -1 0 -1 -1  make ([]int , len (rf.peers))make ([]int , len (rf.peers))for  i := 0 ; i < len (rf.peers); i++ {-1 make ([]bool , len (rf.peers))make (chan  string , 1 )make ([]chan  string , 0 )for  i := 0 ; i < len (rf.peers); i++ {append (rf.newDataNotifyChan, make (chan  string , 1 ))-1 -1 make ([]chan  string , 0 )for  i := 0 ; i < len (rf.peers); i++ {append (rf.installSnapshotChan, make (chan  string , 1 ))go  rf.ticker()go  rf.applier()return  rf
主循环 状态转移矩阵,横向为起始状态,纵向为目标状态。只有candidate可以转化为两种状态,其他角色的下一状态都是必然的。
leader 
follower 
candidate 
 
 
leader 
- 
发现更高term的server 
- 
 
follower 
- 
- 
超时 
 
candidate 
majority vote 
发现更高term的server 
选举超时,下一轮还是candidate 
 
实现复杂度上:leader > candidate > follower
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func  (rf *Raft) for  rf.killed() == false  {if  role == ROLE_FOLLOWER {else  if  role == ROLE_CANDIDATE {else  {
LoopFollower follower只有任期结束这一种角色转换的可能性,逻辑较为简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func  (rf *Raft) make (chan  string , 1 )go  func () defer  func () "x"  }()for  {if  time.Now().After(rf.electionDeadline) {return 
测试在kill之前会先断网,防止状态传播到其他server,然后将持久化状态拷贝一份用于将来重启server,最后调用Kill()杀死服务,persist提供的保存会持锁,与拷贝过程持锁冲突,这样提供了原子的持久化接口
LoopCandidate 包括loopLeader,采用的都是类似的模式,对每个out channel创建协程,加锁构造请求,避免在请求发送期间被修改;无锁发送请求;加锁处理请求,确保对自身状态修改的一致性
Candidate需要同步地响应三种中断:作为RPC receiver引发的角色切换、任期超时、requestVote RPC被接受
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 func  (rf *Raft) make (chan  string , len (rf.peers)) 1 func (term int ) func () bool  {return  func () bool  {return  rf.role != ROLE_CANDIDATE || rf.killed() || term != rf.Termfor  i := 0 ; i < len (rf.peers); i++ {if  i == rf.me {continue go  func (i int ) for  {if  returnFunc() { return nil , nil , req, &resp, nil , nil , 1 ) if  sendSuccess {if  resp.VoteGranted {"ok" else  {if  resp.Term > rf.Term {-1 select  {case  rf.stateChangeNotifyChan <- "x" :default :break make (chan  string , 1 )go  func () defer  func () "x"  }()for  {if  time.Now().After(rf.electionDeadline) {if  rf.role != ROLE_CANDIDATE {return return for  {select  {case  <-ackChan:if  ackNum == rf.getQuorumSize() {"got %d vote" , ackNum)if  rf.role != ROLE_CANDIDATE {return for  i := 0 ; i < len (rf.peers); i++ {1 return case  <-timeoutChan:return case  <-rf.stateChangeNotifyChan:return 
LoopLeader 存放了Raft的主要逻辑(另一个主要部分是RPC处理),整体逻辑与LoopCandidate相似,需要注意的问题有:
唯一需要响应的中断是来自更高Term RPC导致转换为follower 
上任后立即发送一个心跳,尽快向其他server宣告自己的地位,避免它们升到更高的term,导致系统再次进入无leader状态(也就不可用) 
论文给出的matchIndex变量我没有找到用途,我认为依赖nextIndex-1作为对方当前持有的日志位置是安全的。因为只要leader与其他server发生过success=true的AppendEntries RPC交互,上面的命题为真。对于未发生交互的情况,使用初始化的值rf.nextIndex[i] = rf.getLatestIndex() + 1作为对方持有的日志位置进行判断是否已经发给majority,即使超过majority,也会由于日志非当前term而无法提交,除非之后有当前term的位置检测到majority,而这个位置一定又是发生过AppendEntries RPC交互的情况。 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func  (rf *Raft) func (term int ) func () bool  {return  func () bool  {return  rf.role != ROLE_LEADER || rf.killed() || term != rf.Termfor  i := 0 ; i < len (rf.peers); i++ {if  i == rf.me {continue go  func (i int ) var  prevLogTerm int var  sendSuccess bool 0 )for  {select  { case  <-heartbeatTimeoutTimer.C:case  <-rf.newDataNotifyChan[i]:case  <-rf.installSnapshotChan[i]:
发送心跳 经典的加锁构造,无锁发送,加锁处理结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 var  req *AppendEntriesRequestif  returnCond() { return if  rf.nextIndex[i] <= rf.getLatestIndex() {select  {case  rf.newDataNotifyChan[i] <- "x" :default :continue if  rf.nextIndex[i] <= 0  {-1 else  if  rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex {select  {case  rf.installSnapshotChan[i] <- "x" :default :return else  if  rf.nextIndex[i] == rf.RaftSnapshot.LastIncludedIndex+1  {else  {1 ).Term nil ,1 ,if  !rf.retrySend(i, returnCond, req, &resp, nil , nil , nil , nil , 1 ) {continue if  returnCond() {return if  resp.Success {continue if  resp.Term > rf.Term {"x" -1 return else  {if  rf.adjustNextIndex(i, req.PrevLogIndex, resp.ConflictTerm, resp.ConflictIndex) {goto  HTBT else  {continue 
调整nextIndex 在测试TestFigure8Unreliable2C中,会产生要求大量回退的场景,且网络是unreliable的,意味着单次回退一个index太慢。这里使用[5]中提供的回退策略加速回退。
follower
如果follower在发送的prevIndex位置没有日志,那么(conflictTerm=-1, conflictIndex=indexOfLastLog+1) 
如果follower在prevIndex位置有日志,但冲突(term不同,一定是小于),conflictTerm=prevIndex位置日志entry的term,conflictIndex=第一条term为此term的index 
 
leader
如果conflictTerm=-1,回退到conflictIndex 
如果有term=conflictTerm的日志,回退到最后一条这样日志的下一条日志 
否则仍然回退到conflictIndex 
 
分析安全性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func  (rf *Raft) int , prevLogIndex int , conflictTerm int , conflictIndex int ) bool  {if  prevLogIndex <= rf.RaftSnapshot.LastIncludedIndex {"prevLogIndex %d not in log, impossible to go back, notify install snapshot for %d" , prevLogIndex, i)select  {case  rf.installSnapshotChan[i] <- "x" :default :return  false if  conflictTerm == -1  {else  {-1 for  j := rf.Log.getLocalIndex(prevLogIndex); j >= 1 ; j-- {if  rf.Log[j-1 ].Term == conflictTerm {break if  rf.Log[j-1 ].Term < conflictTerm {break if  conflictTermIndex == -1  {else  {select  {case  rf.newDataNotifyChan[i] <- "x" :default :return  true 
发送日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 var  req *AppendEntriesRequestif  returnCond() {return if  rf.nextIndex[i] > rf.getLatestIndex() {continue if  rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex {select  {case  rf.installSnapshotChan[i] <- "x" :default :continue if  rf.nextIndex[i] <= 0  {-1 else  if  rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex {select  {case  rf.installSnapshotChan[i] <- "x" :default :return else  if  rf.nextIndex[i] == rf.RaftSnapshot.LastIncludedIndex+1  {else  {1 ).Termmake ([]LogEntry, 0 )for  _, log := range  rf.Log {append (logCp, LogEntry{1 ,if  !rf.retrySend(i, returnCond, req, &resp, nil , nil , nil , nil , 1 ) {goto  MSG if  returnCond() {return if  resp.Success {len (req.Msgs)goto  MSG if  resp.Term > rf.Term {select  {case  rf.stateChangeNotifyChan <- "x" :default :return if  rf.adjustNextIndex(i, req.PrevLogIndex, resp.ConflictTerm, resp.ConflictIndex) {goto  MSGelse  {continue 
更新提交位置 从successStartIndex到当前日志位置检查是否可以提交,一旦有日志不符合提交条件,后续日志也无检查必要。commit只是逻辑上的标志,有专门的日志提交协程。
这里解释一下论文figure 8:不允许通过replicate仅含之前term的日志到majority来判定其committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func  (rf *Raft) int , successStartIndex int ) {for  current := successStartIndex; current < rf.nextIndex[i]; current++ {1 for  j := 0 ; j < len (rf.peers); j++ {if  j == rf.me {continue if  rf.nextIndex[j] > current {if  ackNum >= rf.getQuorumSize() && current > rf.RaftSnapshot.LastIncludedIndex && rf.Log.getByIndex(current).Term == rf.Term && rf.commitIndex < current { else  {break 
发送snapshot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 var  req *InstallSnapshotRequestif  returnCond() {return if  !rf.retrySend(i, returnCond, nil , nil , nil , nil , req, &resp, 1 ) {goto  SNAPSHOTif  returnCond() {return if  resp.Term > rf.Term {select  {case  rf.stateChangeNotifyChan <- "x" :default :return else  {if  req.LastIncludedIndex+1  > rf.nextIndex[i] {1 
RequestVote RPC 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func  (rf *Raft) defer  rf.mu.Unlock()defer  rf.persist()if  rf.Term > request.Term {return else  if  rf.Term < request.Term {-1  select  {case  rf.stateChangeNotifyChan <- "x" :default :if  rf.VotedFor != -1  && rf.VotedFor != request.CandidateId {"reject because self.votedFor %d != request.candidateId %d" ,return if  request.LastLogTerm > lastTerm || (request.LastLogTerm == lastTerm && request.LastLogIndex >= lastIndex) {return 
AppendEntries RPC 某条日志的commit最早可以发生在majority中最后一台server将此日志持久化后的那一刻,但显然leader直到这条(或者随后的RPC)消息返回才能够观测到这个事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 func  (rf *Raft) defer  rf.mu.Unlock()defer  rf.persist()if  !rf.checkTermAndRole(request.Term, request.LeaderId) {"%s because term" , textRed("reject" ))return if  request.Msgs != nil  {0 for  ; firstNonSnapshotIndexInMsg < len (request.Msgs); firstNonSnapshotIndexInMsg++ {if  request.Msgs[firstNonSnapshotIndexInMsg].Index > rf.RaftSnapshot.LastIncludedIndex {break if  firstNonSnapshotIndexInMsg != 0  {if  len (request.Msgs) == 0  {return if  lastIndex < request.PrevLogIndex {"reject entry because not containing Log at prevLogIndex %d, current Log max index %d" , request.PrevLogIndex, lastIndex)-1 1  return if  request.PrevLogTerm >= 0  && request.PrevLogIndex > rf.RaftSnapshot.LastIncludedIndex && logAtPrevIndex.Term != request.PrevLogTerm {for  {if  i == 0  || rf.Log[i-1 ].Term != response.ConflictTerm {break return 1  0 false for  ; i < len (rf.Log) && j < len (request.Msgs); i++ {if  rf.Log[i].Term != request.Msgs[j].Term { true "overwrite %d with %d" , rf.Log[i].Index, request.Msgs[j])if  j < len (request.Msgs) {append (rf.Log, request.Msgs[j:]...)else  if  hasConflict {if  request.LeaderCommit > rf.commitIndex {var  newCommitIndex int if  request.LeaderCommit > lastIndex {else  {if  newCommitIndex > oldCommitIndex {
异步提交、故障恢复、快照与持久化 
持久化:对任何持久变量的修改,如果修改后的状态是一致的,那么需要进行一次保存。 
快照:如果没有快照,commitIndex按照figure 2中初始化为0,日志是持久化的,那么每次故障恢复都会从第一条日志开始重现历史,因此需要有快照,通过快照获得初始状态,通过重现少量历史回到故障发生前的持久化状态。快照内容包含状态机状态与raft状态,状态机状态与应用相关,raft状态包含LastIncludedTerm和LastIncludedIndex两个变量,显然都需要是持久化的 
 
关于异步提交这块,applyCh改成有缓冲channel可以解决暂时的日志提交与日志消费速度不匹配的问题(仅仅是暂时),仍然不能避免加锁提交导致的死锁问题。下方仅对当前的无缓冲applyCh实现进行分析。
不要加锁提交:考虑状态机拿到一条日志执行后,判断需要进行snapshot,尝试获得锁(raft有多个持久化状态,为保证snapshot期间读到一致的状态,需要加锁),与此同时,raft server持有锁去提交日志,由于applyCh无缓冲(或者可以看成缓冲区满),产生了死锁状态
快照过程:由状态机触发,目的是基于某个commitIndex,收集一个raft与状态机间的一致状态。
同步/异步提交:同步提交的问题在于状态机消费与raft提交的速度不一定一致
发送快照:首先leader无论何时拥有最新日志,因此这个RPC的方向一定是从leader到其他角色。一旦leader发现其他server的日志比自己的snapshot还落后,由于snapshot会裁剪日志,此时已经无法用AppendEntries RPC来使其追赶,而是通过发送快照实现。需要同时发送raft与状态机的快照。可以看出,上层快照也需要传递给raft server,尽管不需要理解含义。快照可以是异步的(如果在我的实现中使用同步提交snapshot请求给状态机会导致死锁)。
接受快照(installSnapshot)过程:leader发送installSnapshot RPC到follower,follower异步发送installSnapshot消息给状态机,状态机调用CondInstallSnapshot确保发来的snapshot比当前状态更新(通过与lastApplied比较)后,raft apply此snapshot,状态机apply此snapshot
故障恢复:raft层的故障恢复通过每次持久化状态之间一致时进行持久化来实现,但commitIndex和lastApplied不是持久化的,在恢复后初始化为snapshot的位置,即会从snapshot开始重新向上层提交此后的日志。上层的持久化依赖snapshot时传给raft的状态实现。
什么时候一条日志可以看做提交:前面提到,commitIndex不是持久化的,所以commitIndex的移动不能视作提交。当当前term日志replicate到majority时,并且这些日志被majority server持久化后,尽管没有持久化的变量去标记,但是,由于这些日志不可能被覆盖(不含此日志的server无法成为更高term的leader),也不会丢失(已经在majority持久化),那么在同步(synchronous)或部分同步(partial synchronous)系统中(也许还加上crash-recover模型的条件?),这条日志会最终deliver给状态机,因此可以看做被提交。 
 
 
可以看到,我们需要一个不加锁、异步的日志提交。这块的代码参考了[2]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func  (rf *Raft) for  rf.killed() == false  {if  rf.lastApplied >= rf.commitIndex {20 )continue make ([]LogEntry, commitIndex-lastApplied)copy (entries, rf.Log[lastAppliedLogIndex+1 :commitLogIndex+1 ])for  _, entry := range  entries {true , 1 ,if  commitIndex > rf.lastApplied {
Snapshot相关 由状态机触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func  (rf *Raft) int , snapshot []byte ) {defer  rf.mu.Unlock()if  index <= rf.RaftSnapshot.LastIncludedIndex {"end snapshot creating because outdated index %d, current snapshot %d" , index, rf.RaftSnapshot.LastIncludedIndex)return var  persistentRaftState *RaftSnapshotfor  _, log := range  rf.Log {if  log.Index == index {if  rf.LastIncludedIndex > rf.lastApplied {
InstallSnapshot RPC相关 与日志提交同理,installSnapshot向状态机的提交也需要是无锁、异步的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func  (rf *Raft) if  !rf.checkTermAndRole(request.Term, request.LeaderId) {return if  rf.LastIncludedIndex >= request.LastIncludedIndex { return go  func () true ,1 ,
CondInstallSnapshot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func  (rf *Raft) int , lastIncludedIndex int , snapshot []byte ) bool  {defer  rf.mu.Unlock()if  lastIncludedIndex <= rf.LastIncludedIndex {return  false if  lastIncludedIndex <= rf.lastApplied {else  if  lastIncludedIndex <= rf.commitIndex {else  {return  true 
其他 
日志:每个服务我都加了一个日志开关,设置了日志格式,提供加锁、不加锁的(以及临时无视开关的,更好的实践应该是日志等级)API,以raft为例: 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func  (rf *Raft) string , a ...interface {}) {if  !rf.enableLog {return defer  rf.mu.Unlock()func  (rf *Raft) string , a ...interface {}) {if  rf.enableLog {func  (rf *Raft) string , a ...interface {}) {"[%d %s Term: %d, Role: %s, cmt: %d, lastTerm/Idx:%d-%d, snapTerm/Idx:%d-%d LogL: %d, Next:%v, lA:%d Id: %d] " ,len (rf.Log), rf.nextIndex, rf.lastApplied, rf.me) + fmt.Sprintf(str, a...) + "\n" )
效果(此外一些RPC的返回success也加了个颜色,找起来方便一些)
1 2 3 4 5 6 7 8 9 10 // 每列分别是:物理时间、进程名称(Lab4用)、Term、Role、CommitIndex、日志位置、快照状态、日志长度、nextIndex数组、lastApplied、raft集群内id for  1 advance to 20, self lastLogIndex 20true ] vote[false ] snap[false ]log  append [20->20]timeout  to 606000000
后期为了排查偶发问题加了不少日志,最终的1024次测试居然打出了16G共10834W行的日志……
问题排查 
 
死锁:找到日志最先停止的协程,找到最后一条日志,一般会有些思路。另一种办法是重写一个mutex,加锁解锁打日志。 
活锁:找到最后一个非定时任务触发的日志,比死锁更难查 
断言:在各种地方加断言也是很有效的,不要让程序带着错误状态继续执行,会导致问题爆发的位置与状态开始偏离正常的位置相距很远,导致排查困难 
一致性检测:其实还是断言,对Lab4,我实现了类似Lab2中检测日志一致性的协程去check,这样能够知道问题来源于raft还是来源于状态机,缩小排查范围 
 
一些小错误 
 
Lab 3 Lab 4的shard controller基本是复用这里的代码,这里简单介绍一下Lab 3任务,实现细节放在Lab 4介绍。实现细节可以去看[2]
Lab3任务:基于lab2,实现一个in-memory的linearizable的kv storage
TODO raft不依赖消息顺序
Lab 4 Lab4要求实现Sharded Key/Value Service,分片是实现multi-raft的方式之一(说实话我还不了解其他方式),架构如下:
一个基于Raft的高可用的分布式配置中心,用于配置分片策略 
每个raft-group负责一个分片,raft-group就是一个raft集群 
raft-group定时获取配置中心的配置,并向配置的状态同步 
 
这样的multi-raft实现在相同服务器数量的情况下,以相对更低的可用性,换取了更低的响应访问、状态存储压力
对于分片迁移,[2]给出了主动拉数据的实现,我的实现使用主动推送作为方案
Lab 4A 由于跳过了Lab 3,这里详细介绍一下Lab 4A,本质是实现一个RSM,无论一致性协议上层是何种应用,都可以使用类似的实现
ClientID这里用的是随机数,最好还是使用一个分布式ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func  (ck *Clerk) map [int ][]string ) {defer  func () for  {var  reply QueryReply"ShardCtrler.Command" , args, &reply)if  !ok || reply.WrongLeader || reply.Err == ErrTimeout {1 ) % len (ck.servers) 25  * time.Millisecond) continue return 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 func  (sc *ShardCtrler) if  !ok {-1 , -1 , nil , make (chan  Response),if  !isLeader {true return select  {case  result := <-clientInfo.Ch:if  result.MoveReply.Err != ""  {else  if  result.LeaveReply.Err != ""  {else  if  result.JoinReply.Err != ""  {else  { if  result.ClientSeq < args.ClientSeq {goto  RETRYreturn case  <-time.After(CONSENSUS_TIMEOUT):return 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 func  (sc *ShardCtrler) for  applyMsg := range  sc.applyCh {func () defer  sc.mu.Unlock()if  !ok {-1 , -1 , nil ,make (chan  Response),if  applyMsg.CommandIndex <= sc.LastAppliedIndex {"discard outdated index %d" , applyMsg.CommandIndex)return if  op.ClientSeq <= client.RcvSeq {select  {case  client.Ch <- *client.LastResp:default :return else  { var  resp Responseif  op.OpType == OP_QUERY {else  if  op.OpType == OP_JOIN {append (sc.configs, newCfg)else  if  op.OpType == OP_MOVE {append (sc.configs, *newCfg)else  { append (sc.configs, newCfg)if  sc.Clients[op.ClientId].ReqSeq == op.ClientSeq {select  {case  client.Ch <- resp:default :
负载均衡算法 将shard平均分配到不同的group中,group可能在运行期间加入或离开,要求相对于变更以前的分配,最小化shard移动的数量,降低网络压力
建议参考[2]实现,核心思路如下:
Join:不断将shard从具有最多shard的节点分给具有最少shard的节点,直到最多与最少的差不大于1 
Leave:将Leave节点的shard一次一个地分配给具有最少shard的节点 
 
Lab 4B TODO 需要完善
服务端协程:
定期拉配置并提交配置更新的日志 
定期轮询将自身数据分片向自身配置靠近,同时做GC 
主协程,处理commit的日志 
轮询新term提交空日志 
 
Client为什么也要持久化 
Shard Server自身也是一个client 
如何推送,推送是两段的,是否会有问题 
 
思考&优化 如何设计测试 来自6.824 补充用例 单client串行写入,必须读到上次写入值后才写入下次值,每次写入后随机shutdown server。用来fail那些上任后没有提交空日志的实现
性能测试 在majority节点correct的前提下,raft已经确保了可用性,这里关注性能
此部分参考[11]中etcd的性能测试
性能可以从两方面评价:时延和吞吐量。举个例子,时延长而吞吐量大:服务可以同时响应大量请求(吞吐量),但单看每个请求从发送到响应的时间却比较长(时延)。[11]指出最短可能的时间是一个RTT加上fdatasync的时间,前者取决于data center之间的距离、网络环境,后者取决于磁盘类型(HDD、SSD),但由于会批量发送、刷盘,可以将代价均摊到batch中的每条日志
此外,还有一些其他影响性能的点:
etcd后端采用bblot(从boltdb fork得到)作为存储引擎,对于事务有MVCC的额外开销 
raft的snapshot需要偶尔刷盘 
inflight compaction,我理解也许是消息发送前后对消息的压缩解压? 
 
变量如下:
节点数量 
读或写(读的话,线性一致读还是顺序一致读,也就是走不走日志) 
K、V分别的大小 
KV总数 
Client数 
连接数 
是否只与leader交互,还是允许和任何人交互,follower将请求转发到leader [localref-1] 
 
评价指标如下:
平均读写QPS(可能有多个clinet,和latency不同) 
单个request的平均latency 
平均服务RSS,即ps aux中的RSS或top中的RES,即物理内存占用 
 
我也做了简单的性能测试,但与etcd的也许无法直接比较,为什么
使用线程模拟,无网络传输开销 
内存kv数据库,无磁盘写入开销 
在单机进行模拟,受CPU限制无法做高并发测试,仅将GOMAXPROCS设为8 
etcd的get有优化,我的没有 
没有做锁优化(至少手动层面没有做) 
 
我的测试(client总是等于连接数,GOMAXPROCS设为8,默认不分片,关闭snapshot功能):
1 client 3节点,kv层read write交互,各1000条读写,共2000条
耗时60s,QPS=33, latency=30ms 
 
 
3节点,raft层不停write,最后一条write提交视为截止 
100 client 3节点,每个client写入1000条,kv层共10W条写入,其实更好的做法是限制时间,而不是限制数量 ,注意不要加race,会慢很多(大概7倍时间)
不加race,耗时52921ms,QPS=1889.6(10W/552921),lateny=52.9ms,相比于1,吞吐量x57,时延x1.77 
 
 
3 Group * 3节点,100 client,每个client 1000条写入
麻了,是3的三倍时间,不知道怎么解释…… 
 
 
 
local ref
How does etcd propagate writes to non-leader members? Question: What does “Average Server RSS” mean?  
Raft与CAP的关系 英文解释来源https://www.the-paper-trail.org/page/cap-faq/ 
Availability - will a request made to the data store always eventually complete?
什么是available
A data store is available if and only if all get and set requests eventually return a response that’s part of their specification. This does not permit error responses, since a system could be trivially available by always returning an error.
 
TODO
TODO 有人说Redis主备就是AP的,MySQL主备呢https://zhuanlan.zhihu.com/p/152105666 
Prevote & CheckQuorum TL;DR
(cloudflare因为这个问题挂过六个小时)
考虑这样的节点拓扑[9]
leader可以在123内产生,但由于45未连接到leader,会不断超时自增term,term传递到2或3,使其成为follower,随后超时成为candidate,这一term最终会传递到当前leader,使得leader失去身份,这一情况会一直持续。也就是说,普通的raft实际上是不满足liveness条件的
[7] 9.6节给出了叫做Pre-Vote 算法的解决方案:对方有最新的日志,并且自身自从上次收到leader的消息超出baseline election timeout (我的理解是最小可能的超时时间),那么grant vote。Pre-Vote避免了不可能成为leader的节点通过不断提升term导致当前leader退位的问题 
Pre-Vote也引入了新的问题,考虑如下拓扑[9]:
假设从全连通状态立即跳转到此状态,4是leader,13会随后超时,但由于2与4连通,不能从2获得Pre-Vote的grant,13无法发起新的选举。对于这种情况,[7] 6.2节指出,如果leader长时间没有成功与majority保持心跳 ,那么leader主动退位。这一修改也叫做CheckQuorum ,使得失效leader能够主动退位 。
通过补充Pre-Vote RPC与leader主动退位,我们确保了算法的liveness
DeadLock 我一直很怀疑partial connection到底是不是一个实际的问题
当网络拓扑成为树状时,如A-B, A-C,A-D, A-E当A的日志不是最新时,A无法成为leader,系统死锁
5节点全连通,除了A与B,是否会出现A、B无限轮流称为leader导致违反liveness条件的情况? 问题来自 https://www.zhihu.com/question/54997169 
不会,之前提到过,为了保证liveness条件,leader上任后需要commit一条no-op后才可提供服务(包括响应读请求),假设A commit完成后,B就不是拥有最新数据的节点了,因此不会出现这一问题
Raft是否存在脑裂问题 存在,5节点全连通,除了A与B,A当选leader后、发出任何请求前,B超时,term自增,完成选举,也成为leader
但并不会造成什么问题,并且,可以设置允许其他节点转发请求,其实是等价于网络仍然是全连通的https://www.zhihu.com/question/54997169/answer/192987776 
TODO
通过client向raft提交op,raft提供什么样的语义 at-most-once
业务上可以利用at-most-once语义进行组合得到exactly-once语义
读优化 走日志的读称为 Log Read,是安全的,但我们可以做得更好
TL;DR(总结)
推荐的两篇文章https://zhuanlan.zhihu.com/p/78164196 https://zhuanlan.zhihu.com/p/463140808 
ReadIndex 本小节完全参考[14]
当leader接收到读请求时,将当前commit index记录下来,记作read index,在返回结果给客户端之前,leader需要先确定自己到底还是不是真的leader,确定的方法就是给其他所有peers发送一次心跳,如果收到了多数派的响应,说明至少这个读请求到达这个节点时,这个节点仍然是leader,这时只需要等到commit index被apply到状态机后,即可返回结果。
 
总结:
leader check自己是否在当前term commit过entry 
leader记录下当前commit index,然后leader给所有peers发心跳广播 
收到多数派响应代表读请求到达时还是leader,然后等待apply index大于等于commit index 
返回结果 
 
etcd不仅实现了leader上的read only query,同时也实现了follower上的read only query,原理是一样的,只不过读请求到达follower时,commit index是需要向leader去要的,leader返回commit index给follower之前,同样,需要走上面的ReadIndex流程,因为leader同样需要check自己到底还是不是leader
 
Lease Read 本小节完全参考 TiDB 文档[10]
(TiDB 把这个优化当做比 ReadIndex 更高级的优化来介绍,具体来说,ReadIndex 还需要额外发一轮心跳,收到 quorum ack 后,才能返回)
我们希望能直接读leader本地的数据而不向其他人发任何请求,这要求我们确保leader的已提交数据总是最新的,可以通过确保majority在一定时间内不选择其他leader,使得这个时间段内最多只有自己一个leader(或者自己挂掉),不会有更新term的leader提交日志,使得当前leader的已提交数据总是最新的
在 Raft 论文里面,提到了一种通过 clock + heartbeat 的 lease read 优化方法。也就是 leader 发送 heartbeat 的时候,会首先记录一个时间点 start,当系统大部分节点都回复了 heartbeat response,那么我们就可以认为 leader 的 lease 有效期可以到 start + election timeout / (clock drift bound per second * election timeout)这个时间点
 
我的理解:leader lease 的有效期到 “不再满足 quorum 节点心跳未过期,减去心跳过期时间内的时钟偏移上限”
TODO 如果消息传输已经用了比lease还久的时间,那么leader怎么做,leader在lease过期后自己就step down吗,好像不需要,因为写入操作实际上也是心跳,之后的写入等于续约,或者,如果follower已经没有lease了,等于签一个新的lease
Membership 参考[12]
论文中介绍了两阶段Membership变更算法,介绍如下:
外部触发membership变更请求,leader将其转化为 $C_{old,new}$ 日志并广播。任何收到membership变更日志后的节点,立即按照新配置运行(无论其是否commit) 
自 $C_{old,new}$ 日志起,任何日志需要同时达成旧配置与新配置中的quorum ack才视为提交 
当 $C_{old,new}$ 提交后,原有leader构造一条 $C_{new}$ 日志并广播,若leader不在新配置中,当 $C_{new}$ commit后可以退位 
 
Etcd实际上使用的是单步成员变更,优势之一在于只需要单阶段就可完成单节点的membership变更[13]
Learner角色 [7]4.2.1节引入此角色,目的是当membership变化时,新加入的成员在需要在日志同步完成后再加入集群,否则根据[6]中给出的membership变更算法,变更期间的日志提交需要得到both old conf和new conf中majority节点的ack才视为提交,未完成日志同步的节点会阻塞整个集群的日志提交
Raft的一些重要时刻 这个是我自己提出的问题(因此问题可能不是很有价值,而且也许是错的……)
一条日志什么时候算作提交:当第majority个node将日志持久化时 
写入日志的Linearization Point在哪里:Leader将commitIndex指向它的时候 
以kv store为例,上层应用的Linearization Point在哪里:在apply日志到状态机的那一刻 
 
附录 测试结果 测试结果太长了,放在最后
Lab2 2048次测试通过
Lab2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 [root@e647a1bc04a2 raft]if  too many followers disconnect ...'t too high ...   ... Passed --   2.8  3   28    8866   12 Test (2C): basic persistence ...   ... Passed --   5.6  3   69   18640    6 Test (2C): more persistence ...   ... Passed --  19.9  5 1167  220492   16 Test (2C): partitioned leader and one follower crash, leader restarts ...   ... Passed --   2.8  3   37    9225    4 Test (2C): Figure 8 ...   ... Passed --  34.5  5  594  134594   38 Test (2C): unreliable agreement ...   ... Passed --   3.8  5  237   89256  246 Test (2C): Figure 8 (unreliable) ... delay time: 2325   ... Passed --  35.2  5 3544 5190733  350 Test (2C): churn ...   ... Passed --  16.3  5 3479 3369043 1469 Test (2C): unreliable churn ...   ... Passed --  16.4  5 1106  548290  291 Test (2D): snapshots basic ... time used: 7296   ... Passed --   7.3  3  144   56814  251 Test (2D): install snapshots (disconnect) ... time used: 53277   ... Passed --  53.3  3  871  256572  367 Test (2D): install snapshots (disconnect+unreliable) ... time used: 67236   ... Passed --  67.2  3 1049  288318  379 Test (2D): install snapshots (crash) ... time used: 41029   ... Passed --  41.0  3  592  180220  355 Test (2D): install snapshots (unreliable+crash) ... time used: 49101   ... Passed --  49.1  3  701  199176  366 PASS ok      6.824/raft      422.568s real    7m3.771s user    0m9.621s sys     0m7.843s 
Lab3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 [root@9bd769928c32 kvraft]in  majority (3A) ...in  minority (3A) ...
Lab4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 [root@9bd769928c32 shardkv]join  then  leave ...join , and leave ...
补充测试 
GOMAXPROCS 
表头 
ss 
 
 
4 
单元格 
 
单元格 
单元格 
 
其他 runtime: marked free object in span 遇到的一个Golang的奇怪Bug,由于代码改动后来没有复现