MIT6.824 Lab2 Lab3 Lab4

这份笔记不包括最简单的Lab1,很久之前做的,细节已经忘记了。

花费20多天做完了Lab2-Lab4,稳定通过所有测试(含Lab4的两个Challenge),用例也有不全面的地方,下面会提到。遗憾的地方也有不少:

  1. 代码结构上不够美观;代码行数上,Lab4 server 1000行,Lab2 raft 2000行,也有不少的压缩空间。
  2. Lab3 Lab4实现的kvServer没有优化,据说get是可以不加锁的
  3. 不少地方使用轮询,用wait/signal会更加高效(与优雅)

难度上,我认同[2]作者的排序,Lab4 > Lab2 >> Lab3 > Lab1。
Lab2的难度在于:1. 理解Raft协议,仔细实现论文Figure2中的所有细节。2. 第一次编写调试分布式代码
Lab4的难度在于:1. 发现在Lab2对Raft的实现是有错误的。2.复杂度高,结构上:搭Raft上的状态机,数量上:12个Raft Server,3个ShardKV Server,很多客户端),调试难度上升

推荐资料

TODO 补充这些资料的简略说明

  1. 作业自带的说明与Hint
  2. MIT6.824-2021 四个Lab的实现笔记,Lab2的Snapshot和Lab4的部分思路参考于这里
  3. Raft架构图
  4. Instructors’ Guide to Raft
  5. Students’ Guide to Raft
  6. The Secret Lives of Data - Raft: Understandable Distributed Consensus Raft算法可视化,初学时辅助理解
  7. CONSENSUS: BRIDGING THEORY AND PRACTICE Raft的PhD论文
  8. go-test-many.sh 并行测试脚本,使用方法 ./go-test-many.sh 1024 32 TestUnreliableAgree2C,三个参数分别是测试次数、并行执行数、用例包含的字符串
  9. Raft does not Guarantee Liveness in the face of Network Faults
  10. TiKV 功能介绍 - Lease Read
  11. Performance
  12. In Search of an Understandable Consensus Algorithm Raft论文本身
  13. Raft成员变更的工程实践
  14. etcd raft如何实现Linearizable Read

相关资料

来不及看的一些文章,优先级递减

  1. Linearizability 一致性验证
    PingCAP的文章
  2. ARC: Analysis of Raft Consensus
  3. dist101
  4. Debugging by Pretty Printing

Lab 2

TODO:协程整理

Lab2分四次小作业

  • 2A: 实现选举+心跳包
  • 2B: 实现Basic Raft
  • 2C: 持久化
  • 2D: 日志压缩

整体上需要注意的点有:

  1. 在实现snapshot时会截断日志,因此一定不要使用Log数组的下标保存index,后面改起来会很麻烦
  2. RPC的发送需要手动管理超时,当发送给挂掉的server时,网络时延可能高达7s
  3. 测试时确保任意时刻CPU不要打满。在测试Lab4的TestJoinLeave和TestChallenge2Unaffected时,用例中给了一个假定的时限用于迁移shard,随后shutdown那个group,如果server进展过慢则会fail。使用并行数为8进行测试时,5800x在某些时刻存在打满的情况,测试结果中会低概率(<1%)无法通过这两个测试,给两倍时间,或并行数修改为4,能够稳定通过所有测试。

结构定义与初始化

结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type Raft struct {
mu DebugLock // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's Index into peers[]
dead int32 // set by Kill()

commitIndex int // index of the highest entry known to be committed
lastApplied int // highest entry applied to the state machine

nextIndex []int // next Log entry to send to that server, init len(Log)
// matchIndex []int // highest Log entry known to be replicated on server, init -1

PersistentState // 见下方

// ====== 以上是Figure 2中的所有变量 ======

role string // leader、follower or candidate

newDataNotifyChan []chan string // 通过此channel触发向其他raft server的日志发送逻辑
installSnapshotChan []chan string // 通过此channel触发向其他raft server install snapshot的逻辑
stateChangeNotifyChan chan string // 通过此channel通知主循环当前节点角色变化事件

muSnapshot sync.Mutex // 避免执行CondInstallSnapshot时的lastAppliedIndex与实际不一致,之后详细介绍
electionDeadline time.Time // follwer与candidate的任期超时时间

applyCh chan ApplyMsg // 提交给状态机的Channel

// debug用
name string // 用于区分不同的raft server
enableLog bool
omitLogCommand bool
enablePProf bool

//logsToSendChan chan *LogEntry // note: reinitialize when restart
// TODO: 用lock似乎无法同时响应多个中断,使用select
}

// 需要持久化的raft状态
type PersistentState struct {
// Figure 2指出的几个需要持久化的变量
Term int
VotedFor int
Log Logs

RaftSnapshot // 快照状态
}

type RaftSnapshot struct {
LastIncludedTerm int
LastIncludedIndex int
}

常量

选举timeout 550ms-1100ms,心跳180ms,RPC超时100ms

1
2
3
4
5
6
ELECTION_TIMEOUT_MIN   = 550 // follower/candidate timeout = (550 + rand.Intn(550)) ms
ELECTION_TIMEOUT_DELTA = 550

HEARTBEAT_TIMEOUT = 180 // 心跳包间隔
RPC_TIMEOUT = 100 // RPC超时
DEADLINE_CHECK_INTERVAL = 15 // check election timeout的间隔

初始化,只列出必要代码(略去辅助代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).
rf.VotedFor = -1
rf.Term = 0
rf.role = ROLE_FOLLOWER
rf.commitIndex = -1
rf.lastApplied = -1 // 这里使用0开始的日志index,test中使用的是1开始的,提交给applyCh转换即可
// TODO: golang race怎么检测冲突的
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.peers); i++ {
rf.matchIndex[i] = -1
}
rf.nextIndexValid = make([]bool, len(rf.peers))
rf.applyCh = applyCh
rf.stateChangeNotifyChan = make(chan string, 1)
rf.newDataNotifyChan = make([]chan string, 0)
for i := 0; i < len(rf.peers); i++ {
rf.newDataNotifyChan = append(rf.newDataNotifyChan, make(chan string, 1))
}

// snapshot
rf.RaftSnapshot.LastIncludedTerm = -1
rf.RaftSnapshot.LastIncludedIndex = -1
rf.installSnapshotChan = make([]chan string, 0)
for i := 0; i < len(rf.peers); i++ {
rf.installSnapshotChan = append(rf.installSnapshotChan, make(chan string, 1))
}

// initialize from state persisted after a crash
rf.readPersist(persister.ReadRaftState())

// 主循环
go rf.ticker()
// 异步日志提交
go rf.applier()

return rf
}

主循环

状态转移矩阵,横向为起始状态,纵向为目标状态。只有candidate可以转化为两种状态,其他角色的下一状态都是必然的。

leader follower candidate
leader - 发现更高term的server -
follower - - 超时
candidate majority vote 发现更高term的server 选举超时,下一轮还是candidate

实现复杂度上:leader > candidate > follower

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (rf *Raft) ticker() {
for rf.killed() == false {
rf.mu.Lock()
role := rf.role // 即使在锁外role发生多次变化,使用term作为逻辑时钟,raft可以避免落后的server发挥作用
rf.mu.Unlock()

if role == ROLE_FOLLOWER {
rf.loopFollower()
} else if role == ROLE_CANDIDATE {
rf.loopCandidate()
} else {
rf.loopLeader()
}
}
}

LoopFollower

follower只有任期结束这一种角色转换的可能性,逻辑较为简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) loopFollower() {
rf.mu.Lock()
rf.resetElectionTimeout()
rf.mu.Unlock()

timeoutChan := make(chan string, 1)
go func() {
defer func() { timeoutChan <- "x" }()
for {
rf.mu.Lock() // 加锁确保检查时间的时候没有vote正在进行
if time.Now().After(rf.electionDeadline) {
rf.mu.Unlock()
return
}
rf.mu.Unlock()
time.Sleep(time.Millisecond * DEADLINE_CHECK_INTERVAL) // golang的timer很容易错误使用,这里轮询
}
}()

<-timeoutChan // 只有timeout导致角色变化一种可能性

rf.mu.Lock()
rf.Term++
rf.role = ROLE_CANDIDATE // role变更与相关的初始化在同一次加锁内完成
rf.VotedFor = rf.me
rf.persist() // 一致修改persistent变量后记得做原子保存

rf.mu.Unlock()
}

测试在kill之前会先断网,防止状态传播到其他server,然后将持久化状态拷贝一份用于将来重启server,最后调用Kill()杀死服务,persist提供的保存会持锁,与拷贝过程持锁冲突,这样提供了原子的持久化接口

LoopCandidate

包括loopLeader,采用的都是类似的模式,对每个out channel创建协程,加锁构造请求,避免在请求发送期间被修改;无锁发送请求;加锁处理请求,确保对自身状态修改的一致性

Candidate需要同步地响应三种中断:作为RPC receiver引发的角色切换、任期超时、requestVote RPC被接受

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func (rf *Raft) loopCandidate() {
ackChan := make(chan string, len(rf.peers)) // 存放requestVote的ack
ackNum := 1

rf.mu.Lock()

returnFunc := func(term int) func() bool {
return func() bool {
return rf.role != ROLE_CANDIDATE || rf.killed() || term != rf.Term
}
}(rf.Term)

rf.resetElectionTimeout()
rf.persist()

for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}

go func(i int) {
for {
rf.mu.Lock()

if returnFunc() { // 在合适的时机释放协程
rf.mu.Unlock()
return
}

lastTerm, lastIndex := rf.getLatestTermAndIndex()
req := &RequestVoteArgs{
Term: rf.Term,
CandidateId: rf.me,
LastLogIndex: lastIndex,
LastLogTerm: lastTerm,
}
rf.mu.Unlock()
resp := RequestVoteReply{}

sendSuccess := rf.retrySend(i, returnFunc, nil, nil, req, &resp, nil, nil, 1) // TODO: returnFunc

if sendSuccess {
if resp.VoteGranted {
ackChan <- "ok"
} else {
rf.mu.Lock()
if resp.Term > rf.Term {
rf.Term = resp.Term
rf.VotedFor = -1
rf.persist()
rf.role = ROLE_FOLLOWER
select {
case rf.stateChangeNotifyChan <- "x":
default:
}
}
rf.mu.Unlock()
}
break
} // else continue next round
}

}(i)
}
rf.mu.Unlock()

timeoutChan := make(chan string, 1)
go func() {
defer func() { timeoutChan <- "x" }()
for {
rf.mu.Lock() // 加锁确保检查时间的时候没有vote正在进行
if time.Now().After(rf.electionDeadline) {
if rf.role != ROLE_CANDIDATE {
rf.mu.Unlock()
return
}
rf.Term++
rf.persist()
rf.mu.Unlock()
return
}
rf.mu.Unlock()
time.Sleep(time.Millisecond * DEADLINE_CHECK_INTERVAL)
}
}()

// 响应不同类型的中断
for {
select {
case <-ackChan:
ackNum++
if ackNum == rf.getQuorumSize() {
rf.mu.Lock()
rf.logNoLock("got %d vote", ackNum)
if rf.role != ROLE_CANDIDATE {
rf.mu.Unlock()
return
}
rf.role = ROLE_LEADER // 直接进入下一个cycle,不需要响应stateChange
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = rf.getLatestIndex() + 1
}
rf.mu.Unlock()
return
}
case <-timeoutChan:
return
case <-rf.stateChangeNotifyChan:
return
}
}
}

LoopLeader

存放了Raft的主要逻辑(另一个主要部分是RPC处理),整体逻辑与LoopCandidate相似,需要注意的问题有:

  1. 唯一需要响应的中断是来自更高Term RPC导致转换为follower
  2. 上任后立即发送一个心跳,尽快向其他server宣告自己的地位,避免它们升到更高的term,导致系统再次进入无leader状态(也就不可用)
    实际上应当发送一个空日志而非心跳,考虑某个当选的leader在当选时有commitIndex=5,lastLogIndex=6,而其他server均为commitIndex=lastLogIndex=6,上层状态机仅当被deliver index=6的消息后,才会向raft server提交新的日志。在这种情况下,根据论文Figure 8,来自上一个term的消息通过counting majority提交时不安全的,此时这条消息永远不会deliver,系统无法前进,即不满足liveness property。发送当前任期的空日志可以解决这一问题。然而,在用例中提交自己创造的日志会引发错误,说明给出的测试用例是有缺陷的。这一问题最常在TestBasicAgreeUnreliable中触发,概率约0.2%。Lab4中在状态机级别会主动提交空日志解决此问题。
  3. 论文给出的matchIndex变量我没有找到用途,我认为依赖nextIndex-1作为对方当前持有的日志位置是安全的。因为只要leader与其他server发生过success=true的AppendEntries RPC交互,上面的命题为真。对于未发生交互的情况,使用初始化的值rf.nextIndex[i] = rf.getLatestIndex() + 1作为对方持有的日志位置进行判断是否已经发给majority,即使超过majority,也会由于日志非当前term而无法提交,除非之后有当前term的位置检测到majority,而这个位置一定又是发生过AppendEntries RPC交互的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (rf *Raft) loopLeader() {
rf.mu.Lock()

returnCond := func(term int) func() bool {
return func() bool {
return rf.role != ROLE_LEADER || rf.killed() || term != rf.Term
}
}(rf.Term)

for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}

go func(i int) {
var prevLogTerm int
var sendSuccess bool

// 立即发送心跳
heartbeatTimeoutTimer := time.NewTimer(0)

for {
select { // 注意select的不同case间是没有优先级的
case <-heartbeatTimeoutTimer.C:
HTBT:
// 发送心跳的逻辑,下方介绍
case <-rf.newDataNotifyChan[i]:
MSG:
// 发送日志的逻辑,下方介绍
case <-rf.installSnapshotChan[i]:
SNAPSHOT:
// 向其他server发送snapshot的逻辑,下方介绍
}
}
}(i)
}
rf.mu.Unlock()

<-rf.stateChangeNotifyChan
}
发送心跳

经典的加锁构造,无锁发送,加锁处理结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
var req *AppendEntriesRequest
resp := AppendEntriesResponse{}
heartbeatTimeoutTimer.Reset(rf.getHeartbeatTimeout())
rf.mu.Lock()
if returnCond() { // note: 只靠ctx.Done()好像还不够……麻了
rf.mu.Unlock()
return
}
// 如果有新数据,优先触发发送日志的handler
if rf.nextIndex[i] <= rf.getLatestIndex() {
select {
case rf.newDataNotifyChan[i] <- "x":
default:
}
rf.mu.Unlock()
continue
}

if rf.nextIndex[i] <= 0 {
prevLogTerm = -1
} else if rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex {
// 如果对方的nextIndex小于等于我的snapshot位置,优先触发installSnapshot的handler
select {
case rf.installSnapshotChan[i] <- "x":
default:
}
rf.mu.Unlock()
return
} else if rf.nextIndex[i] == rf.RaftSnapshot.LastIncludedIndex+1 {
prevLogTerm = rf.RaftSnapshot.LastIncludedTerm
} else {
prevLogTerm = rf.getLogByIndex(rf.nextIndex[i] - 1).Term
}

req = &AppendEntriesRequest{
Msgs: nil,
Term: rf.Term,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[i] - 1,
PrevLogTerm: prevLogTerm,
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()

// 把所有的rpc发送与接收实现在一个函数中了,最初的设计是支持重试,也就是最后一个参数,但loopLeader的循环本身就是重试,于是重试次数都填1
if !rf.retrySend(i, returnCond, req, &resp, nil, nil, nil, nil, 1) {
continue
}

rf.mu.Lock()
if returnCond() {
rf.mu.Unlock()
return
}

if resp.Success {
rf.mu.Unlock()
continue
}

if resp.Term > rf.Term {
rf.Term = resp.Term
rf.role = ROLE_FOLLOWER
rf.stateChangeNotifyChan <- "x"
rf.VotedFor = -1
rf.persist()
rf.mu.Unlock()
return
} else {
// nextIndex估计错误,回退
if rf.adjustNextIndex(i, req.PrevLogIndex, resp.ConflictTerm, resp.ConflictIndex) {
rf.mu.Unlock()
goto HTBT // 不等待,立即发送下一次心跳,继续调整nextIndex直到正确
} else {
rf.mu.Unlock()
continue
}
}
调整nextIndex

在测试TestFigure8Unreliable2C中,会产生要求大量回退的场景,且网络是unreliable的,意味着单次回退一个index太慢。这里使用[5]中提供的回退策略加速回退。

follower

  1. 如果follower在发送的prevIndex位置没有日志,那么(conflictTerm=-1, conflictIndex=indexOfLastLog+1)
  2. 如果follower在prevIndex位置有日志,但冲突(term不同,一定是小于),conflictTerm=prevIndex位置日志entry的term,conflictIndex=第一条term为此term的index

leader

  1. 如果conflictTerm=-1,回退到conflictIndex
  2. 如果有term=conflictTerm的日志,回退到最后一条这样日志的下一条日志
  3. 否则仍然回退到conflictIndex

分析安全性:
回退不足会导致效率下降(在网络环境差时加剧),回退过度要么导致发送installSnapshot,要么导致发送一些冗余日志(由于appendEntries是批量发送,RPC次数是一样的),不会有安全性问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (rf *Raft) adjustNextIndex(i int, prevLogIndex int, conflictTerm int, conflictIndex int) bool {
// 发送请求期间自身进行过日志裁剪,导致发送的prevIndex处的日志已经不在了,触发installSnapshot handler
if prevLogIndex <= rf.RaftSnapshot.LastIncludedIndex {
rf.logNoLock("prevLogIndex %d not in log, impossible to go back, notify install snapshot for %d", prevLogIndex, i)
select {
case rf.installSnapshotChan[i] <- "x":
default:
}
return false
}

if conflictTerm == -1 {
rf.nextIndex[i] = conflictIndex
} else {
conflictTermIndex := -1

for j := rf.Log.getLocalIndex(prevLogIndex); j >= 1; j-- {
// note: 考虑日志leader 1122333,follower 112222,发送最后两个33,返回conflictTerm=2,回到最后一个2的后一个位置作为nextIndex发送
if rf.Log[j-1].Term == conflictTerm {
conflictTermIndex = j
break
}
if rf.Log[j-1].Term < conflictTerm {
break
}
}
if conflictTermIndex == -1 {
// note: 考虑日志leader 113333,follower 1122,发送最后两个33,conflictTerm=2 conflictIndex=无法在leader日志找到,那么按照conflictIndex=2来
// follower视角:冲突点在第二个2,找到第一个2作为conflictIndex返回,即leader视角的第一个3位置
rf.nextIndex[i] = conflictIndex
} else {
rf.nextIndex[i] = rf.Log[conflictTermIndex].Index
}
}

// 触发发送日志handler
select {
case rf.newDataNotifyChan[i] <- "x":
default:
}

return true
}
发送日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
var req *AppendEntriesRequest
resp := AppendEntriesResponse{}

heartbeatTimeoutTimer.Reset(rf.getHeartbeatTimeout())

rf.mu.Lock()
if returnCond() {
rf.mu.Unlock()
return
}

if rf.nextIndex[i] > rf.getLatestIndex() {
rf.mu.Unlock()
continue
}

if rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex {
select {
case rf.installSnapshotChan[i] <- "x":
default:
}
rf.mu.Unlock()
continue
}

if rf.nextIndex[i] <= 0 {
prevLogTerm = -1
} else if rf.nextIndex[i] <= rf.RaftSnapshot.LastIncludedIndex {
select {
case rf.installSnapshotChan[i] <- "x":
default:
}
rf.mu.Unlock()
return
} else if rf.nextIndex[i] == rf.RaftSnapshot.LastIncludedIndex+1 {
prevLogTerm = rf.RaftSnapshot.LastIncludedTerm
} else {
prevLogTerm = rf.Log.getByIndex(rf.nextIndex[i] - 1).Term
}

// 考虑发送过程中接收到更高term的appendEntries请求,对日志进行了覆盖,那么请求参数会被改变,因此需要复制一份
logCp := make([]LogEntry, 0)
for _, log := range rf.Log {
logCp = append(logCp, LogEntry{
Log: log.Log,
Term: log.Term,
Index: log.Index,
})
}
req = &AppendEntriesRequest{
Msgs: logCp[rf.Log.getLocalIndex(rf.nextIndex[i]):],
Term: rf.Term,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[i] - 1,
PrevLogTerm: prevLogTerm,
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()

if !rf.retrySend(i, returnCond, req, &resp, nil, nil, nil, nil, 1) {
goto MSG // 重试
}

rf.mu.Lock()
if returnCond() {
rf.mu.Unlock()
return
}
if resp.Success {
forward := len(req.Msgs)
rf.nextIndex[i] += forward
rf.tryCommit(i, rf.nextIndex[i]-forward) // 从nextIndex-消息长度的位置开始检查commit条件是否满足
rf.mu.Unlock()
goto MSG // 确认是否有更多消息,没有消息才等待心跳包超时
}

if resp.Term > rf.Term {
rf.Term = resp.Term
rf.persist()
rf.role = ROLE_FOLLOWER
select {
case rf.stateChangeNotifyChan <- "x":
default:
}
rf.mu.Unlock()
return
}

if rf.adjustNextIndex(i, req.PrevLogIndex, resp.ConflictTerm, resp.ConflictIndex) {
rf.mu.Unlock()
goto MSG
} else {
rf.mu.Unlock()
continue
}
更新提交位置

从successStartIndex到当前日志位置检查是否可以提交,一旦有日志不符合提交条件,后续日志也无检查必要。commit只是逻辑上的标志,有专门的日志提交协程。

这里解释一下论文figure 8:不允许通过replicate仅含之前term的日志到majority来判定其committed
考虑5台server的raft cluster
term 2:server 1是leader,replicate msg2到server 1(leader), server2(follower)后挂掉。
term 3:server 5当选为leader(因为有三台没有收到msg2的server),自己append一条日志后挂掉。
term 4:server1复活,选举为leader,server1根据心跳获知server 3没有msg2这条日志,通过AppendEntires发送,根据心跳获知server1有这条日志,此时根据AppendEntries的成功返回(success=true),term 3 leader获知已经replicate到majority,但不能commit,下面会看到msg2如何被覆盖。假设server1在term4仅仅发送一条msg2给server3后就挂掉
term 5:server5当选leader(根据论文中RequestVote RPC中的as up-to-date比较log新旧顺序,server5比server2/3/4更新,因此能够收到来自majority的投票),收到心跳包回复,回退日志,发送AppendEntries RPC,会对之前的msg2进行覆盖,这就是不允许通过replicate仅含之前term的日志到majority来判定其committed的原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rf *Raft) tryCommit(i int, successStartIndex int) {
for current := successStartIndex; current < rf.nextIndex[i]; current++ {
ackNum := 1
for j := 0; j < len(rf.peers); j++ {
if j == rf.me {
continue
}

if rf.nextIndex[j] > current {
ackNum++
}
}

// 最后的rf.commitIndex < current属于assert性质,没有来得及验证是否可以删除
if ackNum >= rf.getQuorumSize() && current > rf.RaftSnapshot.LastIncludedIndex && rf.Log.getByIndex(current).Term == rf.Term && rf.commitIndex < current {
rf.commitIndex = current
} else {
break
}
}
}
发送snapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
var req *InstallSnapshotRequest
resp := InstallSnapshotResponse{}

rf.mu.Lock()
if returnCond() {
rf.mu.Unlock()
return
}

// note: 不应该发最新的,而是应该发自己的snapshot,snapshot确保commit
req = &InstallSnapshotRequest{
Term: rf.Term,
LeaderId: rf.me,
RaftSnapshot: rf.RaftSnapshot,
StateMachineState: rf.persister.ReadSnapshot(),
}
rf.mu.Unlock()

if !rf.retrySend(i, returnCond, nil, nil, nil, nil, req, &resp, 1) {
goto SNAPSHOT
}

rf.mu.Lock()
if returnCond() {
rf.mu.Unlock()
return
}

if resp.Term > rf.Term {
rf.Term = resp.Term
rf.persist()
rf.role = ROLE_FOLLOWER
select {
case rf.stateChangeNotifyChan <- "x":
default:
}
rf.mu.Unlock()
return
} else {
// 如果nextIndex有进展的话
if req.LastIncludedIndex+1 > rf.nextIndex[i] {
oldNextId := rf.nextIndex[i]
rf.nextIndex[i] = req.LastIncludedIndex + 1
rf.tryCommit(i, oldNextId)
}
}

rf.mu.Unlock()

RequestVote RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (rf *Raft) RequestVote(request *RequestVoteArgs, response *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()

if rf.Term > request.Term {
rf.rejectVote(request, response)
return
} else if rf.Term < request.Term {
rf.Term = request.Term
rf.VotedFor = -1 // 转为follower不意味着一定投票给对方,可能自己的log更up to date
rf.role = ROLE_FOLLOWER
select {
case rf.stateChangeNotifyChan <- "x":
default:
}
}

// 只有没有投过票,或者已经投给过request.Candidate的server可能会accept
if rf.VotedFor != -1 && rf.VotedFor != request.CandidateId {
rf.logNoLock("reject because self.votedFor %d != request.candidateId %d",
rf.VotedFor, request.CandidateId)
rf.rejectVote(request, response)
return
}

// vote only if peer is at least as up to date as self
lastTerm, lastIndex := rf.getLatestTermAndIndex()
if request.LastLogTerm > lastTerm || (request.LastLogTerm == lastTerm && request.LastLogIndex >= lastIndex) {
rf.VotedFor = request.CandidateId
rf.resetElectionTimeout() // Figure 2中follower的两种刷新timeout条件之一
rf.acceptVote(request, response)
return
}

rf.rejectVote(request, response)
}

AppendEntries RPC

某条日志的commit最早可以发生在majority中最后一台server将此日志持久化后的那一刻,但显然leader直到这条(或者随后的RPC)消息返回才能够观测到这个事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
func (rf *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()

// 检查Term与Role
if !rf.checkTermAndRole(request.Term, request.LeaderId) {
rf.logNoLock("%s because term", textRed("reject"))
rf.rejectAppendEntries(request, response)
return
}

// 切除append msg已经在当前server snapshot中的部分,如果Msg不为nil(不是心跳包)且发送的消息全部在snapshot中,立即返回
if request.Msgs != nil {
firstNonSnapshotIndexInMsg := 0
for ; firstNonSnapshotIndexInMsg < len(request.Msgs); firstNonSnapshotIndexInMsg++ {
if request.Msgs[firstNonSnapshotIndexInMsg].Index > rf.RaftSnapshot.LastIncludedIndex {
break
}
}
request.Msgs = request.Msgs[firstNonSnapshotIndexInMsg:]

// 如果长度变短了,那么一定有小于等于snapshot的部分,修改request.PrevLog[Term/Index]使得prevIndex一定与Msg[0].Index能够连起来
if firstNonSnapshotIndexInMsg != 0 {
request.PrevLogTerm = rf.RaftSnapshot.LastIncludedTerm
request.PrevLogIndex = rf.RaftSnapshot.LastIncludedIndex
}

if len(request.Msgs) == 0 {
rf.acceptAppendEntries(request, response)
return
}
}

// 确保PrevLogIndex不新于当前最新日志,即不存在日志Index不连续的情况,否则根据前文介绍的回退规则回退
_, lastIndex := rf.getLatestTermAndIndex()
if lastIndex < request.PrevLogIndex {
rf.logNoLock("reject entry because not containing Log at prevLogIndex %d, current Log max index %d", request.PrevLogIndex, lastIndex)
response.ConflictTerm = -1
response.ConflictIndex = lastIndex + 1 // 回退到自己持有的最新日志位置
rf.rejectAppendEntries(request, response)
return
}

localIndexForLogAtPrevIndex, logAtPrevIndex := rf.getLogAndLocalIndexByIndex(request.PrevLogIndex)

// PrevLogIndex在当前状态中已经存在,但Term不一致,回退。Append被accept后,需要保证与leader在Append的最后一个日志entry以及之前都是一致的
if request.PrevLogTerm >= 0 && request.PrevLogIndex > rf.RaftSnapshot.LastIncludedIndex && logAtPrevIndex.Term != request.PrevLogTerm {
// 回退到自身日志中,term等于在prevIndex处自身log的term的第一条日志的位置,这样的位置一定存在
response.ConflictTerm = logAtPrevIndex.Term
i := localIndexForLogAtPrevIndex
for {
if i == 0 || rf.Log[i-1].Term != response.ConflictTerm {
response.ConflictIndex = i // first index whose entry has term equal to conflictTerm.
break
}
i--
}
rf.rejectAppendEntries(request, response)
return
}

// 所有检查通过,必定accept
rf.acceptAppendEntries(request, response)

i := localIndexForLogAtPrevIndex + 1 // prev后的第一个local位置,local index即自身Log数组中log entry的下标
j := 0
hasConflict := false
for ; i < len(rf.Log) && j < len(request.Msgs); i++ {
if rf.Log[i].Term != request.Msgs[j].Term { // figure 2中conflict的定义
hasConflict = true
}
// 用Msg中的日志覆盖本地日志
rf.logNoLock("overwrite %d with %d", rf.Log[i].Index, request.Msgs[j])
rf.Log[i] = request.Msgs[j]
j++
}

// 如果来自覆盖本地日志后还有多,那么还需要append超出的部分;否则,如果发生了覆盖(存在conflict,定义见figure 2),本地日志需要裁剪至request.Msg中最后一条日志的位置,如果未conflict,那么什么都不需要做
if j < len(request.Msgs) {
rf.Log = append(rf.Log, request.Msgs[j:]...)
} else if hasConflict {
rf.Log = rf.Log[:i]
}

// 根据leader的commit位置尝试更新自身commit位置,在leader会过滤figure 8的情况(不允许通过replicate仅含之前term的日志到majority来判定其committed),这里只是follower leader的commit位置,不需要判断它
_, lastIndex = rf.getLatestTermAndIndex()
if request.LeaderCommit > rf.commitIndex {
oldCommitIndex := rf.commitIndex
var newCommitIndex int
if request.LeaderCommit > lastIndex {
newCommitIndex = lastIndex
} else {
newCommitIndex = request.LeaderCommit
}

if newCommitIndex > oldCommitIndex {
rf.commitIndex = newCommitIndex
}
}
}

异步提交、故障恢复、快照与持久化

  • 持久化:对任何持久变量的修改,如果修改后的状态是一致的,那么需要进行一次保存。
  • 快照:如果没有快照,commitIndex按照figure 2中初始化为0,日志是持久化的,那么每次故障恢复都会从第一条日志开始重现历史,因此需要有快照,通过快照获得初始状态,通过重现少量历史回到故障发生前的持久化状态。快照内容包含状态机状态与raft状态,状态机状态与应用相关,raft状态包含LastIncludedTermLastIncludedIndex两个变量,显然都需要是持久化的

关于异步提交这块,applyCh改成有缓冲channel可以解决暂时的日志提交与日志消费速度不匹配的问题(仅仅是暂时),仍然不能避免加锁提交导致的死锁问题。下方仅对当前的无缓冲applyCh实现进行分析。

  • 不要加锁提交:考虑状态机拿到一条日志执行后,判断需要进行snapshot,尝试获得锁(raft有多个持久化状态,为保证snapshot期间读到一致的状态,需要加锁),与此同时,raft server持有锁去提交日志,由于applyCh无缓冲(或者可以看成缓冲区满),产生了死锁状态
    TODO:如果只有一个持久化状态,就可以不加锁了吗?

  • 快照过程:由状态机触发,目的是基于某个commitIndex,收集一个raft与状态机间的一致状态。
    实际上对于raft的持久化状态,term和voteFor都可能根据新term的产生而改变,似乎只要log,或者snapshot位置(LastIncludedTerm、LastIncludedIndex)不低于状态机传来的commitIndex,就算是一致的

  • 同步/异步提交:同步提交的问题在于状态机消费与raft提交的速度不一定一致

  • 发送快照:首先leader无论何时拥有最新日志,因此这个RPC的方向一定是从leader到其他角色。一旦leader发现其他server的日志比自己的snapshot还落后,由于snapshot会裁剪日志,此时已经无法用AppendEntries RPC来使其追赶,而是通过发送快照实现。需要同时发送raft与状态机的快照。可以看出,上层快照也需要传递给raft server,尽管不需要理解含义。快照可以是异步的(如果在我的实现中使用同步提交snapshot请求给状态机会导致死锁)。

  • 接受快照(installSnapshot)过程:leader发送installSnapshot RPC到follower,follower异步发送installSnapshot消息给状态机,状态机调用CondInstallSnapshot确保发来的snapshot比当前状态更新(通过与lastApplied比较)后,raft apply此snapshot,状态机apply此snapshot

  • 故障恢复:raft层的故障恢复通过每次持久化状态之间一致时进行持久化来实现,但commitIndex和lastApplied不是持久化的,在恢复后初始化为snapshot的位置,即会从snapshot开始重新向上层提交此后的日志。上层的持久化依赖snapshot时传给raft的状态实现。
    总结一下:状态机的状态会因故障回退,raft的日志在确认提交后不会回退,因此可以通过重放还原故障前的状态机状态

    • 什么时候一条日志可以看做提交:前面提到,commitIndex不是持久化的,所以commitIndex的移动不能视作提交。当当前term日志replicate到majority时,并且这些日志被majority server持久化后,尽管没有持久化的变量去标记,但是,由于这些日志不可能被覆盖(不含此日志的server无法成为更高term的leader),也不会丢失(已经在majority持久化),那么在同步(synchronous)或部分同步(partial synchronous)系统中(也许还加上crash-recover模型的条件?),这条日志会最终deliver给状态机,因此可以看做被提交。

可以看到,我们需要一个不加锁、异步的日志提交。这块的代码参考了[2]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (rf *Raft) applier() {
for rf.killed() == false {
rf.mu.Lock()
// if there is no need to apply entries, just release CPU and wait other goroutine's signal if they commit new entries
if rf.lastApplied >= rf.commitIndex {
rf.mu.Unlock()
time.Sleep(time.Millisecond * 20)
continue
}
lastAppliedLogIndex, _ := rf.getLogAndLocalIndexByIndex(rf.lastApplied)
commitLogIndex, _ := rf.getLogAndLocalIndexByIndex(rf.commitIndex)

commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
entries := make([]LogEntry, commitIndex-lastApplied)
copy(entries, rf.Log[lastAppliedLogIndex+1:commitLogIndex+1])
rf.mu.Unlock()

for _, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: true, // 这里提上去调的是Snapshot
Command: entry.Log,
CommandIndex: entry.Index + 1,
}
} // apply后callback

rf.mu.Lock()
// 注意rf.commitIndex可能会变化,使用提交前加锁记录的commitIndex
// 加锁进入前可能CondInstall被调用导致lastApplied前进,这里如果不判断更新会导致此变量后退
if commitIndex > rf.lastApplied {
rf.lastApplied = commitIndex
}
rf.mu.Unlock()
}
}

Snapshot相关

由状态机触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()

index--

// note: 因为还可能从其他人那里install
if index <= rf.RaftSnapshot.LastIncludedIndex {
rf.logNoLock("end snapshot creating because outdated index %d, current snapshot %d", index, rf.RaftSnapshot.LastIncludedIndex)
return
}

// index大于超过快照,这条日志一定存在
var persistentRaftState *RaftSnapshot
for _, log := range rf.Log {
if log.Index == index {
persistentRaftState = &RaftSnapshot{
LastIncludedTerm: log.Term,
LastIncludedIndex: log.Index,
}
}
}

rf.RaftSnapshot = *persistentRaftState
if rf.LastIncludedIndex > rf.lastApplied {
rf.lastApplied = rf.LastIncludedIndex // applier deliver日志与更新lastApplied间不是原子的(未加锁),此外唯一可能修改的位置就是这里
}

rf.trimLog(index) // 清理日志数组,直到其至少从index+1的位置开始
rf.persister.SaveStateAndSnapshot(rf.serialize(), snapshot)
}

InstallSnapshot RPC相关

与日志提交同理,installSnapshot向状态机的提交也需要是无锁、异步的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, resposne *InstallSnapshotResponse) {
rf.mu.Lock()

resposne.Term = rf.Term

// note: 里面有个重置election timeout,我认为installSnapshot也要做
if !rf.checkTermAndRole(request.Term, request.LeaderId) {
rf.mu.Unlock()
return
}

// note: 如果发来的snapshot确定不改变状态机状态,那么在这里先过滤一下
if rf.LastIncludedIndex >= request.LastIncludedIndex { // note: 关系是 LastIncludedIndex <= lastApplied <= committed
rf.persist() // 因为前面调用了checkTermAndRole,所以需要persist
rf.mu.Unlock()
return
}

rf.mu.Unlock()

go func() {
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: request.StateMachineState,
SnapshotTerm: request.LastIncludedTerm,
SnapshotIndex: request.LastIncludedIndex + 1,
}
}()
}
CondInstallSnapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
rf.mu.Lock()
defer rf.mu.Unlock()

lastIncludedIndex--

// note: LastIncludedIndex决定重放日志的开始位置,如果lastIncludedIndex前进,发来的状态机状态就需要与raft层状态一起原子保存。否则拒绝apply这个snapshot
if lastIncludedIndex <= rf.LastIncludedIndex {
return false
}

rf.trimLog(lastIncludedIndex)
// 下面分别是:只更新snapshot位置、更新snapshot/lastApplied位置、更新snapshot/lastApplied/commit位置
// 三者的关系必定有 lastIncludedIndex <= lastApplied <= commitIndex
if lastIncludedIndex <= rf.lastApplied {
rf.RaftSnapshot = RaftSnapshot{LastIncludedTerm: lastIncludedTerm, LastIncludedIndex: lastIncludedIndex}
// note: [2]中的实现在这里也修改了lastApplied,这会导致部分日志重复提交,但只要状态机过滤了也ok
} else if lastIncludedIndex <= rf.commitIndex {
rf.RaftSnapshot = RaftSnapshot{LastIncludedTerm: lastIncludedTerm, LastIncludedIndex: lastIncludedIndex}
rf.lastApplied = lastIncludedIndex
} else {
rf.RaftSnapshot = RaftSnapshot{LastIncludedTerm: lastIncludedTerm, LastIncludedIndex: lastIncludedIndex}
rf.lastApplied = lastIncludedIndex
rf.commitIndex = lastIncludedIndex
}

rf.persister.SaveStateAndSnapshot(rf.serialize(), snapshot)

return true
}

其他

  1. 日志:每个服务我都加了一个日志开关,设置了日志格式,提供加锁、不加锁的(以及临时无视开关的,更好的实践应该是日志等级)API,以raft为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rf *Raft) logWithLock(str string, a ...interface{}) {
if !rf.enableLog {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
rf.logNoLock(str, a...)
}

func (rf *Raft) logNoLock(str string, a ...interface{}) {
if rf.enableLog {
rf.logPrivilege(str, a...)
}
}

func (rf *Raft) logPrivilege(str string, a ...interface{}) {
lastTerm, lastIndex := rf.getLatestTermAndIndex()
fmt.Printf(fmt.Sprintf("[%d %s Term: %d, Role: %s, cmt: %d, lastTerm/Idx:%d-%d, snapTerm/Idx:%d-%d LogL: %d, Next:%v, lA:%d Id: %d] ",
realTimeMilli(), rf.name, rf.Term, rf.role, rf.commitIndex, lastTerm, lastIndex, rf.LastIncludedTerm, rf.LastIncludedIndex,
len(rf.Log), rf.nextIndex, rf.lastApplied, rf.me) + fmt.Sprintf(str, a...) + "\n")
}

效果(此外一些RPC的返回success也加了个颜色,找起来方便一些)

1
2
3
4
5
6
7
8
9
10
// 每列分别是:物理时间、进程名称(Lab4用)、Term、Role、CommitIndex、日志位置、快照状态、日志长度、nextIndex数组、lastApplied、raft集群内id
[124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 19 0], lA:19 Id: 2] [reqId 22206 msgTiUsd 12] after sending to 1 success:^[[0;32mtrue^[[0mrespSuccess:^[[0;32mtrue^[[0m
[124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] nextIndex for 1 advance to 20, self lastLogIndex 20
[124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] [reqId 47175 realTime 124142] before sending append entries to 1
[124142 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] >>> sending req[38427] append[true] vote[false] snap[false]
[124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-19, snapTerm/Idx:1-17 LogL: 2, Next:[0 0 0], lA:17 Id: 1] recv log append [20->20]
[124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-19, snapTerm/Idx:1-17 LogL: 2, Next:[0 0 0], lA:17 Id: 1] reset election timeout to 606000000
[124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-19, snapTerm/Idx:1-17 LogL: 2, Next:[0 0 0], lA:17 Id: 1] accept append entries from 2
[124148 101-1 Term: 1, Role: follower, cmt: 18, lastTerm/Idx:1-20, snapTerm/Idx:1-17 LogL: 3, Next:[0 0 0], lA:17 Id: 1] leaderCommit 19 self.lastIndex: 20
[124148 101-2 Term: 1, Role: leader, cmt: 19, lastTerm/Idx:1-20, snapTerm/Idx:1-18 LogL: 2, Next:[20 20 0], lA:19 Id: 2] [reqId 47175 msgTiUsd 6] after sending to 1 success:^[[0;32mtrue^[[0m respSuccess:^[[0;32mtrue^[[0m

后期为了排查偶发问题加了不少日志,最终的1024次测试居然打出了16G共10834W行的日志……

  1. 问题排查
  • 死锁:找到日志最先停止的协程,找到最后一条日志,一般会有些思路。另一种办法是重写一个mutex,加锁解锁打日志。
  • 活锁:找到最后一个非定时任务触发的日志,比死锁更难查
  • 断言:在各种地方加断言也是很有效的,不要让程序带着错误状态继续执行,会导致问题爆发的位置与状态开始偏离正常的位置相距很远,导致排查困难
  • 一致性检测:其实还是断言,对Lab4,我实现了类似Lab2中检测日志一致性的协程去check,这样能够知道问题来源于raft还是来源于状态机,缩小排查范围
  1. 一些小错误
    之前的一版实现,每次修改持久化变量都存一次档,效率低是其次,问题在于有些时候只改一个变量会导致不一致状态被持久化

Lab 3

Lab 4的shard controller基本是复用这里的代码,这里简单介绍一下Lab 3任务,实现细节放在Lab 4介绍。实现细节可以去看[2]

Lab3任务:基于lab2,实现一个in-memory的linearizable的kv storage

TODO raft不依赖消息顺序
TODO Client UUID

Lab 4

Lab4要求实现Sharded Key/Value Service,分片是实现multi-raft的方式之一(说实话我还不了解其他方式),架构如下:

  • 一个基于Raft的高可用的分布式配置中心,用于配置分片策略
  • 每个raft-group负责一个分片,raft-group就是一个raft集群
  • raft-group定时获取配置中心的配置,并向配置的状态同步

这样的multi-raft实现在相同服务器数量的情况下,以相对更低的可用性,换取了更低的响应访问、状态存储压力

对于分片迁移,[2]给出了主动拉数据的实现,我的实现使用主动推送作为方案

Lab 4A

由于跳过了Lab 3,这里详细介绍一下Lab 4A,本质是实现一个RSM,无论一致性协议上层是何种应用,都可以使用类似的实现

ClientID这里用的是随机数,最好还是使用一个分布式ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// client实现,看一种请求就够了,不需要关心这个请求是什么意思
func (ck *Clerk) Join(servers map[int][]string) {
defer func() { ck.seq++ }()

args := &CommandArgs{}
// Your code here.
args.ClientInfo = ck.getClientInfo()
args.OpType = OP_JOIN
args.Servers = servers

for {
var reply QueryReply
ok := ck.servers[ck.leaderId].Call("ShardCtrler.Command", args, &reply)
if !ok || reply.WrongLeader || reply.Err == ErrTimeout {
ck.leaderId = (ck.leaderId + 1) % len(ck.servers) // 缓存上次正确的leaderId
time.Sleep(25 * time.Millisecond) // 永续尝试所有可能server
continue
}
return
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 处理Client请求
func (sc *ShardCtrler) Command(args *CommandArgs, reply *CommandReply) {
sc.mu.Lock() // 加锁修改Client map
_, ok := sc.Clients[args.ClientId]
if !ok {
// 注册client信息
sc.Clients[args.ClientId] = &Client{
ReqSeq: -1, // client seq id
RcvSeq: -1, // 状态机处理完毕的seq id
LastResp: nil, // RcvSeq对应的Resp(如果有的话)
Ch: make(chan Response),
}
}
sc.Clients[args.ClientId].ReqSeq = args.ClientSeq

clientInfo := sc.Clients[args.ClientId]
sc.mu.Unlock()

// 提交日志
_, _, isLeader := sc.rf.Start(Op{
CommandArgs: *args,
})
if !isLeader {
reply.WrongLeader = true
return
}

// 等待从client ch拿到这次请求的响应
RETRY:
select {
case result := <-clientInfo.Ch:
reply.Config = result.QueryReply.Config
if result.MoveReply.Err != "" {
reply.Err = result.MoveReply.Err
} else if result.LeaveReply.Err != "" {
reply.Err = result.MoveReply.Err
} else if result.JoinReply.Err != "" {
reply.Err = result.JoinReply.Err
} else { // query reqply
reply.Err = result.QueryReply.Err
}

// 考虑Term1,request[seq=1]日志log1被成功持久化,返回结果,client seq提高到2
// 切换到Term2,leader切换,client转而请求新leader,新的leader此时才提交log1,那么会从这个channel传来旧request的response,此时重试,继续等待更高request seq的响应
if result.ClientSeq < args.ClientSeq {
// 另外需要考虑RETRY开始前,第二个args.ClientSeq的commit经由channel到来,而我们还没带的及读它,在下方代码可以看到,写此channel是非阻塞的。这一情况是允许的,超时后重试仍然能返回结果
goto RETRY
}

return
case <-time.After(CONSENSUS_TIMEOUT):
reply.Err = ErrTimeout
return
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (sc *ShardCtrler) getFromApplied() {
for applyMsg := range sc.applyCh {
func() {
sc.mu.Lock()
defer sc.mu.Unlock()

op := applyMsg.Command.(Op)

_, ok := sc.Clients[op.ClientId]
if !ok {
// 对于follower需要在这里初始化client,因为clinet的请求最先不发给它
sc.Clients[op.ClientId] = &Client{
ReqSeq: -1,
RcvSeq: -1,
LastResp: nil,
Ch: make(chan Response),
}
}
client := sc.Clients[op.ClientId]

// 128次全量测试出现28次
// TODO 不记得原因了
if applyMsg.CommandIndex <= sc.LastAppliedIndex {
sc.log("discard outdated index %d", applyMsg.CommandIndex)
return
}
sc.LastAppliedIndex = applyMsg.CommandIndex

// client重试unknown状态的请求(如没有返回,因此不知道是否执行)导致
if op.ClientSeq <= client.RcvSeq {
select {
case client.Ch <- *client.LastResp:
default:
}
return
} else { // 如果op.ClientSeq > client.RcvSeq,说明当前server空洞的seq在其他server已经处理过,当前server依然需要apply这个状态,但不返回response
var resp Response

if op.OpType == OP_QUERY {
resp = Response{QueryReply: QueryReply{
Config: sc.getConfigNoLock(op.Num),
}}
} else if op.OpType == OP_JOIN {
newCfg := sc.reBalanceCfg(op)
sc.configs = append(sc.configs, newCfg)
resp = Response{JoinReply: JoinReply{}}
} else if op.OpType == OP_MOVE {
newCfg := sc.latestConfig().Copy()
newCfg.Shards[op.Shard] = op.GID
sc.configs = append(sc.configs, *newCfg)
resp = Response{MoveReply: MoveReply{}}
} else { // OP_LEAVE
newCfg := sc.reBalanceCfg(op)
sc.configs = append(sc.configs, newCfg)
resp = Response{LeaveReply: LeaveReply{}}
}

resp.ClientId = op.ClientId
resp.ClientSeq = op.ClientSeq

client.RcvSeq = op.ClientSeq
client.LastResp = &resp
sc.LastAppliedIndex = applyMsg.CommandIndex

if sc.Clients[op.ClientId].ReqSeq == op.ClientSeq {
select {
case client.Ch <- resp:
default:
}
}
}
}()
}
}

负载均衡算法

将shard平均分配到不同的group中,group可能在运行期间加入或离开,要求相对于变更以前的分配,最小化shard移动的数量,降低网络压力

建议参考[2]实现,核心思路如下:

  • Join:不断将shard从具有最多shard的节点分给具有最少shard的节点,直到最多与最少的差不大于1
  • Leave:将Leave节点的shard一次一个地分配给具有最少shard的节点

Lab 4B

TODO 需要完善

服务端协程:

  • 定期拉配置并提交配置更新的日志
  • 定期轮询将自身数据分片向自身配置靠近,同时做GC
  • 主协程,处理commit的日志
  • 轮询新term提交空日志
  1. Client为什么也要持久化
  2. Shard Server自身也是一个client
  3. 如何推送,推送是两段的,是否会有问题

思考&优化

如何设计测试

来自6.824

补充用例

单client串行写入,必须读到上次写入值后才写入下次值,每次写入后随机shutdown server。用来fail那些上任后没有提交空日志的实现

性能测试

在majority节点correct的前提下,raft已经确保了可用性,这里关注性能

此部分参考[11]中etcd的性能测试

性能可以从两方面评价:时延和吞吐量。举个例子,时延长而吞吐量大:服务可以同时响应大量请求(吞吐量),但单看每个请求从发送到响应的时间却比较长(时延)。[11]指出最短可能的时间是一个RTT加上fdatasync的时间,前者取决于data center之间的距离、网络环境,后者取决于磁盘类型(HDD、SSD),但由于会批量发送、刷盘,可以将代价均摊到batch中的每条日志

此外,还有一些其他影响性能的点:

  1. etcd后端采用bblot(从boltdb fork得到)作为存储引擎,对于事务有MVCC的额外开销
  2. raft的snapshot需要偶尔刷盘
  3. inflight compaction,我理解也许是消息发送前后对消息的压缩解压?

变量如下:

  • 节点数量
  • 读或写(读的话,线性一致读还是顺序一致读,也就是走不走日志)
  • K、V分别的大小
  • KV总数
  • Client数
  • 连接数
  • 是否只与leader交互,还是允许和任何人交互,follower将请求转发到leader [localref-1]

评价指标如下:

  • 平均读写QPS(可能有多个clinet,和latency不同)
  • 单个request的平均latency
  • 平均服务RSS,即ps aux中的RSS或top中的RES,即物理内存占用

我也做了简单的性能测试,但与etcd的也许无法直接比较,为什么

  1. 使用线程模拟,无网络传输开销
  2. 内存kv数据库,无磁盘写入开销
  3. 在单机进行模拟,受CPU限制无法做高并发测试,仅将GOMAXPROCS设为8
  4. etcd的get有优化,我的没有
  5. 没有做锁优化(至少手动层面没有做)

我的测试(client总是等于连接数,GOMAXPROCS设为8,默认不分片,关闭snapshot功能):

  1. 1 client 3节点,kv层read write交互,各1000条读写,共2000条
    • 耗时60s,QPS=33, latency=30ms
  2. 3节点,raft层不停write,最后一条write提交视为截止
  3. 100 client 3节点,每个client写入1000条,kv层共10W条写入,其实更好的做法是限制时间,而不是限制数量,注意不要加race,会慢很多(大概7倍时间)
    • 不加race,耗时52921ms,QPS=1889.6(10W/552921),lateny=52.9ms,相比于1,吞吐量x57,时延x1.77
  4. 3 Group * 3节点,100 client,每个client 1000条写入
    • 麻了,是3的三倍时间,不知道怎么解释……
      (就说QPS是三千多,CPU打满了,看不出来)

local ref

  1. How does etcd propagate writes to non-leader members?
  2. Question: What does “Average Server RSS” mean?

Raft与CAP的关系

英文解释来源
https://www.the-paper-trail.org/page/cap-faq/

Availability - will a request made to the data store always eventually complete?
Consistency - will all executions of reads and writes seen by all nodes be atomic or linearizably consistent?
Partition tolerance - the network is allowed to drop any messages.

什么是available

A data store is available if and only if all get and set requests eventually return a response that’s part of their specification. This does not permit error responses, since a system could be trivially available by always returning an error.

TODO
这块还要看下,目前的理解是raft在保证quorum写入才算commit的同时,意味着放弃了可用性:在分区的情况下,无法保证lower(n/2)节点的容错,如果分区大小为quorum+1个节点挂掉,系统hang住,意味着丧失了可用性。即Raft是CP的

TODO 有人说Redis主备就是AP的,MySQL主备呢
https://zhuanlan.zhihu.com/p/152105666

Prevote & CheckQuorum

TL;DR
PreVote是对非QCS(Quotum Connected Server) Candidate的限制,CheckQuorum是对非QCS Leader的限制

(cloudflare因为这个问题挂过六个小时)

考虑这样的节点拓扑[9]

leader可以在123内产生,但由于45未连接到leader,会不断超时自增term,term传递到2或3,使其成为follower,随后超时成为candidate,这一term最终会传递到当前leader,使得leader失去身份,这一情况会一直持续。也就是说,普通的raft实际上是不满足liveness条件的

[7] 9.6节给出了叫做Pre-Vote算法的解决方案:
follower在超时成为candidate并自增term前,需要通过RPC收到一个majority的确认,RPC handler中,如果判断对方有最新的日志,并且自身自从上次收到leader的消息超出baseline election timeout(我的理解是最小可能的超时时间),那么grant vote。Pre-Vote避免了不可能成为leader的节点通过不断提升term导致当前leader退位的问题

Pre-Vote也引入了新的问题,考虑如下拓扑[9]:

假设从全连通状态立即跳转到此状态,4是leader,13会随后超时,但由于2与4连通,不能从2获得Pre-Vote的grant,13无法发起新的选举。对于这种情况,[7] 6.2节指出,如果leader长时间没有成功与majority保持心跳,那么leader主动退位。这一修改也叫做CheckQuorum使得失效leader能够主动退位

通过补充Pre-Vote RPC与leader主动退位,我们确保了算法的liveness

DeadLock

我一直很怀疑partial connection到底是不是一个实际的问题

当网络拓扑成为树状时,如A-B, A-C,A-D, A-E当A的日志不是最新时,A无法成为leader,系统死锁

5节点全连通,除了A与B,是否会出现A、B无限轮流称为leader导致违反liveness条件的情况?

问题来自 https://www.zhihu.com/question/54997169

不会,之前提到过,为了保证liveness条件,leader上任后需要commit一条no-op后才可提供服务(包括响应读请求),假设A commit完成后,B就不是拥有最新数据的节点了,因此不会出现这一问题

Raft是否存在脑裂问题

存在,5节点全连通,除了A与B,A当选leader后、发出任何请求前,B超时,term自增,完成选举,也成为leader

但并不会造成什么问题,并且,可以设置允许其他节点转发请求,其实是等价于网络仍然是全连通的
ref: raft协议应用方面的疑问? - 我做分布式数据库的回答 https://www.zhihu.com/question/54997169/answer/192987776

TODO

通过client向raft提交op,raft提供什么样的语义

at-most-once
举例:如果提交请求未返回,那么要么op没有被执行,要么执行了一次

业务上可以利用at-most-once语义进行组合得到exactly-once语义
举例:提交请求A未返回,可以再提交一次,就像LAB3 LAB4中的client,raft上层可以基于op的seqNum决定是否apply这个op

读优化

走日志的读称为 Log Read,是安全的,但我们可以做得更好

TL;DR(总结)
Read Index:读走 Leader 但不走日志,Leader 发送一轮心跳,收到 quorum ack 证明自己还是 leader,可以安全返回请求到来时 commit index 对应的状态机状态(或者更新的状态,只要保证后续返回的状态不比这轮返回的状态旧)
Lease Read:基于 Read Index,但实际上,leader 如果确信自己的心跳在 quorum 内还未过期(注意考虑时钟偏移),那么可以安全地直接向 client 返回读结果

推荐的两篇文章
https://zhuanlan.zhihu.com/p/78164196
https://zhuanlan.zhihu.com/p/463140808

ReadIndex

本小节完全参考[14]

当leader接收到读请求时,将当前commit index记录下来,记作read index,在返回结果给客户端之前,leader需要先确定自己到底还是不是真的leader,确定的方法就是给其他所有peers发送一次心跳,如果收到了多数派的响应,说明至少这个读请求到达这个节点时,这个节点仍然是leader,这时只需要等到commit index被apply到状态机后,即可返回结果。

总结:

  1. leader check自己是否在当前term commit过entry
  2. leader记录下当前commit index,然后leader给所有peers发心跳广播
  3. 收到多数派响应代表读请求到达时还是leader,然后等待apply index大于等于commit index
  4. 返回结果

etcd不仅实现了leader上的read only query,同时也实现了follower上的read only query,原理是一样的,只不过读请求到达follower时,commit index是需要向leader去要的,leader返回commit index给follower之前,同样,需要走上面的ReadIndex流程,因为leader同样需要check自己到底还是不是leader
(我的理解:本质还是 leader 的 Read Index 读,只是 follower 充当与 client 之间的一层 proxy)

Lease Read

本小节完全参考 TiDB 文档[10]

(TiDB 把这个优化当做比 ReadIndex 更高级的优化来介绍,具体来说,ReadIndex 还需要额外发一轮心跳,收到 quorum ack 后,才能返回)

我们希望能直接读leader本地的数据而不向其他人发任何请求,这要求我们确保leader的已提交数据总是最新的,可以通过确保majority在一定时间内不选择其他leader,使得这个时间段内最多只有自己一个leader(或者自己挂掉),不会有更新term的leader提交日志,使得当前leader的已提交数据总是最新的

在 Raft 论文里面,提到了一种通过 clock + heartbeat 的 lease read 优化方法。也就是 leader 发送 heartbeat 的时候,会首先记录一个时间点 start,当系统大部分节点都回复了 heartbeat response,那么我们就可以认为 leader 的 lease 有效期可以到 start + election timeout / (clock drift bound per second * election timeout)这个时间点

我的理解:leader lease 的有效期到 “不再满足 quorum 节点心跳未过期,减去心跳过期时间内的时钟偏移上限”

TODO 如果消息传输已经用了比lease还久的时间,那么leader怎么做,leader在lease过期后自己就step down吗,好像不需要,因为写入操作实际上也是心跳,之后的写入等于续约,或者,如果follower已经没有lease了,等于签一个新的lease

Membership

参考[12]

论文中介绍了两阶段Membership变更算法,介绍如下:

  1. 外部触发membership变更请求,leader将其转化为 $C_{old,new}$ 日志并广播。任何收到membership变更日志后的节点,立即按照新配置运行(无论其是否commit)
  2. 自 $C_{old,new}$ 日志起,任何日志需要同时达成旧配置与新配置中的quorum ack才视为提交
  3. 当 $C_{old,new}$ 提交后,原有leader构造一条 $C_{new}$ 日志并广播,若leader不在新配置中,当 $C_{new}$ commit后可以退位

Etcd实际上使用的是单步成员变更,优势之一在于只需要单阶段就可完成单节点的membership变更[13]

Learner角色

[7]4.2.1节引入此角色,目的是当membership变化时,新加入的成员在需要在日志同步完成后再加入集群,否则根据[6]中给出的membership变更算法,变更期间的日志提交需要得到both old conf和new conf中majority节点的ack才视为提交,未完成日志同步的节点会阻塞整个集群的日志提交

Raft的一些重要时刻

这个是我自己提出的问题(因此问题可能不是很有价值,而且也许是错的……)

  • 一条日志什么时候算作提交:当第majority个node将日志持久化时
  • 写入日志的Linearization Point在哪里:Leader将commitIndex指向它的时候
  • 以kv store为例,上层应用的Linearization Point在哪里:在apply日志到状态机的那一刻

附录

测试结果

测试结果太长了,放在最后

Lab2 2048次测试通过
Lab3 512次测试通过
Lab4 1024次测试通过

Lab2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
[root@e647a1bc04a2 raft]# time go test -run Test
Test (2A): initial election ...
... Passed -- 3.5 3 34 10312 0
Test (2A): election after network failure ...
... Passed -- 5.5 3 127 23366 0
Test (2A): multiple elections ...
... Passed -- 7.4 7 838 134958 0
Test (2B): basic agreement ...
... Passed -- 1.2 3 16 4750 3
Test (2B): basic agreement ...
... Passed -- 1.6 3 16 4420 3
Test (2B): RPC byte count ...
... Passed -- 2.9 3 48 115098 11
Test (2B): agreement despite follower disconnection ...
... Passed -- 6.9 3 112 26633 8
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 4.1 5 299 49880 3
Test (2B): concurrent Start()s ...
... Passed -- 1.3 3 10 3002 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 5.9 3 129 32434 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 27.1 5 2710 1836997 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.8 3 28 8866 12
Test (2C): basic persistence ...
... Passed -- 5.6 3 69 18640 6
Test (2C): more persistence ...
... Passed -- 19.9 5 1167 220492 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 2.8 3 37 9225 4
Test (2C): Figure 8 ...
... Passed -- 34.5 5 594 134594 38
Test (2C): unreliable agreement ...
... Passed -- 3.8 5 237 89256 246
Test (2C): Figure 8 (unreliable) ...
delay time: 2325
... Passed -- 35.2 5 3544 5190733 350
Test (2C): churn ...
... Passed -- 16.3 5 3479 3369043 1469
Test (2C): unreliable churn ...
... Passed -- 16.4 5 1106 548290 291
Test (2D): snapshots basic ...
time used: 7296
... Passed -- 7.3 3 144 56814 251
Test (2D): install snapshots (disconnect) ...
time used: 53277
... Passed -- 53.3 3 871 256572 367
Test (2D): install snapshots (disconnect+unreliable) ...
time used: 67236
... Passed -- 67.2 3 1049 288318 379
Test (2D): install snapshots (crash) ...
time used: 41029
... Passed -- 41.0 3 592 180220 355
Test (2D): install snapshots (unreliable+crash) ...
time used: 49101
... Passed -- 49.1 3 701 199176 366
PASS
ok 6.824/raft 422.568s

real 7m3.771s
user 0m9.621s
sys 0m7.843s

Lab3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
[root@9bd769928c32 kvraft]# time go test -run Test
Test: one client (3A) ...
... Passed -- 15.1 5 3495 689
Test: ops complete fast enough (3A) ...
... Passed -- 21.4 3 3040 0
Test: many Clients (3A) ...
... Passed -- 15.4 5 7223 3310
Test: unreliable net, many Clients (3A) ...
... Passed -- 16.9 5 3133 608
Test: concurrent append to same key, unreliable (3A) ...
... Passed -- 1.8 3 156 52
Test: progress in majority (3A) ...
... Passed -- 0.6 5 41 2
Test: no progress in minority (3A) ...
... Passed -- 1.1 5 104 3
Test: completion after heal (3A) ...
... Passed -- 1.1 5 40 3
Test: partitions, one client (3A) ...
... Passed -- 22.8 5 2734 401
Test: partitions, many Clients (3A) ...
... Passed -- 22.5 5 6356 2272
Test: restarts, one client (3A) ...
... Passed -- 21.2 5 3723 691
Test: restarts, many Clients (3A) ...
... Passed -- 21.4 5 8116 3317
Test: unreliable net, restarts, many Clients (3A) ...
... Passed -- 22.3 5 3771 612
Test: restarts, partitions, many Clients (3A) ...
... Passed -- 27.9 5 6817 2564
Test: unreliable net, restarts, partitions, many Clients (3A) ...
... Passed -- 29.3 5 3566 313
Test: unreliable net, restarts, partitions, random keys, many Clients (3A) ...
... Passed -- 33.6 7 9159 856
Test: InstallSnapshot RPC (3B) ...
... Passed -- 5.9 3 337 63
Test: snapshot size is reasonable (3B) ...
... Passed -- 17.1 3 2434 800
Test: ops complete fast enough (3B) ...
... Passed -- 21.6 3 3054 0
Test: restarts, snapshots, one client (3B) ...
... Passed -- 21.1 5 3723 692
Test: restarts, snapshots, many Clients (3B) ...
... Passed -- 21.5 5 88544 28600
Test: unreliable net, snapshots, many Clients (3B) ...
... Passed -- 16.4 5 2864 537
Test: unreliable net, restarts, snapshots, many Clients (3B) ...
... Passed -- 23.1 5 3801 608
Test: unreliable net, restarts, partitions, snapshots, many Clients (3B) ...
... Passed -- 28.2 5 3368 332
Test: unreliable net, restarts, partitions, snapshots, random keys, many Clients (3B) ...
... Passed -- 32.0 7 8083 877
PASS
ok 6.824/kvraft 462.060s

real 7m42.459s
user 1m42.312s
sys 0m10.487s

Lab4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[root@9bd769928c32 shardkv]# time go test -run Test
Test: static shards ...
... Passed
Test: join then leave ...
... Passed
Test: snapshots, join, and leave ...
... Passed
Test: servers miss configuration changes...
... Passed
Test: concurrent puts and configuration changes...
... Passed
Test: more concurrent puts and configuration changes...
... Passed
Test: concurrent configuration change and restart...
... Passed
Test: unreliable 1...
... Passed
Test: unreliable 2...
... Passed
Test: unreliable 3...
... Passed
Test: shard deletion (challenge 1) ...
... Passed
Test: unaffected shard access (challenge 2) ...
... Passed
Test: partial migration shard access (challenge 2) ...
... Passed
PASS
ok 6.824/shardkv 131.002s

real 2m12.658s
user 7m36.001s
sys 0m6.838s

补充测试

GOMAXPROCS 表头 ss
4 单元格
单元格 单元格

其他

runtime: marked free object in span遇到的一个Golang的奇怪Bug,由于代码改动后来没有复现


MIT6.824 Lab2 Lab3 Lab4
https://vicety.github.io/2021/12/04/mit6.824 Labs/
作者
vicety
发布于
2021年12月4日
许可协议