SpringBoot源码系列(10):@Async原理

布鸽不鸽 Lv4

前言

在SpringBoot中,我们可以使用异步操作来处理耗时的任务。通常我们需要在想异步执行的方法上标注@Async,然后在主启动类上标注@EnableAsync开启异步功能。关于其应用,可以查看我之前的文章:SpringBoot中的@Asnyc注解。本文将讨论以下问题:

  • @EnableAsync@Async的原理是什么?
  • SpringBoot中默认的线程池/执行器是如何被指定的?
  • @Async为什么能通过value属性指定使用的执行器?

原文地址:https://xuedongyun.cn/post/24765/

异步处理的原理(@EnableAsync)

EnableAsync

我们在主启动类上标注了@EnableAsync注解。该注解通过@Import注解,导入了AsyncConfigurationSelector

1
2
3
4
5
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync { }

AsyncConfigurationSelector

继承关系:ImportSelector<-AdviceModeImportSelector<EnableAsync><-AsyncConfigurationSelector

我们先看一看父类中的方法:

@Configuration配置类上,使用@Import注解导入一个实现了ImportSelector接口的类时:会根据@Configuration配置类的AnnotationMetadata,返回需要导入到容器中的类的名称

1
2
3
4
5
6
7
8
9
10
11
public interface ImportSelector {

// 根据@Configuration配置类的AnnotationMetadata,返回一个class名称列表,来决定向容器中导入哪些类
String[] selectImports(AnnotationMetadata importingClassMetadata);

@Nullable
default Predicate<String> getExclusionFilter() {
return null;
}

}

AdviceModeImportSelector<EnableAsync>类,实现了selectImports方法。该方法用于获取@EnableAsync注解中的mode对应的值,并调用selectImports(adviceMode)方法。最终返回需要导入的类。这正是我们AsyncConfigurationSelector实现的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 代码有删减
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {

public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

protected String getAdviceModeAttributeName() {
return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
}

@Override
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
// 用于当前类,泛型的具体类型(此处是EnableAsync)
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "...");

// 获取@Configuration配置类的@EnableAsync的属性
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) { throw new IllegalArgumentException("..."); }

// 此处getAdviceModeAttributeName()返回"mode",@EnableAsync注解的"mode"属性默认值是"PROXY"
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());

// 调用了selectImports(adviceMode)方法
String[] imports = selectImports(adviceMode);
return imports;
}

@Nullable
protected abstract String[] selectImports(AdviceMode adviceMode);

}

此时,我们将向容器中导入ProxyAsyncConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
// 向容器中导入ProxyAsyncConfiguration
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}

}

ProxyAsyncConfiguration

继承关系:AbstractAsyncConfiguration<-ProxyAsyncConfiguration

导入的ProxyAsyncConfiguration是一个@Configuration配置类,它通过@Bean注入了AsyncAnnotationBeanPostProcessor类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 代码有删改
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {

// 创建了一个PostProcessor,放到容器中
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();

// 设置了执行器和错误处理器
bpp.configure(this.executor, this.exceptionHandler);

// ...
return bpp;
}
}

在其父类AbstractAsyncConfiguration中,通过@Autowired拿到了容器中实现了AsyncConfigurer的组件(获取了线程池)。this.executor就是在此获取值的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {

// 通过@Autowired拿到了容器中实现了AsyncConfigurer的组件(获取了线程池)
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
// 没有就返回空
return null;
}
if (candidates.size() > 1) {
// 只能有一个实现AsyncConfigurer接口的组件
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});

// this.executor在此处获取
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}
}

小结1:我们可以向容器中放入实现了AsyncConfigurer接口的组件,来向容器中放入线程池

AsyncAnnotationBeanPostProcessor

继承关系:BeanFactoryAware<-AbstractBeanFactoryAwareAdvisingPostProcessor<-AsyncAnnotationBeanPostProcessor

此时AsyncAnnotationBeanPostProcessor往容器中创建了一个增强器AsyncAnnotationAdvisor

Spring中,如果Bean实现了BeanNameAware 接口,则会默认调用setBeanFactory方法,获取BeanFactory实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 代码有删改
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//创建一个增强器
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}

public void configure(@Nullable Supplier<Executor> executor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

this.executor = executor;
this.exceptionHandler = exceptionHandler;
}

}

AsyncAnnotationAdvisor

AsyncAnnotationAdvisor中,创建了增强方法this.advice,它其实也实现了BeanNameAware接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 代码有删改
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

// 增强方法this.advice真实类型:AnnotationAsyncExecutionInterceptor
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}

protected Advice buildAdvice(@Nullable Supplier<Executor> executor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

// 拦截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置此切面的执行器和异常处理器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}

// 配置此切面的执行器和异常处理器
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}

// 此处this.advice也实现了BeanNameAware接口
@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
}

AnnotationAsyncExecutionInterceptor

继承关系:MethodInterceptor&AsyncExecutionAspectSupport<-AsyncExecutionInterceptor<-AnnotationAsyncExecutionInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {

public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 拿到了默认的执行器/线程池,
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}

@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 拿唯一的类型为TaskExecutor的组件
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
try {
// 容器里有多个TaskExecutor,拿名字叫"taskExecutor"的,类型为Executor的组件
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
// 容器里没有TaskExecutor,拿名字叫"taskExecutor"的,类型为Executor的组件
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
}
}
return null;
}
}

更进一步的,AsyncExecutionInterceptor继续重写了getDefaultExecutor,加入一个新的逻辑:如果没有默认执行器,就用SimpleAsyncTaskExecutor

1
2
3
4
5
6
7
8
9
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
}

小结2:若容器中只有一个TaskExecutor组件,其为默认执行器;若不唯一,拿名字叫”taskExecutor”的,类型为Executor的组件。若都不满足,使用SimpleAsyncTaskExecutor作为默认执行器(每次执行被注解方法时,单独创建一个Thread来执行)

AsyncExecutionInterceptor中,实现了invoke方法,把原来方法的调用提交到新的线程池执行,从而实现了方法的异步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 代码有删改
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}

@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {

// 决定使用哪个executor
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);

// 创建task(去执行原来方法)
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
return null;
};

// 提交到新的线程池执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

}

异步处理的原理(@Async)

AnnotationAsyncExecutionInterceptor

继承关系:MethodInterceptor&AsyncExecutionAspectSupport<-AsyncExecutionInterceptor<-AnnotationAsyncExecutionInterceptor

我们继续上面一节的内容,AsyncExecutionAspectSupport中实现了determineAsyncExecutor方法,用于确定使用哪个executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 根据@Async注解的value属性,查找对应类型为Executor的异步执行器
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
// 如果没找到,获取defaultExecutor(之前已经获取过了)
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
// 缓存,为某个方法指定executor
this.executors.put(method, executor);
}
return executor;
}

AnnotationAsyncExecutionInterceptor中实现了getExecutorQualifier方法

1
2
3
4
5
6
7
8
9
10
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// 核心:拿到@Async注解标注的value
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}

小结3:我们可以通过@Async(value="xxx")来手动指定想要使用的执行器

SpringBoot中的默认线程池

TaskExecutionAutoConfiguration

SpringBoot中的线程池,是通过TaskExecutionAutoConfiguration这个自动配置类加载的。我们来看一下其中的源码。

  1. taskExecutorBuilder方法用于创建一个构造器
  2. applicationTaskExecutor方法中,使用该构造器创建了线程池
  3. TaskExecutionProperties.class属性类,绑定了所有的属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@AutoConfiguration
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {

// Bean name of the application {@link TaskExecutor}.
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

// 构造器,用于创建线程池
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
builder = builder.taskDecorator(taskDecorator.getIfUnique());
return builder;
}

// 使用构造器,创建线程池Bean,放到容器中
// name = {applicationTaskExecutor, taskExecutor}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}

}

TaskExecutionProperties

该属性配置类绑定到spring.task.execution,我们可以定制SpringBoot线程池中的基本属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 代码有删改
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

private final Pool pool = new Pool();
private final Shutdown shutdown = new Shutdown();
private String threadNamePrefix = "task-";

public static class Pool {
private int queueCapacity = Integer.MAX_VALUE;
private int coreSize = 8;
private int maxSize = Integer.MAX_VALUE;
}

// ...
}
  • 标题: SpringBoot源码系列(10):@Async原理
  • 作者: 布鸽不鸽
  • 创建于 : 2023-06-12 22:49:56
  • 更新于 : 2023-06-25 22:57:34
  • 链接: https://xuedongyun.cn//post/24765/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论