前言
微服务架构下,一个业务操作可能涉及多个服务,如何保证数据一致性是个问题
比如下单操作,要扣库存、创建订单、扣减积分,这三个操作在三个不同的服务里
如果其中一个失败了,怎么保证数据一致性?
其实这个问题挺经典的,今天记录一下几种解决方案
本地事务的问题
在单体应用里,用@Transactional就能保证事务一致性:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Transactional public void placeOrder(Order order) { inventoryService.deduct(order.getProductId(), order.getCount());
orderRepository.save(order);
pointService.deduct(order.getUserId(), order.getPoint());
}
|
但是在微服务架构下,这三个方法可能在三个不同的服务里,@Transactional就失效了
解决方案
方案一:2PC(两阶段提交)
Seata实现
添加依赖:
1 2 3 4
| <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>
|
配置文件:
1 2 3 4 5 6 7 8 9 10 11
| seata: enabled: true application-id: order-service tx-service-group: my_test_tx_group service: vgroup-mapping: my_test_tx_group: default grouplist: default: 127.0.0.1:8091 registry: type: file
|
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @GlobalTransactional public void placeOrder(Order order) { inventoryFeignClient.deduct(order.getProductId(), order.getCount());
orderRepository.save(order);
pointFeignClient.deduct(order.getUserId(), order.getPoint());
}
|
Seata的2PC模式:
- 准备阶段:每个分支事务执行业务SQL,提交前先注册分支
- 提交阶段:如果所有分支都成功,全局提交;否则全局回滚
优点:对业务代码侵入小
缺点:性能较差,锁资源时间长
方案二:TCC(Try-Confirm-Cancel)
TCC是应用层的2PC,需要业务接口实现三个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @LocalTCC public interface InventoryTccService {
@TwoPhaseBusinessAction(name = "inventoryDeduct", commitMethod = "confirm", rollbackMethod = "cancel") boolean deduct(@BusinessActionContextParameter(paramName = "productId") Long productId, @BusinessActionContextParameter(paramName = "count") Integer count);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context); }
|
实现类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @Service public class InventoryTccServiceImpl implements InventoryTccService {
@Autowired private InventoryRepository inventoryRepository;
@Autowired private InventoryFreezeRepository freezeRepository;
@Override @Transactional public boolean deduct(Long productId, Integer count) { Inventory inventory = inventoryRepository.findByProductId(productId); if (inventory.getStock() < count) { throw new RuntimeException("库存不足"); }
inventory.setStock(inventory.getStock() - count); inventoryRepository.save(inventory);
InventoryFreeze freeze = new InventoryFreeze(); freeze.setProductId(productId); freeze.setCount(count); freezeRepository.save(freeze);
return true; }
@Override @Transactional public boolean confirm(BusinessActionContext context) { Long productId = context.getActionContext("productId", Long.class); Integer count = context.getActionContext("count", Integer.class);
freezeRepository.deleteByProductId(productId); return true; }
@Override @Transactional public boolean cancel(BusinessActionContext context) { Long productId = context.getActionContext("productId", Long.class); Integer count = context.getActionContext("count", Integer.class);
InventoryFreeze freeze = freezeRepository.findByProductId(productId); if (freeze != null) { Inventory inventory = inventoryRepository.findByProductId(productId); inventory.setStock(inventory.getStock() + count); inventoryRepository.save(inventory);
freezeRepository.delete(freeze); } return true; } }
|
业务层调用:
1 2 3 4 5 6 7 8 9
| @GlobalTransactional public void placeOrder(Order order) { inventoryTccService.deduct(order.getProductId(), order.getCount());
orderRepository.save(order);
pointFeignClient.deduct(order.getUserId(), order.getPoint()); }
|
TCC的优点:
缺点:
- 代码侵入性强
- 需要实现三个方法,开发成本高
- 要考虑幂等性、空回滚、悬挂等异常情况
方案三:本地消息表
核心思想:使用本地事务 + 消息队列,保证最终一致性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Service public class OrderService {
@Autowired private OrderRepository orderRepository;
@Autowired private LocalMessageRepository messageRepository;
@Autowired private RabbitTemplate rabbitTemplate;
@Transactional public void placeOrder(Order order) { orderRepository.save(order);
LocalMessage message = new LocalMessage(); message.setId(UUID.randomUUID().toString()); message.setType("DEDUCT_INVENTORY"); message.setContent(JSON.toJSONString(order)); message.setStatus("PENDING"); messageRepository.save(message);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { rabbitTemplate.convertAndSend("order.exchange", "order.key", message.getId()); } }); } }
|
库存服务监听消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Component @RabbitListener(queues = "inventory.queue") public class InventoryConsumer {
@Autowired private LocalMessageRepository messageRepository;
@Autowired private InventoryService inventoryService;
@RabbitHandler public void handleMessage(String messageId) { LocalMessage message = messageRepository.findById(messageId).orElse(null); if (message == null) { return; }
try { Order order = JSON.parseObject(message.getContent(), Order.class); inventoryService.deduct(order.getProductId(), order.getCount());
message.setStatus("CONSUMED"); messageRepository.save(message); } catch (Exception e) { log.error("消费失败", e); } } }
|
定时任务扫描未消费的消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class MessageRetryScheduler {
@Autowired private LocalMessageRepository messageRepository;
@Autowired private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0 */1 * * * ?") public void retryPendingMessages() { List<LocalMessage> messages = messageRepository.findByStatus("PENDING");
for (LocalMessage message : messages) { try { rabbitTemplate.convertAndSend("order.exchange", "order.key", message.getId()); } catch (Exception e) { log.error("重试失败", e); } } } }
|
方案四:MQ事务消息
RocketMQ提供了事务消息功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Service public class OrderService {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void placeOrder(Order order) { rocketMQTemplate.sendMessageInTransaction( "order-group", "order-topic", MessageBuilder.withPayload(order).build(), null ); } }
|
事务监听器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired private OrderRepository orderRepository;
@Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { Order order = (Order) msg.getPayload(); orderRepository.save(order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { Order order = (Order) msg.getPayload(); Order saved = orderRepository.findById(order.getId()).orElse(null); if (saved != null) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.UNKNOWN; } }
|
总结
分布式事务没有银弹,根据业务场景选择合适的方案:
| 方案 |
一致性 |
性能 |
复杂度 |
适用场景 |
| 2PC(Seata AT) |
强一致 |
低 |
低 |
对一致性要求高的核心业务 |
| TCC |
强一致 |
中 |
高 |
对一致性要求高,且资源有限 |
| 本地消息表 |
最终一致 |
高 |
中 |
对实时性要求不高的业务 |
| MQ事务消息 |
最终一致 |
高 |
中 |
异步解耦,最终一致性 |
建议:
- 能用本地事务就别用分布式事务
- 对一致性要求高的用2PC或TCC
- 对实时性要求不高的用消息队列
- 尽量避免分布式事务,通过业务设计规避
暂时就先记录这么多