分布式协议

分布式协议

概述

分布式协议是分布式系统中节点间协调一致性的规则和算法。主要解决在网络分区、节点故障等异常情况下,如何保证系统的一致性和可用性。

Raft协议

基本概念

节点状态

  • Leader(领导者):处理所有客户端请求,负责日志复制
  • Follower(跟随者):被动接收Leader的日志条目
  • Candidate(候选者):Leader选举过程中的临时状态

核心机制

  • Leader选举:选出唯一的Leader
  • 日志复制:Leader将日志条目复制到Follower
  • 安全性:保证已提交的日志不会丢失

Leader选举

选举流程

java
public class RaftNode {
    private NodeState state = NodeState.FOLLOWER;
    private int currentTerm = 0;
    private String votedFor = null;
    private long lastHeartbeat = System.currentTimeMillis();
    
    // 选举超时,转为候选者
    public void startElection() {
        state = NodeState.CANDIDATE;
        currentTerm++;
        votedFor = nodeId;
        lastHeartbeat = System.currentTimeMillis();
        
        // 向其他节点请求投票
        int votes = 1; // 自己的票
        for (String peer : peers) {
            VoteRequest request = new VoteRequest(currentTerm, nodeId, lastLogIndex, lastLogTerm);
            VoteResponse response = sendVoteRequest(peer, request);
            if (response.isVoteGranted()) {
                votes++;
            }
        }
        
        // 获得多数票,成为Leader
        if (votes > peers.size() / 2) {
            becomeLeader();
        } else {
            becomeFollower();
        }
    }
    
    // 处理投票请求
    public VoteResponse handleVoteRequest(VoteRequest request) {
        // 1. 如果请求的term小于当前term,拒绝投票
        if (request.getTerm() < currentTerm) {
            return new VoteResponse(currentTerm, false);
        }
        
        // 2. 如果请求的term大于当前term,更新term并转为follower
        if (request.getTerm() > currentTerm) {
            currentTerm = request.getTerm();
            votedFor = null;
            state = NodeState.FOLLOWER;
        }
        
        // 3. 检查是否已经投票
        if (votedFor == null || votedFor.equals(request.getCandidateId())) {
            // 4. 检查候选者的日志是否至少和自己一样新
            if (isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm())) {
                votedFor = request.getCandidateId();
                lastHeartbeat = System.currentTimeMillis();
                return new VoteResponse(currentTerm, true);
            }
        }
        
        return new VoteResponse(currentTerm, false);
    }
}
public class RaftNode {
    private NodeState state = NodeState.FOLLOWER;
    private int currentTerm = 0;
    private String votedFor = null;
    private long lastHeartbeat = System.currentTimeMillis();
    
    // 选举超时,转为候选者
    public void startElection() {
        state = NodeState.CANDIDATE;
        currentTerm++;
        votedFor = nodeId;
        lastHeartbeat = System.currentTimeMillis();
        
        // 向其他节点请求投票
        int votes = 1; // 自己的票
        for (String peer : peers) {
            VoteRequest request = new VoteRequest(currentTerm, nodeId, lastLogIndex, lastLogTerm);
            VoteResponse response = sendVoteRequest(peer, request);
            if (response.isVoteGranted()) {
                votes++;
            }
        }
        
        // 获得多数票,成为Leader
        if (votes > peers.size() / 2) {
            becomeLeader();
        } else {
            becomeFollower();
        }
    }
    
    // 处理投票请求
    public VoteResponse handleVoteRequest(VoteRequest request) {
        // 1. 如果请求的term小于当前term,拒绝投票
        if (request.getTerm() < currentTerm) {
            return new VoteResponse(currentTerm, false);
        }
        
        // 2. 如果请求的term大于当前term,更新term并转为follower
        if (request.getTerm() > currentTerm) {
            currentTerm = request.getTerm();
            votedFor = null;
            state = NodeState.FOLLOWER;
        }
        
        // 3. 检查是否已经投票
        if (votedFor == null || votedFor.equals(request.getCandidateId())) {
            // 4. 检查候选者的日志是否至少和自己一样新
            if (isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm())) {
                votedFor = request.getCandidateId();
                lastHeartbeat = System.currentTimeMillis();
                return new VoteResponse(currentTerm, true);
            }
        }
        
        return new VoteResponse(currentTerm, false);
    }
}

日志复制

复制流程

java
public class RaftLeader {
    private List<LogEntry> log = new ArrayList<>();
    private Map<String, Integer> nextIndex = new HashMap<>();
    private Map<String, Integer> matchIndex = new HashMap<>();
    
    // 接收客户端请求
    public boolean appendEntry(String command) {
        LogEntry entry = new LogEntry(currentTerm, log.size(), command);
        log.add(entry);
        
        // 并行复制到所有follower
        CompletableFuture<Boolean>[] futures = peers.stream()
            .map(peer -> CompletableFuture.supplyAsync(() -> replicateToFollower(peer, entry)))
            .toArray(CompletableFuture[]::new);
            
        try {
            CompletableFuture.allOf(futures).get(1000, TimeUnit.MILLISECONDS);
            
            // 统计成功复制的节点数
            long successCount = Arrays.stream(futures)
                .mapToLong(f -> f.join() ? 1 : 0)
                .sum() + 1; // 加上leader自己
                
            // 超过半数节点成功,提交日志
            if (successCount > (peers.size() + 1) / 2) {
                commitIndex = entry.getIndex();
                return true;
            }
        } catch (Exception e) {
            log.error("Failed to replicate log entry", e);
        }
        
        return false;
    }
    
    // 向follower复制日志
    private boolean replicateToFollower(String follower, LogEntry entry) {
        int prevLogIndex = nextIndex.get(follower) - 1;
        int prevLogTerm = prevLogIndex >= 0 ? log.get(prevLogIndex).getTerm() : 0;
        
        AppendEntriesRequest request = new AppendEntriesRequest(
            currentTerm, nodeId, prevLogIndex, prevLogTerm, 
            Arrays.asList(entry), commitIndex);
            
        AppendEntriesResponse response = sendAppendEntries(follower, request);
        
        if (response.isSuccess()) {
            nextIndex.put(follower, entry.getIndex() + 1);
            matchIndex.put(follower, entry.getIndex());
            return true;
        } else {
            // 日志不一致,回退nextIndex
            nextIndex.put(follower, Math.max(0, nextIndex.get(follower) - 1));
            return false;
        }
    }
}
public class RaftLeader {
    private List<LogEntry> log = new ArrayList<>();
    private Map<String, Integer> nextIndex = new HashMap<>();
    private Map<String, Integer> matchIndex = new HashMap<>();
    
    // 接收客户端请求
    public boolean appendEntry(String command) {
        LogEntry entry = new LogEntry(currentTerm, log.size(), command);
        log.add(entry);
        
        // 并行复制到所有follower
        CompletableFuture<Boolean>[] futures = peers.stream()
            .map(peer -> CompletableFuture.supplyAsync(() -> replicateToFollower(peer, entry)))
            .toArray(CompletableFuture[]::new);
            
        try {
            CompletableFuture.allOf(futures).get(1000, TimeUnit.MILLISECONDS);
            
            // 统计成功复制的节点数
            long successCount = Arrays.stream(futures)
                .mapToLong(f -> f.join() ? 1 : 0)
                .sum() + 1; // 加上leader自己
                
            // 超过半数节点成功,提交日志
            if (successCount > (peers.size() + 1) / 2) {
                commitIndex = entry.getIndex();
                return true;
            }
        } catch (Exception e) {
            log.error("Failed to replicate log entry", e);
        }
        
        return false;
    }
    
    // 向follower复制日志
    private boolean replicateToFollower(String follower, LogEntry entry) {
        int prevLogIndex = nextIndex.get(follower) - 1;
        int prevLogTerm = prevLogIndex >= 0 ? log.get(prevLogIndex).getTerm() : 0;
        
        AppendEntriesRequest request = new AppendEntriesRequest(
            currentTerm, nodeId, prevLogIndex, prevLogTerm, 
            Arrays.asList(entry), commitIndex);
            
        AppendEntriesResponse response = sendAppendEntries(follower, request);
        
        if (response.isSuccess()) {
            nextIndex.put(follower, entry.getIndex() + 1);
            matchIndex.put(follower, entry.getIndex());
            return true;
        } else {
            // 日志不一致,回退nextIndex
            nextIndex.put(follower, Math.max(0, nextIndex.get(follower) - 1));
            return false;
        }
    }
}

安全性保证

选举限制

java
// 只有拥有最新日志的节点才能成为Leader
private boolean isLogUpToDate(int lastLogIndex, int lastLogTerm) {
    int myLastLogTerm = log.isEmpty() ? 0 : log.get(log.size() - 1).getTerm();
    int myLastLogIndex = log.size() - 1;
    
    // 比较最后一条日志的term
    if (lastLogTerm != myLastLogTerm) {
        return lastLogTerm > myLastLogTerm;
    }
    
    // term相同,比较索引
    return lastLogIndex >= myLastLogIndex;
}
// 只有拥有最新日志的节点才能成为Leader
private boolean isLogUpToDate(int lastLogIndex, int lastLogTerm) {
    int myLastLogTerm = log.isEmpty() ? 0 : log.get(log.size() - 1).getTerm();
    int myLastLogIndex = log.size() - 1;
    
    // 比较最后一条日志的term
    if (lastLogTerm != myLastLogTerm) {
        return lastLogTerm > myLastLogTerm;
    }
    
    // term相同,比较索引
    return lastLogIndex >= myLastLogIndex;
}

ZAB协议(ZooKeeper Atomic Broadcast)

基本概念

节点角色

  • Leader:处理所有写请求,负责事务提议
  • Follower:参与Leader选举和事务投票
  • Observer:只接收事务结果,不参与投票

协议阶段

  • 选举阶段:选出Leader
  • 发现阶段:Leader收集Follower的状态信息
  • 同步阶段:Leader与Follower同步数据
  • 广播阶段:正常的事务处理

Leader选举

选举算法

java
public class ZabElection {
    private long myId;
    private long myZxid; // 事务ID
    private int myEpoch; // 选举轮次
    
    public Vote lookForLeader() {
        Map<Long, Vote> recvset = new HashMap<>();
        Map<Long, Vote> outofelection = new HashMap<>();
        
        // 初始投票给自己
        Vote currentVote = new Vote(myId, myZxid, myEpoch);
        sendNotifications(currentVote);
        
        while (state == ServerState.LOOKING) {
            Notification notification = recvqueue.poll(200, TimeUnit.MILLISECONDS);
            
            if (notification == null) {
                // 超时,重新发送投票
                sendNotifications(currentVote);
                continue;
            }
            
            if (notification.getEpoch() > myEpoch) {
                // 更新选举轮次
                myEpoch = notification.getEpoch();
                recvset.clear();
                
                // 比较投票
                if (totalOrderPredicate(notification.getLeader(), notification.getZxid(),
                                      currentVote.getId(), currentVote.getZxid())) {
                    currentVote = new Vote(notification.getLeader(), notification.getZxid(), myEpoch);
                }
                sendNotifications(currentVote);
            } else if (notification.getEpoch() < myEpoch) {
                // 忽略过期的投票
                continue;
            } else {
                // 相同轮次,记录投票
                recvset.put(notification.getSid(), 
                           new Vote(notification.getLeader(), notification.getZxid(), notification.getEpoch()));
                
                // 检查是否有过半数投票
                if (termPredicate(recvset, currentVote)) {
                    // 确认Leader
                    return currentVote;
                }
            }
        }
        
        return null;
    }
    
    // 投票比较规则:zxid大的优先,zxid相同则id大的优先
    private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
        if (newZxid > curZxid) {
            return true;
        } else if (newZxid == curZxid) {
            return newId > curId;
        }
        return false;
    }
}
public class ZabElection {
    private long myId;
    private long myZxid; // 事务ID
    private int myEpoch; // 选举轮次
    
    public Vote lookForLeader() {
        Map<Long, Vote> recvset = new HashMap<>();
        Map<Long, Vote> outofelection = new HashMap<>();
        
        // 初始投票给自己
        Vote currentVote = new Vote(myId, myZxid, myEpoch);
        sendNotifications(currentVote);
        
        while (state == ServerState.LOOKING) {
            Notification notification = recvqueue.poll(200, TimeUnit.MILLISECONDS);
            
            if (notification == null) {
                // 超时,重新发送投票
                sendNotifications(currentVote);
                continue;
            }
            
            if (notification.getEpoch() > myEpoch) {
                // 更新选举轮次
                myEpoch = notification.getEpoch();
                recvset.clear();
                
                // 比较投票
                if (totalOrderPredicate(notification.getLeader(), notification.getZxid(),
                                      currentVote.getId(), currentVote.getZxid())) {
                    currentVote = new Vote(notification.getLeader(), notification.getZxid(), myEpoch);
                }
                sendNotifications(currentVote);
            } else if (notification.getEpoch() < myEpoch) {
                // 忽略过期的投票
                continue;
            } else {
                // 相同轮次,记录投票
                recvset.put(notification.getSid(), 
                           new Vote(notification.getLeader(), notification.getZxid(), notification.getEpoch()));
                
                // 检查是否有过半数投票
                if (termPredicate(recvset, currentVote)) {
                    // 确认Leader
                    return currentVote;
                }
            }
        }
        
        return null;
    }
    
    // 投票比较规则:zxid大的优先,zxid相同则id大的优先
    private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
        if (newZxid > curZxid) {
            return true;
        } else if (newZxid == curZxid) {
            return newId > curId;
        }
        return false;
    }
}

事务处理

两阶段提交

java
public class ZabLeader {
    private long zxid = 0;
    
    public void processRequest(Request request) {
        // 生成事务ID
        zxid++;
        Proposal proposal = new Proposal(zxid, request);
        
        // 阶段1:发送提议
        for (Follower follower : followers) {
            follower.propose(proposal);
        }
        
        // 等待过半数ACK
        waitForAcks(proposal);
        
        // 阶段2:发送提交
        for (Follower follower : followers) {
            follower.commit(proposal);
        }
        
        // 应用到本地状态机
        applyToStateMachine(proposal);
    }
    
    private void waitForAcks(Proposal proposal) {
        int ackCount = 1; // Leader自己的ACK
        
        while (ackCount <= followers.size() / 2) {
            Ack ack = ackQueue.take();
            if (ack.getZxid() == proposal.getZxid()) {
                ackCount++;
            }
        }
    }
}
public class ZabLeader {
    private long zxid = 0;
    
    public void processRequest(Request request) {
        // 生成事务ID
        zxid++;
        Proposal proposal = new Proposal(zxid, request);
        
        // 阶段1:发送提议
        for (Follower follower : followers) {
            follower.propose(proposal);
        }
        
        // 等待过半数ACK
        waitForAcks(proposal);
        
        // 阶段2:发送提交
        for (Follower follower : followers) {
            follower.commit(proposal);
        }
        
        // 应用到本地状态机
        applyToStateMachine(proposal);
    }
    
    private void waitForAcks(Proposal proposal) {
        int ackCount = 1; // Leader自己的ACK
        
        while (ackCount <= followers.size() / 2) {
            Ack ack = ackQueue.take();
            if (ack.getZxid() == proposal.getZxid()) {
                ackCount++;
            }
        }
    }
}

PBFT协议(Practical Byzantine Fault Tolerance)

基本概念

拜占庭故障

  • 节点可能发送错误或恶意的消息
  • 需要容忍最多f个拜占庭节点(总节点数至少3f+1)

协议阶段

  • Pre-prepare:主节点发送预准备消息
  • Prepare:备份节点发送准备消息
  • Commit:节点发送提交消息

协议实现

java
public class PbftNode {
    private int nodeId;
    private int viewNumber = 0;
    private int sequenceNumber = 0;
    private Map<String, Integer> prepareCount = new HashMap<>();
    private Map<String, Integer> commitCount = new HashMap<>();
    
    // 主节点处理客户端请求
    public void handleClientRequest(Request request) {
        if (!isPrimary()) {
            return;
        }
        
        sequenceNumber++;
        String messageDigest = computeDigest(request);
        
        // 发送pre-prepare消息
        PrePrepareMessage prePrepare = new PrePrepareMessage(
            viewNumber, sequenceNumber, messageDigest, request);
            
        broadcast(prePrepare);
        
        // 进入prepare阶段
        enterPreparePhase(messageDigest);
    }
    
    // 处理pre-prepare消息
    public void handlePrePrepare(PrePrepareMessage message) {
        if (message.getViewNumber() != viewNumber) {
            return;
        }
        
        // 验证消息
        if (verifyMessage(message)) {
            // 发送prepare消息
            PrepareMessage prepare = new PrepareMessage(
                viewNumber, message.getSequenceNumber(), 
                message.getDigest(), nodeId);
                
            broadcast(prepare);
        }
    }
    
    // 处理prepare消息
    public void handlePrepare(PrepareMessage message) {
        String key = message.getDigest();
        prepareCount.put(key, prepareCount.getOrDefault(key, 0) + 1);
        
        // 收到2f个prepare消息,进入commit阶段
        if (prepareCount.get(key) >= 2 * f) {
            CommitMessage commit = new CommitMessage(
                viewNumber, message.getSequenceNumber(), 
                message.getDigest(), nodeId);
                
            broadcast(commit);
        }
    }
    
    // 处理commit消息
    public void handleCommit(CommitMessage message) {
        String key = message.getDigest();
        commitCount.put(key, commitCount.getOrDefault(key, 0) + 1);
        
        // 收到2f+1个commit消息,执行请求
        if (commitCount.get(key) >= 2 * f + 1) {
            executeRequest(message.getDigest());
        }
    }
}
public class PbftNode {
    private int nodeId;
    private int viewNumber = 0;
    private int sequenceNumber = 0;
    private Map<String, Integer> prepareCount = new HashMap<>();
    private Map<String, Integer> commitCount = new HashMap<>();
    
    // 主节点处理客户端请求
    public void handleClientRequest(Request request) {
        if (!isPrimary()) {
            return;
        }
        
        sequenceNumber++;
        String messageDigest = computeDigest(request);
        
        // 发送pre-prepare消息
        PrePrepareMessage prePrepare = new PrePrepareMessage(
            viewNumber, sequenceNumber, messageDigest, request);
            
        broadcast(prePrepare);
        
        // 进入prepare阶段
        enterPreparePhase(messageDigest);
    }
    
    // 处理pre-prepare消息
    public void handlePrePrepare(PrePrepareMessage message) {
        if (message.getViewNumber() != viewNumber) {
            return;
        }
        
        // 验证消息
        if (verifyMessage(message)) {
            // 发送prepare消息
            PrepareMessage prepare = new PrepareMessage(
                viewNumber, message.getSequenceNumber(), 
                message.getDigest(), nodeId);
                
            broadcast(prepare);
        }
    }
    
    // 处理prepare消息
    public void handlePrepare(PrepareMessage message) {
        String key = message.getDigest();
        prepareCount.put(key, prepareCount.getOrDefault(key, 0) + 1);
        
        // 收到2f个prepare消息,进入commit阶段
        if (prepareCount.get(key) >= 2 * f) {
            CommitMessage commit = new CommitMessage(
                viewNumber, message.getSequenceNumber(), 
                message.getDigest(), nodeId);
                
            broadcast(commit);
        }
    }
    
    // 处理commit消息
    public void handleCommit(CommitMessage message) {
        String key = message.getDigest();
        commitCount.put(key, commitCount.getOrDefault(key, 0) + 1);
        
        // 收到2f+1个commit消息,执行请求
        if (commitCount.get(key) >= 2 * f + 1) {
            executeRequest(message.getDigest());
        }
    }
}

协议对比

协议容错类型容错数量性能复杂度应用场景
Raft崩溃故障f < n/2中等分布式存储
ZAB崩溃故障f < n/2中等ZooKeeper
PBFT拜占庭故障f < n/3区块链

实际应用

Raft应用

  • etcd:Kubernetes的配置存储
  • Consul:服务发现和配置管理
  • TiKV:分布式键值存储

ZAB应用

  • ZooKeeper:分布式协调服务
  • Kafka:消息队列的元数据管理

PBFT应用

  • Hyperledger Fabric:企业级区块链
  • R3 Corda:金融区块链平台

选择建议

场景分析

  1. 一般分布式系统:选择Raft,简单高效
  2. 需要强一致性:选择ZAB,经过大规模验证
  3. 拜占庭环境:选择PBFT,安全性最高
  4. 高性能要求:避免PBFT,选择Raft或ZAB

实现考虑

  1. 网络分区处理:确保协议能正确处理网络分区
  2. 性能优化:批处理、流水线等优化技术
  3. 故障恢复:节点重启后的状态恢复机制
  4. 配置变更:集群成员变更的处理

总结

分布式协议是构建可靠分布式系统的基础。选择合适的协议需要考虑故障模型、性能要求和实现复杂度。Raft协议因其简单性和高效性,在现代分布式系统中得到了广泛应用。