PolarSPARC

Raft Consensus Algorithm Unraveled


Bhaskar S 08/22/2020


Overview

Modern applications are being built the Cloud Native way. This implies distributed processing systems (including Data Stores) running on different hosts and communicating using API and/or Messaging.

Designing and building distributed fault-tolerant systems is hard and in particular keeping the state replicated and in-sync across the different nodes of a distributed data store (Distributed Consensus) is even more challenging. The reason for this - since it involves many different nodes, there are multiple points of failure - one or more nodes can crash, communication with nodes can fail, messages could be lost, etc.

Look at any of the modern distributed data stores, namely, etcd, TiDB, or Kudu, etc. What do they all have in common w.r.t Distributed Consensus ???

YES !!! They all use the Raft Consensus Algorithm under-the-hood to achieve consensus on the replicated state.

In this article, we will unravel the inner workings of the Raft Consensus by implementating our own simplified version of the algorithm in Golang.

There are 3 important aspects of the Raft Consensus algorithm for the distributed consensus to work properly:

We will now drill into each of the above 3 aspects. Before we do that, let us get some terms defined.

Term Description
Quorum If there are N (typically odd number - 3, 5, 7, etc) nodes in the cluster, then a quorum count is defined as (N/2 + 1)
Follower All nodes in a cluster start in this state
Candidate To initiate the Leader election process, one of the nodes becomes a candidate to request votes from the other nodes in the cluster
Leader After a successful Leader election, only one node in the cluster is the Leader and all updates to the cluster is through the Leader
Term A monotonically increasing number that represents an instance of the Leader election. Once a Leader is chosen, it does not change until the next Leader election
Index A monotonically increasing number that represents the next update to be stored and committed
Election Timer A timer that is started on each node of the cluster. The first timer that expires is the node that becomes the Candidate node and initiates the Leader election process
Heartbeat A regular periodic ping from the Leader node to all the other Follwer nodes in the cluster to indicate the Leader is alive and working

Every node in the cluster starts a network server, listening for incoming messages. The following code segment shows the starting of the network server and handling of incoming requests by the go-routine handleRequest :

rpc.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

const MaxBufferSz int = 512

// Goroutine
func StartServer(node RaftNode) {
  fmt.Printf("++ [%s] Ready to start server on Raft node\n", node.Addr)

  listen, err := net.Listen("tcp", node.Addr)
  if err != nil {
    fmt.Printf("-- [%s] Could not start service on - %s\n", node.Addr, err)
    os.Exit(1)
  }

  StartRaftTimer(&node)

  fmt.Printf("++ [%s] Ready to accept remote connections\n", node.Addr)

  for {
    conn, err := listen.Accept()
    if err != nil {
      fmt.Printf("-- [%s] Could not establish network connection - %s", node.Addr, err)
      continue
    }
    go handleRequest(&node, conn)
  }
}

// Goroutine
func handleRequest(node *RaftNode, conn net.Conn) {
  buf, err := connRead(*node, conn, "Request")
  if err == nil {
    msg := Deserialize(*node, buf)
    switch msg.(type) {
      case Dummy:
        fmt.Printf("-- [%s] ***** Deserialized event *Dummy*\n", node.Addr)
        _ = conn.Close()
      case ClientUpdate:
        fmt.Printf("++ [%s] ClientUpdate event from %s\n", node.Addr, conn.RemoteAddr().String())

        go handleClient(node, conn, msg)
      default:
        node.PeerC <-msg
        _ = conn.Close()
    }
  }
}

// Defined in utils.go
func connRead(node RaftNode, conn net.Conn, source string) ([]byte, error) {
  to := conn.RemoteAddr().String()
  buf := make([]byte, MaxBufferSz)
  _, err := conn.Read(buf)
  if err != nil {
    fmt.Printf("-- [%s] %s - Could not read from endpoint [%s] - %s\n", node.Addr, source, to, err)
  }
  return buf, err
}

Every message between nodes is serialized into an array of bytes, with the first byte indicating the type of the message payload. The following diagram illustrates the layout of the network payload exchanged between the nodes in the cluster:


Initial State
Figure.1

The following code segment shows the functions for serialization and deserialization of the different messages:

io.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

const (
  RVType byte = 0x01
  GVType byte = 0x02
  HBType byte = 0x03
  AEType byte = 0x04
  CAType byte = 0x05
  CUType byte = 0x06
  UAType byte = 0x07
)

func Serialize(node RaftNode, t interface{}) []byte {
  buf := bytes.NewBuffer([]byte{})
  enc := gob.NewEncoder(buf)

  switch v := t.(type) {
    case RequestVote:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode RequestVote - %s", node.Addr, err)
      } else {
        return copyBytes(RVType, buf)
      }
    case GrantVote:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode GrantVote - %s", node.Addr, err)
      } else {
        return copyBytes(GVType, buf)
      }
    case Heartbeat:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode Heartbeat - %s", node.Addr, err)
      } else {
        return copyBytes(HBType, buf)
      }
    case AppendEntries:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode AppendEntries - %s", node.Addr, err)
      } else {
        return copyBytes(AEType, buf)
      }
    case CommitAck:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode CommitAck - %s", node.Addr, err)
      } else {
        return copyBytes(CAType, buf)
      }
    case ClientUpdate:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode ClientUpdate - %s", node.Addr, err)
      } else {
        return copyBytes(CUType, buf)
      }
    case UpdateAck:
      err := enc.Encode(v)
      if err != nil {
        fmt.Printf("-- [%s] Could not encode UpdateAck - %s", node.Addr, err)
      } else {
        return copyBytes(UAType, buf)
      }
  }

  return []byte{}
}

func Deserialize(node RaftNode, b []byte) interface{} {
  dec := gob.NewDecoder(bytes.NewBuffer(b[1:]))

  switch b[0] {
    case RVType:
      t1 := RequestVote{}
      err := dec.Decode(&t1)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode RequestVote - %s", node.Addr, err)
      }
      return t1
    case GVType:
      t2 := GrantVote{}
      err := dec.Decode(&t2)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode GrantVote - %s", node.Addr, err)
      }
      return t2
    case HBType:
      t3 := Heartbeat{}
      err := dec.Decode(&t3)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode Heartbeat - %s", node.Addr, err)
      }
      return t3
    case AEType:
      t4 := AppendEntries{}
      err := dec.Decode(&t4)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode AppendEntries - %s", node.Addr, err)
      }
      return t4
    case CAType:
      t5 := CommitAck{}
      err := dec.Decode(&t5)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode CommitAck - %s", node.Addr, err)
      }
      return t5
    case CUType:
      t6 := ClientUpdate{}
      err := dec.Decode(&t6)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode ClientUpdate - %s", node.Addr, err)
      }
      return t6
    case UAType:
      t7 := UpdateAck{}
      err := dec.Decode(&t7)
      if err != nil {
        fmt.Printf("-- [%s] Could not decode UpdateAck - %s", node.Addr, err)
      }
      return t7
  }

  return Dummy{}
}

A node serializes a message and writes it on the network connection of another node as a payload. The target node reads the network payload and deserializes it to a message. The target node then sends the message via a go chan for further processing of the message. The following diagram illustrates the flow of a message between two nodes in the cluster:


Initial State
Figure.2

Let us now dive into the first aspect Leader Election of the Raft Consensus algorithm.

Leader Election

The following code segment lists all the data types relevant for Leader Election:

data.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

const MinElectionDuration int = 1150
const MaxElectionDuration int = 1300
const HeartbeatDuration   int = 1000
const DurationIncrRange   int = 100
const MaxWaitCycles       int = 3

/*
  AckCounter is used for tracking acknowledgements from the nodes in the Raft cluster.
  It is used in the two cases - Leader Election and Log Replication
*/

type YesNo struct {
  Yes int
  No  int
}

type AckCounter map[string]*YesNo

func (c AckCounter) Reset(from string, to string) {
  k := compositeAddr(from, to)
  c[k] = &YesNo{0, 0}
}

func (c AckCounter) GetYes(from string, to string) int {
  k := compositeAddr(from, to)
  if v, ok := c[k]; ok {
    return v.Yes
  } else {
    c[k] = &YesNo{0, 0}
  }
  return 0
}

func (c AckCounter) GetNo(from string, to string) int {
  k := compositeAddr(from, to)
  if v, ok := c[k]; ok {
    return v.No
  } else {
    c[k] = &YesNo{0, 0}
  }
  return 0
}

func (c AckCounter) IncrementYes(from string, to string) {
  k := compositeAddr(from, to)
  if v, ok := c[k]; ok {
    v.Yes++
  } else {
    c[k] = &YesNo{1, 0}
  }
}

func (c AckCounter) IncrementNo(from string, to string) {
  k := compositeAddr(from, to)
  if v, ok := c[k]; ok {
    v.No++
  } else {
    c[k] = &YesNo{0, 1}
  }
}

/*
  RaftNode encapsulates the state of a Raft node
*/

type RaftNodeType byte

const (
  Follower RaftNodeType = 0x01
  Candidate             = 0x02
  Leader                = 0x03
)

type RaftNode struct {
  Type           RaftNodeType        // Type of the node - Follower, Candidate, or Leader
  GrantedVote    bool                // Flag that indicates if this node granted a vote to a Candidate
  CurrentTerm    int                 // Tracks the current election term of each node
  CurrentIndex   int                 // Tracks the current index on each node
  CommittedIndex int                 // Tracks the index committed on the Leader
  Duration       int                 // Represents the random duration for election timeout
  VotesCycles    int                 // Count of wait cycles during Leader Election
  Addr           string              // Network endpoint of this node
  LeaderAddr     string              // Network endpoint of the Leader node
  PeerAddr       []string            // Network endpoints of the other nodes in the Raft cluster
  LastHB         time.Time           // Tracks the last Heartbeat from the Leader node
  PeerC          chan interface{}    // Used for receiving events from peer nodes
  LeaderC        chan interface{}    // Used for channeling events from clients to the Leader
  VotesAck       AckCounter          // Acknowledgement counter for votes
  CommitAck      AckCounter          // Acknowledgement counter for commits on nodes of the cluster
  CommitCycles   UpdateAckCycles     // Count of wait cycles per client during Log Replication
  ClientC        ClientChannel       // Used for channeling events to clients
  Log            AppendLog           // Log where client updates are appended
}

func NewRaftNode(addr []string) RaftNode {
  n := RaftNode{}

  n.Type = Follower
  n.GrantedVote = false
  n.CurrentTerm = 0
  n.CurrentIndex = 0
  n.CommittedIndex = 0
  n.Duration = rand.Intn(MaxElectionDuration-MinElectionDuration+1) + MinElectionDuration
  n.VotesCycles = 0
  n.Addr = addr[0]
  n.LeaderAddr = ""
  n.PeerAddr = addr[1:]
  n.LastHB = time.Now()
  n.PeerC = make(chan interface{})
  n.LeaderC = make(chan interface{})
  n.ClientC = ClientChannel{}
  n.VotesAck = AckCounter{}
  n.CommitAck = AckCounter{}
  n.CommitCycles = UpdateAckCycles{}
  n.Log = MemLog{Cache: map[int]string{}}

  return n
}

/*
  RequestVote message is sent by a Candidate node during Leader Election
*/

type RequestVote struct {
  Term  int
  Index int
  From  string
}

func NewRequestVote(term int, index int, from string) RequestVote {
  r := RequestVote{}

  r.Term = term
  r.Index = index
  r.From = from

  return r
}

/*
  GrantVote message is sent out by Follower node(s) in response to a RequestVote during Leader Election
*/

type GrantVote struct {
  Vote  bool
  Term  int
  Index int
  From  string
}

func NewGrantVote(vote bool, term int, index int, from string) GrantVote {
  g := GrantVote{}

  g.Vote = vote
  g.Term = term
  g.Index = index
  g.From = from

  return g
}

/*
  Heartbeat message is sent out by the Leader node to all the Follower nodes in the cluster
*/

type Heartbeat struct {
  Term  int
  Index int
  From  string
}

func NewHeartbeat(term int, index int, from string) Heartbeat {
  h := Heartbeat{}

  h.Term = term
  h.Index = index
  h.From = from

  return h
}

The type RaftNodeType indicates the type of the node in the cluster - Follower, Candidate, or Leader (defined as constants).

The type RaftNode encapsulates the necessary state of a node in the cluster. The following are the description of some of the fields:

Let us assume a cluster of 3 nodes identified as A, B, and C respectively. When a node starts up, it starts as a Follower node. Also, it randomly picks an Election timer duration value (in milliseconds) between 150 and 300. In order to observe and reason how the Leader Election works, for our demonstration we choose values between 1150 and 1300 .

The following diagram illustrates the initial state of the cluster:


Initial State
Figure.3

From the illustration above in Figure.3, we see the node B's timer will go off first. Node B promotes itself as a Candidate node and broadcasts a RequestVote command to all the other Follower nodes in the cluster. The RequestVote is basically asking the other Follower nodes to grant a vote for the Candidate node to be become a Leader node.

The following diagram illustrates the broadcast of the RequestVote command to the nodes of the cluster:


RequestVote Command
Figure.4

The following code segment shows the handling of the election timer going off to broadcast the RequestVote message:

timers.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

func HandleElectionTimeout(node *RaftNode) {
  fmt.Printf("++ [%s] Election timeout - {Type: %s, IsLeaderOk: %v, GrantedVote: %v, LastHB: %v}\n",
    node.Addr, node.GetType(), node.isLeaderOk(), node.GrantedVote, node.LastHB)

  // No leader yet ???
  if node.IsFollower() && !node.isLeaderOk() && !node.GrantedVote {
    fmt.Printf("++ [%s] Election timeout - Follower ready to start Leader election\n", node.Addr)

    nodeStateReset(false, node)
    node.ToCandidate()
    node.VotesAck.IncrementYes(node.Addr, node.Addr) // Self vote YES

    rv := NewRequestVote(node.CurrentTerm+1, node.CurrentIndex, node.Addr)
    b := Serialize(*node, rv)
    if len(b) > 0 {
      count := 1 // Self count
      for _, addr := range node.PeerAddr {
        if SendToPeer(*node, addr, b) {
          count++
        }
      }
      // No Quorum of Peers ???
      if count < node.QuorumCount() {
        fmt.Printf("-- [%s] Election timeout - No QUORUM of Peer connections, RESET back to Follower\n",
          node.Addr)

        node.SetCurrentTerm(rv.Term)
        nodeStateReset(false, node)
      }
    }
  } else if node.IsCandidate() {
      ...
      ...
      ...
  } else if node.IsLeader() && node.CurrentIndex != node.CommittedIndex {
      ...
      ...
      ...
  } else {
    if node.GrantedVote {
      nodeStateReset(false, node)
    }
  }
}

!!! ATTENTION !!!

Code segments in the article have been trimmed to just show the pieces relevant to the context
Also, lines with three dots '...' indicates code that has been truncated

On receiving the RequestVote command, the Follower nodes A and C of the cluster respond with a GrantVote command. The GrantVote command from a Follower node indicates either an approve or deny of the RequestVote from the Candidate node.

The following diagram illustrates the GrantVote command from the Follower nodes of the cluster:


GrantVote Command
Figure.5

The following code segment shows the handling of the RequestVote by the Follwer nodes and each responding with GrantVote message:

election.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

// To be executed by all Peer nodes
func HandleRequestVote(node *RaftNode, msg RequestVote) {
  fmt.Printf("++ [%s] RequestVote event - {Term: %d, Index: %d, Leader: [%s]} => " +
    "{Term: %d, Index: %d, From: [%s]}\n", node.Addr, node.CurrentTerm, node.CurrentIndex, node.LeaderAddr,
    msg.Term, msg.Index, msg.From)

  if node.IsFollower() {
    if node.CurrentTerm <= msg.Term && node.CurrentIndex <= msg.Index {
      fmt.Printf("++ [%s] RequestVote - Follower will GrantVote to [%s]\n", node.Addr, msg.From)

      node.SetCurrentTerm(msg.Term)
      node.SetCurrentIndex(msg.Index)
      node.SetLeaderAddr(msg.From)
      sendGrantVote(node, true, msg.Term, msg.Index, msg.From)
    } else {
      fmt.Printf("-- [%s] RequestVote - Follower out-of-sync {CurrentTerm: %d, Term:%d, " +
        "CurrentIndex: %d, Index: %d}, DENY GrantVote to [%s]\n",
        node.Addr, node.CurrentTerm, msg.Term, node.CurrentIndex, msg.Index, msg.From)

      sendGrantVote(node, false, node.CurrentTerm, node.CurrentIndex, msg.From)
    }
  } else { // Could be Candidate or Leader
    fmt.Printf("-- [%s] RequestVote - NOT Follower, DENY GrantVote to [%s]\n", node.Addr, msg.From)

    sendGrantVote(node, false, msg.Term, msg.Index, msg.From)
  }
}

// Defined in utils.go
func connWrite(node RaftNode, conn net.Conn, b []byte) error {
  to := conn.RemoteAddr().String()
  _, err := conn.Write(b)
  if err != nil {
    fmt.Printf("-- [%s] Could not send command to endpoint [%s] - %s\n", node.Addr, to, err)
  }
  return err
}

// Defined in rpc.go
func SendToPeer(node RaftNode, to string, b []byte) bool {
  conn, err := net.Dial("tcp", to)
  if err != nil {
    fmt.Printf("-- [%s] Could not establish connection to peer [%s] - %s\n", node.Addr, to, err)

    return false
  } else {
    defer conn.Close()
    err := connWrite(node, conn, b)
    if err != nil {
      return false
    }
  }
  return true
}

// Defined in utils.go
func sendGrantVote(node *RaftNode, vote bool, term int, index int, from string) {
  gv := NewGrantVote(vote, term, index, node.Addr)
  b := Serialize(*node, gv)
  if len(b) > 0 {
    if SendToPeer(*node, from, b) {
      if vote {
        node.SetGrantedVote()
      }
    }
  }
}

Once the Candidate node B receives quorum of approved GrantVote commands, The Candidate node promotes itself to be a Leader node of the cluster. All the other nodes in the cluster reset themselves to Follower nodes.

The following diagram illustrates the successful election of node B as the Leader of the cluster:


Leader Success
Figure.6

The following code segment shows the handling of the GrantVote commands from the Follower nodes by the Candidate node to promote itself to a Leader node and start the Heartbeat messages:

election.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

// To be executed by the Candidate node
func HandleGrantVote(node *RaftNode, msg GrantVote) {
  fmt.Printf("++ [%s] GrantVote event - {Term: %d, Index: %d, Leader: [%s]} => " +
    "{Vote: %v, Term: %d, Index: %d, From: [%s]}\n", node.Addr, node.CurrentTerm, node.CurrentIndex,
    node.LeaderAddr, msg.Vote, msg.Term, msg.Index, msg.From)

  if node.IsCandidate() {
    if node.CurrentTerm <= msg.Term && node.CurrentIndex <= msg.Index {
      if msg.Vote {
        node.VotesAck.IncrementYes(msg.From, node.Addr) // Peer vote YES
      } else {
        node.VotesAck.IncrementNo(msg.From, node.Addr) // Peer vote NO
      }

      yes := 1 // Self Ack YES
      no := 0
      for _, addr := range node.PeerAddr {
        yes += node.VotesAck.GetYes(addr, node.Addr)
        no += node.VotesAck.GetNo(addr, node.Addr)
      }

      fmt.Printf("++ [%s] GrantVote - Candidate {Votes: [YES: %d, NO: %d], Quorum: %d}\n",
        node.Addr, yes, no, node.QuorumCount())

      if yes >= node.QuorumCount() {
        fmt.Printf("++ [%s] GrantVote - Candidate PROMOTED to Leader\n", node.Addr)

        node.SetCurrentTerm(msg.Term)
        node.SetLeaderAddr(node.Addr)
        nodeStateReset(true, node)
        sendHeartbeat(node)

        StartHeartbeatTimer(node)
      } else {
        // Did we get all the votes ???
        if (yes + no) >= node.AllCount() {
          fmt.Printf("-- [%s] GrantVote - No QUORUM, Candidate RESET to Follower\n", node.Addr)

          node.SetCurrentTerm(msg.Term)
          nodeStateReset(false, node)
        }
      }
    } else {
      fmt.Printf("-- [%s] GrantVote - Candidate Out-of-sync, RESET to Follower\n", node.Addr)

      node.SetCurrentTerm(msg.Term)
      nodeStateReset(false, node)
    }
  } else {
    // Only apply for Follower
    if node.IsFollower() {
      fmt.Printf("-- [%s] GrantVote - Is Follower, IGNORE GrantVote from [%s]\n", node.Addr, msg.From)

      node.SetCurrentTerm(msg.Term)
      nodeStateReset(false, node)
    }
  }
}

// Defined in timers.go
// Goroutine - Only started on the Leader peer
func StartHeartbeatTimer(node *RaftNode) {
  fmt.Printf("++ [%s] Heartbeat timer duration %d\n", node.Addr, HeartbeatDuration)

  // Heartbeat timer
  hd := time.Duration(HeartbeatDuration) * time.Millisecond
  ht := time.NewTimer(hd)

  go func() {
    for {
      <-ht.C

      if node.IsLeader() {
        fmt.Printf("++ [%s] Heartbeat timer - {Term: %d, Index: %d, Committed: %d}\n",
          node.Addr, node.CurrentTerm, node.CurrentIndex, node.CommittedIndex)

        if !sendHeartbeat(node) {
          ht.Stop()
          break
        }
      } else {
        fmt.Printf("-- [%s] Heartbeat timer - NOT Leader, terminating Heartbeat timer @ %v\n",
          node.Addr, time.Now())

        ht.Stop()
        nodeStateReset(false, node)
        break
      }

      // Reset should always be invoked on stopped or expired channels
      ht.Reset(hd)
    }
  }()
}

// Defined in utils.go
func nodeStateReset(leader bool, node *RaftNode) {
  if leader {
    node.ToLeader()
  } else {
    node.ToFollower()
  }
  node.ResetVoteCycles()
  node.VotesAck.Reset(node.Addr, node.Addr)
  for _, addr := range node.PeerAddr {
    node.VotesAck.Reset(addr, node.Addr)
  }
  node.ResetGrantedVote()
}

// Defined in utils.go
func sendHeartbeat(node *RaftNode) bool {
  hb := NewHeartbeat(node.CurrentTerm, node.CurrentIndex, node.LeaderAddr)
  b := Serialize(*node, hb)
  if len(b) > 0 {
    count := 1 // Self count
    for _, addr := range node.PeerAddr {
      if SendToPeer(*node, addr, b) {
        count++
      }
    }
    // No Quorum of Peers ???
    if count < node.QuorumCount() {
      fmt.Printf("-- [%s] Send Heartbeat - No QUORUM of Peer connections, RESET back to Follower\n",
        node.Addr)

      nodeStateReset(false, node)
      return false
    }
    node.SetLastHB(time.Now())
  } else {
    return false
  }
  return true
}

Once a Leader node is elected, it establishes a Term and synchronizes state with the other nodes in the cluster (CurrentTerm) via Heartbeat messages (at regular intervals).

We will now demonstrate the Leader Election with a 3 node cluster. We will open 3 Terminals one for each of the nodes. We will refer to them as A, B, and C respectively.

The following is the output of node B, which has been elected as the Leader:

Output.1

++ [192.168.1.179:9002] Ready to start server on Raft node
++ [192.168.1.179:9002] Election timer duration 1234
++ [192.168.1.179:9002] Ready to accept remote connections
++ [192.168.1.179:9002] Election timer started @ 2020-08-22 19:21:59.208772097 -0400 EDT m=+0.001042014
++ [192.168.1.179:9002] Election timeout - {Type: Follower, IsLeaderOk: false, GrantedVote: false, LastHB: 2020-08-22 19:21:59.208549237 -0400 EDT m=+0.000819154}
++ [192.168.1.179:9002] Election timeout - Follower ready to start Leader election
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9001] - dial tcp 192.168.1.179:9001: connect: connection refused
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9003] - dial tcp 192.168.1.179:9003: connect: connection refused
-- [192.168.1.179:9002] Election timeout - No QUORUM of Peer connections, RESET back to Follower
++ [192.168.1.179:9002] Election timeout - {Type: Follower, IsLeaderOk: false, GrantedVote: false, LastHB: 2020-08-22 19:21:59.208549237 -0400 EDT m=+0.000819154}
++ [192.168.1.179:9002] Election timeout - Follower ready to start Leader election
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9001] - dial tcp 192.168.1.179:9001: connect: connection refused
++ [192.168.1.179:9002] GrantVote event - {Term: 1, Index: 0, Leader: []} => {Vote: true, Term: 2, Index: 0, From: [192.168.1.179:9003]}
++ [192.168.1.179:9002] GrantVote - Candidate {Votes: [YES: 2, NO: 0], Quorum: 2}
++ [192.168.1.179:9002] GrantVote - Candidate PROMOTED to Leader
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9001] - dial tcp 192.168.1.179:9001: connect: connection refused
++ [192.168.1.179:9002] Heartbeat timer duration 1000
++ [192.168.1.179:9002] Heartbeat timer - {Term: 2, Index: 0, Committed: 0}
++ [192.168.1.179:9002] Election timeout - {Type: Leader, IsLeaderOk: true, GrantedVote: false, LastHB: 2020-08-22 19:22:02.680284618 -0400 EDT m=+3.472554585}
++ [192.168.1.179:9002] Heartbeat timer - {Term: 2, Index: 0, Committed: 0}
++ [192.168.1.179:9002] Election timeout - {Type: Leader, IsLeaderOk: true, GrantedVote: false, LastHB: 2020-08-22 19:22:03.680932944 -0400 EDT m=+4.473202981}
...
...
...

The following is the output of node C that granted a vote for node B:

Output.2

++ [192.168.1.179:9003] Ready to start server on Raft node
++ [192.168.1.179:9003] Election timer duration 1245
++ [192.168.1.179:9003] Ready to accept remote connections
++ [192.168.1.179:9003] Election timer started @ 2020-08-22 19:22:00.728157567 -0400 EDT m=+0.000939241
++ [192.168.1.179:9003] RequestVote event - {Term: 0, Index: 0, Leader: []} => {Term: 2, Index: 0, From: [192.168.1.179:9002]}
++ [192.168.1.179:9003] RequestVote - Follower will GrantVote to [192.168.1.179:9002]
++ [192.168.1.179:9003] Heartbeat event - {Term: 2, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 2, Index: 0, From: [192.168.1.179:9002]}
++ [192.168.1.179:9003] Heartbeat event - {Term: 2, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 2, Index: 0, From: [192.168.1.179:9002]}
...
...
...

The following is the output of node A:

Output.3

++ [192.168.1.179:9001] Ready to start server on Raft node
++ [192.168.1.179:9001] Election timer duration 1268
++ [192.168.1.179:9001] Ready to accept remote connections
++ [192.168.1.179:9001] Election timer started @ 2020-08-22 19:22:02.240564441 -0400 EDT m=+0.001018941
++ [192.168.1.179:9001] Heartbeat event - {Term: 0, Index: 0, Leader: []} => {Term: 2, Index: 0, From: [192.168.1.179:9002]}
++ [192.168.1.179:9001] Heartbeat event - {Term: 2, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 2, Index: 0, From: [192.168.1.179:9002]}
++ [192.168.1.179:9001] Heartbeat event - {Term: 2, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 2, Index: 0, From: [192.168.1.179:9002]}
...
...
...

Let us now TERMINATE the Leader node B by pressing CTRL-C.

The following is the output of node A, which has now been elected as the new Leader:

Output.4

++ [192.168.1.179:9001] Election timeout - {Type: Follower, IsLeaderOk: false, GrantedVote: false, LastHB: 2020-08-22 19:22:09.684588286 -0400 EDT m=+7.445042755}
++ [192.168.1.179:9001] Election timeout - Follower ready to start Leader election
-- [192.168.1.179:9001] Could not establish connection to peer [192.168.1.179:9002] - dial tcp 192.168.1.179:9002: connect: connection refused
++ [192.168.1.179:9001] GrantVote event - {Term: 2, Index: 0, Leader: [192.168.1.179:9002]} => {Vote: true, Term: 3, Index: 0, From: [192.168.1.179:9003]}
++ [192.168.1.179:9001] GrantVote - Candidate {Votes: [YES: 2, NO: 0], Quorum: 2}
++ [192.168.1.179:9001] GrantVote - Candidate PROMOTED to Leader
-- [192.168.1.179:9001] Could not establish connection to peer [192.168.1.179:9002] - dial tcp 192.168.1.179:9002: connect: connection refused
++ [192.168.1.179:9001] Heartbeat timer duration 1000
++ [192.168.1.179:9001] Heartbeat timer - {Term: 3, Index: 0, Committed: 0}
-- [192.168.1.179:9001] Could not establish connection to peer [192.168.1.179:9002] - dial tcp 192.168.1.179:9002: connect: connection refused
...
...
...

The following is the output of node C, which has granted a vote for node A:

Output.5

++ [192.168.1.179:9003] Election timeout - {Type: Follower, IsLeaderOk: false, GrantedVote: true, LastHB: 2020-08-22 19:22:09.684801758 -0400 EDT m=+8.957583532}
++ [192.168.1.179:9003] RequestVote event - {Term: 2, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 3, Index: 0, From: [192.168.1.179:9001]}
++ [192.168.1.179:9003] RequestVote - Follower will GrantVote to [192.168.1.179:9001]
++ [192.168.1.179:9003] Heartbeat event - {Term: 3, Index: 0, Leader: [192.168.1.179:9001]} => {Term: 3, Index: 0, From: [192.168.1.179:9001]}
++ [192.168.1.179:9003] Heartbeat event - {Term: 3, Index: 0, Leader: [192.168.1.179:9001]} => {Term: 3, Index: 0, From: [192.168.1.179:9001]}
...
...
...

Now, let us dive into the second aspect Log Replication of the Raft Consensus algorithm.

Log Replication

It starts with a remote client sending a ClientUpdate message to the Leader for replication across a quorum of nodes. The message is published to the go channel called LeaderC so the Leader can process it. Once the Leader successfully processes the ClientUpdate message, it publishes an UpdateAck back to a client specific go channel, which is then serialized and dispatched to the remote client.

The following code segment shows the client sending the ClientUpdate message to the Leader:

rpc.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

// Goroutine - Keep the client connection open and handle future updates from the client
func handleClient(node *RaftNode, conn net.Conn, msg interface{}) {
  defer conn.Close()
  ca := msg.(ClientUpdate)
  node.ClientC[ca.Addr] = make(chan interface{})
  for {
    node.LeaderC <-msg

    evt := <-node.ClientC[ca.Addr]

    ua := evt.(UpdateAck)

    fmt.Printf("++ [%s] Client - UpdateAck event from [%s] {Ack: %v, Addr: %s}\n",
      node.Addr, ua.From, ua.Ack, ua.Addr)

    // Send response to client
    b := Serialize(*node, ua)
    if len(b) > 0 {
      err := connWrite(*node, conn, b)
      if err != nil {
        break
      }
    } else {
      fmt.Printf("-- [%s] Client - Serialization error, disconnect client [%s]\n", node.Addr, ua.Addr)
      break
    }

    if !ua.Ack {
      fmt.Printf("-- [%s] Client - Leader Update REJECT, disconnect client [%s]\n", node.Addr, ua.Addr)

      // Give a little time for client to read
      time.Sleep(50 * time.Millisecond)
      break
    }

    // Read next update from client
    buf, err := connRead(*node, conn, "Client")
    if err == nil {
      msg = Deserialize(*node, buf)
      ca = msg.(ClientUpdate)
    } else {
      fmt.Printf("-- [%s] Client - Network read error, disconnect client [%s]\n", node.Addr, ua.Addr)

      break
    }
  }
}

Once the Leader receives a ClientUpdate message, it appends the update to its own Log, assigning the next Index value to CurrentIndex), and it sends an AppendEntries message to all the Follower nodes in the cluster for replication.

The following diagram illustrates the Leader getting an update from a client and sending a Log Replication message to the Follower nodes:


Append Entries
Figure.7

The following code segment shows the Leader sending an AppendEntries message (in response to a ClientUpdate message) to the Follower nodes:

replication.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

// To be executed by the Follower nodes
func HandleAppendEntries(node *RaftNode, msg AppendEntries) {
  fmt.Printf("++ [%s] AppendEntries event - {Term: %d, Index: %d, Leader: [%s]} => " +
    "{Term: %d, Index: %d, Client: [%s], Entry: %s}\n", node.Addr, node.CurrentTerm, node.CurrentIndex,
    node.LeaderAddr, msg.Term, msg.Index, msg.Client, msg.Entry)

  if node.IsFollower() && node.CurrentTerm == msg.Term && node.LeaderAddr == msg.Leader {
    if node.CurrentIndex+1 <= msg.Index {
      if node.CurrentIndex+1 == msg.Index {
        ack := false
        sync := false

        if msg.Sync {
          fmt.Printf("++ [%s] AppendEntries - SYNC accepted {Index: %d} with {Entry: %s}\n",
            node.Addr, msg.Index, msg.Entry)

          sync = true
        } else {
          fmt.Printf("++ [%s] AppendEntries - UPDATE accepted {Index: %d} with {Entry: %s}\n",
            node.Addr, msg.Index, msg.Entry)

          ack = true
        }

        err := node.Log.Write(msg.Index, msg.Entry)
        if err == nil {
          node.SetCurrentIndex(msg.Index)
          sendCommitAck(node, ack, sync, msg.Term, msg.Index, msg.Client)
        }
      } else {
        fmt.Printf("-- [%s] AppendEntries - Node index BEHIND, REJECT entry from [%s]\n",
          node.Addr, msg.Client)

        sendCommitAck(node, false, true, msg.Term, node.CurrentIndex, msg.Client)
      }
    } else {
      // Did the CommitAck not reach the Leader node ???
      if node.CurrentIndex == msg.Index {
        sendCommitAck(node, true, false, msg.Term, msg.Index, msg.Client)
      } else {
        fmt.Printf("-- [%s] AppendEntries - Node index AHEAD, REJECT entry from [%s]\n",
          node.Addr, msg.Client)

        sendCommitAck(node, false, false, msg.Term, node.CurrentIndex, msg.Client)
      }
    }
  } else {
    fmt.Printf("-- [%s] AppendEntries - Node NOT in sync, REJECT entry from [%s]\n", node.Addr, msg.Client)

    sendCommitAck(node, false, false, node.CurrentTerm, node.CurrentIndex, msg.Client)
  }
}

Once a Follower node proccesses the AppendEntries message, based on the processing status (success or failure), updates its Index (on success), and sends a CommitAck message back to the Leader.

The following diagram illustrates the Leader sending replication message to Follower nodes. Once a Follower node processes the replication, it sends an acknowledgement back to the Leader:


Commit Acknowledgement
Figure.8

The following code segment shows the Leader processing the CommitAck messages from the Follower nodes:

replication.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

// To be executed by the Leader node
func HandleCommitAck(node *RaftNode, msg CommitAck) {
  fmt.Printf("++ [%s] CommitAck event - {Term: %d, Index: %d} => {Ack: %v, Term: %d, Index: %d, " +
    "Peer: [%s], Client: [%s]}\n",
    node.Addr, node.CurrentTerm, node.CurrentIndex, msg.Ack, msg.Term, msg.Index, msg.Addr, msg.Client)

  if node.IsLeader() {
    // Is Follower behind ???
    if msg.Sync && node.CurrentIndex > msg.Index {
      syncPeer(node, msg.Index+1, msg.Addr)
    } else if node.CommittedIndex < node.CurrentIndex {
      if msg.Ack {
        node.CommitAck.IncrementYes(msg.Client, msg.Addr)
      } else {
        node.CommitAck.IncrementNo(msg.Client, msg.Addr)
      }

      yes := 1 // Self Ack YES
      no := 0
      for _, peer := range node.PeerAddr {
        yes += node.CommitAck.GetYes(msg.Client, peer)
        no += node.CommitAck.GetNo(msg.Client, peer)
      }

      if yes >= node.QuorumCount() {
        fmt.Printf("++ [%s] CommitAck - Leader ACCEPTED update {Client: [%s], Index: %d}\n",
          node.Addr, msg.Client, node.CurrentIndex)

        ua := NewUpdateAck(true, msg.Client, node.Addr)
        node.ClientC[msg.Client] <-ua
        commitAckReset(node, msg.Client)
        node.SetCommittedIndex(node.CurrentIndex)
      } else {
        // Did we not get all the votes ???
        if (yes + no) >= node.AllCount() {
          fmt.Printf("-- [%s] CommitAck - NO Quorum, Leader REJECTED update from client [%s]\n",
            node.Addr, msg.Client)

          ua := NewUpdateAck(false, msg.Client, node.Addr)
          node.ClientC[msg.Client] <-ua
          commitAckReset(node, msg.Client)
        }
      }
    }
  } else {
    fmt.Printf("-- [%s] CommitAck - NOT Leader, Ignore CommitAck from [%s]\n", node.Addr, msg.Client)
  }
}

Once the Leader node receives a quorum of CommitAck messages (from the Followers), it finalizes the Index number by updating CommittedIndex. It the Leader does not receive a quorum, it waits few cycles (to accomodate for delays in the network to deliver CommitAck messages) to check. Finally, the Leader sends an UpdateAck message back to the client.

The following diagram illustrates the Leader processing commit acknowledgement messages from the Follower nodes. Once the Leader node gets either a quorum or exhausts the wait cycles (3 election timeout durations in our case), it sends an update acknowledgement back to the client:


Update Acknowledgement
Figure.9

The following code segment shows the Leader processing the CommitAck messages via the wait cycles in the election timer:

timers.go
/*
  @Author: Bhaskar S
  @Blog:   https://www.polarsparc.com
  @Date:   22 Aug 2020
*/

func HandleElectionTimeout(node *RaftNode) {
  fmt.Printf("++ [%s] Election timeout - {Type: %s, IsLeaderOk: %v, GrantedVote: %v, LastHB: %v}\n",
    node.Addr, node.GetType(), node.isLeaderOk(), node.GrantedVote, node.LastHB)

...
...
...

  } else if node.IsLeader() && node.CurrentIndex != node.CommittedIndex {
    clients := node.ClientC.Clients()
    for _, addr := range clients {
      yes := 1 // Self Ack YES
      no := 0
      for _, peer := range node.PeerAddr {
        yes += node.CommitAck.GetYes(addr, peer)
        no += node.CommitAck.GetNo(addr, peer)
      }

      fmt.Printf("++ [%s] Election timeout - Leader {Client: [%s], Votes: [YES: %d, NO: %d], Quorum: %d}\n",
        node.Addr, addr, yes, no, node.QuorumCount())

      if yes >= node.QuorumCount() {
        fmt.Printf("++ [%s] Election timeout - Leader ACCEPTED update {Client: [%s], Index: %d}\n",
          node.Addr, addr, node.CurrentIndex)

        ua := NewUpdateAck(true, addr, node.Addr)
        node.ClientC[addr] <-ua
        commitAckReset(node, addr)
        node.SetCommittedIndex(node.CurrentIndex)
      } else {
        if node.CommitCycles.Count(addr) >= MaxWaitCycles {
          fmt.Printf("-- [%s] Election timeout - Maxed wait cycles, Leader REJECTED "+
            "update {Client: [%s], Index: %d, Committed: %d}\n",
            node.Addr, addr, node.CurrentIndex, node.CommittedIndex)

          ua := NewUpdateAck(false, addr, node.Addr)
          node.ClientC[addr] <-ua
          commitAckReset(node, addr)
        } else {
          node.CommitCycles.Increment(addr)
        }
      }
    }
  } else {
    if node.GrantedVote {
      nodeStateReset(false, node)
    }
  }
}

We will now demonstrate the Log Replication capability with the same 3 node cluster. We will start a client that sends updates to the Leader node.

The following is the output of node B, which has been elected as the Leader, receives updates from the client, replicates to the Followers, receives commit acknowledgement from the Followers, and finally sends an update acknowledgement to the client:

Output.6

++ [192.168.1.179:9002] Ready to start server on Raft node
++ [192.168.1.179:9002] Election timer duration 1263
++ [192.168.1.179:9002] Ready to accept remote connections
++ [192.168.1.179:9002] Election timer started @ 2020-08-22 21:48:52.642250222 -0400 EDT m=+0.001114231
++ [192.168.1.179:9002] Election timeout - {Type: Follower, IsLeaderOk: false, GrantedVote: false, LastHB: 2020-08-22 21:48:52.641940769 -0400 EDT m=+0.000804798}
++ [192.168.1.179:9002] Election timeout - Follower ready to start Leader election
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9001] - dial tcp 192.168.1.179:9001: connect: connection refused
++ [192.168.1.179:9002] GrantVote event - {Term: 0, Index: 0, Leader: []} => {Vote: true, Term: 1, Index: 0, From: [192.168.1.179:9003]}
++ [192.168.1.179:9002] GrantVote - Candidate {Votes: [YES: 2, NO: 0], Quorum: 2}
++ [192.168.1.179:9002] GrantVote - Candidate PROMOTED to Leader
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9001] - dial tcp 192.168.1.179:9001: connect: connection refused
++ [192.168.1.179:9002] Heartbeat timer duration 1000
++ [192.168.1.179:9002] Heartbeat timer - {Term: 1, Index: 0, Committed: 0}
++ [192.168.1.179:9002] Election timeout - {Type: Leader, IsLeaderOk: true, GrantedVote: false, LastHB: 2020-08-22 21:48:54.908250491 -0400 EDT m=+2.267114470}
...
...
...
++ [192.168.1.179:9002] ClientUpdate event from 192.168.1.179:40316
++ [192.168.1.179:9002] ClientUpdate event - {Addr: [192.168.1.179:40316], Entry: It does not matter how slowly you go as long as you do not stop, Time: 2020-08-22 21:49:42.008284085 -0400 EDT}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 1} => {Ack: true, Term: 1, Index: 1, Peer: [192.168.1.179:9001], Client: [192.168.1.179:40316]}
++ [192.168.1.179:9002] CommitAck - Leader ACCEPTED update {Client: [192.168.1.179:40316], Index: 1}
++ [192.168.1.179:9002] Client - UpdateAck event from [192.168.1.179:9002] {Ack: true, Addr: 192.168.1.179:40316}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 1} => {Ack: true, Term: 1, Index: 1, Peer: [192.168.1.179:9003], Client: [192.168.1.179:40316]}
...
...
...
++ [192.168.1.179:9002] ClientUpdate event - {Addr: [192.168.1.179:40316], Entry: The secret of getting ahead is getting started, Time: 2020-08-22 21:49:52.010354848 -0400 EDT}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 2} => {Ack: true, Term: 1, Index: 2, Peer: [192.168.1.179:9001], Client: [192.168.1.179:40316]}
++ [192.168.1.179:9002] CommitAck - Leader ACCEPTED update {Client: [192.168.1.179:40316], Index: 2}
++ [192.168.1.179:9002] Client - UpdateAck event from [192.168.1.179:9002] {Ack: true, Addr: 192.168.1.179:40316}
...
...
...
++ [192.168.1.179:9002] ClientUpdate event - {Addr: [192.168.1.179:40316], Entry: Be kind whenever possible. It is always possible, Time: 2020-08-22 21:50:02.012186897 -0400 EDT}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 3} => {Ack: true, Term: 1, Index: 3, Peer: [192.168.1.179:9001], Client: [192.168.1.179:40316]}
++ [192.168.1.179:9002] CommitAck - Leader ACCEPTED update {Client: [192.168.1.179:40316], Index: 3}
++ [192.168.1.179:9002] Client - UpdateAck event from [192.168.1.179:9002] {Ack: true, Addr: 192.168.1.179:40316}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 3} => {Ack: true, Term: 1, Index: 3, Peer: [192.168.1.179:9003], Client: [192.168.1.179:40316]}
...
...
...

The following is the output of node C that granted a vote for node B, receives replication messages from the Leader, updates its own Log, and finally acknowledges the replication commit back to the Leader:

Output.7

++ [192.168.1.179:9003] Ready to start server on Raft node
++ [192.168.1.179:9003] Election timer duration 1152
++ [192.168.1.179:9003] Ready to accept remote connections
++ [192.168.1.179:9003] Election timer started @ 2020-08-22 21:48:53.873848859 -0400 EDT m=+0.000953318
++ [192.168.1.179:9003] RequestVote event - {Term: 0, Index: 0, Leader: []} => {Term: 1, Index: 0, From: [192.168.1.179:9002]}
++ [192.168.1.179:9003] RequestVote - Follower will GrantVote to [192.168.1.179:9002]
++ [192.168.1.179:9003] Heartbeat event - {Term: 1, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 0, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 1, Client: [192.168.1.179:40316], Entry: It does not matter how slowly you go as long as you do not stop}
++ [192.168.1.179:9003] AppendEntries - UPDATE accepted {Index: 1} with {Entry: It does not matter how slowly you go as long as you do not stop}
++ [192.168.1.179:9003] Heartbeat event - {Term: 1, Index: 1, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 1, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 1, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 2, Client: [192.168.1.179:40316], Entry: The secret of getting ahead is getting started}
++ [192.168.1.179:9003] AppendEntries - UPDATE accepted {Index: 2} with {Entry: The secret of getting ahead is getting started}
++ [192.168.1.179:9003] Heartbeat event - {Term: 1, Index: 2, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 2, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 2, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 3, Client: [192.168.1.179:40316], Entry: Be kind whenever possible. It is always possible}
++ [192.168.1.179:9003] AppendEntries - UPDATE accepted {Index: 3} with {Entry: Be kind whenever possible. It is always possible}
++ [192.168.1.179:9003] Heartbeat event - {Term: 1, Index: 3, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 3, From: [192.168.1.179:9002]}
...
...
...

The following is the output of node A that receives replication messages from the Leader, updates its own Log, and finally acknowledges the replication commit back to the Leader:

Output.8

++ [192.168.1.179:9001] Ready to start server on Raft node
++ [192.168.1.179:9001] Election timer duration 1245
++ [192.168.1.179:9001] Ready to accept remote connections
++ [192.168.1.179:9001] Election timer started @ 2020-08-22 21:48:54.626840575 -0400 EDT m=+0.001222294
++ [192.168.1.179:9001] Heartbeat event - {Term: 0, Index: 0, Leader: []} => {Term: 1, Index: 0, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9001] AppendEntries event - {Term: 1, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 1, Client: [192.168.1.179:40316], Entry: It does not matter how slowly you go as long as you do not stop}
++ [192.168.1.179:9001] AppendEntries - UPDATE accepted {Index: 1} with {Entry: It does not matter how slowly you go as long as you do not stop}
++ [192.168.1.179:9001] Heartbeat event - {Term: 1, Index: 1, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 1, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9001] AppendEntries event - {Term: 1, Index: 1, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 2, Client: [192.168.1.179:40316], Entry: The secret of getting ahead is getting started}
++ [192.168.1.179:9001] AppendEntries - UPDATE accepted {Index: 2} with {Entry: The secret of getting ahead is getting started}
++ [192.168.1.179:9001] Heartbeat event - {Term: 1, Index: 2, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 2, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9001] AppendEntries event - {Term: 1, Index: 2, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 3, Client: [192.168.1.179:40316], Entry: Be kind whenever possible. It is always possible}
++ [192.168.1.179:9001] AppendEntries - UPDATE accepted {Index: 3} with {Entry: Be kind whenever possible. It is always possible}
++ [192.168.1.179:9001] Heartbeat event - {Term: 1, Index: 3, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 3, From: [192.168.1.179:9002]}
...
...
...

Let us now TERMINATE the Leader node C by pressing CTRL-C.

The following is the output of node B, which continues to be a Leader and processes updates from the client as there is still a quorum (2 nodes):

Output.9

...
...
...
++ [192.168.1.179:9002] ClientUpdate event - {Addr: [192.168.1.179:40316], Entry: It always seems impossible until it's done, Time: 2020-08-22 21:50:12.0141336 -0400 EDT}
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9003] - dial tcp 192.168.1.179:9003: connect: connection refused
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 4} => {Ack: true, Term: 1, Index: 4, Peer: [192.168.1.179:9001], Client: [192.168.1.179:40316]}
++ [192.168.1.179:9002] CommitAck - Leader ACCEPTED update {Client: [192.168.1.179:40316], Index: 4}
++ [192.168.1.179:9002] Client - UpdateAck event from [192.168.1.179:9002] {Ack: true, Addr: 192.168.1.179:40316}
++ [192.168.1.179:9002] Heartbeat timer - {Term: 1, Index: 4, Committed: 4}
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9003] - dial tcp 192.168.1.179:9003: connect: connection refused
++ [192.168.1.179:9002] Election timeout - {Type: Leader, IsLeaderOk: true, GrantedVote: false, LastHB: 2020-08-22 21:50:12.951802123 -0400 EDT m=+80.310666183}
++ [192.168.1.179:9002] Heartbeat timer - {Term: 1, Index: 4, Committed: 4}
-- [192.168.1.179:9002] Could not establish connection to peer [192.168.1.179:9003] - dial tcp 192.168.1.179:9003: connect: connection refused
...
...
...
++ [192.168.1.179:9002] ClientUpdate event - {Addr: [192.168.1.179:40316], Entry: If you're going through hell, keep going, Time: 2020-08-22 21:50:22.015607258 -0400 EDT}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 5} => {Ack: true, Term: 1, Index: 5, Peer: [192.168.1.179:9001], Client: [192.168.1.179:40316]}
++ [192.168.1.179:9002] CommitAck - Leader ACCEPTED update {Client: [192.168.1.179:40316], Index: 5}
++ [192.168.1.179:9002] Client - UpdateAck event from [192.168.1.179:9002] {Ack: true, Addr: 192.168.1.179:40316}
++ [192.168.1.179:9002] CommitAck event - {Term: 1, Index: 5} => {Ack: true, Term: 1, Index: 5, Peer: [192.168.1.179:9003], Client: [192.168.1.179:40316]}
++ [192.168.1.179:9002] Heartbeat timer - {Term: 1, Index: 5, Committed: 5}
++ [192.168.1.179:9002] Election timeout - {Type: Leader, IsLeaderOk: true, GrantedVote: false, LastHB: 2020-08-22 21:50:22.957223561 -0400 EDT m=+90.316087591}
...
...
...

The following is the output of node A, which continues to process replication messages from the Leader:

Output.10

...
...
...
++ [192.168.1.179:9001] AppendEntries event - {Term: 1, Index: 3, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 4, Client: [192.168.1.179:40316], Entry: It always seems impossible until it's done}
++ [192.168.1.179:9001] AppendEntries - UPDATE accepted {Index: 4} with {Entry: It always seems impossible until it's done}
++ [192.168.1.179:9001] Heartbeat event - {Term: 1, Index: 4, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 4, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9001] AppendEntries event - {Term: 1, Index: 4, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 5, Client: [192.168.1.179:40316], Entry: If you're going through hell, keep going}
++ [192.168.1.179:9001] AppendEntries - UPDATE accepted {Index: 5} with {Entry: If you're going through hell, keep going}
++ [192.168.1.179:9001] Heartbeat event - {Term: 1, Index: 5, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 5, From: [192.168.1.179:9002]}
...
...
...

Now, let us bring BACK node C by restarting it.

The following is the output of node C, which synchronizes with the Leader on the old missed updates and catches up:

Output.11

++ [192.168.1.179:9003] Ready to start server on Raft node
++ [192.168.1.179:9003] Election timer duration 1212
++ [192.168.1.179:9003] Ready to accept remote connections
++ [192.168.1.179:9003] Election timer started @ 2020-08-22 21:50:17.482606286 -0400 EDT m=+0.000887964
++ [192.168.1.179:9003] Heartbeat event - {Term: 0, Index: 0, Leader: []} => {Term: 1, Index: 4, From: [192.168.1.179:9002]}
-- [192.168.1.179:9003] Heartbeat timeout - Node index BEHIND from [192.168.1.179:9002]
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 0, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 1, Client: [192.168.1.179:9002], Entry: It does not matter how slowly you go as long as you do not stop}
++ [192.168.1.179:9003] AppendEntries - SYNC accepted {Index: 1} with {Entry: It does not matter how slowly you go as long as you do not stop}
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 1, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 2, Client: [192.168.1.179:9002], Entry: The secret of getting ahead is getting started}
++ [192.168.1.179:9003] AppendEntries - SYNC accepted {Index: 2} with {Entry: The secret of getting ahead is getting started}
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 2, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 3, Client: [192.168.1.179:9002], Entry: Be kind whenever possible. It is always possible}
++ [192.168.1.179:9003] AppendEntries - SYNC accepted {Index: 3} with {Entry: Be kind whenever possible. It is always possible}
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 3, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 4, Client: [192.168.1.179:9002], Entry: It always seems impossible until it's done}
++ [192.168.1.179:9003] AppendEntries - SYNC accepted {Index: 4} with {Entry: It always seems impossible until it's done}
++ [192.168.1.179:9003] Heartbeat event - {Term: 1, Index: 4, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 4, From: [192.168.1.179:9002]}
...
...
...
++ [192.168.1.179:9003] AppendEntries event - {Term: 1, Index: 4, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 5, Client: [192.168.1.179:40316], Entry: If you're going through hell, keep going}
++ [192.168.1.179:9003] AppendEntries - UPDATE accepted {Index: 5} with {Entry: If you're going through hell, keep going}
++ [192.168.1.179:9003] Heartbeat event - {Term: 1, Index: 5, Leader: [192.168.1.179:9002]} => {Term: 1, Index: 5, From: [192.168.1.179:9002]}
...
...
...

Now, let us dive into the final aspect Safety measures of the Raft Consensus algorithm.

Safety

The Raft Consensus algorithm guarantees consistency at all times if the following set of properties are *TRUE*:

This concludes our practical hands-on approach to unravelling the Raft Consensus algorithm.

!!! DISCLAIMER !!!

The code in this article is a simple implementation of Raft Consensus to understand the inner workings of the algorithm. It is by NO means EXHAUSTIVELY and THROUGHLY tested. It is PURELY for learning purposes.

References

The Raft Consensus Algorithm

The Raft Consensus Paper

Understanding Distributed Consensus with Raft

GitHub - SimpleRaft Implementation



© PolarSPARC