这是描述Raft分布式共识算法及其在Go中的完整实现系列文章的第2部分。
以下是该系列文章的目录:
第1部分:介绍
第2部分:选举 (本文)
第3部分:命令和日志复制
第4部分:持久化和优化
上一篇文章我们留下一个问题,为什么使用Go实现raft。原因主要包括以下几点:
- 并发:raft算法本身就含高并发场景。每个节点都执行多个操作,例如为定时事件运行定时器,而且必须异步响应其他节点和客户端的请求。
- 标准库:Go具有强大的工业标准库,无需导入和学习任何第三方库,就可以轻松编写复杂的网络服务器。特别是在Raft算法实现中,必须回答的第一个问题是“如何在节点之间发送消息?”许多人陷入了协议设计和一些序列化的细节中,或者使用大量的第三方库。Go有net/rpc,这对于此类任务来说是一个足够好的解决方案,配制非常快,不需要导入即可完成。
- 简单:在我们开始考虑实现语言之前,实现分布式共识机制就已经很复杂了。所以需要尽可能使用简单的语言。而Go天生追求简单化,在每一个层面上都反对复杂性。
下面我们开始今天的话题,本文将介绍实现raft的一些结构体并聚焦在选举上。这部分的代码包括一个功能完整的测试工具,您可以运行这些测试来对系统进行试验。但不接收客户端请求,也不维护日志。
代码结构
下面简单介绍一下Raft实现的结构体;这适用于本系列的所有部分。通常,Raft是作为一个对象实现的,你可以将其嵌入到某个服务中。因为我们这里实际上并不是在开发服务,而是在研究Raft本身,所以我创建了一个简单的Server类型,它封装了一个ConsensusModule类型简称(CM),这里尽可能地抽象出算法中的核心部分:
共识模块(CM)实现了Raft的核心,在raft.go文件中。它完全脱离了网络的细节以及与集群中其他节点的通信。在ConsensusModule中唯一和网络相关的字段是:
// id 共识模块中节点id.
id int
// peerIds 集群中对等节点的id列表.
peerIds []int
// server是运行共识模块(CM)的服务器定义,用于和其他节点发起rpc调用
server *Server
在实现中,每个raft节点称集群中其他节点为“peer”。集群中的每个peer都有一个唯一ID,和所有其他节点的ID列表。server字段是一个Server结构体指针,能够让共识模块发送消息到其他节点,将在后面看到。
这么设计的原因是将所有的网络细节都剥离出去,聚焦在Raft算法本身。通常,要将Raft论文搬到现实,只需实现共识对象及其方法即可。Server部分的代码直接用Go的网络编程脚手架即可实现,在这部分无需花太多的时间。
Raft节点状态
从宏观角度来看,共识模块其实就是一个包含3个状态的状态机:
上一篇文章中我们提高raft算法要实现的多副本状态机,是站在服务层面来说的。这里说的状态机是raft共识模块内部的一个状态机。
集群在正常的稳定状态情况下,只有一台服务器是领导者,而其他所有服务器都是跟随者。虽然我们希望事情能够永远这样维持下去,但Raft的目标是容错,所以我们将花费大部分时间讨论非正常的故障场景,比如一些服务器崩溃,另一些服务器网络断开等等。
如前所述,Raft使用强领导者模式。只有leader响应客户端请求,将新数据添加到日志中,并将它们复制给跟随者。每一个跟随者都时刻准备着在领导者失败或停止通信的情况下接管领导岗位。这就是图表中领导者心跳超时开始选举,follwer转为Candiate的过程。
Terms
和普通的选举类似,在Raft中也有选举。term指的是某一个节点是领导者的期数,可以理解为第几届领导的意思。一次新的选举term会增加,Raft算法确保在特定的任期内只有一个leader。
我们不应该过分类比,因为Raft选举与真正的选举不同。在Raft中,选举更具合作性;候选者的目标不是不惜一切代价赢得选举——所有候选者的目标都是让合适的服务器在任何给定的任期内赢得选举。我们稍后会详细讨论“合适”的含义。
选举计时器
Raft算法的一个关键组成部分是选举计时器。这是每一个跟随者需持续运行的计时器,每次接收到当前领导者的消息时就重新启动计时器。leader发送周期性的心跳,所以当心跳停止时,跟随者会假设leader已经故障或断开连接,并开始选举(切换到Candidate状态)。
问题:有没有可能所有的跟随者同时变成候选者?
答案:不会的,因为选举计时器是随机设定的时间,这是raft简单化的关键之一。Raft使用这种随机计时来降低多个追随者同时进行选举的可能。即使他们同时成为候选者,在一个任期内也只有一个节点会被选为领导者。在少数情况下,如果选票分裂,没有候选者获胜,就会进行新的选举(新的任期)。虽然从理论上讲,无限循环重新选举是可能的,但这种情况发生的几率随着每一轮选举的进行而大大降低。
问题:如果一个节点断链并从集群中分离?会因为无法接收到领导者心跳,而开始选举吗?
答案:这就是网络分区的潜在问题,因为跟随者无法区分谁被分区了。这将引发一场选举。但如果是一个跟随者断连了,发起选举将毫无结果——因为它不能联系其他节点,将得不到任何选票。它可能会在候选状态下继续旋转(每隔一段时间重新启动一次新的选举),直到它重新连接到集群。我们将在后面更详细地研究这个场景。
节点之间RPC
Raft节点之间会发送两种rpc调用。有关这些rpc的详细参数和规则,请参见本文中的图2。我将简要讨论他们的目标:
- RequestVotes(RV):只在候选者状态使用;候选者在选举中使用它请求其他节点的投票。应答中包含是否赞同投票。
- AppendEntries(AE):只在领导者状态使用;领导者使用这个RPC来复制日志给其他节点,同时发送心跳。即使没有要复制的新日志条目,也会定期将此RPC发送给每个follower。
细心的读者会从上面推断出跟随者不会发送任何rpc。这是正确的;跟随者不会向其他节点发起rpc,但他们有一个选举计时器运行在后台。如果这个计时器过期了还没有收到当前leader的消息,跟随者就会成为一个候选者并开始发送RV请求投票。
选举计时器实现
下面进入代码阶段,以下代码都来自于github。这里不详细列举ConsensusModule共识结构体的所有字段,可以查看源码。共识模块通过在一个单独的goroutine运行下面的计时器:
func (cm *ConsensusModule) runElectionTimer() {
timeoutDuration := cm.electionTimeout()
cm.mu.Lock()
termStarted := cm.currentTerm
cm.mu.Unlock()
cm.dlog("election timer started (%v), term=%d", timeoutDuration, termStarted)
// 这个循环会在以下两种情况下退出:
// - 我们发现不再需要选举计时器了,或者
// - 选举计时器到期,该节点成为候选者
// 在跟随者整个生命周期里,计时器会在后台持续运行
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
<-ticker.C
cm.mu.Lock()
if cm.state != Candidate && cm.state != Follower {
cm.dlog("in election timer state=%s, bailing out", cm.state)
cm.mu.Unlock()
return
}
if termStarted != cm.currentTerm {
cm.dlog("in election timer term changed from %d to %d, bailing out", termStarted, cm.currentTerm)
cm.mu.Unlock()
return
}
// 如果超时了没收到领导者心跳就开始选举,或者在选举期内没有选出领导者
if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration {
cm.startElection()
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}
首先通过调用cm.electionTimeout,选择一个伪随机的选举超时时间。和论文中一样,选择的时间范围从150ms到300ms。与ConsensusModule的其他方法一样,runElectionTimer在访问字段时都上锁。这是必要的,因为要尽可能同步,这是Go的优势之一。这也意味着代码是顺序执行的,不能跨多个事件处理程序。rpc还是并发执行的,所以必须保护共享结构体,很快会讨论RPC处理程序。
上面的方法循环运行一个10ms的计时器。有更有效的等待事件的方法,但是这种习惯用法代码最简单。每过10ms循环就迭代一次。理论上讲,在整个选举超时期间内可能都休眠,它的响应速度较慢,并且在日志中调试/跟踪有些困难。会检查状态是否仍然在预期内和确认任期(term)没有改变。如果有任何问题,我们就终止选举计时器。
如果距离上次“选举重置事件”已经过去了足够长的时间,该节点开始选举并成为候选人。什么是选举重置事件?它是任何可以终止选举的东西——例如,收到一个有效的心跳,或给另一个候选人投票。我们很快就会看到这段代码。
成为候选者
我们已经看到,一旦足够长的时间过去了,追随者没有收到领导者或其他候选者的消息,选举就开始了。在看代码之前,让我们想想运行选举需要做些什么:
1、将状态切换为候选者并增加任期(term),因为这是算法对每次选举的要求。
2、发送RV RPC给其他节点请求投票。
3、等待这些rpc的回复,计算是否有足够的选票成为领导者。
在Go中所有这些逻辑都可以收集到一个函数中:
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
votesReceived := 1
// 并发发送RequestVote rpc到所有其他节点。
for _, peerId := range cm.peerIds {
go func(peerId int) {
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
}
var reply RequestVoteReply
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votesReceived++
if votesReceived*2 > len(cm.peerIds)+1 {
// 赢得选举
cm.dlog("wins election with %d votes", votesReceived)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// 再运行一个选举计时器,以防这次选举失败。
go cm.runElectionTimer()
}
候选者以给自己投票开始,初始化votesReceived为1 ,设置cm.voteFor = cm.id。然后并行发送RPCs给其他节点。每个RPC都在单独的goroutine中完成,因为RPC是异步的,它们会阻塞,直到收到响应,这可能需要一段时间。
刚好这里说明下RPC调用:
cm.server.Call(peer, "ConsensusModule.RequestVote", args, &reply)
我们使用cm共识对象里的Server指针,发起远程调用,并带上ConsensusModule.RequestVotes为远程调用方法名。这最终会调用第一个参数里peer节点的RequestVote方法。
如果rpc调用成功,一段时间也过去了,我们会检查状态该如何选择。如果状态不再是候选者,就退出。这个是怎么发生的?比如,我们可能赢了选举,因为在其他RPC调用中有足够的投票。或者收到一个term更大的RPC调用,因此就要切换到跟随者状态。重要的是要记住,在一个脆弱的网络中,RPC可能需要很长时间才能到达——当我们得到答复时,其余的代码可能已经继续运行,在这种情况下优雅地退出是很重要的。
如果rpc返回时节点仍然是候选者,我们检查回复的期限term,并将其与发送请求时的原始期限term进行比较。如果回复的字数更高,我们将恢复为跟随者状态。例如,当我们在收集选票时,如果另一个候选人赢得了选举,就会发生这种情况。
如果期限term和本节点发出去的一样,检查投票是否被允许。如果该节点拥有大多数选票(包括它自己投的选票),它就会成为领导者。
注意startElection方法没有阻塞。它更新一些状态,启动一堆goroutine并返回。因此,它还应该在goroutine中启动一个新的选举计数器——在最后一行中这么做了。这确保了如果这次选举没有产生任何有用的结果,超时后会启动新一轮选举。这也解释了runElectionTimer中的状态检查:如果这次选举将peer(其他节点)变成leader,并发的runElectionTimer将在观察到节点无需切换状态时退出。
成为领导者
我们看到在startElection中调用startLeader,当投票显示该节点赢得选举的话:
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
// 只要还是leader,就发送周期性的心跳
for {
cm.leaderSendHeartbeats()
<-ticker.C
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}()
}
这实际上是一个相当简单的方法:它所做的就是运行心跳计时器——只要这个节点仍然是leader,启动一个goroutine每50毫秒调用leaderSendHeartbeats。下面是leaderSendHeartbeats的代码:
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
}
go func(peerId int) {
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, 0, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
}
}(peerId)
}
}
有点类似于startElection,从某种意义上说,它为每个peer启动一个goroutine来发送一个RPC。这一次RPC是不带日志内容的AppendEntries (AE),它在Raft中扮演心跳的角色。
与处理RV(请求投票)回复类似,如果RPC返回的term比我们自己的高,该节点将切换为一个跟随者。下面看一下becomeFollower方法了:
func (cm *ConsensusModule) becomeFollower(term int) {
cm.dlog("becomes Follower with term=%d; log=%v", term, cm.log)
cm.state = Follower
cm.currentTerm = term
cm.votedFor = -1
cm.electionResetEvent = time.Now()
go cm.runElectionTimer()
}
它将CM的状态设置为follower,并重置其term和其他重要的状态字段。它还会启动一个新的选举计时器,因为这是一个追随者应该一直在后台运行的。
RPC应答
到目前为止我们看了主动发起RPC请求、计时器和状态转移的代码。在我们看到服务器方法之前,我们看下请求投票函数代码:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
注意“dead”状态的检查。我们后面再说。
类似逻辑,从检查term是否过时和成为一个跟随者开始。如果它已经是一个追随者,状态不会改变,但其他状态字段将重置。
否则,如果调用者的term与该节点的一致,而我们还没有投票给其他的候选者,我们将批准投票。我们从来不给旧任期的RPCs投票。
AppendEntries函数:
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
reply.Success = true
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
这里的逻辑还应与本文中图2中的选举部分保持一致。需要理解的复杂情况是:
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
问题:如果该节点是领导者-为啥要转为跟随者。
答案:Raft保证了在任期内只有一位领导者。如果仔细遵循RequestVote的逻辑和startElection中发送RV的代码,您将看到集群中不能存在两个具有相同term的leader。这一条件对于发现另一节点赢得了这一任期的候选者来说非常重要。
状态和Goroutine
有必要回顾一下CM可能处于的所有状态,以及其中运行的不同goroutine:
Follower:当CM被初始化为一个追随者时,并且在每次调用becomeFollower时,一个新的goroutine开始运行runElectionTimer。这个goroutine是跟随者必须一直执行的。请注意,可以有多个同时在短时间内运行。假设一个跟随者从一个term更大的候选者那里得到了一RV;这将触发becomeFollower调用,并启动一个新的计时器goroutine。但是,一旦发现term发生了变化,旧的term就会什么都不做地退出。
候选者:候选者会运行选举goroutine,但除此之外,它还包括一些goroutine发送rpc。它和一个跟随者一样,在新的选举开始时,有同样的保障措施来阻止“旧的”选举。一定要记得,RPC goroutines可能需要很长时间才能完成,因此,如果它们在RPC调用返回时注意到term已经过时了,就会悄悄地退出。
Leader:领导者是不运行选举goroutine的,但需要运行心跳goroutine。
代码中有一个额外的状态“Dead”。这纯粹是为了CM的有序关闭。调用Stop将状态设置为Dead,并且所有goroutine一旦观察到这个状态就会小心退出。
维护这些goroutine运行可能会令人担忧——如果它们中的一些残留在后台呢?或者更糟的是,它们不断泄漏,数量无限增长?这就是goroutine泄漏检查的目的,一些测试启用了泄漏检查。
故障服务器和增加任期Term
为了总结本文,让我们研究一个可能发生的棘手场景,以及Raft是如何处理的。我觉得这个例子很有趣,也很有启发意义。在这里,我试图将其作为一个故事来呈现,但您可能希望使用一张纸来跟踪不同服务器的状态。
假设一个集群有三个服务器:A、B和C。假设A是领导者,开始时任期term是1,集群正在愉快地运行。A每50毫秒向B和C发送心跳AE RPCs,并在几毫秒内得到快速响应;每个AE会重置B和C的electionResetEvent,使A、B维持跟随者状态。
在某些时候,由于网络路由器的临时故障,服务器B从A和C中分区。A仍然每50毫秒向它发送一次AEs,但是这些AEs要么立即出错,要么发生RPC超时。A对此无能为力,但也没什么大不了的。我们还没有讨论日志复制,但是由于三个服务器中有两个是活动的,集群能够完成仲裁来提交客户端请求。
那B呢?假设当它断开连接时,它的选举超时被设置为200毫秒。断开连接大约200毫秒后,B的runElectionTimer goroutine发现没有收到leader的心跳,发生选举超时;B没有办法区分哪个节点故障,所以它将成为一个候选人,并开始新的选举。
B的任期term增加到2(A和C还是1)。B将向A和C发送RV投票RPC请求;由于B自己故障很显然这些RPC将丢失。这里代码不用panic!B的startElection启动另一个runElectionTimer协程,假设协程等待250ms(记住我们的超时范围是150ms-300ms)看看上次选举是否有结果。对B没有作用,因为它仍然是完全孤立的。因此,runElectionTimer开始另一个新的选举,将任期增加到3。
如此反复;B的路由器持续了几秒钟才能重新启动并重新上线。与此同时,B的连任选举也时有发生,其任期term已经达到8。此时,网络分区是没有变,B重新连接到A和C。
不久之后,B收到A的一个心跳AE RPC。A一直每50毫秒发送一次,即使B有一段时间没有回复。B的AppendEntries被调用并返回一个term=8的回复。A在leaderSendHeartbeats中得到这个回复,检查回复的term并注意到它比自己的高。它将自己的term更新到8,并成为一个追随者。集群暂时没有leader了。
根据时间的不同,现在可能发生多种情况。B是一个候选,但它可能在网络恢复之前就已经把自己的选举投票RVs rpc送出去了;C是一个跟随者,但在它自己的选举超时时间内,它将成为一个候选者,因为它停止接收来自A的心跳AEs。A成为了一个跟随者,并且在它的选举超时时间内也将变成一个候选者。
所以这三个服务器中的任何一个都可能赢得下次选举。注意,这只是因为我们还没有复制任何日志。正如我们在下一部分将看到的,在实际的场景中,A和C可能会在B不在的时候添加一些新的客户端请求,因此它们的日志会更全。因此,新的选举发生时B不能成为新的领导人,将由A或C赢得;在下一部分中,我们将再次讨论这个场景。
假设在B断开连接后没有添加新的请求,那么由于重新连接而发生leader更改是绝对没问题的。
如果这看起来效率低下,那是事实。这里领导者没必要更换,因为A在整个过程中都是非常健康的。但是,在特殊情况下以牺牲一些效率来保持不变量的简单性是Raft的设计选择之一。保持一般情况下的高效率是很重要的,因为集群99.9%的时间都是正常的。