分布式事务

分布式事务

概述

分布式事务是指跨越多个网络节点的事务处理,需要保证多个数据源的数据一致性。由于网络分区和节点故障的存在,分布式事务比本地事务更加复杂。

分布式事务的挑战

1. 网络不可靠

  • 网络延迟和丢包
  • 网络分区导致节点无法通信
  • 消息乱序和重复

2. 节点故障

  • 进程崩溃
  • 机器宕机
  • 存储故障

3. 并发控制

  • 多个事务同时访问共享资源
  • 死锁检测和处理
  • 性能与一致性的权衡

分布式事务解决方案

1. 两阶段提交(2PC)

原理

两阶段提交协议包含一个协调者和多个参与者,分为准备阶段和提交阶段。

实现示例

java
public class TwoPhaseCommitCoordinator {
    private List<Participant> participants;
    
    public boolean executeTransaction(Transaction transaction) {
        // 第一阶段:准备阶段
        List<Boolean> prepareResults = new ArrayList<>();
        for (Participant participant : participants) {
            try {
                boolean prepared = participant.prepare(transaction);
                prepareResults.add(prepared);
            } catch (Exception e) {
                // 准备失败,回滚所有参与者
                rollbackAll(transaction);
                return false;
            }
        }
        
        // 检查所有参与者是否都准备好
        boolean allPrepared = prepareResults.stream().allMatch(Boolean::booleanValue);
        
        if (allPrepared) {
            // 第二阶段:提交阶段
            return commitAll(transaction);
        } else {
            // 回滚所有参与者
            rollbackAll(transaction);
            return false;
        }
    }
    
    private boolean commitAll(Transaction transaction) {
        for (Participant participant : participants) {
            try {
                participant.commit(transaction);
            } catch (Exception e) {
                // 提交失败,记录日志,后续重试
                log.error("Commit failed for participant: " + participant.getId(), e);
                return false;
            }
        }
        return true;
    }
    
    private void rollbackAll(Transaction transaction) {
        for (Participant participant : participants) {
            try {
                participant.rollback(transaction);
            } catch (Exception e) {
                log.error("Rollback failed for participant: " + participant.getId(), e);
            }
        }
    }
}
public class TwoPhaseCommitCoordinator {
    private List<Participant> participants;
    
    public boolean executeTransaction(Transaction transaction) {
        // 第一阶段:准备阶段
        List<Boolean> prepareResults = new ArrayList<>();
        for (Participant participant : participants) {
            try {
                boolean prepared = participant.prepare(transaction);
                prepareResults.add(prepared);
            } catch (Exception e) {
                // 准备失败,回滚所有参与者
                rollbackAll(transaction);
                return false;
            }
        }
        
        // 检查所有参与者是否都准备好
        boolean allPrepared = prepareResults.stream().allMatch(Boolean::booleanValue);
        
        if (allPrepared) {
            // 第二阶段:提交阶段
            return commitAll(transaction);
        } else {
            // 回滚所有参与者
            rollbackAll(transaction);
            return false;
        }
    }
    
    private boolean commitAll(Transaction transaction) {
        for (Participant participant : participants) {
            try {
                participant.commit(transaction);
            } catch (Exception e) {
                // 提交失败,记录日志,后续重试
                log.error("Commit failed for participant: " + participant.getId(), e);
                return false;
            }
        }
        return true;
    }
    
    private void rollbackAll(Transaction transaction) {
        for (Participant participant : participants) {
            try {
                participant.rollback(transaction);
            } catch (Exception e) {
                log.error("Rollback failed for participant: " + participant.getId(), e);
            }
        }
    }
}

优缺点

优点

  • 强一致性保证
  • 实现相对简单

缺点

  • 阻塞协议,性能较差
  • 单点故障问题
  • 数据不可用时间长

2. 三阶段提交(3PC)

改进点

  • 增加超时机制
  • 减少阻塞时间
  • 引入预提交阶段
java
public class ThreePhaseCommitCoordinator {
    private static final int TIMEOUT = 30000; // 30秒超时
    
    public boolean executeTransaction(Transaction transaction) {
        // 第一阶段:CanCommit
        if (!canCommitPhase(transaction)) {
            return false;
        }
        
        // 第二阶段:PreCommit
        if (!preCommitPhase(transaction)) {
            rollbackAll(transaction);
            return false;
        }
        
        // 第三阶段:DoCommit
        return doCommitPhase(transaction);
    }
    
    private boolean canCommitPhase(Transaction transaction) {
        CompletableFuture<Boolean>[] futures = participants.stream()
            .map(p -> CompletableFuture.supplyAsync(() -> p.canCommit(transaction)))
            .toArray(CompletableFuture[]::new);
            
        try {
            CompletableFuture.allOf(futures)
                .get(TIMEOUT, TimeUnit.MILLISECONDS);
            return Arrays.stream(futures)
                .allMatch(f -> f.join());
        } catch (TimeoutException e) {
            return false;
        }
    }
}
public class ThreePhaseCommitCoordinator {
    private static final int TIMEOUT = 30000; // 30秒超时
    
    public boolean executeTransaction(Transaction transaction) {
        // 第一阶段:CanCommit
        if (!canCommitPhase(transaction)) {
            return false;
        }
        
        // 第二阶段:PreCommit
        if (!preCommitPhase(transaction)) {
            rollbackAll(transaction);
            return false;
        }
        
        // 第三阶段:DoCommit
        return doCommitPhase(transaction);
    }
    
    private boolean canCommitPhase(Transaction transaction) {
        CompletableFuture<Boolean>[] futures = participants.stream()
            .map(p -> CompletableFuture.supplyAsync(() -> p.canCommit(transaction)))
            .toArray(CompletableFuture[]::new);
            
        try {
            CompletableFuture.allOf(futures)
                .get(TIMEOUT, TimeUnit.MILLISECONDS);
            return Arrays.stream(futures)
                .allMatch(f -> f.join());
        } catch (TimeoutException e) {
            return false;
        }
    }
}

3. TCC(Try-Confirm-Cancel)

原理

TCC是一种补偿型事务,将业务逻辑分为Try、Confirm、Cancel三个阶段。

实现示例

java
// 账户服务的TCC实现
@Service
public class AccountTccService {
    
    @Autowired
    private AccountRepository accountRepository;
    
    // Try阶段:预留资源
    @Tcc(confirmMethod = "confirmTransfer", cancelMethod = "cancelTransfer")
    public boolean tryTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        Account from = accountRepository.findById(fromAccount);
        if (from.getBalance().compareTo(amount) < 0) {
            return false; // 余额不足
        }
        
        // 冻结金额
        from.setFrozenAmount(from.getFrozenAmount().add(amount));
        accountRepository.save(from);
        
        return true;
    }
    
    // Confirm阶段:确认执行
    public boolean confirmTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        Account from = accountRepository.findById(fromAccount);
        Account to = accountRepository.findById(toAccount);
        
        // 扣减余额和冻结金额
        from.setBalance(from.getBalance().subtract(amount));
        from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
        
        // 增加目标账户余额
        to.setBalance(to.getBalance().add(amount));
        
        accountRepository.save(from);
        accountRepository.save(to);
        
        return true;
    }
    
    // Cancel阶段:取消操作
    public boolean cancelTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        Account from = accountRepository.findById(fromAccount);
        
        // 释放冻结金额
        from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
        accountRepository.save(from);
        
        return true;
    }
}
// 账户服务的TCC实现
@Service
public class AccountTccService {
    
    @Autowired
    private AccountRepository accountRepository;
    
    // Try阶段:预留资源
    @Tcc(confirmMethod = "confirmTransfer", cancelMethod = "cancelTransfer")
    public boolean tryTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        Account from = accountRepository.findById(fromAccount);
        if (from.getBalance().compareTo(amount) < 0) {
            return false; // 余额不足
        }
        
        // 冻结金额
        from.setFrozenAmount(from.getFrozenAmount().add(amount));
        accountRepository.save(from);
        
        return true;
    }
    
    // Confirm阶段:确认执行
    public boolean confirmTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        Account from = accountRepository.findById(fromAccount);
        Account to = accountRepository.findById(toAccount);
        
        // 扣减余额和冻结金额
        from.setBalance(from.getBalance().subtract(amount));
        from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
        
        // 增加目标账户余额
        to.setBalance(to.getBalance().add(amount));
        
        accountRepository.save(from);
        accountRepository.save(to);
        
        return true;
    }
    
    // Cancel阶段:取消操作
    public boolean cancelTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        Account from = accountRepository.findById(fromAccount);
        
        // 释放冻结金额
        from.setFrozenAmount(from.getFrozenAmount().subtract(amount));
        accountRepository.save(from);
        
        return true;
    }
}

4. Saga模式

原理

Saga将长事务分解为多个本地事务,每个本地事务都有对应的补偿操作。

编排式Saga

java
@Component
public class OrderSagaOrchestrator {
    
    @Autowired
    private OrderService orderService;
    @Autowired
    private PaymentService paymentService;
    @Autowired
    private InventoryService inventoryService;
    
    public void processOrder(OrderRequest request) {
        SagaTransaction saga = SagaTransaction.builder()
            .addStep("createOrder", 
                () -> orderService.createOrder(request),
                () -> orderService.cancelOrder(request.getOrderId()))
            .addStep("processPayment",
                () -> paymentService.processPayment(request.getPayment()),
                () -> paymentService.refund(request.getPayment()))
            .addStep("reserveInventory",
                () -> inventoryService.reserve(request.getItems()),
                () -> inventoryService.release(request.getItems()))
            .build();
            
        saga.execute();
    }
}
@Component
public class OrderSagaOrchestrator {
    
    @Autowired
    private OrderService orderService;
    @Autowired
    private PaymentService paymentService;
    @Autowired
    private InventoryService inventoryService;
    
    public void processOrder(OrderRequest request) {
        SagaTransaction saga = SagaTransaction.builder()
            .addStep("createOrder", 
                () -> orderService.createOrder(request),
                () -> orderService.cancelOrder(request.getOrderId()))
            .addStep("processPayment",
                () -> paymentService.processPayment(request.getPayment()),
                () -> paymentService.refund(request.getPayment()))
            .addStep("reserveInventory",
                () -> inventoryService.reserve(request.getItems()),
                () -> inventoryService.release(request.getItems()))
            .build();
            
        saga.execute();
    }
}

协调式Saga

java
@SagaOrchestrationStart
public class OrderSaga {
    
    @SagaOrchestrationStart
    public void processOrder(OrderCreatedEvent event) {
        // 发送支付命令
        commandGateway.send(new ProcessPaymentCommand(event.getOrderId(), event.getAmount()));
    }
    
    @SagaOrchestrationAssociationProperty("orderId")
    @EventHandler
    public void handle(PaymentProcessedEvent event) {
        // 支付成功,预留库存
        commandGateway.send(new ReserveInventoryCommand(event.getOrderId(), event.getItems()));
    }
    
    @SagaOrchestrationAssociationProperty("orderId")
    @EventHandler
    public void handle(PaymentFailedEvent event) {
        // 支付失败,取消订单
        commandGateway.send(new CancelOrderCommand(event.getOrderId()));
    }
}
@SagaOrchestrationStart
public class OrderSaga {
    
    @SagaOrchestrationStart
    public void processOrder(OrderCreatedEvent event) {
        // 发送支付命令
        commandGateway.send(new ProcessPaymentCommand(event.getOrderId(), event.getAmount()));
    }
    
    @SagaOrchestrationAssociationProperty("orderId")
    @EventHandler
    public void handle(PaymentProcessedEvent event) {
        // 支付成功,预留库存
        commandGateway.send(new ReserveInventoryCommand(event.getOrderId(), event.getItems()));
    }
    
    @SagaOrchestrationAssociationProperty("orderId")
    @EventHandler
    public void handle(PaymentFailedEvent event) {
        // 支付失败,取消订单
        commandGateway.send(new CancelOrderCommand(event.getOrderId()));
    }
}

5. 本地消息表

原理

通过本地消息表保证本地事务和消息发送的一致性。

java
@Service
@Transactional
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private MessageRepository messageRepository;
    
    public void createOrder(Order order) {
        // 1. 保存订单
        orderRepository.save(order);
        
        // 2. 保存消息到本地消息表
        Message message = new Message();
        message.setTopic("order.created");
        message.setPayload(JSON.toJSONString(order));
        message.setStatus(MessageStatus.PENDING);
        messageRepository.save(message);
        
        // 本地事务提交后,异步发送消息
    }
}

@Component
public class MessagePublisher {
    
    @Scheduled(fixedDelay = 5000)
    public void publishPendingMessages() {
        List<Message> pendingMessages = messageRepository.findByStatus(MessageStatus.PENDING);
        
        for (Message message : pendingMessages) {
            try {
                messageProducer.send(message.getTopic(), message.getPayload());
                message.setStatus(MessageStatus.SENT);
                messageRepository.save(message);
            } catch (Exception e) {
                log.error("Failed to send message: " + message.getId(), e);
            }
        }
    }
}
@Service
@Transactional
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private MessageRepository messageRepository;
    
    public void createOrder(Order order) {
        // 1. 保存订单
        orderRepository.save(order);
        
        // 2. 保存消息到本地消息表
        Message message = new Message();
        message.setTopic("order.created");
        message.setPayload(JSON.toJSONString(order));
        message.setStatus(MessageStatus.PENDING);
        messageRepository.save(message);
        
        // 本地事务提交后,异步发送消息
    }
}

@Component
public class MessagePublisher {
    
    @Scheduled(fixedDelay = 5000)
    public void publishPendingMessages() {
        List<Message> pendingMessages = messageRepository.findByStatus(MessageStatus.PENDING);
        
        for (Message message : pendingMessages) {
            try {
                messageProducer.send(message.getTopic(), message.getPayload());
                message.setStatus(MessageStatus.SENT);
                messageRepository.save(message);
            } catch (Exception e) {
                log.error("Failed to send message: " + message.getId(), e);
            }
        }
    }
}

6. 消息事务

基于消息队列的事务

java
@Service
public class OrderTransactionService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void createOrderWithTransaction(Order order) {
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction("order-topic", order, order.getId());
    }
    
    @RocketMQTransactionListener
    public class OrderTransactionListener implements RocketMQLocalTransactionListener {
        
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                String orderId = (String) arg;
                // 执行本地事务
                orderService.createOrder(orderId);
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // 检查本地事务状态
            String orderId = new String(msg.getBody());
            Order order = orderRepository.findById(orderId);
            return order != null ? 
                RocketMQLocalTransactionState.COMMIT : 
                RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}
@Service
public class OrderTransactionService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void createOrderWithTransaction(Order order) {
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction("order-topic", order, order.getId());
    }
    
    @RocketMQTransactionListener
    public class OrderTransactionListener implements RocketMQLocalTransactionListener {
        
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                String orderId = (String) arg;
                // 执行本地事务
                orderService.createOrder(orderId);
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // 检查本地事务状态
            String orderId = new String(msg.getBody());
            Order order = orderRepository.findById(orderId);
            return order != null ? 
                RocketMQLocalTransactionState.COMMIT : 
                RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

分布式事务框架

Seata

配置

yaml
seata:
  enabled: true
  application-id: order-service
  tx-service-group: order-service-group
  service:
    vgroup-mapping:
      order-service-group: default
    grouplist:
      default: 127.0.0.1:8091
seata:
  enabled: true
  application-id: order-service
  tx-service-group: order-service-group
  service:
    vgroup-mapping:
      order-service-group: default
    grouplist:
      default: 127.0.0.1:8091

使用示例

java
@Service
public class BusinessService {
    
    @Autowired
    private StorageService storageService;
    @Autowired
    private OrderService orderService;
    
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        // 扣减库存
        storageService.deduct(commodityCode, orderCount);
        
        // 创建订单
        orderService.create(userId, commodityCode, orderCount);
        
        // 如果发生异常,Seata会自动回滚所有操作
    }
}
@Service
public class BusinessService {
    
    @Autowired
    private StorageService storageService;
    @Autowired
    private OrderService orderService;
    
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        // 扣减库存
        storageService.deduct(commodityCode, orderCount);
        
        // 创建订单
        orderService.create(userId, commodityCode, orderCount);
        
        // 如果发生异常,Seata会自动回滚所有操作
    }
}

选择策略

场景分析

场景一致性要求性能要求推荐方案
金融转账强一致性中等2PC/TCC
订单处理最终一致性Saga/消息事务
库存扣减强一致性TCC
数据同步最终一致性本地消息表

技术选型建议

  1. 强一致性场景

    • 简单场景:2PC + Seata
    • 复杂场景:TCC
  2. 最终一致性场景

    • 事件驱动:Saga模式
    • 消息驱动:本地消息表
  3. 高性能场景

    • 避免使用2PC
    • 优先考虑异步方案

最佳实践

  1. 幂等性设计:确保重试不会产生副作用
  2. 超时处理:设置合理的超时时间
  3. 监控告警:监控事务执行状态
  4. 补偿机制:设计完善的补偿逻辑
  5. 测试验证:充分测试各种异常场景
  6. 性能优化:减少跨网络调用
  7. 故障恢复:设计故障恢复机制

总结

分布式事务是分布式系统中的重要问题,需要根据业务特点选择合适的解决方案。在保证数据一致性的同时,也要考虑性能、可用性和复杂度的平衡。