警告
    
    
      这里的实现有很多的 Bug,不能跑通 2B 和 2C 的 Test。
     
   
    
      三个函数
    
    
      相比起 Lab1 的作业,Lab2A 作业量差不多。Lab2A 主要是围绕论文中 Figure2 的选举部分进行实现即可。
首先是有三个控制 Raft 服务器状态的函数。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 | func (rf *Raft) BecomeFollower(term int) {
	rf.currentTerm = term
	rf.Role = 0
	rf.votedFor = -1
	rf.GotHeatBeat = false
	DPrintf("term %d server %d become Follower\n", term, rf.me)
}
func (rf *Raft) BecomeCandidate() {
	rf.votedFor = rf.me
	rf.currentTerm++
	rf.Role = 1
	DPrintf("term %d server %d become Candidate", rf.currentTerm, rf.me)
}
func (rf *Raft) BecomeLeader() {
	rf.votedFor = rf.me
	rf.Role = 2
	for i := 0; i < len(rf.peers); i++ {
		rf.MatchIndex[i] = 0
		rf.NextIndex[i] = len(rf.log)
	}
	DPrintf("term %d server %d become Leader", rf.currentTerm, rf.me)
}
 | 
 
三个函数负责 server 状态的切换。注意这三个函数都是没有加锁的,需要在调用的地方加锁。
 
   
接着是对 Figure2 中需要保存的状态变量进行定义,这里就不放出详细代码了比较简单。最后就是实现 RequestVote 和 AppendEntries。注意需要详细结合 Figure2 里面的策略,否则会出现意想不到的 bug (通常难以解释或者难以自己想出正确的策略)。
    
      选举相关策略
    
    
      对所有的 server:
- 若任何通信中发现了 term > currentTerm的情况,都将所在的 server 设置为currentTerm = term的 follower。无论当前 server 是什么角色。
对 followers:
- 回应所有 RPC请求
- 如果选举定时器超时,仍然没有受到 AppendEntries,则转换为 candidate 开始举行选举投票。
对 candidates:
- 变成 candidate 后开始选举
- 如果收到大部分的选票支持,则成为 Leader。
- 如果收到一个 AppendEntries, 则变成 follower。
- 如果选举超时,则重新开始选举。
对 leader:
- 定时发送 AppendEntries到所有 server, 防止他们的选举定时器超时。
 
   
在我的实现中,大部分的锁都是函数级别的,没有把粒度分得很细。定时器除了使用 for 和 sleep 来实现以外,两个 RPC 的接收也使用了定时器来防止超时,这里的定时器使用 channel 和 select 来实现,可以和 RPC 的接收一起实现,更加简洁。这里给出最核心的 ticker 函数。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
 | func (rf *Raft) ticker() {
	for !rf.killed() {
		randomNum := 100 + rand.Intn(100)
		time.Sleep(time.Millisecond * time.Duration(randomNum))
		rf.mu.Lock()
		if !rf.GotHeatBeat && rf.Role == 0 {
			rf.BecomeCandidate()
			agrees := 1
			ch := make(chan RequestVoteReply)
			args := RequestVoteArgs{rf.CurrentTerm, rf.me, rf.CommitIndex, rf.CurrentTerm - 1}
			for i := 0; i < len(rf.peers); i++ {
				if i == rf.me {
					continue
				}
				go rf.sendRequestVote(i, &args, ch)
			}
			waiting := true
			for i := 0; i < len(rf.peers) && waiting; i++ {
				if rf.Role != 1 {
					break
				}
				select {
				case reply := <-ch:
					if reply.VoteGranted {
						agrees++
					}
					if reply.Term > rf.CurrentTerm {
						rf.BecomeFollower(reply.Term)
						waiting = false
					}
				case <-time.After(time.Millisecond * 300):
					DPrintf("term %d server %d election timeout\n", rf.CurrentTerm, rf.me)
					waiting = false
				}
				if agrees*2 > len(rf.peers) {
					break
				}
			}
			if agrees*2 > len(rf.peers) {
				rf.BecomeLeader()
			} else {
				rf.BecomeFollower(rf.CurrentTerm - 1)
			}
		}
		// DPrintf("server %d end first stage\n", rf.me)
		rf.GotHeatBeat = false
		rf.mu.Unlock()
		term, isLeader := rf.GetState()
		for isLeader {
			ch := make(chan AppendEntriesReply)
			args := AppendEntriesArgs{term, rf.me, 0, 0, rf.log, rf.CommitIndex}
			for i := 0; i < len(rf.peers); i++ {
				go rf.sendAppliedEntries(i, &args, ch)
			}
			waiting := true
			for i := 0; i < len(rf.peers) && waiting; i++ {
					// DPrintf("server %d waiting AppendEntriesReply", rf.me)
					select{
					case reply := <-ch:
						// DPrintf("server %d got AppendEntriesReply, term: %d", rf.me, reply.Term)
						rf.mu.Lock()
						if reply.Term > rf.CurrentTerm {
							rf.BecomeFollower(reply.Term)
							waiting = false
						}
						rf.mu.Unlock()
					case <- time.After(time.Millisecond * 500):
						waiting = false
					}
			}
			time.Sleep(time.Millisecond * 100)
			term, isLeader = rf.GetState()
		}
	}
}
 | 
 
    
      结果:
    
    
      运行 go test -run 2A
| 1
2
3
4
5
6
7
8
 | Test (2A): initial election ...
  ... Passed --   3.1  3   87   21200    0
Test (2A): election after network failure ...
  ... Passed --   4.5  3   90   17216    0
Test (2A): multiple elections ...
  ... Passed --   7.5  7  625  100826    0
PASS
ok      6.824/raft      15.106s
 | 
 
测试通过。