程序员社区

Raft实现(3):客户端指令和日志复制

这是关于raft分布式共识算法及其Go实现系列文章的第三部分。下面是文章目录:
第1部分:介绍
第2部分:选举
第3部分:指令和日志复制(本文)
第4部分:持久化和优化
本文我们聚焦在处理客户端提交的命令,并在Raft集群中复制指令,进一步完善Raft实现。代码结构与第2部分相同。有一些新的结构体和函数以及对现有代码的一些更新——我将在稍后介绍并解释所有这些内容。

本文所有代码链接在这里。

客户端交互

在第一部分已经介绍了一点关于客户端交互。我强烈建议你回头重新阅读这部分内容。这里我们不关注客户端如何找到领导者节点,我们将讨论找到领导者节点后会发生哪些操作。

先对一些术语进行说明:和前面讨论的一样,客户端使用Raft复制一系列指令,可以将其视为通用状态机的输入。就Raft实现而言,这些请求是任意类型的,我们用Go的interface接口类型来实现。在Raft共识中,请求指令会经历以下过程:
1、首先指令从客户端发送到领导者节点。在Raft集群中,请求通常只提交给单个节点。
2、领导者节点将指令复制到跟随者。
3、最后一旦领导者接收到大多数跟随者(超过一半)节点返回的复制成功信息,指令就被提交并通知客户端。
注意领导者接收到指令并复制到跟随者节点这个过程,以及领导者判断指令提交是否成功是很关键的,我们将很快讨论这些。

Raft实现(3):客户端指令和日志复制插图

回忆下第一部分中的图,状态机代表任意使用Raft算法服务;例如可能是一个KV数据库。提交指令改变服务状态(比如添加一个key/value到数据库中)。

我们从raft共识模块角度来讨论客户端,因为请求是来源于客户端发起的。换句话说,上面图中箭头就是代表客户端向服务端发送请求。

commit channel(提交通道实现)

在代码实现中,当consensusModule共识对象被创建,会接收一个commit channel,用于向调用者发送提交指令:commitChan chan<- CommitEntry。定义如下:

//CommitEntry是raft发送到通道中的数据。
type CommitEntry struct {
  // 客户端发送的请求指令
  Command interface{}

  //是客户端请求提交的日志索引。
  Index int

  // Term 提交请求时raft的任期
  Term int
}

使用channel是一个设计上的选择,不是唯一的解决方案。我们可以使用回调代替。当创建ConsensusModule调用者可以注册一个回调函数,有请求提交就触发回调。

我们将很快看到在通道上发送条目的代码;首先,我们必须讨论Raft服务器如何复制命令并决定是否提交命令。

Raft日志

Raft日志已经在本系列文章中多次提到,但我们还没有过多地介绍它。日志指的是会应用到状态机上的一系列请求指令;如果需要,状态机可以根据日志恢复原来的状态。正常运行时,所有Raft节点的日志是相同的;当leader获得一个新请求指令时,它将其保存在自己的日志中,然后将其复制给跟随者。跟随者将命令添加在日志中,并向leader确认,leader将记录成功复制到集群中的大多数服务器上的最新日志索引的计数。日志内容可以用下图说明:

Raft实现(3):客户端指令和日志复制插图1
raft日志

每一个框代表一条日志;每条日志包含索引和Term任期数以及请求数据。同一颜色表示任期相同。如果这个日志在一个空的key-value存储中生效,结果是x=4,y=7。日志类型定义:

type LogEntry struct {
  Command interface{}
  Term    int
}

每个ConsensusModule的日志使用[]LogEntry切片存储。客户端通常不关心任期term;然而,term对Raft的正确性至关重要,因此在阅读代码时务必记住。

提交一个请求指令

让我们从Submit方法开始,它允许客户端提交命令:

func (cm *ConsensusModule) Submit(command interface{}) bool {
  cm.mu.Lock()
  defer cm.mu.Unlock()

  cm.dlog("Submit received by %v: %v", cm.state, command)
  if cm.state == Leader {
    cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
    cm.dlog("... log=%v", cm.log)
    return true
  }
  return false
}

非常直接,如果CM状态是领导者,新的请求指令会添加到日志当中并返回true。否则忽略请求返回false。
问题:Submit返回的true值是否足以表明客户端已经向leader提交了命令?
答案: 很不幸不行。在极少数情况下,一个领导者可能会与其他Raft服务器分区,后者将在一段时间后继续选举一个新的领导者。然而,客户可能仍在与原来的领导者联系。客户端应该等待一段合理的时间,让它提交的命令出现在提交通道上;如果没有,这意味着它联系了错误的领导者,它应该选择另一个领导者重试。

复制日志

我们刚刚看到请求指令提交到leader并添加到日志末尾。跟随者如何收到这条请求呢?是通过leaderSendHeartbeats方法实现的,日志在心跳中一起发送给跟随者:

func (cm *ConsensusModule) leaderSendHeartbeats() {
  cm.mu.Lock()
  savedCurrentTerm := cm.currentTerm
  cm.mu.Unlock()

  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      cm.mu.Lock()
      ni := cm.nextIndex[peerId]
      prevLogIndex := ni - 1
      prevLogTerm := -1
      if prevLogIndex >= 0 {
        prevLogTerm = cm.log[prevLogIndex].Term
      }
      entries := cm.log[ni:]

      args := AppendEntriesArgs{
        Term:         savedCurrentTerm,
        LeaderId:     cm.id,
        PrevLogIndex: prevLogIndex,
        PrevLogTerm:  prevLogTerm,
        Entries:      entries,
        LeaderCommit: cm.commitIndex,
      }
      cm.mu.Unlock()
      cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
      var reply AppendEntriesReply
      if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in heartbeat reply")
          cm.becomeFollower(reply.Term)
          return
        }

        if cm.state == Leader && savedCurrentTerm == reply.Term {
          if reply.Success {
            cm.nextIndex[peerId] = ni + len(entries)
            cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
            cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)

            savedCommitIndex := cm.commitIndex
            for i := cm.commitIndex + 1; i < len(cm.log); i++ {
              if cm.log[i].Term == cm.currentTerm {
                matchCount := 1
                for _, peerId := range cm.peerIds {
                  if cm.matchIndex[peerId] >= i {
                    matchCount++
                  }
                }
                if matchCount*2 > len(cm.peerIds)+1 {
                  cm.commitIndex = i
                }
              }
            }
            if cm.commitIndex != savedCommitIndex {
              cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
              cm.newCommitReadyChan <- struct{}{}
            }
          } else {
            cm.nextIndex[peerId] = ni - 1
            cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
          }
        }
      }
    }(peerId)
  }
}

这部分代码比较复杂,但它还是按照raft算法来实现的。代码说明:

  • 心跳AE RPC调用,创建AppendEntriesArgs对象。
  • AE响应中有一个success字段,跟随者向领导者反馈preLogIndex和preLogTerm信息。根据这些字段领导者为跟随者更新nextIndex。
  • commitIndex:日志提交索引,如果收到跟随者返回成功提交的数量超过一半,就增加1。
    这部分代码与我们之前关于客户端交互的讨论特别重要:
if cm.commitIndex != savedCommitIndex {
  cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
  cm.newCommitReadyChan <- struct{}{}
}

newCommitReadyChan该通道是consensusModule内部使用的,用于通知客户端请求数据已经发送,由以下函数接收该通道数据,并进行逻辑处理:

func (cm *ConsensusModule) commitChanSender() {
  for range cm.newCommitReadyChan {
    // Find which entries we have to apply.
    cm.mu.Lock()
    savedTerm := cm.currentTerm
    savedLastApplied := cm.lastApplied
    var entries []LogEntry
    if cm.commitIndex > cm.lastApplied {
      entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
      cm.lastApplied = cm.commitIndex
    }
    cm.mu.Unlock()
    cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)

    for i, entry := range entries {
      cm.commitChan <- CommitEntry{
        Command: entry.Command,
        Index:   savedLastApplied + i + 1,
        Term:    savedTerm,
      }
    }
  }
  cm.dlog("commitChanSender done")
}

lastApplied存储的是最新的日志索引,因此需要更新当前值。并将需要返回给客户端的响应信息写入commitChan中。

follower更新日志

已经看到了leader如何处理新的日志条目。下面看下跟随者怎么处理新的日志信息,确切地说,AppendEntries RPC如何调用。

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("AppendEntries: %+v", args)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in AppendEntries")
    cm.becomeFollower(args.Term)
  }

  reply.Success = false
  if args.Term == cm.currentTerm {
    if cm.state != Follower {
      cm.becomeFollower(args.Term)
    }
    cm.electionResetEvent = time.Now()

    // 查看PreLogIndex是否匹配
    //注意PrevLogIndex == -1是特殊情况
    if args.PrevLogIndex == -1 ||
      (args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
      reply.Success = true

      // 找到日志插入位置
      logInsertIndex := args.PrevLogIndex + 1
      newEntriesIndex := 0

      for {
        if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
          break
        }
        if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
          break
        }
        logInsertIndex++
        newEntriesIndex++
      }
     
      if newEntriesIndex < len(args.Entries) {
        cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
        cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
        cm.dlog("... log is now: %v", cm.log)
      }

      // 设置commitIndex
      if args.LeaderCommit > cm.commitIndex {
        cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
        cm.dlog("... setting commitIndex=%d", cm.commitIndex)
        cm.newCommitReadyChan <- struct{}{}
      }
    }
  }

  reply.Term = cm.currentTerm
  cm.dlog("AppendEntries reply: %+v", *reply)
  return nil
}

当代码发现领导者的LeaderCommit比跟随者的cm.commitIndex更大时候,这是追随者得知领导考虑提交额外的日志,需向ch.newCommitReadyChan通道发送通知。
当leader使用AE发送新的日志条目时,会发生以下情况:

  • follower将新条目添加到其日志中,并向leader回复success=true。
  • leader会更新这个跟随者的matchIndex。当足够数量的follower在下一个索引中拥有他们的matchIndex时,leader更新commitIndex并在下一个AE中发送给所有的跟随者(在leaderCommit字段中)。
    问题:提交一个新的请求指令需要发送多少RPC调用?
    答案:两类RPC,第一类是领导者节点发送下一条日志给跟随者,然后跟随者确认收到日志信息。当领导者处理AE返回,会根据响应信息更新提交索引(commitIndex)。第二类发送更新提交索引给跟随者,跟随者将标记这些日志已提交,并发送到commit通道。

选举安全

日志在选举中也起作用了,即选举约束。raft处理选举时会根据其包含的日志是否比大多数其他节点的更全,才能赢得选举。因为这个原因,请求投票信息RV包含lastIndex和lastLogTerm字段。当候选者发送RV,会根据最新收到的日志信息来设置这两个字段。跟随者收到RV后会和自己的相应字段做比较,决定候选者是否有被投票的资格。

以下是startElection增加这个逻辑的代码:

func (cm *ConsensusModule) startElection() {
  cm.state = Candidate
  cm.currentTerm += 1
  savedCurrentTerm := cm.currentTerm
  cm.electionResetEvent = time.Now()
  cm.votedFor = cm.id
  cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)

  votesReceived := 1

  // 并发发送 RequestVote RPCs给所有其他节点
  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      cm.mu.Lock()
      savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
      cm.mu.Unlock()

      args := RequestVoteArgs{
        Term:         savedCurrentTerm,
        CandidateId:  cm.id,
        LastLogIndex: savedLastLogIndex,
        LastLogTerm:  savedLastLogTerm,
      }

      cm.dlog("sending RequestVote to %d: %+v", peerId, args)
      var reply RequestVoteReply
      if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        cm.dlog("received RequestVoteReply %+v", reply)

        if cm.state != Candidate {
          cm.dlog("while waiting for reply, state = %v", cm.state)
          return
        }

        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in RequestVoteReply")
          cm.becomeFollower(reply.Term)
          return
        } else if reply.Term == savedCurrentTerm {
          if reply.VoteGranted {
            votesReceived++
            if votesReceived*2 > len(cm.peerIds)+1 {
              // 赢得选举
              cm.dlog("wins election with %d votes", votes)
              cm.startLeader()
              return
            }
          }
        }
      }
    }(peerId)
  }

  // 再运行一个选举计时器,以防这次选举失败
  go cm.runElectionTimer()
}

lastLogIndexAndTerm辅助函数:

// lastLogIndexAndTerm  返回最新一条日志索引和日志的任期
// (没有日志返回-1) 
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
  if len(cm.log) > 0 {
    lastIndex := len(cm.log) - 1
    return lastIndex, cm.log[lastIndex].Term
  } else {
    return -1, -1
  }
}

注意:本文的实现索引从0开始,而Raft论文中索引从1开始。-1通常用作哨兵值。
下面是更新后的RV处理程序,它实现了选举安全检查:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
//获取最新日志和任期
  lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
  cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in RequestVote")
    cm.becomeFollower(args.Term)
  }

//这里对比收到的任期和日志情况
  if cm.currentTerm == args.Term &&
    (cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
    (args.LastLogTerm > lastLogTerm ||
      (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
    reply.VoteGranted = true
    cm.votedFor = args.CandidateId
    cm.electionResetEvent = time.Now()
  } else {
    reply.VoteGranted = false
  }
  reply.Term = cm.currentTerm
  cm.dlog("... RequestVote reply: %+v", reply)
  return nil
}
赞(0) 打赏
未经允许不得转载:IDEA激活码 » Raft实现(3):客户端指令和日志复制

一个分享Java & Python知识的社区