关于解决分布式事务一致性问题的实践

前言

微服务架构下,一个业务操作可能涉及多个服务,如何保证数据一致性是个问题

比如下单操作,要扣库存、创建订单、扣减积分,这三个操作在三个不同的服务里

如果其中一个失败了,怎么保证数据一致性?

其实这个问题挺经典的,今天记录一下几种解决方案

本地事务的问题

在单体应用里,用@Transactional就能保证事务一致性:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public void placeOrder(Order order) {
// 1. 扣库存
inventoryService.deduct(order.getProductId(), order.getCount());

// 2. 创建订单
orderRepository.save(order);

// 3. 扣减积分
pointService.deduct(order.getUserId(), order.getPoint());

// 任何一个失败,整个事务回滚
}

但是在微服务架构下,这三个方法可能在三个不同的服务里,@Transactional就失效了

解决方案

方案一:2PC(两阶段提交)

Seata实现

添加依赖:

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

配置文件:

1
2
3
4
5
6
7
8
9
10
11
seata:
enabled: true
application-id: order-service
tx-service-group: my_test_tx_group
service:
vgroup-mapping:
my_test_tx_group: default
grouplist:
default: 127.0.0.1:8091
registry:
type: file

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
@GlobalTransactional  // Seata全局事务注解
public void placeOrder(Order order) {
// 1. 扣库存(远程调用)
inventoryFeignClient.deduct(order.getProductId(), order.getCount());

// 2. 创建订单(本地事务)
orderRepository.save(order);

// 3. 扣减积分(远程调用)
pointFeignClient.deduct(order.getUserId(), order.getPoint());

// Seata会自动协调各个参与者,要么全部提交,要么全部回滚
}

Seata的2PC模式:

  1. 准备阶段:每个分支事务执行业务SQL,提交前先注册分支
  2. 提交阶段:如果所有分支都成功,全局提交;否则全局回滚

优点:对业务代码侵入小
缺点:性能较差,锁资源时间长

方案二:TCC(Try-Confirm-Cancel)

TCC是应用层的2PC,需要业务接口实现三个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@LocalTCC
public interface InventoryTccService {

/**
* Try:资源预留
*/
@TwoPhaseBusinessAction(name = "inventoryDeduct", commitMethod = "confirm", rollbackMethod = "cancel")
boolean deduct(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);

/**
* Confirm:确认执行
*/
boolean confirm(BusinessActionContext context);

/**
* Cancel:取消执行
*/
boolean cancel(BusinessActionContext context);
}

实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Service
public class InventoryTccServiceImpl implements InventoryTccService {

@Autowired
private InventoryRepository inventoryRepository;

@Autowired
private InventoryFreezeRepository freezeRepository;

@Override
@Transactional
public boolean deduct(Long productId, Integer count) {
// Try:冻结库存
Inventory inventory = inventoryRepository.findByProductId(productId);
if (inventory.getStock() < count) {
throw new RuntimeException("库存不足");
}

inventory.setStock(inventory.getStock() - count);
inventoryRepository.save(inventory);

// 记录冻结库存
InventoryFreeze freeze = new InventoryFreeze();
freeze.setProductId(productId);
freeze.setCount(count);
freezeRepository.save(freeze);

return true;
}

@Override
@Transactional
public boolean confirm(BusinessActionContext context) {
Long productId = context.getActionContext("productId", Long.class);
Integer count = context.getActionContext("count", Integer.class);

// Confirm:删除冻结记录,实际库存已经在Try中扣除了
freezeRepository.deleteByProductId(productId);
return true;
}

@Override
@Transactional
public boolean cancel(BusinessActionContext context) {
Long productId = context.getActionContext("productId", Long.class);
Integer count = context.getActionContext("count", Integer.class);

// Cancel:恢复库存
InventoryFreeze freeze = freezeRepository.findByProductId(productId);
if (freeze != null) {
Inventory inventory = inventoryRepository.findByProductId(productId);
inventory.setStock(inventory.getStock() + count);
inventoryRepository.save(inventory);

freezeRepository.delete(freeze);
}
return true;
}
}

业务层调用:

1
2
3
4
5
6
7
8
9
@GlobalTransactional
public void placeOrder(Order order) {
// 使用TCC接口
inventoryTccService.deduct(order.getProductId(), order.getCount());

orderRepository.save(order);

pointFeignClient.deduct(order.getUserId(), order.getPoint());
}

TCC的优点:

  • 性能比2PC好,锁资源时间短
  • 一致性保证强

缺点:

  • 代码侵入性强
  • 需要实现三个方法,开发成本高
  • 要考虑幂等性、空回滚、悬挂等异常情况

方案三:本地消息表

核心思想:使用本地事务 + 消息队列,保证最终一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Service
public class OrderService {

@Autowired
private OrderRepository orderRepository;

@Autowired
private LocalMessageRepository messageRepository;

@Autowired
private RabbitTemplate rabbitTemplate;

@Transactional
public void placeOrder(Order order) {
// 1. 创建订单(本地事务)
orderRepository.save(order);

// 2. 创建本地消息(同一个事务)
LocalMessage message = new LocalMessage();
message.setId(UUID.randomUUID().toString());
message.setType("DEDUCT_INVENTORY");
message.setContent(JSON.toJSONString(order));
message.setStatus("PENDING");
messageRepository.save(message);

// 3. 发送消息到MQ(事务提交后)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
rabbitTemplate.convertAndSend("order.exchange", "order.key", message.getId());
}
});
}
}

库存服务监听消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {

@Autowired
private LocalMessageRepository messageRepository;

@Autowired
private InventoryService inventoryService;

@RabbitHandler
public void handleMessage(String messageId) {
LocalMessage message = messageRepository.findById(messageId).orElse(null);
if (message == null) {
return;
}

try {
// 处理业务
Order order = JSON.parseObject(message.getContent(), Order.class);
inventoryService.deduct(order.getProductId(), order.getCount());

// 更新消息状态
message.setStatus("CONSUMED");
messageRepository.save(message);
} catch (Exception e) {
// 消费失败,稍后重试
log.error("消费失败", e);
}
}
}

定时任务扫描未消费的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class MessageRetryScheduler {

@Autowired
private LocalMessageRepository messageRepository;

@Autowired
private RabbitTemplate rabbitTemplate;

// 每分钟扫描一次
@Scheduled(cron = "0 */1 * * * ?")
public void retryPendingMessages() {
List<LocalMessage> messages = messageRepository.findByStatus("PENDING");

for (LocalMessage message : messages) {
try {
rabbitTemplate.convertAndSend("order.exchange", "order.key", message.getId());
} catch (Exception e) {
log.error("重试失败", e);
}
}
}
}

方案四:MQ事务消息

RocketMQ提供了事务消息功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class OrderService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void placeOrder(Order order) {
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order-group",
"order-topic",
MessageBuilder.withPayload(order).build(),
null
);
}
}

事务监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderRepository orderRepository;

@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) msg.getPayload();
orderRepository.save(order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务是否执行成功
Order order = (Order) msg.getPayload();
Order saved = orderRepository.findById(order.getId()).orElse(null);
if (saved != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}

总结

分布式事务没有银弹,根据业务场景选择合适的方案:

方案 一致性 性能 复杂度 适用场景
2PC(Seata AT) 强一致 对一致性要求高的核心业务
TCC 强一致 对一致性要求高,且资源有限
本地消息表 最终一致 对实时性要求不高的业务
MQ事务消息 最终一致 异步解耦,最终一致性

建议:

  1. 能用本地事务就别用分布式事务
  2. 对一致性要求高的用2PC或TCC
  3. 对实时性要求不高的用消息队列
  4. 尽量避免分布式事务,通过业务设计规避

暂时就先记录这么多