分布式协议
分布式协议
概述
分布式协议是分布式系统中节点间协调一致性的规则和算法。主要解决在网络分区、节点故障等异常情况下,如何保证系统的一致性和可用性。
Raft协议
基本概念
节点状态
- Leader(领导者):处理所有客户端请求,负责日志复制
- Follower(跟随者):被动接收Leader的日志条目
- Candidate(候选者):Leader选举过程中的临时状态
核心机制
- Leader选举:选出唯一的Leader
- 日志复制:Leader将日志条目复制到Follower
- 安全性:保证已提交的日志不会丢失
Leader选举
选举流程
java
public class RaftNode {
private NodeState state = NodeState.FOLLOWER;
private int currentTerm = 0;
private String votedFor = null;
private long lastHeartbeat = System.currentTimeMillis();
// 选举超时,转为候选者
public void startElection() {
state = NodeState.CANDIDATE;
currentTerm++;
votedFor = nodeId;
lastHeartbeat = System.currentTimeMillis();
// 向其他节点请求投票
int votes = 1; // 自己的票
for (String peer : peers) {
VoteRequest request = new VoteRequest(currentTerm, nodeId, lastLogIndex, lastLogTerm);
VoteResponse response = sendVoteRequest(peer, request);
if (response.isVoteGranted()) {
votes++;
}
}
// 获得多数票,成为Leader
if (votes > peers.size() / 2) {
becomeLeader();
} else {
becomeFollower();
}
}
// 处理投票请求
public VoteResponse handleVoteRequest(VoteRequest request) {
// 1. 如果请求的term小于当前term,拒绝投票
if (request.getTerm() < currentTerm) {
return new VoteResponse(currentTerm, false);
}
// 2. 如果请求的term大于当前term,更新term并转为follower
if (request.getTerm() > currentTerm) {
currentTerm = request.getTerm();
votedFor = null;
state = NodeState.FOLLOWER;
}
// 3. 检查是否已经投票
if (votedFor == null || votedFor.equals(request.getCandidateId())) {
// 4. 检查候选者的日志是否至少和自己一样新
if (isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm())) {
votedFor = request.getCandidateId();
lastHeartbeat = System.currentTimeMillis();
return new VoteResponse(currentTerm, true);
}
}
return new VoteResponse(currentTerm, false);
}
}
public class RaftNode {
private NodeState state = NodeState.FOLLOWER;
private int currentTerm = 0;
private String votedFor = null;
private long lastHeartbeat = System.currentTimeMillis();
// 选举超时,转为候选者
public void startElection() {
state = NodeState.CANDIDATE;
currentTerm++;
votedFor = nodeId;
lastHeartbeat = System.currentTimeMillis();
// 向其他节点请求投票
int votes = 1; // 自己的票
for (String peer : peers) {
VoteRequest request = new VoteRequest(currentTerm, nodeId, lastLogIndex, lastLogTerm);
VoteResponse response = sendVoteRequest(peer, request);
if (response.isVoteGranted()) {
votes++;
}
}
// 获得多数票,成为Leader
if (votes > peers.size() / 2) {
becomeLeader();
} else {
becomeFollower();
}
}
// 处理投票请求
public VoteResponse handleVoteRequest(VoteRequest request) {
// 1. 如果请求的term小于当前term,拒绝投票
if (request.getTerm() < currentTerm) {
return new VoteResponse(currentTerm, false);
}
// 2. 如果请求的term大于当前term,更新term并转为follower
if (request.getTerm() > currentTerm) {
currentTerm = request.getTerm();
votedFor = null;
state = NodeState.FOLLOWER;
}
// 3. 检查是否已经投票
if (votedFor == null || votedFor.equals(request.getCandidateId())) {
// 4. 检查候选者的日志是否至少和自己一样新
if (isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm())) {
votedFor = request.getCandidateId();
lastHeartbeat = System.currentTimeMillis();
return new VoteResponse(currentTerm, true);
}
}
return new VoteResponse(currentTerm, false);
}
}
日志复制
复制流程
java
public class RaftLeader {
private List<LogEntry> log = new ArrayList<>();
private Map<String, Integer> nextIndex = new HashMap<>();
private Map<String, Integer> matchIndex = new HashMap<>();
// 接收客户端请求
public boolean appendEntry(String command) {
LogEntry entry = new LogEntry(currentTerm, log.size(), command);
log.add(entry);
// 并行复制到所有follower
CompletableFuture<Boolean>[] futures = peers.stream()
.map(peer -> CompletableFuture.supplyAsync(() -> replicateToFollower(peer, entry)))
.toArray(CompletableFuture[]::new);
try {
CompletableFuture.allOf(futures).get(1000, TimeUnit.MILLISECONDS);
// 统计成功复制的节点数
long successCount = Arrays.stream(futures)
.mapToLong(f -> f.join() ? 1 : 0)
.sum() + 1; // 加上leader自己
// 超过半数节点成功,提交日志
if (successCount > (peers.size() + 1) / 2) {
commitIndex = entry.getIndex();
return true;
}
} catch (Exception e) {
log.error("Failed to replicate log entry", e);
}
return false;
}
// 向follower复制日志
private boolean replicateToFollower(String follower, LogEntry entry) {
int prevLogIndex = nextIndex.get(follower) - 1;
int prevLogTerm = prevLogIndex >= 0 ? log.get(prevLogIndex).getTerm() : 0;
AppendEntriesRequest request = new AppendEntriesRequest(
currentTerm, nodeId, prevLogIndex, prevLogTerm,
Arrays.asList(entry), commitIndex);
AppendEntriesResponse response = sendAppendEntries(follower, request);
if (response.isSuccess()) {
nextIndex.put(follower, entry.getIndex() + 1);
matchIndex.put(follower, entry.getIndex());
return true;
} else {
// 日志不一致,回退nextIndex
nextIndex.put(follower, Math.max(0, nextIndex.get(follower) - 1));
return false;
}
}
}
public class RaftLeader {
private List<LogEntry> log = new ArrayList<>();
private Map<String, Integer> nextIndex = new HashMap<>();
private Map<String, Integer> matchIndex = new HashMap<>();
// 接收客户端请求
public boolean appendEntry(String command) {
LogEntry entry = new LogEntry(currentTerm, log.size(), command);
log.add(entry);
// 并行复制到所有follower
CompletableFuture<Boolean>[] futures = peers.stream()
.map(peer -> CompletableFuture.supplyAsync(() -> replicateToFollower(peer, entry)))
.toArray(CompletableFuture[]::new);
try {
CompletableFuture.allOf(futures).get(1000, TimeUnit.MILLISECONDS);
// 统计成功复制的节点数
long successCount = Arrays.stream(futures)
.mapToLong(f -> f.join() ? 1 : 0)
.sum() + 1; // 加上leader自己
// 超过半数节点成功,提交日志
if (successCount > (peers.size() + 1) / 2) {
commitIndex = entry.getIndex();
return true;
}
} catch (Exception e) {
log.error("Failed to replicate log entry", e);
}
return false;
}
// 向follower复制日志
private boolean replicateToFollower(String follower, LogEntry entry) {
int prevLogIndex = nextIndex.get(follower) - 1;
int prevLogTerm = prevLogIndex >= 0 ? log.get(prevLogIndex).getTerm() : 0;
AppendEntriesRequest request = new AppendEntriesRequest(
currentTerm, nodeId, prevLogIndex, prevLogTerm,
Arrays.asList(entry), commitIndex);
AppendEntriesResponse response = sendAppendEntries(follower, request);
if (response.isSuccess()) {
nextIndex.put(follower, entry.getIndex() + 1);
matchIndex.put(follower, entry.getIndex());
return true;
} else {
// 日志不一致,回退nextIndex
nextIndex.put(follower, Math.max(0, nextIndex.get(follower) - 1));
return false;
}
}
}
安全性保证
选举限制
java
// 只有拥有最新日志的节点才能成为Leader
private boolean isLogUpToDate(int lastLogIndex, int lastLogTerm) {
int myLastLogTerm = log.isEmpty() ? 0 : log.get(log.size() - 1).getTerm();
int myLastLogIndex = log.size() - 1;
// 比较最后一条日志的term
if (lastLogTerm != myLastLogTerm) {
return lastLogTerm > myLastLogTerm;
}
// term相同,比较索引
return lastLogIndex >= myLastLogIndex;
}
// 只有拥有最新日志的节点才能成为Leader
private boolean isLogUpToDate(int lastLogIndex, int lastLogTerm) {
int myLastLogTerm = log.isEmpty() ? 0 : log.get(log.size() - 1).getTerm();
int myLastLogIndex = log.size() - 1;
// 比较最后一条日志的term
if (lastLogTerm != myLastLogTerm) {
return lastLogTerm > myLastLogTerm;
}
// term相同,比较索引
return lastLogIndex >= myLastLogIndex;
}
ZAB协议(ZooKeeper Atomic Broadcast)
基本概念
节点角色
- Leader:处理所有写请求,负责事务提议
- Follower:参与Leader选举和事务投票
- Observer:只接收事务结果,不参与投票
协议阶段
- 选举阶段:选出Leader
- 发现阶段:Leader收集Follower的状态信息
- 同步阶段:Leader与Follower同步数据
- 广播阶段:正常的事务处理
Leader选举
选举算法
java
public class ZabElection {
private long myId;
private long myZxid; // 事务ID
private int myEpoch; // 选举轮次
public Vote lookForLeader() {
Map<Long, Vote> recvset = new HashMap<>();
Map<Long, Vote> outofelection = new HashMap<>();
// 初始投票给自己
Vote currentVote = new Vote(myId, myZxid, myEpoch);
sendNotifications(currentVote);
while (state == ServerState.LOOKING) {
Notification notification = recvqueue.poll(200, TimeUnit.MILLISECONDS);
if (notification == null) {
// 超时,重新发送投票
sendNotifications(currentVote);
continue;
}
if (notification.getEpoch() > myEpoch) {
// 更新选举轮次
myEpoch = notification.getEpoch();
recvset.clear();
// 比较投票
if (totalOrderPredicate(notification.getLeader(), notification.getZxid(),
currentVote.getId(), currentVote.getZxid())) {
currentVote = new Vote(notification.getLeader(), notification.getZxid(), myEpoch);
}
sendNotifications(currentVote);
} else if (notification.getEpoch() < myEpoch) {
// 忽略过期的投票
continue;
} else {
// 相同轮次,记录投票
recvset.put(notification.getSid(),
new Vote(notification.getLeader(), notification.getZxid(), notification.getEpoch()));
// 检查是否有过半数投票
if (termPredicate(recvset, currentVote)) {
// 确认Leader
return currentVote;
}
}
}
return null;
}
// 投票比较规则:zxid大的优先,zxid相同则id大的优先
private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
if (newZxid > curZxid) {
return true;
} else if (newZxid == curZxid) {
return newId > curId;
}
return false;
}
}
public class ZabElection {
private long myId;
private long myZxid; // 事务ID
private int myEpoch; // 选举轮次
public Vote lookForLeader() {
Map<Long, Vote> recvset = new HashMap<>();
Map<Long, Vote> outofelection = new HashMap<>();
// 初始投票给自己
Vote currentVote = new Vote(myId, myZxid, myEpoch);
sendNotifications(currentVote);
while (state == ServerState.LOOKING) {
Notification notification = recvqueue.poll(200, TimeUnit.MILLISECONDS);
if (notification == null) {
// 超时,重新发送投票
sendNotifications(currentVote);
continue;
}
if (notification.getEpoch() > myEpoch) {
// 更新选举轮次
myEpoch = notification.getEpoch();
recvset.clear();
// 比较投票
if (totalOrderPredicate(notification.getLeader(), notification.getZxid(),
currentVote.getId(), currentVote.getZxid())) {
currentVote = new Vote(notification.getLeader(), notification.getZxid(), myEpoch);
}
sendNotifications(currentVote);
} else if (notification.getEpoch() < myEpoch) {
// 忽略过期的投票
continue;
} else {
// 相同轮次,记录投票
recvset.put(notification.getSid(),
new Vote(notification.getLeader(), notification.getZxid(), notification.getEpoch()));
// 检查是否有过半数投票
if (termPredicate(recvset, currentVote)) {
// 确认Leader
return currentVote;
}
}
}
return null;
}
// 投票比较规则:zxid大的优先,zxid相同则id大的优先
private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
if (newZxid > curZxid) {
return true;
} else if (newZxid == curZxid) {
return newId > curId;
}
return false;
}
}
事务处理
两阶段提交
java
public class ZabLeader {
private long zxid = 0;
public void processRequest(Request request) {
// 生成事务ID
zxid++;
Proposal proposal = new Proposal(zxid, request);
// 阶段1:发送提议
for (Follower follower : followers) {
follower.propose(proposal);
}
// 等待过半数ACK
waitForAcks(proposal);
// 阶段2:发送提交
for (Follower follower : followers) {
follower.commit(proposal);
}
// 应用到本地状态机
applyToStateMachine(proposal);
}
private void waitForAcks(Proposal proposal) {
int ackCount = 1; // Leader自己的ACK
while (ackCount <= followers.size() / 2) {
Ack ack = ackQueue.take();
if (ack.getZxid() == proposal.getZxid()) {
ackCount++;
}
}
}
}
public class ZabLeader {
private long zxid = 0;
public void processRequest(Request request) {
// 生成事务ID
zxid++;
Proposal proposal = new Proposal(zxid, request);
// 阶段1:发送提议
for (Follower follower : followers) {
follower.propose(proposal);
}
// 等待过半数ACK
waitForAcks(proposal);
// 阶段2:发送提交
for (Follower follower : followers) {
follower.commit(proposal);
}
// 应用到本地状态机
applyToStateMachine(proposal);
}
private void waitForAcks(Proposal proposal) {
int ackCount = 1; // Leader自己的ACK
while (ackCount <= followers.size() / 2) {
Ack ack = ackQueue.take();
if (ack.getZxid() == proposal.getZxid()) {
ackCount++;
}
}
}
}
PBFT协议(Practical Byzantine Fault Tolerance)
基本概念
拜占庭故障
- 节点可能发送错误或恶意的消息
- 需要容忍最多f个拜占庭节点(总节点数至少3f+1)
协议阶段
- Pre-prepare:主节点发送预准备消息
- Prepare:备份节点发送准备消息
- Commit:节点发送提交消息
协议实现
java
public class PbftNode {
private int nodeId;
private int viewNumber = 0;
private int sequenceNumber = 0;
private Map<String, Integer> prepareCount = new HashMap<>();
private Map<String, Integer> commitCount = new HashMap<>();
// 主节点处理客户端请求
public void handleClientRequest(Request request) {
if (!isPrimary()) {
return;
}
sequenceNumber++;
String messageDigest = computeDigest(request);
// 发送pre-prepare消息
PrePrepareMessage prePrepare = new PrePrepareMessage(
viewNumber, sequenceNumber, messageDigest, request);
broadcast(prePrepare);
// 进入prepare阶段
enterPreparePhase(messageDigest);
}
// 处理pre-prepare消息
public void handlePrePrepare(PrePrepareMessage message) {
if (message.getViewNumber() != viewNumber) {
return;
}
// 验证消息
if (verifyMessage(message)) {
// 发送prepare消息
PrepareMessage prepare = new PrepareMessage(
viewNumber, message.getSequenceNumber(),
message.getDigest(), nodeId);
broadcast(prepare);
}
}
// 处理prepare消息
public void handlePrepare(PrepareMessage message) {
String key = message.getDigest();
prepareCount.put(key, prepareCount.getOrDefault(key, 0) + 1);
// 收到2f个prepare消息,进入commit阶段
if (prepareCount.get(key) >= 2 * f) {
CommitMessage commit = new CommitMessage(
viewNumber, message.getSequenceNumber(),
message.getDigest(), nodeId);
broadcast(commit);
}
}
// 处理commit消息
public void handleCommit(CommitMessage message) {
String key = message.getDigest();
commitCount.put(key, commitCount.getOrDefault(key, 0) + 1);
// 收到2f+1个commit消息,执行请求
if (commitCount.get(key) >= 2 * f + 1) {
executeRequest(message.getDigest());
}
}
}
public class PbftNode {
private int nodeId;
private int viewNumber = 0;
private int sequenceNumber = 0;
private Map<String, Integer> prepareCount = new HashMap<>();
private Map<String, Integer> commitCount = new HashMap<>();
// 主节点处理客户端请求
public void handleClientRequest(Request request) {
if (!isPrimary()) {
return;
}
sequenceNumber++;
String messageDigest = computeDigest(request);
// 发送pre-prepare消息
PrePrepareMessage prePrepare = new PrePrepareMessage(
viewNumber, sequenceNumber, messageDigest, request);
broadcast(prePrepare);
// 进入prepare阶段
enterPreparePhase(messageDigest);
}
// 处理pre-prepare消息
public void handlePrePrepare(PrePrepareMessage message) {
if (message.getViewNumber() != viewNumber) {
return;
}
// 验证消息
if (verifyMessage(message)) {
// 发送prepare消息
PrepareMessage prepare = new PrepareMessage(
viewNumber, message.getSequenceNumber(),
message.getDigest(), nodeId);
broadcast(prepare);
}
}
// 处理prepare消息
public void handlePrepare(PrepareMessage message) {
String key = message.getDigest();
prepareCount.put(key, prepareCount.getOrDefault(key, 0) + 1);
// 收到2f个prepare消息,进入commit阶段
if (prepareCount.get(key) >= 2 * f) {
CommitMessage commit = new CommitMessage(
viewNumber, message.getSequenceNumber(),
message.getDigest(), nodeId);
broadcast(commit);
}
}
// 处理commit消息
public void handleCommit(CommitMessage message) {
String key = message.getDigest();
commitCount.put(key, commitCount.getOrDefault(key, 0) + 1);
// 收到2f+1个commit消息,执行请求
if (commitCount.get(key) >= 2 * f + 1) {
executeRequest(message.getDigest());
}
}
}
协议对比
协议 | 容错类型 | 容错数量 | 性能 | 复杂度 | 应用场景 |
---|---|---|---|---|---|
Raft | 崩溃故障 | f < n/2 | 高 | 中等 | 分布式存储 |
ZAB | 崩溃故障 | f < n/2 | 高 | 中等 | ZooKeeper |
PBFT | 拜占庭故障 | f < n/3 | 低 | 高 | 区块链 |
实际应用
Raft应用
- etcd:Kubernetes的配置存储
- Consul:服务发现和配置管理
- TiKV:分布式键值存储
ZAB应用
- ZooKeeper:分布式协调服务
- Kafka:消息队列的元数据管理
PBFT应用
- Hyperledger Fabric:企业级区块链
- R3 Corda:金融区块链平台
选择建议
场景分析
- 一般分布式系统:选择Raft,简单高效
- 需要强一致性:选择ZAB,经过大规模验证
- 拜占庭环境:选择PBFT,安全性最高
- 高性能要求:避免PBFT,选择Raft或ZAB
实现考虑
- 网络分区处理:确保协议能正确处理网络分区
- 性能优化:批处理、流水线等优化技术
- 故障恢复:节点重启后的状态恢复机制
- 配置变更:集群成员变更的处理
总结
分布式协议是构建可靠分布式系统的基础。选择合适的协议需要考虑故障模型、性能要求和实现复杂度。Raft协议因其简单性和高效性,在现代分布式系统中得到了广泛应用。