SoFunction
Updated on 2025-03-03

Raft log synchronization using Go language

In raft, the cluster can work normally after successful selection. The normal client proposal process is as follows:

  • The client connects to the leader and initiates a proposal
  • After receiving the proposal, the leader packages the proposal into a log
  • The leader caches the log to the log to the to-commit
  • Leader sends logs to other nodes of the cluster
  • After receiving the log, follower caches the log to the log to the to-be-submitted log and responds to the leader request
  • The leader receives a follower response, checking whether most nodes in the cluster have responded
  • After most nodes in the cluster have cached the log, the leader submits the log and sends an empty log and requires the follower to submit the log.
  • The leader responds to the client proposal received

The log rules in raft are as follows:

1. If the two log records in different node logs have the same term and number, then the two log records have the same content

  • Leader creates at most one given number log record in a single term
  • The leader will not change the location of the log record (the leader will not overwrite or delete log records)

2. If the two log records in different node logs have the same term and number, then all logs before these two logs are the same

The leader sends log records to the follower and will include the last log record number and term

When the follower receives the request for appending the log, it will perform a consistency check. If the check fails, the follower will refuse the appending request, ensuring that the log is consistent with the leader. When the consistency check fails, the leader will force the follower to accept the leader log, thereby ensuring log consistency. There are two types of failures in consistency checks.

There is a missing log. After receiving the failure response, the leader will reduce the sending log number by one (the current latest log number can also be returned when the follower fails to check). The missing log of the follower is sent

When the node is offline for a period of time, the network exception message will be lost, the log will be missing.

There are redundant/inconsistent logs. After receiving the failure response, the leader will reduce the sending log number by one (the follower can also return the currently last submitted log number when the follower fails to check). Find the latest consistent record number in the two logs, and delete the inconsistent part of the follower.

The node's selected leader receives a message that only appends to the local area and has not been synchronized to the cluster, and it will be offline abnormally. In the future, there will be inconsistent logs in memory, and there will be no inconsistency in the submitted logs.

The synchronization of logs between nodes is completed in two steps.

Copy the logs to other nodes via rpc

2. When the leader confirms that the log has been copied to most nodes in the cluster, the log will be persisted to disk. The leader tracks the maximum number of the submitted logs and sends this number through the log rpc (including heartbeat) to inform the follower that the log needs to be submitted.

The current number and the uncommitted logs before the log are persisted when the log is submitted.

Each log contains the actual proposal content, leader term, and log number of the client, and defines the log record as the following structure:

  • type log type, currently there is only one type of log, client proposal
  • The term generated by the term log, the same number and term represent the same log
  • index log number, monotonously increased
  • The actual content of the data proposal
enum EntryType {
  NORMAL = 0;
}
message LogEntry {
  EntryType type = 1;
  uint64 term = 2;
  uint64 index = 3;
  bytes data = 4;
}

Define the overall structure of the log

Log not submitted, saved in memory slice

When the log is submitted, take out the section to be submitted and persist.

type WaitApply struct {
    done  bool
    index uint64
    ch    chan struct{}
}
type RaftLog struct {
    logEnties        []* // No logs submitted    storage          Storage        // Submitted log storage    commitIndex      uint64         // Submit progress    lastAppliedIndex uint64         // Last submission log    lastAppliedTerm  uint64         // Last submission of log term    lastAppendIndex  uint64         // Last log    logger           *
}

Define the log persistence interface, and the actual storage implementation is provided by the external

type Storage interface {
    Append(entries []*)
    GetEntries(startIndex, endIndex uint64) []*
    GetTerm(index uint64) uint64
    GetLastLogIndexAndTerm() (uint64, uint64)
    Close()
}

Implement consistency checks

The persisted log must be consistent with the leader. When checking consistently, you only need to check the log slice in memory. There are several situations:

1. The last log of the leader was found in the node log

I. The last log to append to the node

II. It is a log in the node memory slice

  • The node network fluctuates, resulting in the leader not responding to the leader. The leader resends the record and clears the duplicate log record.
  • During the node as leader, some logs are invalid if they are not synchronized to other nodes. The cluster is re-elected, resulting in inconsistent subsequent logs, and the conflict logs are cleared (sequential logs in memory)

III. Last submission of log for node

If there is log records in memory, all records in memory are inconsistent. Clear the memory log records

2. The node did not find the last log added to the leader

I. There are the same log number record, no matter how different

During the period when the node is the leader, some logs are invalid if they are not synchronized to other nodes. The cluster is re-elected, resulting in the use of the same log number, clearing the conflict log (log of the same term), and re-issuing from the unconflicted part of the node.

II.No same log number record

The log is missing, and it needs to be reposted from the last commit

func (l *RaftLog) HasPrevLog(lastIndex, lastTerm uint64) bool {
    if lastIndex == 0 {
        return true
    }
    var term uint64
    size := len()
    if size > 0 {
        lastlog := [size-1]
        if  == lastIndex {
            term = 
        } else if  > lastIndex {
            // Check the final submission            if lastIndex ==  { // The submitted logs must be consistent                 = [:0]
                return true
            } else if lastIndex >  {
                // Check the log not submitted                for i, entry := range [:size] {
                    if  == lastIndex {
                        term = 
                        // Clean up the log after the last append of the leader                        // The network exception does not receive a response, causing the leader to resend the log/leader reselect to invalidate the old leader's unsynchronized data                         = [:i+1]
                        break
                    }
                }
            }
        }
    } else if lastIndex ==  {
        return true
    }
    b := term == lastTerm
    if !b {
        ("Latest log: %d, term: %d, local record term: %d", lastIndex, lastTerm, term)
        if term != 0 { // When the log is inconsistent with the leader, delete inconsistent data in memory and term log records.            for i, entry := range  {
                if  == term {
                     = [:i]
                    break
                }
            }
        }
    }
    return b
}

Implement log appends, add new logs to memory slices, update the last append log number

func (l *RaftLog) AppendEntry(entry []*) {
​​​​​​​    size := len(entry)
    if size == 0 {
        return
    }
     = append(, entry...)
     = entry[size-1].Index
}

Implement log submission

  • Follower may not synchronize all logs. If the node log has been synchronized with all the waiting logs, the waiting logs will be submitted, otherwise the submit index will be appended to the logs.
  • Take out the part to be submitted in the log, add it to the persistent storage, update the submission progress and memory slice
func (l *RaftLog) Apply(lastCommit, lastLogIndex uint64) {
	// Update submission index	if lastCommit >  {
		if lastLogIndex > lastCommit {
			 = lastCommit
		} else {
			 = lastLogIndex
		}
	}
	// Submit index	if  >  {
		n := 0
		for i, entry := range  {
			if  >=  {
				n = i
			} else {
				break
			}
		}
		entries := [:n+1]
		(entries)
		 = [n].Index
		 = [n].Term
		 = [n+1:]
        ()
	}
}

Define a new function and provide storage implementation when creating an instance

func NewRaftLog(storage Storage, logger *) *RaftLog {
	lastIndex, lastTerm := ()
	return &RaftLog{
		logEnties:        make([]*, 0),
		storage:          storage,
		commitIndex:      lastIndex,
		lastAppliedIndex: lastIndex,
		lastAppliedTerm:  lastTerm,
		lastAppendIndex:  lastIndex,
		logger:           logger,
	}
}

Implement the consistency check, append, and submit of logs. Next, we implement the log processing logic in raft. First, we need to save the log synchronization progress of other nodes in the cluster in the leader node.

The node resets the progress when switching to leader

  • The latest log information of the node will be returned in the voting response
  • If no voting response is received, use the latest log of the leader and update dynamically after consistency check.

During cluster use, confirm that the network is available through the first message. If the network is normal, the message will be sent successfully. Do not wait for the node to respond to the message until the synchronization failure occurs.

  • prevResp records the last sent result, initially flase
  • logging the unsent log number in pending
  • When sending a message, if !prevResp && len(pending) is true, it means that the last sending was not completed and subsequent information is delayed.
  • After a message is successfully sent, prevResp is marked as true, and subsequent logs to be sent directly
type ReplicaProgress struct {
    MatchIndex         uint64            // Logs received    NextIndex          uint64            // Send the log next time    pending            []uint64          // Complete log not sent    prevResp           bool              // The last log send result    maybeLostIndex     uint64            // The logs that may be lost, record the last time it was not sent to resend}

The leader appends the log record to the local area and then broadcasts it to the cluster

func (r *Raft) BroadcastAppendEntries() {
	(func(id uint64, _ *ReplicaProgress) {
		if id ==  {
			return
		}
		(id)
	})
}
func (r *Raft) SendAppendEntries(to uint64) {
	p := [to]
	if p == nil || () {
		return
	}
	nextIndex := (to)
	lastLogIndex := nextIndex - 1
	lastLogTerm := (lastLogIndex)
	maxSize := MAX_LOG_ENTRY_SEND
	if ! {
		maxSize = 1
	}
	// var entries []*
	entries := (nextIndex, maxSize)
	size := len(entries)
	if size > 0  {
		(to, entries[size-1].Index)
	}
	(&{
		MsgType:      pb.MessageType_APPEND_ENTRY,
		Term:         ,
		From:         ,
		To:           to,
		LastLogIndex: lastLogIndex,
		LastLogTerm:  lastLogTerm,
		LastCommit:   ,
		Entry:        entries,
	})
}
  • Get the latest log number from the log, traverse the log to be added, and set the log number
  • Append logs to memory slices
  • Update the leader's append progress
  • Broadcast log to cluster
func (r *Raft) AppendEntry(entries []*) {
	lastLogIndex, _ := ()
	for i, entry := range entries {
		 = lastLogIndex + 1 + uint64(i)
		 = 
	}
	(entries)
	(, entries[len(entries)-1].Index)
	()
}
func (c *Cluster) UpdateLogIndex(id uint64, lastIndex uint64) {
	p := [id]
	if p != nil {
		 = lastIndex
		 = lastIndex + 1
	}
}

The broadcast log is consistent with the previous broadcast heartbeat. The traversal cluster information is sent to each node. The sending process is as follows

Check the sending status. If the last sending is not completed, the sending will be temporarily suspended.

func (rp *ReplicaProgress) IsPause() bool {
    return (! && len() > 0)
}

Obtain the current log number to be sent from the node synchronization progress

func (c *Cluster) GetNextIndex(id uint64) uint64 {
    p := [id]
    if p != nil {
        return 
    }
    return 0
}

Get the log to be sent from the leader's log

func (l *RaftLog) GetEntries(index uint64, maxSize int) []* {
    // The request log has been submitted and retrieved from the storage    if index <=  {
        endIndex := index + MAX_APPEND_ENTRY_SIZE
        if endIndex >=  {
            endIndex =  + 1
        }
        return (index, endIndex)
    } else { // The request log is not submitted, get from the array        var entries []*
        for i, entry := range  {
            if  == index {
                if len()-i > maxSize {
                    entries = [i : i+maxSize]
                } else {
                    entries = [i:]
                }
                break
            }
        }
        return entries
    }
}

Update the node's sending progress, add one to the node's log number to be sent, and add the sent log number to the unsent slice

When the last time the sending was successful, it is assumed that this time will be successful. If the sending fails, the sending progress will be reversed.

func (c *Cluster) AppendEntry(id uint64, lastIndex uint64) {
	p := [id]
	if p != nil {
		(lastIndex)
	}
}
func (rp *ReplicaProgress) AppendEntry(lastIndex uint64) {
	 = append(, lastIndex)
	if  {
		 = lastIndex + 1
	}
}

After the log is sent, the follower receives the log for processing.

Consistency check

  • The check is successful, the log is appended to the follower memory, and the tag is appended successfully
  • The check failed, the conflict log has been processed in the consistency check, and the append failed to be marked directly.

Try to submit the log. Each log message will contain the leader submission progress. According to the leader submission progress, submit the follower log.

Response to the leader's additional result

func (r *Raft) ReciveAppendEntries(mLeader, mTerm, mLastLogTerm, mLastLogIndex, mLastCommit uint64, mEntries []*) {
	var accept bool
	if !(mLastLogIndex, mLastLogTerm) { // Check whether the node log is consistent with the leader		("The node does not contain the last appended log: Index: %d, Term: %d ", mLastLogIndex, mLastLogTerm)
		accept = false
	} else {
		(mEntries)
		accept = true
	}
	lastLogIndex, lastLogTerm := ()
	(mLastCommit, lastLogIndex)
	(&{
		MsgType:      pb.MessageType_APPEND_ENTRY_RESP,
		Term:         ,
		From:         ,
		To:           mLeader,
		LastLogIndex: lastLogIndex,
		LastLogTerm:  lastLogTerm,
		Success:      accept,
	})
}

The leader handles follower log append response, and the response is divided into log append success and log append failure

func (r *Raft) ReciveAppendEntriesResult(from, term, lastLogIndex uint64, success bool) {
    leaderLastLogIndex, _ := ()
    if success {
        (from, lastLogIndex)
        if lastLogIndex >  {
            // Get the synchronized index to update to lastcommit            if (lastLogIndex) {
                prevApplied := 
                (lastLogIndex, lastLogIndex)
                ()
            }
        } else if len() > 0 {
            ()
        }
        if (from) <= leaderLastLogIndex {
            (from)
        }
    } else {
        ("Node %s failed to append log, Leader records the latest node log: %d, and the latest node log: %d ", (from, 16), (from)-1, lastLogIndex)
​​​​​​​        (from, lastLogIndex, leaderLastLogIndex)
        (from)
    }
}

When log appends successfully

Update synchronization progress, update node received progress, clear sent part from the unsent slice, mark the last sending successful

func (c *Cluster) AppendEntryResp(id uint64, lastIndex uint64) {
    p := [id]
    if p != nil {
        (lastIndex)
    }
}
func (rp *ReplicaProgress) AppendEntryResp(lastIndex uint64) {
    if  < lastIndex {
         = lastIndex
    }
    idx := -1
    for i, v := range  {
        if v == lastIndex {
            idx = i
        }
    }
    // Mark the previous log sending is successful, update the next time it is sent    if ! {
         = true
         = lastIndex + 1
    }
    if idx > -1 {
        // Send before clear         = [idx+1:]
    }
}

Check the progress of follower data synchronization to determine whether the corresponding log number of the response is synchronized in the cluster.

func (c *Cluster) CheckCommit(index uint64) bool {
    // Submission is allowed only if the cluster reaches a majority consensus    incomingLogged := 0
    for id := range  {
        if index <= [id].MatchIndex {
            incomingLogged++
        }
    }
    incomingCommit := incomingLogged >= len()/2+1
    return incomingCommit
}

When the cluster reaches a majority consensus, submit the log and continue broadcasting the log

Continue to send logs when the log number to be sent by the follower is less than the latest log of the leader

When log append failed

Reset the log synchronization progress according to the log progress responded by the follower, marking the last sending failure to delay sending logs with the start log number inconsistent with the follower until the log is correctly added.

func (c *Cluster) ResetLogIndex(id uint64, lastIndex uint64, leaderLastIndex uint64) {
    p := [id]
    if p != nil {
        (lastIndex, leaderLastIndex)
    }
}
func (rp *ReplicaProgress) ResetLogIndex(lastLogIndex uint64, leaderLastLogIndex uint64) {
    // The last log of the node is smaller than the latest log of the leader, and the update progress is based on the node, otherwise the update progress is based on the leader    if lastLogIndex < leaderLastLogIndex {
         = lastLogIndex + 1
         = lastLogIndex
    } else {
         = leaderLastLogIndex + 1
         = leaderLastLogIndex
    }
    if  {
         = false
         = nil
    }
}

Resend logs according to the update progress synchronous progress

Modify raft new function and add storage interface to the parameters

func NewRaft(id uint64, storage Storage, peers map[uint64]string, logger *) *Raft {
    raftlog := NewRaftLog(storage, logger)
    ...
}

The basic implementation of raft log synchronization logic is implemented. Next, the proposal method in raftNode is implemented to append the log. In the raftNode main loop, the recv channel has been read and the raft message processing method is called. When it is a leader, the proposal will be appended to the log. At present, only the proposal message needs to be added to the recv channel.

The current leader treats the proposal as a successful write after adding it to the read and write channel. The client responds to the majority consensus of the cluster.

To implement most response notifications, you can add a new structure including RaftMessage and a channel. Add a waiting queue in raftlog. When raft processes append messages, the log number of the last log record is returned to the proposal method through the channel. The proposal method then puts the channel into the raftlog waiting queue, submits the log to check the waiting queue to be notified, and specifies the log number that has been submitted through the channel notification proposal method.

func (n *RaftNode) Propose(ctx , entries []*) error {
    msg := &{
        MsgType: pb.MessageType_PROPOSE,
        Term:    ,
        Entry:   entries,
    }
    return (ctx, msg)
}

Modify raftNode to create a new function to add a storage interface, and the storage implementation is implemented in the next lsm

func NewRaftNode(id uint64, storage Storage, peers map[uint64]string, logger *) *RaftNode {
	node := &RaftNode{
		raft:       NewRaft(id, storage, peers, logger),
		...
	}
    ...
}

Modify the batch sending method in raft server, merge multiple log records into a raft message for sending

func (p *Peer) SendBatch(msgs []*) {
	(1)
	var appEntryMsg *
	var propMsg *
	for _, msg := range msgs {
		if  == pb.MessageType_APPEND_ENTRY {
			if appEntryMsg == nil {
				appEntryMsg = msg
			} else {
				size := len()
				if size == 0 || len() == 0 || [size-1].Index+1 == [0].Index {
					 = 
					 = append(, ...)
				} else if [0].Index >= [0].Index {
					appEntryMsg = msg
				}
			}
		} else if  == pb.MessageType_PROPOSE {
			if propMsg == nil {
				propMsg = msg
			} else {
				 = append(, ...)
			}
		} else {
			(msg)
		}
	}
	if appEntryMsg != nil {
		(appEntryMsg)
	}
	if propMsg != nil {
		(propMsg)
	}
	()
}

Through the above code, the process of proposal to the leader, the leader is wrapped as a log, and synchronized to the cluster. The subsequent log is implemented through lsm and the raft server is used as a simple kv database.

Complete code

refer to:/etcd-io/etcd

This is the end of this article about using Go language to implement Raft log synchronization. For more related Go Raft log synchronization content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!