SpringBoot异步处理与线程池配置详解

前言

最近做项目的时候,有些耗时操作比如发送邮件、生成报表什么的,会影响接口响应速度

所以用异步处理来优化,顺便记录一下SpringBoot异步处理的使用和线程池配置

@Async注解

SpringBoot提供了@Async注解,可以让方法异步执行

开启异步支持

首先在启动类或者配置类上添加@EnableAsync注解:

1
2
3
4
5
6
7
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class EmailService {

@Async
public void sendEmail(String to, String subject, String content) {
// 模拟耗时操作
try {
Thread.sleep(3000);
System.out.println("发送邮件给:" + to);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

调用的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
public class UserController {

@Autowired
EmailService emailService;

@PostMapping("/register")
public String register(@RequestBody User user) {
// 保存用户
// ...

// 异步发送欢迎邮件
emailService.sendEmail(user.getEmail(), "欢迎注册", "欢迎来到我们的平台");

return "注册成功";
}
}

这样sendEmail方法会在另一个线程中执行,不会阻塞主线程

返回值的处理

如果需要获取异步方法的返回值,可以用Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class ReportService {

@Async
public Future<String> generateReport(Long userId) {
try {
Thread.sleep(5000);
String reportUrl = "http://example.com/report/" + userId + ".pdf";
return new AsyncResult<>(reportUrl);
} catch (InterruptedException e) {
e.printStackTrace();
return new AsyncResult<>(null);
}
}
}

调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@RestController
public class ReportController {

@Autowired
ReportService reportService;

@GetMapping("/report/{userId}")
public String generateReport(@PathVariable Long userId) {
Future<String> future = reportService.generateReport(userId);

// 可以做其他事情

try {
// 获取结果
String reportUrl = future.get();
return "报表生成成功:" + reportUrl;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return "报表生成失败";
}
}

@GetMapping("/report/{userId}/async")
public String generateReportAsync(@PathVariable Long userId) {
// 异步生成,先返回任务ID
Future<String> future = reportService.generateReport(userId);
String taskId = UUID.randomUUID().toString();

// 可以把future和taskId的对应关系保存起来,后续查询结果

return "任务已提交,任务ID:" + taskId;
}
}

自定义线程池

默认的线程池配置可能不太够用,建议自定义线程池

方式一:配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Configuration
@EnableAsync
public class AsyncConfig {

@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 核心线程数
executor.setCorePoolSize(5);

// 最大线程数
executor.setMaxPoolSize(10);

// 队列容量
executor.setQueueCapacity(200);

// 线程名前缀
executor.setThreadNamePrefix("async-task-");

// 拒绝策略:由调用线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);

// 等待时间(秒)
executor.setAwaitTerminationSeconds(60);

executor.initialize();
return executor;
}

@Bean(name = "emailExecutor")
public Executor emailExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("email-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

使用指定的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class EmailService {

@Async("emailExecutor")
public void sendEmail(String to, String subject, String content) {
// 使用emailExecutor线程池执行
}

@Async("taskExecutor")
public void sendEmail2(String to, String subject, String content) {
// 使用taskExecutor线程池执行
}
}

线程池参数说明

核心参数

  • corePoolSize:核心线程数,即使空闲也会保留在池中
  • maxPoolSize:最大线程数
  • queueCapacity:队列容量,当核心线程都在运行时,新任务会放入队列
  • keepAliveTime:非核心线程的空闲存活时间
  • threadNamePrefix:线程名前缀,方便排查问题

拒绝策略

当队列满了并且线程数达到最大值时,新任务会被拒绝

JDK提供了4种拒绝策略:

  1. AbortPolicy(默认):直接抛出异常
  2. CallerRunsPolicy:由调用线程执行该任务
  3. DiscardPolicy:直接丢弃,不抛异常
  4. DiscardOldestPolicy:丢弃队列中最老的任务,然后重新尝试

一般推荐用CallerRunsPolicy,这样可以减缓提交速度

1
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

线程池大小配置

CPU密集型任务

1
2
// 核心线程数 = CPU核心数 + 1
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;

IO密集型任务

1
2
// 核心线程数 = CPU核心数 * 2
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;

混合型任务

根据实际业务压测结果调整,一般:

  • 核心线程数:10-20
  • 最大线程数:50-100
  • 队列容量:100-1000

异常处理

异步方法里的异常不会直接抛到调用方,需要特殊处理

方式一:在方法内部处理

1
2
3
4
5
6
7
8
9
@Async
public void asyncMethod() {
try {
// 业务逻辑
} catch (Exception e) {
// 处理异常
log.error("异步任务执行失败", e);
}
}

方式二:使用AsyncUncaughtExceptionHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("async-task-");
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}

static class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("异步方法[{}]执行异常,参数:{}", method.getName(), params, ex);
}
}
}

监控线程池

生产环境建议监控线程池状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class ThreadPoolMonitor {

@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor taskExecutor;

@Scheduled(cron = "0/30 * * * * ?")
public void monitor() {
ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor();
log.info("线程池状态 - " +
"核心线程数: {}, " +
"最大线程数: {}, " +
"当前线程数: {}, " +
"活跃线程数: {}, " +
"队列大小: {}, " +
"已完成任务数: {}",
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getPoolSize(),
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
);
}
}

注意事项

  1. 不要在同一个类中调用异步方法,Spring的AOP代理会失效
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class UserService {

@Async
public void asyncMethod() {
// ...
}

public void method() {
// 这样调用不会异步执行
this.asyncMethod();
}
}

应该通过注入的bean调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class UserService {

@Autowired
private UserService self;

public void method() {
// 这样才会异步执行
self.asyncMethod();
}

@Async
public void asyncMethod() {
// ...
}
}
  1. 异步方法不能是private的

  2. 注意事务,异步方法和调用方法不在同一个线程,事务可能失效

总结

SpringBoot的异步处理用起来还挺方便的,主要是配置好线程池

关键点:

  1. @EnableAsync开启异步支持
  2. @Async标注异步方法
  3. 合理配置线程池参数
  4. 注意异常处理
  5. 避免在同类中调用异步方法

暂时就先记录这么多