Seata分布式事务实战笔记

前言

微服务架构下,分布式事务是个头疼的问题

之前试过本地消息表、RocketMQ事务消息,但感觉都有点复杂

所以决定用Seata试试,据说使用起来比较简单

记录一下使用过程

Seata简介

Seata是阿里开源的分布式事务解决方案,支持AT、TCC、SAGA、XA四种模式

  • AT模式:默认模式,对业务无侵入
  • TCC模式:性能好,但要写三个方法
  • SAGA模式:长事务场景
  • XA模式:强一致性,性能较差

安装Seata Server

Docker安装

1
2
3
4
5
6
7
8
9
docker pull seataio/seata-server:1.7.0

docker run -d \
--name seata-server \
-p 8091:8091 \
-p 7091:7091 \
-e SEATA_PORT=8091 \
-e SEATA_IP=192.168.1.100 \
seataio/seata-server:1.7.0

本地安装

1
2
3
4
5
6
7
8
9
10
11
12
# 下载
wget https://github.com/seata/seata/releases/download/v1.7.0/seata-server-1.7.0.zip

# 解压
unzip seata-server-1.7.0.zip
cd seata-server-1.7.0

# 修改配置文件
vim conf/file.conf

# 启动
sh bin/seata-server.sh -p 8091 -h 192.168.1.100

配置中心

使用Nacos作为配置中心

修改Seata配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# conf/file.conf
transport {
type = "TCP"
server = "NIO"
heartbeat = true
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelectorThread"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
worker-thread-size = "default"
boss-thread-size = 1
}
shutdown {
wait = 3
}
}

service {
vgroup-mapping.my_test_tx_group = "default"
default.grouplist = "192.168.1.100:8091"
enable-degrade = false
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

## transaction log store, only used in seata-server
store {
mode = "db"
db {
datasource = "druid"
db-type = "mysql"
driver-class-name = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root"
password = "123456"
min-conn = 5
max-conn = 10
global-table = "global_table"
branch-table = "branch_table"
lock-table = "lock_table"
distributed-lock-table = "distributed_lock"
query-limit = 100
max-wait = 5000
}
}

创建数据库表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
-- 创建数据库
CREATE DATABASE seata;

USE seata;

-- Global table
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
);

-- Branch table
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
);

-- Lock table
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`)
);

-- Distributed lock table
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` VARCHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
PRIMARY KEY (`lock_key`)
);

SpringBoot整合

添加依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.7.0</version>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
seata:
enabled: true
application-id: order-service
tx-service-group: my_test_tx_group
service:
vgroup-mapping:
my_test_tx_group: default
grouplist:
default: 192.168.1.100:8091
config:
type: nacos
nacos:
server-addr: 192.168.1.100:8848
namespace: seata
group: SEATA_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
server-addr: 192.168.1.100:8848
namespace: seata
group: SEATA_GROUP
application: seata-server

业务表创建undo_log

每个业务数据库都要创建undo_log表:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME(6) NOT NULL,
`log_modified` DATETIME(6) NOT NULL,
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;

使用示例

订单服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private InventoryFeignClient inventoryClient;

@Autowired
private AccountFeignClient accountClient;

// 全局事务注解
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 创建订单
order.setStatus("CREATED");
orderMapper.insert(order);

// 2. 扣减库存
inventoryClient.deduct(order.getProductId(), order.getCount());

// 3. 扣减账户余额
accountClient.deduct(order.getUserId(), order.getAmount());

// 更新订单状态
order.setStatus("FINISHED");
orderMapper.updateById(order);
}
}

库存服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class InventoryService {

@Autowired
private InventoryMapper inventoryMapper;

// 不需要额外注解,Seata会自动代理
public void deduct(Long productId, Integer count) {
Inventory inventory = inventoryMapper.selectById(productId);
if (inventory.getStock() < count) {
throw new RuntimeException("库存不足");
}
inventory.setStock(inventory.getStock() - count);
inventoryMapper.updateById(inventory);
}
}

账户服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class AccountService {

@Autowired
private AccountMapper accountMapper;

public void deduct(Long userId, BigDecimal amount) {
Account account = accountMapper.selectById(userId);
if (account.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("余额不足");
}
account.setBalance(account.getBalance().subtract(amount));
accountMapper.updateById(account);
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@RequestMapping("/order")
public class OrderController {

@Autowired
private OrderService orderService;

@PostMapping("/create")
public String createOrder(@RequestBody Order order) {
try {
orderService.createOrder(order);
return "订单创建成功";
} catch (Exception e) {
return "订单创建失败:" + e.getMessage();
}
}
}

TCC模式

如果对性能要求高,可以用TCC模式

定义TCC接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@LocalTCC
public interface InventoryTccService {

/**
* Try:预留资源
*/
@TwoPhaseBusinessAction(name = "inventoryDeduct", commitMethod = "commit", rollbackMethod = "rollback")
boolean deduct(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);

/**
* Confirm:确认执行
*/
boolean commit(BusinessActionContext context);

/**
* Cancel:取消执行
*/
boolean rollback(BusinessActionContext context);
}

实现TCC接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Service
public class InventoryTccServiceImpl implements InventoryTccService {

@Autowired
private InventoryMapper inventoryMapper;

@Autowired
private InventoryFreezeMapper freezeMapper;

@Override
@Transactional
public boolean deduct(Long productId, Integer count) {
// Try:冻结库存
Inventory inventory = inventoryMapper.selectById(productId);
if (inventory.getStock() < count) {
throw new RuntimeException("库存不足");
}

inventory.setStock(inventory.getStock() - count);
inventoryMapper.updateById(inventory);

// 记录冻结库存
InventoryFreeze freeze = new InventoryFreeze();
freeze.setProductId(productId);
freeze.setCount(count);
freeze.setTransactionId(RootContext.getXID());
freezeMapper.insert(freeze);

return true;
}

@Override
@Transactional
public boolean commit(BusinessActionContext context) {
String xid = context.getXid();
// Confirm:删除冻结记录
freezeMapper.deleteByTransactionId(xid);
return true;
}

@Override
@Transactional
public boolean rollback(BusinessActionContext context) {
Long productId = context.getActionContext("productId", Long.class);
Integer count = context.getActionContext("count", Integer.class);
String xid = context.getXid();

// Cancel:恢复库存
InventoryFreeze freeze = freezeMapper.selectByTransactionId(xid);
if (freeze != null) {
Inventory inventory = inventoryMapper.selectById(productId);
inventory.setStock(inventory.getStock() + count);
inventoryMapper.updateById(inventory);

freezeMapper.deleteByTransactionId(xid);
}
return true;
}
}

常见问题

问题1:事务超时

1
2
3
4
5
6
7
8
9
10
11
seata:
client:
rm:
report-success-enable: true
report-retry-count: 5
async-commit-buffer-limit: 10000
tm:
commit-retry-count: 5
rollback-retry-count: 5
transaction:
rollback-retry-timeout: 30000

问题2:锁超时

1
2
3
4
5
6
seata:
client:
rm:
lock:
retry-interval: 10
retry-times: 30

问题3:循环依赖

如果服务互相调用,要注意避免循环依赖,可以用消息队列解耦

总结

Seata用起来确实比较简单,特别是AT模式,对业务代码无侵入

适用场景:

  • 对一致性要求高的业务
  • 微服务架构
  • 跨库事务

注意事项:

  • 分布式事务有性能损耗,不要滥用
  • 考虑使用TCC模式提升性能
  • 做好监控和日志
  • 充分测试各种异常场景

暂时就先记录这么多