这份笔记不包括最简单的Lab1,很久之前做的,细节已经忘记了。
花费20多天做完了Lab2-Lab4,稳定通过所有测试(含Lab4的两个Challenge),用例也有不全面的地方,下面会提到。遗憾的地方也有不少:
代码结构上不够美观;代码行数上,Lab4 server 1000行,Lab2 raft 2000行,也有不少的压缩空间。
Lab3 Lab4实现的kvServer没有优化,据说get是可以不加锁的
不少地方使用轮询,用wait/signal会更加高效(与优雅)
难度上,我认同[2]作者的排序,Lab4 > Lab2 >> Lab3 > Lab1。 Lab2的难度在于:1. 理解Raft协议,仔细实现论文Figure2中的所有 细节。2. 第一次编写调试分布式代码 Lab4的难度在于:1. 发现在Lab2对Raft的实现是有错误的。2.复杂度高,结构上:搭Raft上的状态机,数量上:12个Raft Server,3个ShardKV Server,很多客户端),调试难度上升
推荐资料 TODO 补充这些资料的简略说明
作业自带的说明与Hint
MIT6.824-2021 四个Lab的实现笔记,Lab2的Snapshot和Lab4的部分思路参考于这里
Raft架构图
Instructors’ Guide to Raft
Students’ Guide to Raft
The Secret Lives of Data - Raft: Understandable Distributed Consensus Raft算法可视化,初学时辅助理解
CONSENSUS: BRIDGING THEORY AND PRACTICE Raft的PhD论文
go-test-many.sh 并行测试脚本,使用方法 ./go-test-many.sh 1024 32 TestUnreliableAgree2C,三个参数分别是测试次数、并行执行数、用例包含的字符串
Raft does not Guarantee Liveness in the face of Network Faults
TiKV 功能介绍 - Lease Read
Performance
In Search of an Understandable Consensus Algorithm Raft论文本身
Raft成员变更的工程实践
etcd raft如何实现Linearizable Read
相关资料 来不及看的一些文章,优先级递减
Linearizability 一致性验证 PingCAP的文章
ARC: Analysis of Raft Consensus
dist101
Debugging by Pretty Printing
Lab 2 TODO:协程整理
Lab2分四次小作业
2A: 实现选举+心跳包
2B: 实现Basic Raft
2C: 持久化
2D: 日志压缩
整体上需要注意的点有:
在实现snapshot时会截断日志,因此一定不要使用Log数组的下标保存index,后面改起来会很麻烦
RPC的发送需要手动管理超时,当发送给挂掉的server时,网络时延可能高达7s
测试时确保任意时刻CPU不要打满。在测试Lab4的TestJoinLeave和TestChallenge2Unaffected时,用例中给了一个假定的时限用于迁移shard,随后shutdown那个group,如果server进展过慢则会fail。使用并行数为8进行测试时,5800x在某些时刻存在打满的情况,测试结果中会低概率(<1%)无法通过这两个测试,给两倍时间,或并行数修改为4,能够稳定通过所有测试。
结构定义与初始化 结构定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 type Raft struct { mu DebugLock peers []*labrpc.ClientEnd persister *Persister me int dead int32 commitIndex int lastApplied int nextIndex []int PersistentState role string newDataNotifyChan []chan string installSnapshotChan []chan string stateChangeNotifyChan chan string muSnapshot sync.Mutex electionDeadline time.Time applyCh chan ApplyMsg name string enableLog bool omitLogCommand bool enablePProf bool }type PersistentState struct { Term int VotedFor int Log Logs RaftSnapshot }type RaftSnapshot struct { LastIncludedTerm int LastIncludedIndex int }
常量
选举timeout 550ms-1100ms,心跳180ms,RPC超时100ms
1 2 3 4 5 6 ELECTION_TIMEOUT_MIN = 550 ELECTION_TIMEOUT_DELTA = 550 HEARTBEAT_TIMEOUT = 180 RPC_TIMEOUT = 100 DEADLINE_CHECK_INTERVAL = 15
初始化,只列出必要代码(略去辅助代码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func Make (peers []*labrpc.ClientEnd, me int , persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me rf.VotedFor = -1 rf.Term = 0 rf.role = ROLE_FOLLOWER rf.commitIndex = -1 rf.lastApplied = -1 rf.nextIndex = make ([]int , len (rf.peers)) rf.matchIndex = make ([]int , len (rf.peers)) for i := 0 ; i < len (rf.peers); i++ { rf.matchIndex[i] = -1 } rf.nextIndexValid = make ([]bool , len (rf.peers)) rf.applyCh = applyCh rf.stateChangeNotifyChan = make (chan string , 1 ) rf.newDataNotifyChan = make ([]chan string , 0 ) for i := 0 ; i < len (rf.peers); i++ { rf.newDataNotifyChan = append (rf.newDataNotifyChan, make (chan string , 1 )) } rf.RaftSnapshot.LastIncludedTerm = -1 rf.RaftSnapshot.LastIncludedIndex = -1 rf.installSnapshotChan = make ([]chan string , 0 ) for i := 0 ; i < len (rf.peers); i++ { rf.installSnapshotChan = append (rf.installSnapshotChan, make (chan string , 1 )) } rf.readPersist(persister.ReadRaftState()) go rf.ticker() go rf.applier() return rf }
主循环 状态转移矩阵,横向为起始状态,纵向为目标状态。只有candidate可以转化为两种状态,其他角色的下一状态都是必然的。
leader
follower
candidate
leader
-
发现更高term的server
-
follower
-
-
超时
candidate
majority vote
发现更高term的server
选举超时,下一轮还是candidate
实现复杂度上:leader > candidate > follower
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (rf *Raft) ticker() { for rf.killed() == false { rf.mu.Lock() role := rf.role rf.mu.Unlock() if role == ROLE_FOLLOWER { rf.loopFollower() } else if role == ROLE_CANDIDATE { rf.loopCandidate() } else { rf.loopLeader() } } }
LoopFollower follower只有任期结束这一种角色转换的可能性,逻辑较为简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (rf *Raft) loopFollower() { rf.mu.Lock() rf.resetElectionTimeout() rf.mu.Unlock() timeoutChan := make (chan string , 1 ) go func () { defer func () { timeoutChan <- "x" }() for { rf.mu.Lock() if time.Now().After(rf.electionDeadline) { rf.mu.Unlock() return } rf.mu.Unlock() time.Sleep(time.Millisecond * DEADLINE_CHECK_INTERVAL) } }() <-timeoutChan rf.mu.Lock() rf.Term++ rf.role = ROLE_CANDIDATE rf.VotedFor = rf.me rf.persist() rf.mu.Unlock() }
测试在kill之前会先断网,防止状态传播到其他server,然后将持久化状态拷贝一份用于将来重启server,最后调用Kill()
杀死服务,persist提供的保存会持锁,与拷贝过程持锁冲突,这样提供了原子的持久化接口
LoopCandidate 包括loopLeader,采用的都是类似的模式,对每个out channel创建协程,加锁构造请求,避免在请求发送期间被修改;无锁发送请求;加锁处理请求,确保对自身状态修改的一致性
Candidate需要同步地响应三种中断:作为RPC receiver引发的角色切换、任期超时、requestVote RPC被接受
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 func (rf *Raft) loopCandidate() { ackChan := make (chan string , len (rf.peers)) ackNum := 1 rf.mu.Lock() returnFunc := func (term int ) func () bool { return func () bool { return rf.role != ROLE_CANDIDATE || rf.killed() || term != rf.Term } }(rf.Term) rf.resetElectionTimeout() rf.persist() for i := 0 ; i < len (rf.peers); i++ { if i == rf.me { continue } go func (i int ) { for { rf.mu.Lock() if returnFunc() { rf.mu.Unlock() return } lastTerm, lastIndex := rf.getLatestTermAndIndex() req := &RequestVoteArgs{ Term: rf.Term, CandidateId: rf.me, LastLogIndex: lastIndex, LastLogTerm: lastTerm, } rf.mu.Unlock() resp := RequestVoteReply{} sendSuccess := rf.retrySend(i, returnFunc, nil , nil , req, &resp, nil , nil , 1 ) if sendSuccess { if resp.VoteGranted { ackChan <- "ok" } else { rf.mu.Lock() if resp.Term > rf.Term { rf.Term = resp.Term rf.VotedFor = -1 rf.persist() rf.role = ROLE_FOLLOWER select { case rf.stateChangeNotifyChan <- "x" : default : } } rf.mu.Unlock() } break } } }(i) } rf.mu.Unlock() timeoutChan := make (chan string , 1 ) go func () { defer func () { timeoutChan <- "x" }() for { rf.mu.Lock() if time.Now().After(rf.electionDeadline) { if rf.role != ROLE_CANDIDATE { rf.mu.Unlock() return } rf.Term++ rf.persist() rf.mu.Unlock() return } rf.mu.Unlock() time.Sleep(time.Millisecond * DEADLINE_CHECK_INTERVAL) } }() for { select { case <-ackChan: ackNum++ if ackNum == rf.getQuorumSize() { rf.mu.Lock() rf.logNoLock("got %d vote" , ackNum) if rf.role != ROLE_CANDIDATE { rf.mu.Unlock() return } rf.role = ROLE_LEADER for i := 0 ; i < len (rf.peers); i++ { rf.nextIndex[i] = rf.getLatestIndex() + 1 } rf.mu.Unlock() return } case <-timeoutChan: return case <-rf.stateChangeNotifyChan: return } } }
LoopLeader 存放了Raft的主要逻辑(另一个主要部分是RPC处理),整体逻辑与LoopCandidate相似,需要注意的问题有:
唯一需要响应的中断是来自更高Term RPC导致转换为follower
上任后立即发送一个心跳,尽快向其他server宣告自己的地位,避免它们升到更高的term,导致系统再次进入无leader状态(也就不可用) 实际上应当发送一个空日志而非心跳,考虑某个当选的leader在当选时有commitIndex=5,lastLogIndex=6,而其他server均为commitIndex=lastLogIndex=6,上层状态机仅当被deliver index=6的消息后,才会向raft server提交新的日志。在这种情况下,根据论文Figure 8,来自上一个term的消息通过counting majority提交时不安全的,此时这条消息永远不会deliver,系统无法前进,即不满足liveness property。发送当前任期的空日志可以解决这一问题。然而,在用例中提交自己创造的日志会引发错误,说明给出的测试用例是有缺陷的。这一问题最常在TestBasicAgreeUnreliable中触发,概率约0.2%。Lab4中在状态机级别会主动提交空日志解决此问题。
论文给出的matchIndex变量我没有找到用途,我认为依赖nextIndex-1作为对方当前持有的日志位置是安全的。因为只要leader与其他server发生过success=true的AppendEntries RPC交互,上面的命题为真。对于未发生交互的情况,使用初始化的值rf.nextIndex[i] = rf.getLatestIndex() + 1
作为对方持有的日志位置进行判断是否已经发给majority,即使超过majority,也会由于日志非当前term而无法提交,除非之后有当前term的位置检测到majority,而这个位置一定又是发生过AppendEntries RPC交互的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (rf *Raft) loopLeader() { rf.mu.Lock() returnCond := func (term int ) func () bool { return func () bool { return rf.role != ROLE_LEADER || rf.killed() || term != rf.Term } }(rf.Term) for i := 0 ; i < len (rf.peers); i++ { if i == rf.me { continue } go func (i int ) { var prevLogTerm int var sendSuccess bool heartbeatTimeoutTimer := time.NewTimer(0 ) for { select { case <-heartbeatTimeoutTimer.C: HTBT: case <-rf.newDataNotifyChan[i]: MSG: case <-rf.installSnapshotChan[i]: SNAPSHOT: } } }(i) } rf.mu.Unlock() <-rf.stateChangeNotifyChan }
发送心跳 经典的加锁构造,无锁发送,加锁处理结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 var req *AppendEntriesRequest resp := AppendEntriesResponse{} heartbeatTimeoutTimer.Reset(rf.getHeartbeatTimeout()) rf.mu.Lock()if returnCond() { rf.mu.Unlock() return }if rf.nextIndex[i] <= rf.getLatestIndex() { select { case rf.newDataNotifyChan[i] <- "x" : default : } rf.mu.Unlock() continue }if rf.nextIndex[i] <= 0 { prevLogTerm = -1 } else if rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex { select { case rf.installSnapshotChan[i] <- "x" : default : } rf.mu.Unlock() return } else if rf.nextIndex[i] == rf.RaftSnapshot.LastIncludedIndex+1 { prevLogTerm = rf.RaftSnapshot.LastIncludedTerm } else { prevLogTerm = rf.getLogByIndex(rf.nextIndex[i] - 1 ).Term } req = &AppendEntriesRequest{ Msgs: nil , Term: rf.Term, LeaderId: rf.me, PrevLogIndex: rf.nextIndex[i] - 1 , PrevLogTerm: prevLogTerm, LeaderCommit: rf.commitIndex, } rf.mu.Unlock()if !rf.retrySend(i, returnCond, req, &resp, nil , nil , nil , nil , 1 ) { continue } rf.mu.Lock()if returnCond() { rf.mu.Unlock() return }if resp.Success { rf.mu.Unlock() continue }if resp.Term > rf.Term { rf.Term = resp.Term rf.role = ROLE_FOLLOWER rf.stateChangeNotifyChan <- "x" rf.VotedFor = -1 rf.persist() rf.mu.Unlock() return } else { if rf.adjustNextIndex(i, req.PrevLogIndex, resp.ConflictTerm, resp.ConflictIndex) { rf.mu.Unlock() goto HTBT } else { rf.mu.Unlock() continue } }
调整nextIndex 在测试TestFigure8Unreliable2C
中,会产生要求大量回退的场景,且网络是unreliable的,意味着单次回退一个index太慢。这里使用[5]中提供的回退策略加速回退。
follower
如果follower在发送的prevIndex位置没有日志,那么(conflictTerm=-1, conflictIndex=indexOfLastLog+1)
如果follower在prevIndex位置有日志,但冲突(term不同,一定是小于),conflictTerm=prevIndex位置日志entry的term,conflictIndex=第一条term为此term的index
leader
如果conflictTerm=-1,回退到conflictIndex
如果有term=conflictTerm的日志,回退到最后一条这样日志的下一条日志
否则仍然回退到conflictIndex
分析安全性: 回退不足会导致效率下降(在网络环境差时加剧),回退过度要么导致发送installSnapshot,要么导致发送一些冗余日志(由于appendEntries是批量发送,RPC次数是一样的),不会有安全性问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (rf *Raft) adjustNextIndex(i int , prevLogIndex int , conflictTerm int , conflictIndex int ) bool { if prevLogIndex <= rf.RaftSnapshot.LastIncludedIndex { rf.logNoLock("prevLogIndex %d not in log, impossible to go back, notify install snapshot for %d" , prevLogIndex, i) select { case rf.installSnapshotChan[i] <- "x" : default : } return false } if conflictTerm == -1 { rf.nextIndex[i] = conflictIndex } else { conflictTermIndex := -1 for j := rf.Log.getLocalIndex(prevLogIndex); j >= 1 ; j-- { if rf.Log[j-1 ].Term == conflictTerm { conflictTermIndex = j break } if rf.Log[j-1 ].Term < conflictTerm { break } } if conflictTermIndex == -1 { rf.nextIndex[i] = conflictIndex } else { rf.nextIndex[i] = rf.Log[conflictTermIndex].Index } } select { case rf.newDataNotifyChan[i] <- "x" : default : } return true }
发送日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 var req *AppendEntriesRequest resp := AppendEntriesResponse{} heartbeatTimeoutTimer.Reset(rf.getHeartbeatTimeout()) rf.mu.Lock()if returnCond() { rf.mu.Unlock() return }if rf.nextIndex[i] > rf.getLatestIndex() { rf.mu.Unlock() continue }if rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex { select { case rf.installSnapshotChan[i] <- "x" : default : } rf.mu.Unlock() continue }if rf.nextIndex[i] <= 0 { prevLogTerm = -1 } else if rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex { select { case rf.installSnapshotChan[i] <- "x" : default : } rf.mu.Unlock() return } else if rf.nextIndex[i] == rf.RaftSnapshot.LastIncludedIndex+1 { prevLogTerm = rf.RaftSnapshot.LastIncludedTerm } else { prevLogTerm = rf.Log.getByIndex(rf.nextIndex[i] - 1 ).Term } logCp := make ([]LogEntry, 0 )for _, log := range rf.Log { logCp = append (logCp, LogEntry{ Log: log.Log, Term: log.Term, Index: log.Index, }) } req = &AppendEntriesRequest{ Msgs: logCp[rf.Log.getLocalIndex(rf.nextIndex[i]):], Term: rf.Term, LeaderId: rf.me, PrevLogIndex: rf.nextIndex[i] - 1 , PrevLogTerm: prevLogTerm, LeaderCommit: rf.commitIndex, } rf.mu.Unlock()if !rf.retrySend(i, returnCond, req, &resp, nil , nil , nil , nil , 1 ) { goto MSG } rf.mu.Lock()if returnCond() { rf.mu.Unlock() return }if resp.Success { forward := len (req.Msgs) rf.nextIndex[i] += forward rf.tryCommit(i, rf.nextIndex[i]-forward) rf.mu.Unlock() goto MSG }if resp.Term > rf.Term { rf.Term = resp.Term rf.persist() rf.role = ROLE_FOLLOWER select { case rf.stateChangeNotifyChan <- "x" : default : } rf.mu.Unlock() return }if rf.adjustNextIndex(i, req.PrevLogIndex, resp.ConflictTerm, resp.ConflictIndex) { rf.mu.Unlock() goto MSG } else { rf.mu.Unlock() continue }
更新提交位置 从successStartIndex到当前日志位置检查是否可以提交,一旦有日志不符合提交条件,后续日志也无检查必要。commit只是逻辑上的标志,有专门的日志提交协程。
这里解释一下论文figure 8:不允许通过replicate仅含之前term的日志到majority来判定其committed 考虑5台server的raft cluster term 2:server 1是leader,replicate msg2到server 1(leader), server2(follower)后挂掉。 term 3:server 5当选为leader(因为有三台没有收到msg2的server),自己append一条日志后挂掉。 term 4:server1复活,选举为leader,server1根据心跳获知server 3没有msg2这条日志,通过AppendEntires发送,根据心跳获知server1有这条日志,此时根据AppendEntries的成功返回(success=true),term 3 leader获知已经replicate到majority,但不能commit,下面会看到msg2如何被覆盖。假设server1在term4仅仅发送一条msg2给server3后就挂掉 term 5:server5当选leader(根据论文中RequestVote RPC中的as up-to-date比较log新旧顺序,server5比server2/3/4更新,因此能够收到来自majority的投票),收到心跳包回复,回退日志,发送AppendEntries RPC,会对之前的msg2进行覆盖,这就是不允许通过replicate仅含之前term的日志到majority来判定其committed的原因。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (rf *Raft) tryCommit(i int , successStartIndex int ) { for current := successStartIndex; current < rf.nextIndex[i]; current++ { ackNum := 1 for j := 0 ; j < len (rf.peers); j++ { if j == rf.me { continue } if rf.nextIndex[j] > current { ackNum++ } } if ackNum >= rf.getQuorumSize() && current > rf.RaftSnapshot.LastIncludedIndex && rf.Log.getByIndex(current).Term == rf.Term && rf.commitIndex < current { rf.commitIndex = current } else { break } } }
发送snapshot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 var req *InstallSnapshotRequest resp := InstallSnapshotResponse{} rf.mu.Lock()if returnCond() { rf.mu.Unlock() return } req = &InstallSnapshotRequest{ Term: rf.Term, LeaderId: rf.me, RaftSnapshot: rf.RaftSnapshot, StateMachineState: rf.persister.ReadSnapshot(), } rf.mu.Unlock()if !rf.retrySend(i, returnCond, nil , nil , nil , nil , req, &resp, 1 ) { goto SNAPSHOT } rf.mu.Lock()if returnCond() { rf.mu.Unlock() return }if resp.Term > rf.Term { rf.Term = resp.Term rf.persist() rf.role = ROLE_FOLLOWER select { case rf.stateChangeNotifyChan <- "x" : default : } rf.mu.Unlock() return } else { if req.LastIncludedIndex+1 > rf.nextIndex[i] { oldNextId := rf.nextIndex[i] rf.nextIndex[i] = req.LastIncludedIndex + 1 rf.tryCommit(i, oldNextId) } } rf.mu.Unlock()
RequestVote RPC 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (rf *Raft) RequestVote(request *RequestVoteArgs, response *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() if rf.Term > request.Term { rf.rejectVote(request, response) return } else if rf.Term < request.Term { rf.Term = request.Term rf.VotedFor = -1 rf.role = ROLE_FOLLOWER select { case rf.stateChangeNotifyChan <- "x" : default : } } if rf.VotedFor != -1 && rf.VotedFor != request.CandidateId { rf.logNoLock("reject because self.votedFor %d != request.candidateId %d" , rf.VotedFor, request.CandidateId) rf.rejectVote(request, response) return } lastTerm, lastIndex := rf.getLatestTermAndIndex() if request.LastLogTerm > lastTerm || (request.LastLogTerm == lastTerm && request.LastLogIndex >= lastIndex) { rf.VotedFor = request.CandidateId rf.resetElectionTimeout() rf.acceptVote(request, response) return } rf.rejectVote(request, response) }
AppendEntries RPC 某条日志的commit最早可以发生在majority中最后一台server将此日志持久化后的那一刻,但显然leader直到这条(或者随后的RPC)消息返回才能够观测到这个事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 func (rf *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() if !rf.checkTermAndRole(request.Term, request.LeaderId) { rf.logNoLock("%s because term" , textRed("reject" )) rf.rejectAppendEntries(request, response) return } if request.Msgs != nil { firstNonSnapshotIndexInMsg := 0 for ; firstNonSnapshotIndexInMsg < len (request.Msgs); firstNonSnapshotIndexInMsg++ { if request.Msgs[firstNonSnapshotIndexInMsg].Index > rf.RaftSnapshot.LastIncludedIndex { break } } request.Msgs = request.Msgs[firstNonSnapshotIndexInMsg:] if firstNonSnapshotIndexInMsg != 0 { request.PrevLogTerm = rf.RaftSnapshot.LastIncludedTerm request.PrevLogIndex = rf.RaftSnapshot.LastIncludedIndex } if len (request.Msgs) == 0 { rf.acceptAppendEntries(request, response) return } } _, lastIndex := rf.getLatestTermAndIndex() if lastIndex < request.PrevLogIndex { rf.logNoLock("reject entry because not containing Log at prevLogIndex %d, current Log max index %d" , request.PrevLogIndex, lastIndex) response.ConflictTerm = -1 response.ConflictIndex = lastIndex + 1 rf.rejectAppendEntries(request, response) return } localIndexForLogAtPrevIndex, logAtPrevIndex := rf.getLogAndLocalIndexByIndex(request.PrevLogIndex) if request.PrevLogTerm >= 0 && request.PrevLogIndex > rf.RaftSnapshot.LastIncludedIndex && logAtPrevIndex.Term != request.PrevLogTerm { response.ConflictTerm = logAtPrevIndex.Term i := localIndexForLogAtPrevIndex for { if i == 0 || rf.Log[i-1 ].Term != response.ConflictTerm { response.ConflictIndex = i break } i-- } rf.rejectAppendEntries(request, response) return } rf.acceptAppendEntries(request, response) i := localIndexForLogAtPrevIndex + 1 j := 0 hasConflict := false for ; i < len (rf.Log) && j < len (request.Msgs); i++ { if rf.Log[i].Term != request.Msgs[j].Term { hasConflict = true } rf.logNoLock("overwrite %d with %d" , rf.Log[i].Index, request.Msgs[j]) rf.Log[i] = request.Msgs[j] j++ } if j < len (request.Msgs) { rf.Log = append (rf.Log, request.Msgs[j:]...) } else if hasConflict { rf.Log = rf.Log[:i] } _, lastIndex = rf.getLatestTermAndIndex() if request.LeaderCommit > rf.commitIndex { oldCommitIndex := rf.commitIndex var newCommitIndex int if request.LeaderCommit > lastIndex { newCommitIndex = lastIndex } else { newCommitIndex = request.LeaderCommit } if newCommitIndex > oldCommitIndex { rf.commitIndex = newCommitIndex } } }
异步提交、故障恢复、快照与持久化
持久化:对任何持久变量的修改,如果修改后的状态是一致的,那么需要进行一次保存。
快照:如果没有快照,commitIndex按照figure 2中初始化为0,日志是持久化的,那么每次故障恢复都会从第一条日志开始重现历史,因此需要有快照,通过快照获得初始状态,通过重现少量历史回到故障发生前的持久化状态。快照内容包含状态机状态与raft状态,状态机状态与应用相关,raft状态包含LastIncludedTerm
和LastIncludedIndex
两个变量,显然都需要是持久化的
关于异步提交这块,applyCh改成有缓冲channel可以解决暂时的日志提交与日志消费速度不匹配的问题(仅仅是暂时),仍然不能避免加锁提交导致的死锁问题。下方仅对当前的无缓冲applyCh实现进行分析。
不要加锁提交:考虑状态机拿到一条日志执行后,判断需要进行snapshot,尝试获得锁(raft有多个持久化状态,为保证snapshot期间读到一致的状态,需要加锁),与此同时,raft server持有锁去提交日志,由于applyCh无缓冲(或者可以看成缓冲区满),产生了死锁状态 TODO:如果只有一个持久化状态,就可以不加锁了吗?
快照过程:由状态机触发,目的是基于某个commitIndex,收集一个raft与状态机间的一致状态。 实际上对于raft的持久化状态,term和voteFor都可能根据新term的产生而改变,似乎只要log,或者snapshot位置(LastIncludedTerm、LastIncludedIndex)不低于状态机传来的commitIndex,就算是一致的
同步/异步提交:同步提交的问题在于状态机消费与raft提交的速度不一定一致
发送快照:首先leader无论何时拥有最新日志,因此这个RPC的方向一定是从leader到其他角色。一旦leader发现其他server的日志比自己的snapshot还落后,由于snapshot会裁剪日志,此时已经无法用AppendEntries RPC来使其追赶,而是通过发送快照实现。需要同时发送raft与状态机的快照。可以看出,上层快照也需要传递给raft server,尽管不需要理解含义。快照可以是异步的(如果在我的实现中使用同步提交snapshot请求给状态机会导致死锁)。
接受快照(installSnapshot)过程:leader发送installSnapshot RPC到follower,follower异步发送installSnapshot消息给状态机,状态机调用CondInstallSnapshot确保发来的snapshot比当前状态更新(通过与lastApplied比较)后,raft apply此snapshot,状态机apply此snapshot
故障恢复:raft层的故障恢复通过每次持久化状态之间一致时进行持久化来实现,但commitIndex和lastApplied不是持久化的,在恢复后初始化为snapshot的位置,即会从snapshot开始重新向上层提交此后的日志。上层的持久化依赖snapshot时传给raft的状态实现。 总结一下:状态机的状态会因故障回退,raft的日志在确认提交后不会回退,因此可以通过重放还原故障前的状态机状态
什么时候一条日志可以看做提交:前面提到,commitIndex不是持久化的,所以commitIndex的移动不能视作提交。当当前term日志replicate到majority时,并且这些日志被majority server持久化后,尽管没有持久化的变量去标记,但是,由于这些日志不可能被覆盖(不含此日志的server无法成为更高term的leader),也不会丢失(已经在majority持久化),那么在同步(synchronous)或部分同步(partial synchronous)系统中(也许还加上crash-recover模型的条件?),这条日志会最终deliver给状态机,因此可以看做被提交。
可以看到,我们需要一个不加锁、异步的日志提交。这块的代码参考了[2]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (rf *Raft) applier() { for rf.killed() == false { rf.mu.Lock() if rf.lastApplied >= rf.commitIndex { rf.mu.Unlock() time.Sleep(time.Millisecond * 20 ) continue } lastAppliedLogIndex, _ := rf.getLogAndLocalIndexByIndex(rf.lastApplied) commitLogIndex, _ := rf.getLogAndLocalIndexByIndex(rf.commitIndex) commitIndex, lastApplied := rf.commitIndex, rf.lastApplied entries := make ([]LogEntry, commitIndex-lastApplied) copy (entries, rf.Log[lastAppliedLogIndex+1 :commitLogIndex+1 ]) rf.mu.Unlock() for _, entry := range entries { rf.applyCh <- ApplyMsg{ CommandValid: true , Command: entry.Log, CommandIndex: entry.Index + 1 , } } rf.mu.Lock() if commitIndex > rf.lastApplied { rf.lastApplied = commitIndex } rf.mu.Unlock() } }
Snapshot相关 由状态机触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (rf *Raft) Snapshot(index int , snapshot []byte ) { rf.mu.Lock() defer rf.mu.Unlock() index-- if index <= rf.RaftSnapshot.LastIncludedIndex { rf.logNoLock("end snapshot creating because outdated index %d, current snapshot %d" , index, rf.RaftSnapshot.LastIncludedIndex) return } var persistentRaftState *RaftSnapshot for _, log := range rf.Log { if log.Index == index { persistentRaftState = &RaftSnapshot{ LastIncludedTerm: log.Term, LastIncludedIndex: log.Index, } } } rf.RaftSnapshot = *persistentRaftState if rf.LastIncludedIndex > rf.lastApplied { rf.lastApplied = rf.LastIncludedIndex } rf.trimLog(index) rf.persister.SaveStateAndSnapshot(rf.serialize(), snapshot) }
InstallSnapshot RPC相关 与日志提交同理,installSnapshot向状态机的提交也需要是无锁、异步的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, resposne *InstallSnapshotResponse) { rf.mu.Lock() resposne.Term = rf.Term if !rf.checkTermAndRole(request.Term, request.LeaderId) { rf.mu.Unlock() return } if rf.LastIncludedIndex >= request.LastIncludedIndex { rf.persist() rf.mu.Unlock() return } rf.mu.Unlock() go func () { rf.applyCh <- ApplyMsg{ SnapshotValid: true , Snapshot: request.StateMachineState, SnapshotTerm: request.LastIncludedTerm, SnapshotIndex: request.LastIncludedIndex + 1 , } }() }
CondInstallSnapshot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int , lastIncludedIndex int , snapshot []byte ) bool { rf.mu.Lock() defer rf.mu.Unlock() lastIncludedIndex-- if lastIncludedIndex <= rf.LastIncludedIndex { return false } rf.trimLog(lastIncludedIndex) if lastIncludedIndex <= rf.lastApplied { rf.RaftSnapshot = RaftSnapshot{LastIncludedTerm: lastIncludedTerm, LastIncludedIndex: lastIncludedIndex} } else if lastIncludedIndex <= rf.commitIndex { rf.RaftSnapshot = RaftSnapshot{LastIncludedTerm: lastIncludedTerm, LastIncludedIndex: lastIncludedIndex} rf.lastApplied = lastIncludedIndex } else { rf.RaftSnapshot = RaftSnapshot{LastIncludedTerm: lastIncludedTerm, LastIncludedIndex: lastIncludedIndex} rf.lastApplied = lastIncludedIndex rf.commitIndex = lastIncludedIndex } rf.persister.SaveStateAndSnapshot(rf.serialize(), snapshot) return true }
其他
日志:每个服务我都加了一个日志开关,设置了日志格式,提供加锁、不加锁的(以及临时无视开关的,更好的实践应该是日志等级)API,以raft为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (rf *Raft) logWithLock(str string , a ...interface {}) { if !rf.enableLog { return } rf.mu.Lock() defer rf.mu.Unlock() rf.logNoLock(str, a...) }func (rf *Raft) logNoLock(str string , a ...interface {}) { if rf.enableLog { rf.logPrivilege(str, a...) } }func (rf *Raft) logPrivilege(str string , a ...interface {}) { lastTerm, lastIndex := rf.getLatestTermAndIndex() fmt.Printf(fmt.Sprintf("[%d %s Term: %d, Role: %s, cmt: %d, lastTerm/Idx:%d-%d, snapTerm/Idx:%d-%d LogL: %d, Next:%v, lA:%d Id: %d] " , realTimeMilli(), rf.name, rf.Term, rf.role, rf.commitIndex, lastTerm, lastIndex, rf.LastIncludedTerm, rf.LastIncludedIndex, len (rf.Log), rf.nextIndex, rf.lastApplied, rf.me) + fmt.Sprintf(str, a...) + "\n" ) }
效果(此外一些RPC的返回success也加了个颜色,找起来方便一些)
1 2 3 4 5 6 7 8 9 10 // 每列分别是:物理时间、进程名称(Lab4用)、Term、Role、CommitIndex、日志位置、快照状态、日志长度、nextIndex数组、lastApplied、raft集群内id [124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 19 0], lA:19 Id: 2] [reqId 22206 msgTiUsd 12] after sending to 1 success:^[[0;32mtrue^[[0mrespSuccess:^[[0;32mtrue^[[0m [124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] nextIndex for 1 advance to 20, self lastLogIndex 20 [124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] [reqId 47175 realTime 124142] before sending append entries to 1 [124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] >>> sending req[38427] append[true ] vote[false ] snap[false ] [124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-19, snapTerm/Idx:1-17 LogL: 2, Next:[0 0 0], lA:17 Id: 1] recv log append [20->20] [124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-19, snapTerm/Idx:1-17 LogL: 2, Next:[0 0 0], lA:17 Id: 1] reset election timeout to 606000000 [124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-19, snapTerm/Idx:1-17 LogL: 2, Next:[0 0 0], lA:17 Id: 1] accept append entries from 2 [124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-20, snapTerm/Idx:1-17 LogL: 3, Next:[0 0 0], lA:17 Id: 1] leaderCommit 19 self.lastIndex: 20 [124148 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] [reqId 47175 msgTiUsd 6] after sending to 1 success:^[[0;32mtrue^[[0m respSuccess:^[[0;32mtrue^[[0m
后期为了排查偶发问题加了不少日志,最终的1024次测试居然打出了16G共10834W行的日志……
问题排查
死锁:找到日志最先停止的协程,找到最后一条日志,一般会有些思路。另一种办法是重写一个mutex,加锁解锁打日志。
活锁:找到最后一个非定时任务触发的日志,比死锁更难查
断言:在各种地方加断言也是很有效的,不要让程序带着错误状态继续执行,会导致问题爆发的位置与状态开始偏离正常的位置相距很远,导致排查困难
一致性检测:其实还是断言,对Lab4,我实现了类似Lab2中检测日志一致性的协程去check,这样能够知道问题来源于raft还是来源于状态机,缩小排查范围
一些小错误 之前的一版实现,每次修改持久化变量都存一次档,效率低是其次,问题在于有些时候只改一个变量会导致不一致状态被持久化
Lab 3 Lab 4的shard controller基本是复用这里的代码,这里简单介绍一下Lab 3任务,实现细节放在Lab 4介绍。实现细节可以去看[2]
Lab3任务:基于lab2,实现一个in-memory的linearizable的kv storage
TODO raft不依赖消息顺序 TODO Client UUID
Lab 4 Lab4要求实现Sharded Key/Value Service,分片是实现multi-raft的方式之一(说实话我还不了解其他方式),架构如下:
一个基于Raft的高可用的分布式配置中心,用于配置分片策略
每个raft-group负责一个分片,raft-group就是一个raft集群
raft-group定时获取配置中心的配置,并向配置的状态同步
这样的multi-raft实现在相同服务器数量的情况下,以相对更低的可用性,换取了更低的响应访问、状态存储压力
对于分片迁移,[2]给出了主动拉数据的实现,我的实现使用主动推送作为方案
Lab 4A 由于跳过了Lab 3,这里详细介绍一下Lab 4A,本质是实现一个RSM,无论一致性协议上层是何种应用,都可以使用类似的实现
ClientID这里用的是随机数,最好还是使用一个分布式ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (ck *Clerk) Join(servers map [int ][]string ) { defer func () { ck.seq++ }() args := &CommandArgs{} args.ClientInfo = ck.getClientInfo() args.OpType = OP_JOIN args.Servers = servers for { var reply QueryReply ok := ck.servers[ck.leaderId].Call("ShardCtrler.Command" , args, &reply) if !ok || reply.WrongLeader || reply.Err == ErrTimeout { ck.leaderId = (ck.leaderId + 1 ) % len (ck.servers) time.Sleep(25 * time.Millisecond) continue } return } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 func (sc *ShardCtrler) Command(args *CommandArgs, reply *CommandReply) { sc.mu.Lock() _, ok := sc.Clients[args.ClientId] if !ok { sc.Clients[args.ClientId] = &Client{ ReqSeq: -1 , RcvSeq: -1 , LastResp: nil , Ch: make (chan Response), } } sc.Clients[args.ClientId].ReqSeq = args.ClientSeq clientInfo := sc.Clients[args.ClientId] sc.mu.Unlock() _, _, isLeader := sc.rf.Start(Op{ CommandArgs: *args, }) if !isLeader { reply.WrongLeader = true return } RETRY: select { case result := <-clientInfo.Ch: reply.Config = result.QueryReply.Config if result.MoveReply.Err != "" { reply.Err = result.MoveReply.Err } else if result.LeaveReply.Err != "" { reply.Err = result.MoveReply.Err } else if result.JoinReply.Err != "" { reply.Err = result.JoinReply.Err } else { reply.Err = result.QueryReply.Err } if result.ClientSeq < args.ClientSeq { goto RETRY } return case <-time.After(CONSENSUS_TIMEOUT): reply.Err = ErrTimeout return } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 func (sc *ShardCtrler) getFromApplied() { for applyMsg := range sc.applyCh { func () { sc.mu.Lock() defer sc.mu.Unlock() op := applyMsg.Command.(Op) _, ok := sc.Clients[op.ClientId] if !ok { sc.Clients[op.ClientId] = &Client{ ReqSeq: -1 , RcvSeq: -1 , LastResp: nil , Ch: make (chan Response), } } client := sc.Clients[op.ClientId] if applyMsg.CommandIndex <= sc.LastAppliedIndex { sc.log("discard outdated index %d" , applyMsg.CommandIndex) return } sc.LastAppliedIndex = applyMsg.CommandIndex if op.ClientSeq <= client.RcvSeq { select { case client.Ch <- *client.LastResp: default : } return } else { var resp Response if op.OpType == OP_QUERY { resp = Response{QueryReply: QueryReply{ Config: sc.getConfigNoLock(op.Num), }} } else if op.OpType == OP_JOIN { newCfg := sc.reBalanceCfg(op) sc.configs = append (sc.configs, newCfg) resp = Response{JoinReply: JoinReply{}} } else if op.OpType == OP_MOVE { newCfg := sc.latestConfig().Copy() newCfg.Shards[op.Shard] = op.GID sc.configs = append (sc.configs, *newCfg) resp = Response{MoveReply: MoveReply{}} } else { newCfg := sc.reBalanceCfg(op) sc.configs = append (sc.configs, newCfg) resp = Response{LeaveReply: LeaveReply{}} } resp.ClientId = op.ClientId resp.ClientSeq = op.ClientSeq client.RcvSeq = op.ClientSeq client.LastResp = &resp sc.LastAppliedIndex = applyMsg.CommandIndex if sc.Clients[op.ClientId].ReqSeq == op.ClientSeq { select { case client.Ch <- resp: default : } } } }() } }
负载均衡算法 将shard平均分配到不同的group中,group可能在运行期间加入或离开,要求相对于变更以前的分配,最小化shard移动的数量,降低网络压力
建议参考[2]实现,核心思路如下:
Join:不断将shard从具有最多shard的节点分给具有最少shard的节点,直到最多与最少的差不大于1
Leave:将Leave节点的shard一次一个地分配给具有最少shard的节点
Lab 4B TODO 需要完善
服务端协程:
定期拉配置并提交配置更新的日志
定期轮询将自身数据分片向自身配置靠近,同时做GC
主协程,处理commit的日志
轮询新term提交空日志
Client为什么也要持久化
Shard Server自身也是一个client
如何推送,推送是两段的,是否会有问题
思考&优化 如何设计测试 来自6.824 补充用例 单client串行写入,必须读到上次写入值后才写入下次值,每次写入后随机shutdown server。用来fail那些上任后没有提交空日志的实现
性能测试 在majority节点correct的前提下,raft已经确保了可用性,这里关注性能
此部分参考[11]中etcd的性能测试
性能可以从两方面评价:时延和吞吐量。举个例子,时延长而吞吐量大:服务可以同时响应大量请求(吞吐量),但单看每个请求从发送到响应的时间却比较长(时延)。[11]指出最短可能的时间是一个RTT加上fdatasync的时间,前者取决于data center之间的距离、网络环境,后者取决于磁盘类型(HDD、SSD),但由于会批量发送、刷盘,可以将代价均摊到batch中的每条日志
此外,还有一些其他影响性能的点:
etcd后端采用bblot(从boltdb fork得到)作为存储引擎,对于事务有MVCC的额外开销
raft的snapshot需要偶尔刷盘
inflight compaction,我理解也许是消息发送前后对消息的压缩解压?
变量如下:
节点数量
读或写(读的话,线性一致读还是顺序一致读,也就是走不走日志)
K、V分别的大小
KV总数
Client数
连接数
是否只与leader交互,还是允许和任何人交互,follower将请求转发到leader [localref-1]
评价指标如下:
平均读写QPS(可能有多个clinet,和latency不同)
单个request的平均latency
平均服务RSS,即ps aux中的RSS或top中的RES,即物理内存占用
我也做了简单的性能测试,但与etcd的也许无法直接比较,为什么
使用线程模拟,无网络传输开销
内存kv数据库,无磁盘写入开销
在单机进行模拟,受CPU限制无法做高并发测试,仅将GOMAXPROCS设为8
etcd的get有优化,我的没有
没有做锁优化(至少手动层面没有做)
我的测试(client总是等于连接数,GOMAXPROCS设为8,默认不分片,关闭snapshot功能):
1 client 3节点,kv层read write交互,各1000条读写,共2000条
耗时60s,QPS=33, latency=30ms
3节点,raft层不停write,最后一条write提交视为截止
100 client 3节点,每个client写入1000条,kv层共10W条写入,其实更好的做法是限制时间,而不是限制数量 ,注意不要加race,会慢很多(大概7倍时间)
不加race,耗时52921ms,QPS=1889.6(10W/552921),lateny=52.9ms,相比于1,吞吐量x57,时延x1.77
3 Group * 3节点,100 client,每个client 1000条写入
麻了,是3的三倍时间,不知道怎么解释…… (就说QPS是三千多,CPU打满了,看不出来)
local ref
How does etcd propagate writes to non-leader members?
Question: What does “Average Server RSS” mean?
Raft与CAP的关系 英文解释来源https://www.the-paper-trail.org/page/cap-faq/
Availability - will a request made to the data store always eventually complete? Consistency - will all executions of reads and writes seen by all nodes be atomic or linearizably consistent? Partition tolerance - the network is allowed to drop any messages.
什么是available
A data store is available if and only if all get and set requests eventually return a response that’s part of their specification. This does not permit error responses, since a system could be trivially available by always returning an error.
TODO 这块还要看下,目前的理解是raft在保证quorum写入才算commit的同时,意味着放弃了可用性:在分区的情况下,无法保证lower(n/2)节点的容错,如果分区大小为quorum+1个节点挂掉,系统hang住,意味着丧失了可用性。即Raft是CP的
TODO 有人说Redis主备就是AP的,MySQL主备呢https://zhuanlan.zhihu.com/p/152105666
Prevote & CheckQuorum TL;DR PreVote是对非QCS(Quotum Connected Server) Candidate的限制,CheckQuorum是对非QCS Leader的限制
(cloudflare因为这个问题挂过六个小时)
考虑这样的节点拓扑[9]
leader可以在123内产生,但由于45未连接到leader,会不断超时自增term,term传递到2或3,使其成为follower,随后超时成为candidate,这一term最终会传递到当前leader,使得leader失去身份,这一情况会一直持续。也就是说,普通的raft实际上是不满足liveness条件的
[7] 9.6节给出了叫做Pre-Vote 算法的解决方案: follower在超时成为candidate并自增term前,需要通过RPC收到一个majority的确认,RPC handler中,如果判断对方有最新的日志,并且自身自从上次收到leader的消息超出baseline election timeout (我的理解是最小可能的超时时间),那么grant vote。Pre-Vote避免了不可能成为leader的节点通过不断提升term导致当前leader退位的问题
Pre-Vote也引入了新的问题,考虑如下拓扑[9]:
假设从全连通状态立即跳转到此状态,4是leader,13会随后超时,但由于2与4连通,不能从2获得Pre-Vote的grant,13无法发起新的选举。对于这种情况,[7] 6.2节指出,如果leader长时间没有成功与majority保持心跳 ,那么leader主动退位。这一修改也叫做CheckQuorum ,使得失效leader能够主动退位 。
通过补充Pre-Vote RPC与leader主动退位,我们确保了算法的liveness
DeadLock 我一直很怀疑partial connection到底是不是一个实际的问题
当网络拓扑成为树状时,如A-B, A-C,A-D, A-E当A的日志不是最新时,A无法成为leader,系统死锁
5节点全连通,除了A与B,是否会出现A、B无限轮流称为leader导致违反liveness条件的情况? 问题来自 https://www.zhihu.com/question/54997169
不会,之前提到过,为了保证liveness条件,leader上任后需要commit一条no-op后才可提供服务(包括响应读请求),假设A commit完成后,B就不是拥有最新数据的节点了,因此不会出现这一问题
Raft是否存在脑裂问题 存在,5节点全连通,除了A与B,A当选leader后、发出任何请求前,B超时,term自增,完成选举,也成为leader
但并不会造成什么问题,并且,可以设置允许其他节点转发请求,其实是等价于网络仍然是全连通的 ref: raft协议应用方面的疑问? - 我做分布式数据库的回答 https://www.zhihu.com/question/54997169/answer/192987776
TODO
通过client向raft提交op,raft提供什么样的语义 at-most-once 举例:如果提交请求未返回,那么要么op没有被执行,要么执行了一次
业务上可以利用at-most-once语义进行组合得到exactly-once语义 举例:提交请求A未返回,可以再提交一次,就像LAB3 LAB4中的client,raft上层可以基于op的seqNum决定是否apply这个op
读优化 走日志的读称为 Log Read,是安全的,但我们可以做得更好
TL;DR(总结) Read Index:读走 Leader 但不走日志,Leader 发送一轮心跳,收到 quorum ack 证明自己还是 leader,可以安全返回请求到来时 commit index 对应的状态机状态(或者更新的状态,只要保证后续返回的状态不比这轮返回的状态旧) Lease Read:基于 Read Index,但实际上,leader 如果确信自己的心跳在 quorum 内还未过期(注意考虑时钟偏移),那么可以安全地直接向 client 返回读结果
推荐的两篇文章https://zhuanlan.zhihu.com/p/78164196 https://zhuanlan.zhihu.com/p/463140808
ReadIndex 本小节完全参考[14]
当leader接收到读请求时,将当前commit index记录下来,记作read index,在返回结果给客户端之前,leader需要先确定自己到底还是不是真的leader,确定的方法就是给其他所有peers发送一次心跳,如果收到了多数派的响应,说明至少这个读请求到达这个节点时,这个节点仍然是leader,这时只需要等到commit index被apply到状态机后,即可返回结果。
总结:
leader check自己是否在当前term commit过entry
leader记录下当前commit index,然后leader给所有peers发心跳广播
收到多数派响应代表读请求到达时还是leader,然后等待apply index大于等于commit index
返回结果
etcd不仅实现了leader上的read only query,同时也实现了follower上的read only query,原理是一样的,只不过读请求到达follower时,commit index是需要向leader去要的,leader返回commit index给follower之前,同样,需要走上面的ReadIndex流程,因为leader同样需要check自己到底还是不是leader (我的理解:本质还是 leader 的 Read Index 读,只是 follower 充当与 client 之间的一层 proxy)
Lease Read 本小节完全参考 TiDB 文档[10]
(TiDB 把这个优化当做比 ReadIndex 更高级的优化来介绍,具体来说,ReadIndex 还需要额外发一轮心跳,收到 quorum ack 后,才能返回)
我们希望能直接读leader本地的数据而不向其他人发任何请求,这要求我们确保leader的已提交数据总是最新的,可以通过确保majority在一定时间内不选择其他leader,使得这个时间段内最多只有自己一个leader(或者自己挂掉),不会有更新term的leader提交日志,使得当前leader的已提交数据总是最新的
在 Raft 论文里面,提到了一种通过 clock + heartbeat 的 lease read 优化方法。也就是 leader 发送 heartbeat 的时候,会首先记录一个时间点 start,当系统大部分节点都回复了 heartbeat response,那么我们就可以认为 leader 的 lease 有效期可以到 start + election timeout / (clock drift bound per second * election timeout)这个时间点
我的理解:leader lease 的有效期到 “不再满足 quorum 节点心跳未过期,减去心跳过期时间内的时钟偏移上限”
TODO 如果消息传输已经用了比lease还久的时间,那么leader怎么做,leader在lease过期后自己就step down吗,好像不需要,因为写入操作实际上也是心跳,之后的写入等于续约,或者,如果follower已经没有lease了,等于签一个新的lease
Membership 参考[12]
论文中介绍了两阶段Membership变更算法,介绍如下:
外部触发membership变更请求,leader将其转化为 $C_{old,new}$ 日志并广播。任何收到membership变更日志后的节点,立即按照新配置运行(无论其是否commit)
自 $C_{old,new}$ 日志起,任何日志需要同时达成旧配置与新配置中的quorum ack才视为提交
当 $C_{old,new}$ 提交后,原有leader构造一条 $C_{new}$ 日志并广播,若leader不在新配置中,当 $C_{new}$ commit后可以退位
Etcd实际上使用的是单步成员变更,优势之一在于只需要单阶段就可完成单节点的membership变更[13]
Learner角色 [7]4.2.1节引入此角色,目的是当membership变化时,新加入的成员在需要在日志同步完成后再加入集群,否则根据[6]中给出的membership变更算法,变更期间的日志提交需要得到both old conf和new conf中majority节点的ack才视为提交,未完成日志同步的节点会阻塞整个集群的日志提交
Raft的一些重要时刻 这个是我自己提出的问题(因此问题可能不是很有价值,而且也许是错的……)
一条日志什么时候算作提交:当第majority个node将日志持久化时
写入日志的Linearization Point在哪里:Leader将commitIndex指向它的时候
以kv store为例,上层应用的Linearization Point在哪里:在apply日志到状态机的那一刻
附录 测试结果 测试结果太长了,放在最后
Lab2 2048次测试通过 Lab3 512次测试通过 Lab4 1024次测试通过
Lab2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 [root@e647a1bc04a2 raft] Test (2A): initial election ... ... Passed -- 3.5 3 34 10312 0 Test (2A): election after network failure ... ... Passed -- 5.5 3 127 23366 0 Test (2A): multiple elections ... ... Passed -- 7.4 7 838 134958 0 Test (2B): basic agreement ... ... Passed -- 1.2 3 16 4750 3 Test (2B): basic agreement ... ... Passed -- 1.6 3 16 4420 3 Test (2B): RPC byte count ... ... Passed -- 2.9 3 48 115098 11 Test (2B): agreement despite follower disconnection ... ... Passed -- 6.9 3 112 26633 8 Test (2B): no agreement if too many followers disconnect ... ... Passed -- 4.1 5 299 49880 3 Test (2B): concurrent Start()s ... ... Passed -- 1.3 3 10 3002 6 Test (2B): rejoin of partitioned leader ... ... Passed -- 5.9 3 129 32434 4 Test (2B): leader backs up quickly over incorrect follower logs ... ... Passed -- 27.1 5 2710 1836997 102 Test (2B): RPC counts aren't too high ... ... Passed -- 2.8 3 28 8866 12 Test (2C): basic persistence ... ... Passed -- 5.6 3 69 18640 6 Test (2C): more persistence ... ... Passed -- 19.9 5 1167 220492 16 Test (2C): partitioned leader and one follower crash, leader restarts ... ... Passed -- 2.8 3 37 9225 4 Test (2C): Figure 8 ... ... Passed -- 34.5 5 594 134594 38 Test (2C): unreliable agreement ... ... Passed -- 3.8 5 237 89256 246 Test (2C): Figure 8 (unreliable) ... delay time: 2325 ... Passed -- 35.2 5 3544 5190733 350 Test (2C): churn ... ... Passed -- 16.3 5 3479 3369043 1469 Test (2C): unreliable churn ... ... Passed -- 16.4 5 1106 548290 291 Test (2D): snapshots basic ... time used: 7296 ... Passed -- 7.3 3 144 56814 251 Test (2D): install snapshots (disconnect) ... time used: 53277 ... Passed -- 53.3 3 871 256572 367 Test (2D): install snapshots (disconnect+unreliable) ... time used: 67236 ... Passed -- 67.2 3 1049 288318 379 Test (2D): install snapshots (crash) ... time used: 41029 ... Passed -- 41.0 3 592 180220 355 Test (2D): install snapshots (unreliable+crash) ... time used: 49101 ... Passed -- 49.1 3 701 199176 366 PASS ok 6.824/raft 422.568s real 7m3.771s user 0m9.621s sys 0m7.843s
Lab3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 [root@9bd769928c32 kvraft] Test: one client (3A) ... ... Passed -- 15.1 5 3495 689 Test: ops complete fast enough (3A) ... ... Passed -- 21.4 3 3040 0 Test: many Clients (3A) ... ... Passed -- 15.4 5 7223 3310 Test: unreliable net, many Clients (3A) ... ... Passed -- 16.9 5 3133 608 Test: concurrent append to same key, unreliable (3A) ... ... Passed -- 1.8 3 156 52 Test: progress in majority (3A) ... ... Passed -- 0.6 5 41 2 Test: no progress in minority (3A) ... ... Passed -- 1.1 5 104 3 Test: completion after heal (3A) ... ... Passed -- 1.1 5 40 3 Test: partitions, one client (3A) ... ... Passed -- 22.8 5 2734 401 Test: partitions, many Clients (3A) ... ... Passed -- 22.5 5 6356 2272 Test: restarts, one client (3A) ... ... Passed -- 21.2 5 3723 691 Test: restarts, many Clients (3A) ... ... Passed -- 21.4 5 8116 3317 Test: unreliable net, restarts, many Clients (3A) ... ... Passed -- 22.3 5 3771 612 Test: restarts, partitions, many Clients (3A) ... ... Passed -- 27.9 5 6817 2564 Test: unreliable net, restarts, partitions, many Clients (3A) ... ... Passed -- 29.3 5 3566 313 Test: unreliable net, restarts, partitions, random keys, many Clients (3A) ... ... Passed -- 33.6 7 9159 856 Test: InstallSnapshot RPC (3B) ... ... Passed -- 5.9 3 337 63 Test: snapshot size is reasonable (3B) ... ... Passed -- 17.1 3 2434 800 Test: ops complete fast enough (3B) ... ... Passed -- 21.6 3 3054 0 Test: restarts, snapshots, one client (3B) ... ... Passed -- 21.1 5 3723 692 Test: restarts, snapshots, many Clients (3B) ... ... Passed -- 21.5 5 88544 28600 Test: unreliable net, snapshots, many Clients (3B) ... ... Passed -- 16.4 5 2864 537 Test: unreliable net, restarts, snapshots, many Clients (3B) ... ... Passed -- 23.1 5 3801 608 Test: unreliable net, restarts, partitions, snapshots, many Clients (3B) ... ... Passed -- 28.2 5 3368 332 Test: unreliable net, restarts, partitions, snapshots, random keys, many Clients (3B) ... ... Passed -- 32.0 7 8083 877 PASS ok 6.824/kvraft 462.060s real 7m42.459s user 1m42.312s sys 0m10.487s
Lab4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 [root@9bd769928c32 shardkv] Test: static shards ... ... Passed Test: join then leave ... ... Passed Test: snapshots, join , and leave ... ... Passed Test: servers miss configuration changes... ... Passed Test: concurrent puts and configuration changes... ... Passed Test: more concurrent puts and configuration changes... ... Passed Test: concurrent configuration change and restart... ... Passed Test: unreliable 1... ... Passed Test: unreliable 2... ... Passed Test: unreliable 3... ... Passed Test: shard deletion (challenge 1) ... ... Passed Test: unaffected shard access (challenge 2) ... ... Passed Test: partial migration shard access (challenge 2) ... ... Passed PASS ok 6.824/shardkv 131.002s real 2m12.658s user 7m36.001s sys 0m6.838s
补充测试
GOMAXPROCS
表头
ss
4
单元格
单元格
单元格
其他 runtime: marked free object in span 遇到的一个Golang的奇怪Bug,由于代码改动后来没有复现