这是关于raft分布式共识算法及其Go实现系列文章的第三部分。下面是文章目录:
第1部分:介绍
第2部分:选举
第3部分:指令和日志复制(本文)
第4部分:持久化和优化
本文我们聚焦在处理客户端提交的命令,并在Raft集群中复制指令,进一步完善Raft实现。代码结构与第2部分相同。有一些新的结构体和函数以及对现有代码的一些更新——我将在稍后介绍并解释所有这些内容。
本文所有代码链接在这里。
客户端交互
在第一部分已经介绍了一点关于客户端交互。我强烈建议你回头重新阅读这部分内容。这里我们不关注客户端如何找到领导者节点,我们将讨论找到领导者节点后会发生哪些操作。
先对一些术语进行说明:和前面讨论的一样,客户端使用Raft复制一系列指令,可以将其视为通用状态机的输入。就Raft实现而言,这些请求是任意类型的,我们用Go的interface接口类型来实现。在Raft共识中,请求指令会经历以下过程:
1、首先指令从客户端发送到领导者节点。在Raft集群中,请求通常只提交给单个节点。
2、领导者节点将指令复制到跟随者。
3、最后一旦领导者接收到大多数跟随者(超过一半)节点返回的复制成功信息,指令就被提交并通知客户端。
注意领导者接收到指令并复制到跟随者节点这个过程,以及领导者判断指令提交是否成功是很关键的,我们将很快讨论这些。
回忆下第一部分中的图,状态机代表任意使用Raft算法服务;例如可能是一个KV数据库。提交指令改变服务状态(比如添加一个key/value到数据库中)。
我们从raft共识模块角度来讨论客户端,因为请求是来源于客户端发起的。换句话说,上面图中箭头就是代表客户端向服务端发送请求。
commit channel(提交通道实现)
在代码实现中,当consensusModule共识对象被创建,会接收一个commit channel,用于向调用者发送提交指令:commitChan chan<- CommitEntry。定义如下:
//CommitEntry是raft发送到通道中的数据。
type CommitEntry struct {
// 客户端发送的请求指令
Command interface{}
//是客户端请求提交的日志索引。
Index int
// Term 提交请求时raft的任期
Term int
}
使用channel是一个设计上的选择,不是唯一的解决方案。我们可以使用回调代替。当创建ConsensusModule调用者可以注册一个回调函数,有请求提交就触发回调。
我们将很快看到在通道上发送条目的代码;首先,我们必须讨论Raft服务器如何复制命令并决定是否提交命令。
Raft日志
Raft日志已经在本系列文章中多次提到,但我们还没有过多地介绍它。日志指的是会应用到状态机上的一系列请求指令;如果需要,状态机可以根据日志恢复原来的状态。正常运行时,所有Raft节点的日志是相同的;当leader获得一个新请求指令时,它将其保存在自己的日志中,然后将其复制给跟随者。跟随者将命令添加在日志中,并向leader确认,leader将记录成功复制到集群中的大多数服务器上的最新日志索引的计数。日志内容可以用下图说明:
每一个框代表一条日志;每条日志包含索引和Term任期数以及请求数据。同一颜色表示任期相同。如果这个日志在一个空的key-value存储中生效,结果是x=4,y=7。日志类型定义:
type LogEntry struct {
Command interface{}
Term int
}
每个ConsensusModule的日志使用[]LogEntry切片存储。客户端通常不关心任期term;然而,term对Raft的正确性至关重要,因此在阅读代码时务必记住。
提交一个请求指令
让我们从Submit方法开始,它允许客户端提交命令:
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.dlog("... log=%v", cm.log)
return true
}
return false
}
非常直接,如果CM状态是领导者,新的请求指令会添加到日志当中并返回true。否则忽略请求返回false。
问题:Submit返回的true值是否足以表明客户端已经向leader提交了命令?
答案: 很不幸不行。在极少数情况下,一个领导者可能会与其他Raft服务器分区,后者将在一段时间后继续选举一个新的领导者。然而,客户可能仍在与原来的领导者联系。客户端应该等待一段合理的时间,让它提交的命令出现在提交通道上;如果没有,这意味着它联系了错误的领导者,它应该选择另一个领导者重试。
复制日志
我们刚刚看到请求指令提交到leader并添加到日志末尾。跟随者如何收到这条请求呢?是通过leaderSendHeartbeats方法实现的,日志在心跳中一起发送给跟随者:
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
go func(peerId int) {
cm.mu.Lock()
ni := cm.nextIndex[peerId]
prevLogIndex := ni - 1
prevLogTerm := -1
if prevLogIndex >= 0 {
prevLogTerm = cm.log[prevLogIndex].Term
}
entries := cm.log[ni:]
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: cm.commitIndex,
}
cm.mu.Unlock()
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
if cm.state == Leader && savedCurrentTerm == reply.Term {
if reply.Success {
cm.nextIndex[peerId] = ni + len(entries)
cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)
savedCommitIndex := cm.commitIndex
for i := cm.commitIndex + 1; i < len(cm.log); i++ {
if cm.log[i].Term == cm.currentTerm {
matchCount := 1
for _, peerId := range cm.peerIds {
if cm.matchIndex[peerId] >= i {
matchCount++
}
}
if matchCount*2 > len(cm.peerIds)+1 {
cm.commitIndex = i
}
}
}
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
} else {
cm.nextIndex[peerId] = ni - 1
cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
}
}
}
}(peerId)
}
}
这部分代码比较复杂,但它还是按照raft算法来实现的。代码说明:
- 心跳AE RPC调用,创建AppendEntriesArgs对象。
- AE响应中有一个success字段,跟随者向领导者反馈preLogIndex和preLogTerm信息。根据这些字段领导者为跟随者更新nextIndex。
- commitIndex:日志提交索引,如果收到跟随者返回成功提交的数量超过一半,就增加1。
这部分代码与我们之前关于客户端交互的讨论特别重要:
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
newCommitReadyChan该通道是consensusModule内部使用的,用于通知客户端请求数据已经发送,由以下函数接收该通道数据,并进行逻辑处理:
func (cm *ConsensusModule) commitChanSender() {
for range cm.newCommitReadyChan {
// Find which entries we have to apply.
cm.mu.Lock()
savedTerm := cm.currentTerm
savedLastApplied := cm.lastApplied
var entries []LogEntry
if cm.commitIndex > cm.lastApplied {
entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
cm.lastApplied = cm.commitIndex
}
cm.mu.Unlock()
cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)
for i, entry := range entries {
cm.commitChan <- CommitEntry{
Command: entry.Command,
Index: savedLastApplied + i + 1,
Term: savedTerm,
}
}
}
cm.dlog("commitChanSender done")
}
lastApplied存储的是最新的日志索引,因此需要更新当前值。并将需要返回给客户端的响应信息写入commitChan中。
follower更新日志
已经看到了leader如何处理新的日志条目。下面看下跟随者怎么处理新的日志信息,确切地说,AppendEntries RPC如何调用。
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
// 查看PreLogIndex是否匹配
//注意PrevLogIndex == -1是特殊情况
if args.PrevLogIndex == -1 ||
(args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
reply.Success = true
// 找到日志插入位置
logInsertIndex := args.PrevLogIndex + 1
newEntriesIndex := 0
for {
if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
break
}
if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
break
}
logInsertIndex++
newEntriesIndex++
}
if newEntriesIndex < len(args.Entries) {
cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
cm.dlog("... log is now: %v", cm.log)
}
// 设置commitIndex
if args.LeaderCommit > cm.commitIndex {
cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
cm.dlog("... setting commitIndex=%d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
}
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
当代码发现领导者的LeaderCommit比跟随者的cm.commitIndex更大时候,这是追随者得知领导考虑提交额外的日志,需向ch.newCommitReadyChan通道发送通知。
当leader使用AE发送新的日志条目时,会发生以下情况:
- follower将新条目添加到其日志中,并向leader回复success=true。
- leader会更新这个跟随者的matchIndex。当足够数量的follower在下一个索引中拥有他们的matchIndex时,leader更新commitIndex并在下一个AE中发送给所有的跟随者(在leaderCommit字段中)。
问题:提交一个新的请求指令需要发送多少RPC调用?
答案:两类RPC,第一类是领导者节点发送下一条日志给跟随者,然后跟随者确认收到日志信息。当领导者处理AE返回,会根据响应信息更新提交索引(commitIndex)。第二类发送更新提交索引给跟随者,跟随者将标记这些日志已提交,并发送到commit通道。
选举安全
日志在选举中也起作用了,即选举约束。raft处理选举时会根据其包含的日志是否比大多数其他节点的更全,才能赢得选举。因为这个原因,请求投票信息RV包含lastIndex和lastLogTerm字段。当候选者发送RV,会根据最新收到的日志信息来设置这两个字段。跟随者收到RV后会和自己的相应字段做比较,决定候选者是否有被投票的资格。
以下是startElection增加这个逻辑的代码:
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
votesReceived := 1
// 并发发送 RequestVote RPCs给所有其他节点
for _, peerId := range cm.peerIds {
go func(peerId int) {
cm.mu.Lock()
savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
cm.mu.Unlock()
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
LastLogIndex: savedLastLogIndex,
LastLogTerm: savedLastLogTerm,
}
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
var reply RequestVoteReply
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votesReceived++
if votesReceived*2 > len(cm.peerIds)+1 {
// 赢得选举
cm.dlog("wins election with %d votes", votes)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// 再运行一个选举计时器,以防这次选举失败
go cm.runElectionTimer()
}
lastLogIndexAndTerm辅助函数:
// lastLogIndexAndTerm 返回最新一条日志索引和日志的任期
// (没有日志返回-1)
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
if len(cm.log) > 0 {
lastIndex := len(cm.log) - 1
return lastIndex, cm.log[lastIndex].Term
} else {
return -1, -1
}
}
注意:本文的实现索引从0开始,而Raft论文中索引从1开始。-1通常用作哨兵值。
下面是更新后的RV处理程序,它实现了选举安全检查:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
//获取最新日志和任期
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
//这里对比收到的任期和日志情况
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}