In distributed systems, one of the most fundamental challenges is achieving agreement among multiple nodes that may fail independently or experience network issues. This challenge—known as the consensus problem—lies at the heart of building reliable distributed systems. From distributed databases to blockchain networks, consensus algorithms provide the critical foundation that enables these systems to function correctly despite failures and network partitions.
This article explores the most important distributed consensus algorithms, their implementations, trade-offs, and practical applications. Whether you’re designing a new distributed system or seeking to understand how existing systems work under the hood, understanding these algorithms is essential.
The Consensus Problem
Before diving into specific algorithms, let’s clearly define the consensus problem. In a distributed system with multiple nodes, consensus is the process of agreeing on a single value or state among all non-faulty nodes. A correct consensus algorithm must satisfy the following properties:
- Agreement: All non-faulty nodes decide on the same value.
- Validity: If all nodes propose the same value, then all non-faulty nodes decide on that value.
- Termination: All non-faulty nodes eventually decide on some value.
The challenge is achieving these properties in the presence of:
- Node failures (crash failures or Byzantine failures)
- Network partitions and message delays
- Asynchronous communication
The FLP impossibility result (named after Fischer, Lynch, and Paterson) proved that in an asynchronous system where even one node might fail, no deterministic algorithm can guarantee consensus. However, by relaxing some assumptions or adding timing constraints, practical consensus algorithms can be developed.
Paxos: The Foundation of Modern Consensus
Paxos, introduced by Leslie Lamport in 1989, is the foundation upon which many modern consensus algorithms are built. Despite its theoretical elegance, Paxos is notoriously difficult to understand and implement correctly.
How Paxos Works
Paxos operates in two phases:
Phase 1: Prepare
- A proposer selects a proposal number
n
and sends aprepare(n)
message to a majority of acceptors. - Each acceptor promises not to accept proposals numbered less than
n
and returns the highest-numbered proposal (if any) that it has accepted.
Phase 2: Accept
- If the proposer receives responses from a majority of acceptors, it sends an
accept(n, v)
message to a majority of acceptors, wherev
is the value of the highest-numbered proposal among the responses, or any value if no previous proposals were returned. - An acceptor accepts the proposal unless it has already responded to a
prepare
request with a number greater thann
.
Multi-Paxos Optimization
Basic Paxos (also called Single-Decree Paxos) reaches consensus on a single value. In practice, systems need to agree on a sequence of values, which is where Multi-Paxos comes in:
- Elect a stable leader (proposer) that runs multiple instances of Paxos.
- The leader can skip Phase 1 for subsequent values after the first successful round.
Paxos Implementation Example
Here’s a simplified implementation of the Paxos acceptor role:
class PaxosAcceptor:
def __init__(self):
self.promised_id = None
self.accepted_id = None
self.accepted_value = None
def prepare(self, proposal_id):
if self.promised_id is None or proposal_id > self.promised_id:
self.promised_id = proposal_id
return True, self.accepted_id, self.accepted_value
else:
return False, None, None
def accept(self, proposal_id, value):
if self.promised_id is None or proposal_id >= self.promised_id:
self.promised_id = proposal_id
self.accepted_id = proposal_id
self.accepted_value = value
return True
else:
return False
And the proposer role:
class PaxosProposer:
def __init__(self, acceptors, proposal_id, proposed_value):
self.acceptors = acceptors
self.proposal_id = proposal_id
self.proposed_value = proposed_value
def propose(self):
# Phase 1: Prepare
prepare_count = 0
highest_accepted_id = None
highest_accepted_value = None
for acceptor in self.acceptors:
success, accepted_id, accepted_value = acceptor.prepare(self.proposal_id)
if success:
prepare_count += 1
if accepted_id is not None and (highest_accepted_id is None or accepted_id > highest_accepted_id):
highest_accepted_id = accepted_id
highest_accepted_value = accepted_value
# Check if we have majority
if prepare_count <= len(self.acceptors) // 2:
return False, None
# Phase 2: Accept
# Use the highest accepted value if there is one, otherwise use our proposed value
value_to_propose = highest_accepted_value if highest_accepted_value is not None else self.proposed_value
accept_count = 0
for acceptor in self.acceptors:
if acceptor.accept(self.proposal_id, value_to_propose):
accept_count += 1
# Check if we have majority
if accept_count <= len(self.acceptors) // 2:
return False, None
return True, value_to_propose
When to Use Paxos
Paxos is suitable when:
- You need a proven, battle-tested consensus algorithm
- You’re building a system that requires strong consistency guarantees
- You can tolerate the complexity of implementation
Challenges with Paxos
- Difficult to understand and implement correctly
- Requires careful handling of edge cases
- Performance can degrade under contention
- Multi-Paxos leader election is not part of the core algorithm
Raft: Consensus for Humans
Raft was designed by Diego Ongaro and John Ousterhout as an alternative to Paxos, with a focus on understandability and practical implementation. It has gained significant adoption in systems like etcd, Consul, and InfluxDB.
How Raft Works
Raft divides the consensus problem into three subproblems:
1. Leader Election
- Servers start as followers
- If followers don’t hear from a leader, they become candidates
- Candidates request votes from other servers
- A candidate becomes leader if it receives votes from a majority
- Leaders send periodic heartbeats to maintain authority
2. Log Replication
- Clients send commands to the leader
- The leader appends the command to its log
- The leader replicates the log entry to followers
- Once safely replicated, the leader commits the entry
- The leader notifies followers of committed entries
3. Safety
- Election Safety: At most one leader per term
- Leader Append-Only: Leaders never overwrite or delete entries
- Log Matching: If two logs contain an entry with the same index and term, all previous entries are identical
- Leader Completeness: If an entry is committed, it will be present in the logs of all future leaders
- State Machine Safety: If a server applies an entry to its state machine, no other server will apply a different entry for the same log index
Raft Implementation Example
Here’s a simplified implementation of Raft’s core components:
// Server states
const (
Follower = iota
Candidate
Leader
)
type RaftServer struct {
// Server identity
id int
state int
// Persistent state
currentTerm int
votedFor int
log []LogEntry
// Volatile state
commitIndex int
lastApplied int
// Leader state
nextIndex map[int]int
matchIndex map[int]int
// Channels for communication
electionTimer *time.Timer
heartbeatTimer *time.Timer
// Network connections to other servers
peers []RaftPeer
}
type LogEntry struct {
Term int
Command interface{}
}
func (s *RaftServer) Start() {
s.state = Follower
s.resetElectionTimer()
for {
switch s.state {
case Follower:
s.runFollower()
case Candidate:
s.runCandidate()
case Leader:
s.runLeader()
}
}
}
func (s *RaftServer) runFollower() {
select {
case <-s.electionTimer.C:
s.state = Candidate
case rpc := <-s.rpcCh:
s.handleRPC(rpc)
s.resetElectionTimer()
}
}
func (s *RaftServer) runCandidate() {
s.currentTerm++
s.votedFor = s.id
votesReceived := 1 // Vote for self
// Request votes from all peers
for _, peer := range s.peers {
go func(p RaftPeer) {
args := RequestVoteArgs{
Term: s.currentTerm,
CandidateId: s.id,
LastLogIndex: len(s.log) - 1,
LastLogTerm: s.getLastLogTerm(),
}
response := p.RequestVote(args)
if response.VoteGranted {
votesReceived++
if votesReceived > len(s.peers)/2 {
s.state = Leader
s.initLeaderState()
}
} else if response.Term > s.currentTerm {
s.currentTerm = response.Term
s.state = Follower
s.votedFor = -1
}
}(peer)
}
s.resetElectionTimer()
}
func (s *RaftServer) runLeader() {
s.sendHeartbeats()
s.heartbeatTimer = time.NewTimer(100 * time.Millisecond)
select {
case <-s.heartbeatTimer.C:
// Time to send heartbeats again
case rpc := <-s.rpcCh:
s.handleRPC(rpc)
case cmd := <-s.proposeCh:
// Append to local log
s.log = append(s.log, LogEntry{Term: s.currentTerm, Command: cmd})
// Replicate to followers (simplified)
s.replicateLogs()
}
}
func (s *RaftServer) handleRequestVote(args RequestVoteArgs) RequestVoteResponse {
if args.Term < s.currentTerm {
return RequestVoteResponse{Term: s.currentTerm, VoteGranted: false}
}
if args.Term > s.currentTerm {
s.currentTerm = args.Term
s.state = Follower
s.votedFor = -1
}
if (s.votedFor == -1 || s.votedFor == args.CandidateId) &&
s.isLogUpToDate(args.LastLogIndex, args.LastLogTerm) {
s.votedFor = args.CandidateId
return RequestVoteResponse{Term: s.currentTerm, VoteGranted: true}
}
return RequestVoteResponse{Term: s.currentTerm, VoteGranted: false}
}
func (s *RaftServer) handleAppendEntries(args AppendEntriesArgs) AppendEntriesResponse {
if args.Term < s.currentTerm {
return AppendEntriesResponse{Term: s.currentTerm, Success: false}
}
// Valid leader, reset election timer
s.resetElectionTimer()
if args.Term > s.currentTerm {
s.currentTerm = args.Term
s.state = Follower
s.votedFor = -1
}
// Check if log contains an entry at prevLogIndex with prevLogTerm
if args.PrevLogIndex >= len(s.log) ||
(args.PrevLogIndex >= 0 && s.log[args.PrevLogIndex].Term != args.PrevLogTerm) {
return AppendEntriesResponse{Term: s.currentTerm, Success: false}
}
// Process log entries
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index < len(s.log) {
if s.log[index].Term != entry.Term {
// Delete conflicting entries and all that follow
s.log = s.log[:index]
// Append new entries
s.log = append(s.log, entry)
}
// Else entry already exists, do nothing
} else {
// Append new entries
s.log = append(s.log, entry)
}
}
// Update commit index
if args.LeaderCommit > s.commitIndex {
s.commitIndex = min(args.LeaderCommit, len(s.log)-1)
s.applyCommittedEntries()
}
return AppendEntriesResponse{Term: s.currentTerm, Success: true}
}
When to Use Raft
Raft is ideal when:
- You need a consensus algorithm that’s easier to understand and implement
- You’re building a system where leader-based operation is acceptable
- You want strong consistency with reasonable performance
- You need a well-documented algorithm with many existing implementations
Challenges with Raft
- Leader-based design can be a bottleneck
- Leader changes require at least one round-trip time
- Membership changes require careful handling
ZAB: ZooKeeper Atomic Broadcast
ZAB (ZooKeeper Atomic Broadcast) is the consensus protocol used by Apache ZooKeeper, a widely-used coordination service for distributed systems. While less general-purpose than Paxos or Raft, ZAB is optimized for the specific requirements of ZooKeeper.
How ZAB Works
ZAB operates in three phases:
1. Leader Election
- Servers elect a leader using a fast leader election algorithm
- The elected leader must have the most up-to-date transaction history
2. Discovery
- The new leader synchronizes its state with followers
- Followers acknowledge the leader and confirm synchronization
3. Broadcast
- The leader receives client requests and creates proposals
- Each proposal is assigned a monotonically increasing identifier (zxid)
- Proposals are broadcast to followers
- When a majority of followers acknowledge a proposal, the leader commits it
- The commit decision is broadcast to followers
ZAB Implementation Example
Here’s a simplified implementation of ZAB’s core components:
public class ZabServer {
enum ServerState {
LOOKING, FOLLOWING, LEADING
}
private ServerState state;
private long currentEpoch;
private long lastZxid;
private List<Transaction> history;
private Map<Long, Transaction> pendingTransactions;
private Set<ZabServer> quorum;
// Leader election
public void startLeaderElection() {
state = ServerState.LOOKING;
// Send out vote with last zxid
Vote myVote = new Vote(getServerId(), currentEpoch, lastZxid);
broadcastVote(myVote);
// Collect votes and determine winner
// (Simplified - actual implementation uses multiple rounds)
Map<Integer, Vote> receivedVotes = collectVotes();
Vote electedLeader = selectLeader(receivedVotes);
if (electedLeader.getServerId() == getServerId()) {
becomeLeader();
} else {
becomeFollower(electedLeader.getServerId());
}
}
// Leader functionality
private void becomeLeader() {
state = ServerState.LEADING;
currentEpoch++;
// Discovery phase
List<FollowerInfo> followers = collectFollowerInfo();
long newEpochZxid = createNewEpochZxid();
// Find highest zxid among followers
long highestZxid = lastZxid;
for (FollowerInfo info : followers) {
if (info.getLastZxid() > highestZxid) {
highestZxid = info.getLastZxid();
}
}
// Synchronize followers
for (FollowerInfo follower : followers) {
synchronizeFollower(follower, highestZxid);
}
// Start processing client requests
startProcessingRequests();
}
// Process a client request as leader
public void processRequest(Request request) {
// Create transaction
Transaction txn = new Transaction(
createZxid(),
request.getOperation(),
request.getData()
);
// Store in pending transactions
pendingTransactions.put(txn.getZxid(), txn);
// Broadcast to followers
List<ZabServer> acks = broadcastTransaction(txn);
// If majority acknowledge, commit
if (acks.size() > quorum.size() / 2) {
commit(txn);
broadcastCommit(txn);
}
}
// Follower functionality
private void becomeFollower(int leaderId) {
state = ServerState.FOLLOWING;
// Connect to leader
connectToLeader(leaderId);
// Send follower info
sendFollowerInfo(lastZxid);
// Process messages from leader
while (state == ServerState.FOLLOWING) {
Message message = receiveFromLeader();
if (message instanceof Transaction) {
Transaction txn = (Transaction) message;
// Validate transaction
if (isValidTransaction(txn)) {
// Log transaction
logTransaction(txn);
// Acknowledge
sendAck(txn.getZxid());
}
} else if (message instanceof Commit) {
Commit commit = (Commit) message;
// Apply committed transaction
applyTransaction(commit.getZxid());
}
}
}
// Create a new zxid
private long createZxid() {
// zxid is a 64-bit number:
// - high 32 bits: epoch
// - low 32 bits: counter
return (currentEpoch << 32) | (++lastZxid & 0xFFFFFFFFL);
}
// Apply a transaction to the state machine
private void applyTransaction(long zxid) {
Transaction txn = pendingTransactions.get(zxid);
if (txn != null) {
// Apply to state machine
applyToStateMachine(txn);
// Add to history
history.add(txn);
// Remove from pending
pendingTransactions.remove(zxid);
}
}
}
When to Use ZAB
ZAB is appropriate when:
- You’re using ZooKeeper as your coordination service
- You need a protocol optimized for primary-backup replication
- You require strong ordering guarantees for operations
Challenges with ZAB
- Specifically designed for ZooKeeper, less general-purpose
- Less documentation and fewer implementations compared to Paxos and Raft
- Leader-based design can be a bottleneck
Byzantine Fault Tolerance (BFT) Algorithms
The algorithms discussed so far assume crash-failure: nodes either work correctly or stop working entirely. Byzantine fault tolerance addresses a more challenging scenario where nodes can behave arbitrarily, including sending conflicting information to different parts of the system.
Practical Byzantine Fault Tolerance (PBFT)
PBFT, introduced by Castro and Liskov in 1999, was the first practical Byzantine consensus algorithm. It can tolerate up to f Byzantine failures with 3f+1 total nodes.
How PBFT Works
PBFT operates in three phases:
- Pre-prepare: The leader assigns a sequence number to a request and multicasts it to all replicas
- Prepare: Replicas verify the request and broadcast prepare messages
- Commit: Once a replica receives 2f prepare messages, it broadcasts a commit message
A request is executed once a replica receives 2f+1 commit messages.
When to Use BFT Algorithms
BFT algorithms are necessary when:
- You cannot trust all nodes in your system
- Nodes might be compromised or behave maliciously
- You’re building systems like blockchains or critical infrastructure
Challenges with BFT
- Higher message complexity (O(n²) where n is the number of nodes)
- Requires more nodes to tolerate the same number of failures
- Significantly more complex to implement correctly
Performance Comparison
When selecting a consensus algorithm, performance characteristics are crucial. Here’s a comparison of the algorithms discussed:
Algorithm | Fault Tolerance | Message Complexity | Latency (steps) | Implementation Complexity |
---|---|---|---|---|
Paxos | f < n/2 | O(n) | 2 RTT | High |
Multi-Paxos | f < n/2 | O(n) | 1 RTT (steady state) | Very High |
Raft | f < n/2 | O(n) | 1 RTT (steady state) | Medium |
ZAB | f < n/2 | O(n) | 1 RTT (steady state) | Medium-High |
PBFT | f < n/3 | O(n²) | 3 RTT | Very High |
RTT = Round-Trip Time
Practical Implementation Considerations
When implementing consensus in real-world systems, consider these practical aspects:
1. State Machine Replication
Consensus algorithms are typically used to implement state machine replication:
class ReplicatedStateMachine:
def __init__(self, consensus_algorithm):
self.state = {} # The actual state
self.consensus = consensus_algorithm
self.log = [] # Log of all commands
def propose_command(self, command):
# Use consensus to agree on the command
success, agreed_command = self.consensus.propose(command)
if success:
# Apply the command to the state machine
self.apply_command(agreed_command)
return True
return False
def apply_command(self, command):
# Add to log
self.log.append(command)
# Apply to state
if command.operation == "SET":
self.state[command.key] = command.value
elif command.operation == "DELETE":
if command.key in self.state:
del self.state[command.key]
# Other operations...
2. Log Compaction
As the log grows, it becomes necessary to compact it to prevent unbounded growth:
def compact_log(self, compact_index):
# Take a snapshot of the state
snapshot = copy.deepcopy(self.state)
# Truncate the log
self.log = self.log[compact_index+1:]
# Store the snapshot
self.snapshots[compact_index] = snapshot
3. Membership Changes
Changing the set of nodes in the consensus group requires careful handling:
def change_membership(self, new_members):
# Create a special configuration change command
config_change = Command(
operation="CONFIG_CHANGE",
new_members=new_members
)
# Use consensus to agree on this command
success, _ = self.consensus.propose(config_change)
if success:
# Apply the configuration change
self.members = new_members
# Reconfigure the consensus algorithm
self.consensus.reconfigure(new_members)
return True
return False
4. Failure Detection
Reliable failure detection is crucial for leader-based algorithms:
class FailureDetector:
def __init__(self, timeout_ms=500):
self.last_heartbeat = {}
self.timeout_ms = timeout_ms
def heartbeat(self, node_id):
self.last_heartbeat[node_id] = time.time()
def suspect_failure(self, node_id):
if node_id not in self.last_heartbeat:
return True
elapsed_ms = (time.time() - self.last_heartbeat[node_id]) * 1000
return elapsed_ms > self.timeout_ms
Real-World Applications
Consensus algorithms power many critical distributed systems:
Distributed Databases
- Google Spanner: Uses Paxos for consistent replication across data centers
- CockroachDB: Uses Raft to maintain consistency across database nodes
- MongoDB: Uses a custom protocol similar to Raft for replica set consensus
Coordination Services
- ZooKeeper: Uses ZAB for consistent distributed coordination
- etcd: Uses Raft to store configuration data for Kubernetes
- Consul: Uses Raft for service discovery and configuration
Blockchain Systems
- Hyperledger Fabric: Uses Practical Byzantine Fault Tolerance variants
- Tendermint: Uses a BFT consensus algorithm for blockchain applications
- Diem (formerly Libra): Uses HotStuff, a BFT consensus algorithm
Conclusion
Distributed consensus algorithms form the backbone of reliable distributed systems, enabling them to function correctly despite failures and network issues. While implementing these algorithms correctly is challenging, understanding their principles and trade-offs is essential for building robust distributed applications.
When selecting a consensus algorithm for your system:
- Consider your failure model: Are you concerned only with crash failures, or do you need Byzantine fault tolerance?
- Evaluate performance requirements: How many nodes will participate in consensus? What latency can you tolerate?
- Assess implementation complexity: Do you have the resources to implement and maintain a complex algorithm like Paxos, or would a more straightforward option like Raft be more appropriate?
- Look at existing implementations: Can you leverage battle-tested libraries rather than implementing from scratch?
By making informed decisions about consensus algorithms, you can build distributed systems that maintain consistency and availability even in the face of failures, providing a solid foundation for your applications.