分布式事务
分布式事务
概述
分布式事务是指跨越多个网络节点的事务处理,需要保证多个数据源的数据一致性。由于网络分区和节点故障的存在,分布式事务比本地事务更加复杂。
分布式事务的挑战
1. 网络不可靠
- 网络延迟和丢包
- 网络分区导致节点无法通信
- 消息乱序和重复
2. 节点故障
- 进程崩溃
- 机器宕机
- 存储故障
3. 并发控制
- 多个事务同时访问共享资源
- 死锁检测和处理
- 性能与一致性的权衡
分布式事务解决方案
1. 两阶段提交(2PC)
原理
两阶段提交协议包含一个协调者和多个参与者,分为准备阶段和提交阶段。
实现示例
java
public class TwoPhaseCommitCoordinator {
private List<Participant> participants;
public boolean executeTransaction(Transaction transaction) {
// 第一阶段:准备阶段
List<Boolean> prepareResults = new ArrayList<>();
for (Participant participant : participants) {
try {
boolean prepared = participant.prepare(transaction);
prepareResults.add(prepared);
} catch (Exception e) {
// 准备失败,回滚所有参与者
rollbackAll(transaction);
return false;
}
}
// 检查所有参与者是否都准备好
boolean allPrepared = prepareResults.stream().allMatch(Boolean::booleanValue);
if (allPrepared) {
// 第二阶段:提交阶段
return commitAll(transaction);
} else {
// 回滚所有参与者
rollbackAll(transaction);
return false;
}
}
private boolean commitAll(Transaction transaction) {
for (Participant participant : participants) {
try {
participant.commit(transaction);
} catch (Exception e) {
// 提交失败,记录日志,后续重试
log.error("Commit failed for participant: " + participant.getId(), e);
return false;
}
}
return true;
}
private void rollbackAll(Transaction transaction) {
for (Participant participant : participants) {
try {
participant.rollback(transaction);
} catch (Exception e) {
log.error("Rollback failed for participant: " + participant.getId(), e);
}
}
}
}
public class TwoPhaseCommitCoordinator {
private List<Participant> participants;
public boolean executeTransaction(Transaction transaction) {
// 第一阶段:准备阶段
List<Boolean> prepareResults = new ArrayList<>();
for (Participant participant : participants) {
try {
boolean prepared = participant.prepare(transaction);
prepareResults.add(prepared);
} catch (Exception e) {
// 准备失败,回滚所有参与者
rollbackAll(transaction);
return false;
}
}
// 检查所有参与者是否都准备好
boolean allPrepared = prepareResults.stream().allMatch(Boolean::booleanValue);
if (allPrepared) {
// 第二阶段:提交阶段
return commitAll(transaction);
} else {
// 回滚所有参与者
rollbackAll(transaction);
return false;
}
}
private boolean commitAll(Transaction transaction) {
for (Participant participant : participants) {
try {
participant.commit(transaction);
} catch (Exception e) {
// 提交失败,记录日志,后续重试
log.error("Commit failed for participant: " + participant.getId(), e);
return false;
}
}
return true;
}
private void rollbackAll(Transaction transaction) {
for (Participant participant : participants) {
try {
participant.rollback(transaction);
} catch (Exception e) {
log.error("Rollback failed for participant: " + participant.getId(), e);
}
}
}
}
优缺点
优点:
- 强一致性保证
- 实现相对简单
缺点:
- 阻塞协议,性能较差
- 单点故障问题
- 数据不可用时间长
2. 三阶段提交(3PC)
改进点
- 增加超时机制
- 减少阻塞时间
- 引入预提交阶段
java
public class ThreePhaseCommitCoordinator {
private static final int TIMEOUT = 30000; // 30秒超时
public boolean executeTransaction(Transaction transaction) {
// 第一阶段:CanCommit
if (!canCommitPhase(transaction)) {
return false;
}
// 第二阶段:PreCommit
if (!preCommitPhase(transaction)) {
rollbackAll(transaction);
return false;
}
// 第三阶段:DoCommit
return doCommitPhase(transaction);
}
private boolean canCommitPhase(Transaction transaction) {
CompletableFuture<Boolean>[] futures = participants.stream()
.map(p -> CompletableFuture.supplyAsync(() -> p.canCommit(transaction)))
.toArray(CompletableFuture[]::new);
try {
CompletableFuture.allOf(futures)
.get(TIMEOUT, TimeUnit.MILLISECONDS);
return Arrays.stream(futures)
.allMatch(f -> f.join());
} catch (TimeoutException e) {
return false;
}
}
}
public class ThreePhaseCommitCoordinator {
private static final int TIMEOUT = 30000; // 30秒超时
public boolean executeTransaction(Transaction transaction) {
// 第一阶段:CanCommit
if (!canCommitPhase(transaction)) {
return false;
}
// 第二阶段:PreCommit
if (!preCommitPhase(transaction)) {
rollbackAll(transaction);
return false;
}
// 第三阶段:DoCommit
return doCommitPhase(transaction);
}
private boolean canCommitPhase(Transaction transaction) {
CompletableFuture<Boolean>[] futures = participants.stream()
.map(p -> CompletableFuture.supplyAsync(() -> p.canCommit(transaction)))
.toArray(CompletableFuture[]::new);
try {
CompletableFuture.allOf(futures)
.get(TIMEOUT, TimeUnit.MILLISECONDS);
return Arrays.stream(futures)
.allMatch(f -> f.join());
} catch (TimeoutException e) {
return false;
}
}
}
3. TCC(Try-Confirm-Cancel)
原理
TCC是一种补偿型事务,将业务逻辑分为Try、Confirm、Cancel三个阶段。
实现示例
java
// 账户服务的TCC实现
@Service
public class AccountTccService {
@Autowired
private AccountRepository accountRepository;
// Try阶段:预留资源
@Tcc(confirmMethod = "confirmTransfer", cancelMethod = "cancelTransfer")
public boolean tryTransfer(String fromAccount, String toAccount, BigDecimal amount) {
Account from = accountRepository.findById(fromAccount);
if (from.getBalance().compareTo(amount) < 0) {
return false; // 余额不足
}
// 冻结金额
from.setFrozenAmount(from.getFrozenAmount().add(amount));
accountRepository.save(from);
return true;
}
// Confirm阶段:确认执行
public boolean confirmTransfer(String fromAccount, String toAccount, BigDecimal amount) {
Account from = accountRepository.findById(fromAccount);
Account to = accountRepository.findById(toAccount);
// 扣减余额和冻结金额
from.setBalance(from.getBalance().subtract(amount));
from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
// 增加目标账户余额
to.setBalance(to.getBalance().add(amount));
accountRepository.save(from);
accountRepository.save(to);
return true;
}
// Cancel阶段:取消操作
public boolean cancelTransfer(String fromAccount, String toAccount, BigDecimal amount) {
Account from = accountRepository.findById(fromAccount);
// 释放冻结金额
from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
accountRepository.save(from);
return true;
}
}
// 账户服务的TCC实现
@Service
public class AccountTccService {
@Autowired
private AccountRepository accountRepository;
// Try阶段:预留资源
@Tcc(confirmMethod = "confirmTransfer", cancelMethod = "cancelTransfer")
public boolean tryTransfer(String fromAccount, String toAccount, BigDecimal amount) {
Account from = accountRepository.findById(fromAccount);
if (from.getBalance().compareTo(amount) < 0) {
return false; // 余额不足
}
// 冻结金额
from.setFrozenAmount(from.getFrozenAmount().add(amount));
accountRepository.save(from);
return true;
}
// Confirm阶段:确认执行
public boolean confirmTransfer(String fromAccount, String toAccount, BigDecimal amount) {
Account from = accountRepository.findById(fromAccount);
Account to = accountRepository.findById(toAccount);
// 扣减余额和冻结金额
from.setBalance(from.getBalance().subtract(amount));
from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
// 增加目标账户余额
to.setBalance(to.getBalance().add(amount));
accountRepository.save(from);
accountRepository.save(to);
return true;
}
// Cancel阶段:取消操作
public boolean cancelTransfer(String fromAccount, String toAccount, BigDecimal amount) {
Account from = accountRepository.findById(fromAccount);
// 释放冻结金额
from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
accountRepository.save(from);
return true;
}
}
4. Saga模式
原理
Saga将长事务分解为多个本地事务,每个本地事务都有对应的补偿操作。
编排式Saga
java
@Component
public class OrderSagaOrchestrator {
@Autowired
private OrderService orderService;
@Autowired
private PaymentService paymentService;
@Autowired
private InventoryService inventoryService;
public void processOrder(OrderRequest request) {
SagaTransaction saga = SagaTransaction.builder()
.addStep("createOrder",
() -> orderService.createOrder(request),
() -> orderService.cancelOrder(request.getOrderId()))
.addStep("processPayment",
() -> paymentService.processPayment(request.getPayment()),
() -> paymentService.refund(request.getPayment()))
.addStep("reserveInventory",
() -> inventoryService.reserve(request.getItems()),
() -> inventoryService.release(request.getItems()))
.build();
saga.execute();
}
}
@Component
public class OrderSagaOrchestrator {
@Autowired
private OrderService orderService;
@Autowired
private PaymentService paymentService;
@Autowired
private InventoryService inventoryService;
public void processOrder(OrderRequest request) {
SagaTransaction saga = SagaTransaction.builder()
.addStep("createOrder",
() -> orderService.createOrder(request),
() -> orderService.cancelOrder(request.getOrderId()))
.addStep("processPayment",
() -> paymentService.processPayment(request.getPayment()),
() -> paymentService.refund(request.getPayment()))
.addStep("reserveInventory",
() -> inventoryService.reserve(request.getItems()),
() -> inventoryService.release(request.getItems()))
.build();
saga.execute();
}
}
协调式Saga
java
@SagaOrchestrationStart
public class OrderSaga {
@SagaOrchestrationStart
public void processOrder(OrderCreatedEvent event) {
// 发送支付命令
commandGateway.send(new ProcessPaymentCommand(event.getOrderId(), event.getAmount()));
}
@SagaOrchestrationAssociationProperty("orderId")
@EventHandler
public void handle(PaymentProcessedEvent event) {
// 支付成功,预留库存
commandGateway.send(new ReserveInventoryCommand(event.getOrderId(), event.getItems()));
}
@SagaOrchestrationAssociationProperty("orderId")
@EventHandler
public void handle(PaymentFailedEvent event) {
// 支付失败,取消订单
commandGateway.send(new CancelOrderCommand(event.getOrderId()));
}
}
@SagaOrchestrationStart
public class OrderSaga {
@SagaOrchestrationStart
public void processOrder(OrderCreatedEvent event) {
// 发送支付命令
commandGateway.send(new ProcessPaymentCommand(event.getOrderId(), event.getAmount()));
}
@SagaOrchestrationAssociationProperty("orderId")
@EventHandler
public void handle(PaymentProcessedEvent event) {
// 支付成功,预留库存
commandGateway.send(new ReserveInventoryCommand(event.getOrderId(), event.getItems()));
}
@SagaOrchestrationAssociationProperty("orderId")
@EventHandler
public void handle(PaymentFailedEvent event) {
// 支付失败,取消订单
commandGateway.send(new CancelOrderCommand(event.getOrderId()));
}
}
5. 本地消息表
原理
通过本地消息表保证本地事务和消息发送的一致性。
java
@Service
@Transactional
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageRepository messageRepository;
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 保存消息到本地消息表
Message message = new Message();
message.setTopic("order.created");
message.setPayload(JSON.toJSONString(order));
message.setStatus(MessageStatus.PENDING);
messageRepository.save(message);
// 本地事务提交后,异步发送消息
}
}
@Component
public class MessagePublisher {
@Scheduled(fixedDelay = 5000)
public void publishPendingMessages() {
List<Message> pendingMessages = messageRepository.findByStatus(MessageStatus.PENDING);
for (Message message : pendingMessages) {
try {
messageProducer.send(message.getTopic(), message.getPayload());
message.setStatus(MessageStatus.SENT);
messageRepository.save(message);
} catch (Exception e) {
log.error("Failed to send message: " + message.getId(), e);
}
}
}
}
@Service
@Transactional
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageRepository messageRepository;
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 保存消息到本地消息表
Message message = new Message();
message.setTopic("order.created");
message.setPayload(JSON.toJSONString(order));
message.setStatus(MessageStatus.PENDING);
messageRepository.save(message);
// 本地事务提交后,异步发送消息
}
}
@Component
public class MessagePublisher {
@Scheduled(fixedDelay = 5000)
public void publishPendingMessages() {
List<Message> pendingMessages = messageRepository.findByStatus(MessageStatus.PENDING);
for (Message message : pendingMessages) {
try {
messageProducer.send(message.getTopic(), message.getPayload());
message.setStatus(MessageStatus.SENT);
messageRepository.save(message);
} catch (Exception e) {
log.error("Failed to send message: " + message.getId(), e);
}
}
}
}
6. 消息事务
基于消息队列的事务
java
@Service
public class OrderTransactionService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrderWithTransaction(Order order) {
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction("order-topic", order, order.getId());
}
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String orderId = (String) arg;
// 执行本地事务
orderService.createOrder(orderId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
String orderId = new String(msg.getBody());
Order order = orderRepository.findById(orderId);
return order != null ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
}
@Service
public class OrderTransactionService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrderWithTransaction(Order order) {
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction("order-topic", order, order.getId());
}
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String orderId = (String) arg;
// 执行本地事务
orderService.createOrder(orderId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
String orderId = new String(msg.getBody());
Order order = orderRepository.findById(orderId);
return order != null ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
}
分布式事务框架
Seata
配置
yaml
seata:
enabled: true
application-id: order-service
tx-service-group: order-service-group
service:
vgroup-mapping:
order-service-group: default
grouplist:
default: 127.0.0.1:8091
seata:
enabled: true
application-id: order-service
tx-service-group: order-service-group
service:
vgroup-mapping:
order-service-group: default
grouplist:
default: 127.0.0.1:8091
使用示例
java
@Service
public class BusinessService {
@Autowired
private StorageService storageService;
@Autowired
private OrderService orderService;
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
// 扣减库存
storageService.deduct(commodityCode, orderCount);
// 创建订单
orderService.create(userId, commodityCode, orderCount);
// 如果发生异常,Seata会自动回滚所有操作
}
}
@Service
public class BusinessService {
@Autowired
private StorageService storageService;
@Autowired
private OrderService orderService;
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
// 扣减库存
storageService.deduct(commodityCode, orderCount);
// 创建订单
orderService.create(userId, commodityCode, orderCount);
// 如果发生异常,Seata会自动回滚所有操作
}
}
选择策略
场景分析
场景 | 一致性要求 | 性能要求 | 推荐方案 |
---|---|---|---|
金融转账 | 强一致性 | 中等 | 2PC/TCC |
订单处理 | 最终一致性 | 高 | Saga/消息事务 |
库存扣减 | 强一致性 | 高 | TCC |
数据同步 | 最终一致性 | 高 | 本地消息表 |
技术选型建议
-
强一致性场景:
- 简单场景:2PC + Seata
- 复杂场景:TCC
-
最终一致性场景:
- 事件驱动:Saga模式
- 消息驱动:本地消息表
-
高性能场景:
- 避免使用2PC
- 优先考虑异步方案
最佳实践
- 幂等性设计:确保重试不会产生副作用
- 超时处理:设置合理的超时时间
- 监控告警:监控事务执行状态
- 补偿机制:设计完善的补偿逻辑
- 测试验证:充分测试各种异常场景
- 性能优化:减少跨网络调用
- 故障恢复:设计故障恢复机制
总结
分布式事务是分布式系统中的重要问题,需要根据业务特点选择合适的解决方案。在保证数据一致性的同时,也要考虑性能、可用性和复杂度的平衡。