Distributed Consensus Algorithms: The Backbone of Reliable Distributed Systems

14 min read 2846 words

Table of Contents

In distributed systems, one of the most fundamental challenges is achieving agreement among multiple nodes that may fail independently or experience network issues. This challenge—known as the consensus problem—lies at the heart of building reliable distributed systems. From distributed databases to blockchain networks, consensus algorithms provide the critical foundation that enables these systems to function correctly despite failures and network partitions.

This article explores the most important distributed consensus algorithms, their implementations, trade-offs, and practical applications. Whether you’re designing a new distributed system or seeking to understand how existing systems work under the hood, understanding these algorithms is essential.


The Consensus Problem

Before diving into specific algorithms, let’s clearly define the consensus problem. In a distributed system with multiple nodes, consensus is the process of agreeing on a single value or state among all non-faulty nodes. A correct consensus algorithm must satisfy the following properties:

  1. Agreement: All non-faulty nodes decide on the same value.
  2. Validity: If all nodes propose the same value, then all non-faulty nodes decide on that value.
  3. Termination: All non-faulty nodes eventually decide on some value.

The challenge is achieving these properties in the presence of:

  • Node failures (crash failures or Byzantine failures)
  • Network partitions and message delays
  • Asynchronous communication

The FLP impossibility result (named after Fischer, Lynch, and Paterson) proved that in an asynchronous system where even one node might fail, no deterministic algorithm can guarantee consensus. However, by relaxing some assumptions or adding timing constraints, practical consensus algorithms can be developed.


Paxos: The Foundation of Modern Consensus

Paxos, introduced by Leslie Lamport in 1989, is the foundation upon which many modern consensus algorithms are built. Despite its theoretical elegance, Paxos is notoriously difficult to understand and implement correctly.

How Paxos Works

Paxos operates in two phases:

Phase 1: Prepare

  1. A proposer selects a proposal number n and sends a prepare(n) message to a majority of acceptors.
  2. Each acceptor promises not to accept proposals numbered less than n and returns the highest-numbered proposal (if any) that it has accepted.

Phase 2: Accept

  1. If the proposer receives responses from a majority of acceptors, it sends an accept(n, v) message to a majority of acceptors, where v is the value of the highest-numbered proposal among the responses, or any value if no previous proposals were returned.
  2. An acceptor accepts the proposal unless it has already responded to a prepare request with a number greater than n.

Multi-Paxos Optimization

Basic Paxos (also called Single-Decree Paxos) reaches consensus on a single value. In practice, systems need to agree on a sequence of values, which is where Multi-Paxos comes in:

  1. Elect a stable leader (proposer) that runs multiple instances of Paxos.
  2. The leader can skip Phase 1 for subsequent values after the first successful round.

Paxos Implementation Example

Here’s a simplified implementation of the Paxos acceptor role:

class PaxosAcceptor:
    def __init__(self):
        self.promised_id = None
        self.accepted_id = None
        self.accepted_value = None
    
    def prepare(self, proposal_id):
        if self.promised_id is None or proposal_id > self.promised_id:
            self.promised_id = proposal_id
            return True, self.accepted_id, self.accepted_value
        else:
            return False, None, None
    
    def accept(self, proposal_id, value):
        if self.promised_id is None or proposal_id >= self.promised_id:
            self.promised_id = proposal_id
            self.accepted_id = proposal_id
            self.accepted_value = value
            return True
        else:
            return False

And the proposer role:

class PaxosProposer:
    def __init__(self, acceptors, proposal_id, proposed_value):
        self.acceptors = acceptors
        self.proposal_id = proposal_id
        self.proposed_value = proposed_value
        
    def propose(self):
        # Phase 1: Prepare
        prepare_count = 0
        highest_accepted_id = None
        highest_accepted_value = None
        
        for acceptor in self.acceptors:
            success, accepted_id, accepted_value = acceptor.prepare(self.proposal_id)
            if success:
                prepare_count += 1
                if accepted_id is not None and (highest_accepted_id is None or accepted_id > highest_accepted_id):
                    highest_accepted_id = accepted_id
                    highest_accepted_value = accepted_value
        
        # Check if we have majority
        if prepare_count <= len(self.acceptors) // 2:
            return False, None
        
        # Phase 2: Accept
        # Use the highest accepted value if there is one, otherwise use our proposed value
        value_to_propose = highest_accepted_value if highest_accepted_value is not None else self.proposed_value
        
        accept_count = 0
        for acceptor in self.acceptors:
            if acceptor.accept(self.proposal_id, value_to_propose):
                accept_count += 1
        
        # Check if we have majority
        if accept_count <= len(self.acceptors) // 2:
            return False, None
        
        return True, value_to_propose

When to Use Paxos

Paxos is suitable when:

  • You need a proven, battle-tested consensus algorithm
  • You’re building a system that requires strong consistency guarantees
  • You can tolerate the complexity of implementation

Challenges with Paxos

  • Difficult to understand and implement correctly
  • Requires careful handling of edge cases
  • Performance can degrade under contention
  • Multi-Paxos leader election is not part of the core algorithm

Raft: Consensus for Humans

Raft was designed by Diego Ongaro and John Ousterhout as an alternative to Paxos, with a focus on understandability and practical implementation. It has gained significant adoption in systems like etcd, Consul, and InfluxDB.

How Raft Works

Raft divides the consensus problem into three subproblems:

1. Leader Election

  • Servers start as followers
  • If followers don’t hear from a leader, they become candidates
  • Candidates request votes from other servers
  • A candidate becomes leader if it receives votes from a majority
  • Leaders send periodic heartbeats to maintain authority

2. Log Replication

  • Clients send commands to the leader
  • The leader appends the command to its log
  • The leader replicates the log entry to followers
  • Once safely replicated, the leader commits the entry
  • The leader notifies followers of committed entries

3. Safety

  • Election Safety: At most one leader per term
  • Leader Append-Only: Leaders never overwrite or delete entries
  • Log Matching: If two logs contain an entry with the same index and term, all previous entries are identical
  • Leader Completeness: If an entry is committed, it will be present in the logs of all future leaders
  • State Machine Safety: If a server applies an entry to its state machine, no other server will apply a different entry for the same log index

Raft Implementation Example

Here’s a simplified implementation of Raft’s core components:

// Server states
const (
    Follower = iota
    Candidate
    Leader
)

type RaftServer struct {
    // Server identity
    id int
    state int
    
    // Persistent state
    currentTerm int
    votedFor int
    log []LogEntry
    
    // Volatile state
    commitIndex int
    lastApplied int
    
    // Leader state
    nextIndex map[int]int
    matchIndex map[int]int
    
    // Channels for communication
    electionTimer *time.Timer
    heartbeatTimer *time.Timer
    
    // Network connections to other servers
    peers []RaftPeer
}

type LogEntry struct {
    Term int
    Command interface{}
}

func (s *RaftServer) Start() {
    s.state = Follower
    s.resetElectionTimer()
    
    for {
        switch s.state {
        case Follower:
            s.runFollower()
        case Candidate:
            s.runCandidate()
        case Leader:
            s.runLeader()
        }
    }
}

func (s *RaftServer) runFollower() {
    select {
    case <-s.electionTimer.C:
        s.state = Candidate
    case rpc := <-s.rpcCh:
        s.handleRPC(rpc)
        s.resetElectionTimer()
    }
}

func (s *RaftServer) runCandidate() {
    s.currentTerm++
    s.votedFor = s.id
    votesReceived := 1  // Vote for self
    
    // Request votes from all peers
    for _, peer := range s.peers {
        go func(p RaftPeer) {
            args := RequestVoteArgs{
                Term:         s.currentTerm,
                CandidateId:  s.id,
                LastLogIndex: len(s.log) - 1,
                LastLogTerm:  s.getLastLogTerm(),
            }
            
            response := p.RequestVote(args)
            
            if response.VoteGranted {
                votesReceived++
                if votesReceived > len(s.peers)/2 {
                    s.state = Leader
                    s.initLeaderState()
                }
            } else if response.Term > s.currentTerm {
                s.currentTerm = response.Term
                s.state = Follower
                s.votedFor = -1
            }
        }(peer)
    }
    
    s.resetElectionTimer()
}

func (s *RaftServer) runLeader() {
    s.sendHeartbeats()
    s.heartbeatTimer = time.NewTimer(100 * time.Millisecond)
    
    select {
    case <-s.heartbeatTimer.C:
        // Time to send heartbeats again
    case rpc := <-s.rpcCh:
        s.handleRPC(rpc)
    case cmd := <-s.proposeCh:
        // Append to local log
        s.log = append(s.log, LogEntry{Term: s.currentTerm, Command: cmd})
        // Replicate to followers (simplified)
        s.replicateLogs()
    }
}

func (s *RaftServer) handleRequestVote(args RequestVoteArgs) RequestVoteResponse {
    if args.Term < s.currentTerm {
        return RequestVoteResponse{Term: s.currentTerm, VoteGranted: false}
    }
    
    if args.Term > s.currentTerm {
        s.currentTerm = args.Term
        s.state = Follower
        s.votedFor = -1
    }
    
    if (s.votedFor == -1 || s.votedFor == args.CandidateId) && 
       s.isLogUpToDate(args.LastLogIndex, args.LastLogTerm) {
        s.votedFor = args.CandidateId
        return RequestVoteResponse{Term: s.currentTerm, VoteGranted: true}
    }
    
    return RequestVoteResponse{Term: s.currentTerm, VoteGranted: false}
}

func (s *RaftServer) handleAppendEntries(args AppendEntriesArgs) AppendEntriesResponse {
    if args.Term < s.currentTerm {
        return AppendEntriesResponse{Term: s.currentTerm, Success: false}
    }
    
    // Valid leader, reset election timer
    s.resetElectionTimer()
    
    if args.Term > s.currentTerm {
        s.currentTerm = args.Term
        s.state = Follower
        s.votedFor = -1
    }
    
    // Check if log contains an entry at prevLogIndex with prevLogTerm
    if args.PrevLogIndex >= len(s.log) || 
       (args.PrevLogIndex >= 0 && s.log[args.PrevLogIndex].Term != args.PrevLogTerm) {
        return AppendEntriesResponse{Term: s.currentTerm, Success: false}
    }
    
    // Process log entries
    for i, entry := range args.Entries {
        index := args.PrevLogIndex + 1 + i
        
        if index < len(s.log) {
            if s.log[index].Term != entry.Term {
                // Delete conflicting entries and all that follow
                s.log = s.log[:index]
                // Append new entries
                s.log = append(s.log, entry)
            }
            // Else entry already exists, do nothing
        } else {
            // Append new entries
            s.log = append(s.log, entry)
        }
    }
    
    // Update commit index
    if args.LeaderCommit > s.commitIndex {
        s.commitIndex = min(args.LeaderCommit, len(s.log)-1)
        s.applyCommittedEntries()
    }
    
    return AppendEntriesResponse{Term: s.currentTerm, Success: true}
}

When to Use Raft

Raft is ideal when:

  • You need a consensus algorithm that’s easier to understand and implement
  • You’re building a system where leader-based operation is acceptable
  • You want strong consistency with reasonable performance
  • You need a well-documented algorithm with many existing implementations

Challenges with Raft

  • Leader-based design can be a bottleneck
  • Leader changes require at least one round-trip time
  • Membership changes require careful handling

ZAB: ZooKeeper Atomic Broadcast

ZAB (ZooKeeper Atomic Broadcast) is the consensus protocol used by Apache ZooKeeper, a widely-used coordination service for distributed systems. While less general-purpose than Paxos or Raft, ZAB is optimized for the specific requirements of ZooKeeper.

How ZAB Works

ZAB operates in three phases:

1. Leader Election

  • Servers elect a leader using a fast leader election algorithm
  • The elected leader must have the most up-to-date transaction history

2. Discovery

  • The new leader synchronizes its state with followers
  • Followers acknowledge the leader and confirm synchronization

3. Broadcast

  • The leader receives client requests and creates proposals
  • Each proposal is assigned a monotonically increasing identifier (zxid)
  • Proposals are broadcast to followers
  • When a majority of followers acknowledge a proposal, the leader commits it
  • The commit decision is broadcast to followers

ZAB Implementation Example

Here’s a simplified implementation of ZAB’s core components:

public class ZabServer {
    enum ServerState {
        LOOKING, FOLLOWING, LEADING
    }
    
    private ServerState state;
    private long currentEpoch;
    private long lastZxid;
    private List<Transaction> history;
    private Map<Long, Transaction> pendingTransactions;
    private Set<ZabServer> quorum;
    
    // Leader election
    public void startLeaderElection() {
        state = ServerState.LOOKING;
        
        // Send out vote with last zxid
        Vote myVote = new Vote(getServerId(), currentEpoch, lastZxid);
        broadcastVote(myVote);
        
        // Collect votes and determine winner
        // (Simplified - actual implementation uses multiple rounds)
        Map<Integer, Vote> receivedVotes = collectVotes();
        Vote electedLeader = selectLeader(receivedVotes);
        
        if (electedLeader.getServerId() == getServerId()) {
            becomeLeader();
        } else {
            becomeFollower(electedLeader.getServerId());
        }
    }
    
    // Leader functionality
    private void becomeLeader() {
        state = ServerState.LEADING;
        currentEpoch++;
        
        // Discovery phase
        List<FollowerInfo> followers = collectFollowerInfo();
        long newEpochZxid = createNewEpochZxid();
        
        // Find highest zxid among followers
        long highestZxid = lastZxid;
        for (FollowerInfo info : followers) {
            if (info.getLastZxid() > highestZxid) {
                highestZxid = info.getLastZxid();
            }
        }
        
        // Synchronize followers
        for (FollowerInfo follower : followers) {
            synchronizeFollower(follower, highestZxid);
        }
        
        // Start processing client requests
        startProcessingRequests();
    }
    
    // Process a client request as leader
    public void processRequest(Request request) {
        // Create transaction
        Transaction txn = new Transaction(
            createZxid(),
            request.getOperation(),
            request.getData()
        );
        
        // Store in pending transactions
        pendingTransactions.put(txn.getZxid(), txn);
        
        // Broadcast to followers
        List<ZabServer> acks = broadcastTransaction(txn);
        
        // If majority acknowledge, commit
        if (acks.size() > quorum.size() / 2) {
            commit(txn);
            broadcastCommit(txn);
        }
    }
    
    // Follower functionality
    private void becomeFollower(int leaderId) {
        state = ServerState.FOLLOWING;
        
        // Connect to leader
        connectToLeader(leaderId);
        
        // Send follower info
        sendFollowerInfo(lastZxid);
        
        // Process messages from leader
        while (state == ServerState.FOLLOWING) {
            Message message = receiveFromLeader();
            
            if (message instanceof Transaction) {
                Transaction txn = (Transaction) message;
                
                // Validate transaction
                if (isValidTransaction(txn)) {
                    // Log transaction
                    logTransaction(txn);
                    
                    // Acknowledge
                    sendAck(txn.getZxid());
                }
            } else if (message instanceof Commit) {
                Commit commit = (Commit) message;
                
                // Apply committed transaction
                applyTransaction(commit.getZxid());
            }
        }
    }
    
    // Create a new zxid
    private long createZxid() {
        // zxid is a 64-bit number:
        // - high 32 bits: epoch
        // - low 32 bits: counter
        return (currentEpoch << 32) | (++lastZxid & 0xFFFFFFFFL);
    }
    
    // Apply a transaction to the state machine
    private void applyTransaction(long zxid) {
        Transaction txn = pendingTransactions.get(zxid);
        if (txn != null) {
            // Apply to state machine
            applyToStateMachine(txn);
            
            // Add to history
            history.add(txn);
            
            // Remove from pending
            pendingTransactions.remove(zxid);
        }
    }
}

When to Use ZAB

ZAB is appropriate when:

  • You’re using ZooKeeper as your coordination service
  • You need a protocol optimized for primary-backup replication
  • You require strong ordering guarantees for operations

Challenges with ZAB

  • Specifically designed for ZooKeeper, less general-purpose
  • Less documentation and fewer implementations compared to Paxos and Raft
  • Leader-based design can be a bottleneck

Byzantine Fault Tolerance (BFT) Algorithms

The algorithms discussed so far assume crash-failure: nodes either work correctly or stop working entirely. Byzantine fault tolerance addresses a more challenging scenario where nodes can behave arbitrarily, including sending conflicting information to different parts of the system.

Practical Byzantine Fault Tolerance (PBFT)

PBFT, introduced by Castro and Liskov in 1999, was the first practical Byzantine consensus algorithm. It can tolerate up to f Byzantine failures with 3f+1 total nodes.

How PBFT Works

PBFT operates in three phases:

  1. Pre-prepare: The leader assigns a sequence number to a request and multicasts it to all replicas
  2. Prepare: Replicas verify the request and broadcast prepare messages
  3. Commit: Once a replica receives 2f prepare messages, it broadcasts a commit message

A request is executed once a replica receives 2f+1 commit messages.

When to Use BFT Algorithms

BFT algorithms are necessary when:

  • You cannot trust all nodes in your system
  • Nodes might be compromised or behave maliciously
  • You’re building systems like blockchains or critical infrastructure

Challenges with BFT

  • Higher message complexity (O(n²) where n is the number of nodes)
  • Requires more nodes to tolerate the same number of failures
  • Significantly more complex to implement correctly

Performance Comparison

When selecting a consensus algorithm, performance characteristics are crucial. Here’s a comparison of the algorithms discussed:

AlgorithmFault ToleranceMessage ComplexityLatency (steps)Implementation Complexity
Paxosf < n/2O(n)2 RTTHigh
Multi-Paxosf < n/2O(n)1 RTT (steady state)Very High
Raftf < n/2O(n)1 RTT (steady state)Medium
ZABf < n/2O(n)1 RTT (steady state)Medium-High
PBFTf < n/3O(n²)3 RTTVery High

RTT = Round-Trip Time


Practical Implementation Considerations

When implementing consensus in real-world systems, consider these practical aspects:

1. State Machine Replication

Consensus algorithms are typically used to implement state machine replication:

class ReplicatedStateMachine:
    def __init__(self, consensus_algorithm):
        self.state = {}  # The actual state
        self.consensus = consensus_algorithm
        self.log = []    # Log of all commands
        
    def propose_command(self, command):
        # Use consensus to agree on the command
        success, agreed_command = self.consensus.propose(command)
        
        if success:
            # Apply the command to the state machine
            self.apply_command(agreed_command)
            return True
        return False
        
    def apply_command(self, command):
        # Add to log
        self.log.append(command)
        
        # Apply to state
        if command.operation == "SET":
            self.state[command.key] = command.value
        elif command.operation == "DELETE":
            if command.key in self.state:
                del self.state[command.key]
        # Other operations...

2. Log Compaction

As the log grows, it becomes necessary to compact it to prevent unbounded growth:

def compact_log(self, compact_index):
    # Take a snapshot of the state
    snapshot = copy.deepcopy(self.state)
    
    # Truncate the log
    self.log = self.log[compact_index+1:]
    
    # Store the snapshot
    self.snapshots[compact_index] = snapshot

3. Membership Changes

Changing the set of nodes in the consensus group requires careful handling:

def change_membership(self, new_members):
    # Create a special configuration change command
    config_change = Command(
        operation="CONFIG_CHANGE",
        new_members=new_members
    )
    
    # Use consensus to agree on this command
    success, _ = self.consensus.propose(config_change)
    
    if success:
        # Apply the configuration change
        self.members = new_members
        
        # Reconfigure the consensus algorithm
        self.consensus.reconfigure(new_members)
        
        return True
    return False

4. Failure Detection

Reliable failure detection is crucial for leader-based algorithms:

class FailureDetector:
    def __init__(self, timeout_ms=500):
        self.last_heartbeat = {}
        self.timeout_ms = timeout_ms
        
    def heartbeat(self, node_id):
        self.last_heartbeat[node_id] = time.time()
        
    def suspect_failure(self, node_id):
        if node_id not in self.last_heartbeat:
            return True
            
        elapsed_ms = (time.time() - self.last_heartbeat[node_id]) * 1000
        return elapsed_ms > self.timeout_ms

Real-World Applications

Consensus algorithms power many critical distributed systems:

Distributed Databases

  • Google Spanner: Uses Paxos for consistent replication across data centers
  • CockroachDB: Uses Raft to maintain consistency across database nodes
  • MongoDB: Uses a custom protocol similar to Raft for replica set consensus

Coordination Services

  • ZooKeeper: Uses ZAB for consistent distributed coordination
  • etcd: Uses Raft to store configuration data for Kubernetes
  • Consul: Uses Raft for service discovery and configuration

Blockchain Systems

  • Hyperledger Fabric: Uses Practical Byzantine Fault Tolerance variants
  • Tendermint: Uses a BFT consensus algorithm for blockchain applications
  • Diem (formerly Libra): Uses HotStuff, a BFT consensus algorithm

Conclusion

Distributed consensus algorithms form the backbone of reliable distributed systems, enabling them to function correctly despite failures and network issues. While implementing these algorithms correctly is challenging, understanding their principles and trade-offs is essential for building robust distributed applications.

When selecting a consensus algorithm for your system:

  1. Consider your failure model: Are you concerned only with crash failures, or do you need Byzantine fault tolerance?
  2. Evaluate performance requirements: How many nodes will participate in consensus? What latency can you tolerate?
  3. Assess implementation complexity: Do you have the resources to implement and maintain a complex algorithm like Paxos, or would a more straightforward option like Raft be more appropriate?
  4. Look at existing implementations: Can you leverage battle-tested libraries rather than implementing from scratch?

By making informed decisions about consensus algorithms, you can build distributed systems that maintain consistency and availability even in the face of failures, providing a solid foundation for your applications.

Andrew
Andrew

Andrew is a visionary software engineer and DevOps expert with a proven track record of delivering cutting-edge solutions that drive innovation at Ataiva.com. As a leader on numerous high-profile projects, Andrew brings his exceptional technical expertise and collaborative leadership skills to the table, fostering a culture of agility and excellence within the team. With a passion for architecting scalable systems, automating workflows, and empowering teams, Andrew is a sought-after authority in the field of software development and DevOps.

Tags