SpringBoot源码系列(10):@Async原理
前言
在SpringBoot中,我们可以使用异步操作来处理耗时的任务。通常我们需要在想异步执行的方法上标注@Async
,然后在主启动类上标注@EnableAsync
开启异步功能。关于其应用,可以查看我之前的文章:SpringBoot中的@Asnyc注解。本文将讨论以下问题:
@EnableAsync
和@Async
的原理是什么?- SpringBoot中默认的线程池/执行器是如何被指定的?
@Async
为什么能通过value属性指定使用的执行器?
原文地址:https://xuedongyun.cn/post/24765/
异步处理的原理(@EnableAsync)
EnableAsync
我们在主启动类上标注了@EnableAsync
注解。该注解通过@Import
注解,导入了AsyncConfigurationSelector
1 2 3 4 5
| @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { }
|
AsyncConfigurationSelector
继承关系:ImportSelector
<-AdviceModeImportSelector<EnableAsync>
<-AsyncConfigurationSelector
我们先看一看父类中的方法:
在@Configuration
配置类上,使用@Import
注解导入一个实现了ImportSelector
接口的类时:会根据@Configuration
配置类的AnnotationMetadata
,返回需要导入到容器中的类的名称
1 2 3 4 5 6 7 8 9 10 11
| public interface ImportSelector { String[] selectImports(AnnotationMetadata importingClassMetadata);
@Nullable default Predicate<String> getExclusionFilter() { return null; }
}
|
而AdviceModeImportSelector<EnableAsync>
类,实现了selectImports
方法。该方法用于获取@EnableAsync
注解中的mode
对应的值,并调用selectImports(adviceMode)
方法。最终返回需要导入的类。这正是我们AsyncConfigurationSelector
实现的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";
protected String getAdviceModeAttributeName() { return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME; }
@Override public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null, "..."); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null) { throw new IllegalArgumentException("..."); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); String[] imports = selectImports(adviceMode); return imports; }
@Nullable protected abstract String[] selectImports(AdviceMode adviceMode);
}
|
此时,我们将向容器中导入ProxyAsyncConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
@Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
}
|
ProxyAsyncConfiguration
继承关系:AbstractAsyncConfiguration
<-ProxyAsyncConfiguration
导入的ProxyAsyncConfiguration
是一个@Configuration
配置类,它通过@Bean
注入了AsyncAnnotationBeanPostProcessor
类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); return bpp; } }
|
在其父类AbstractAsyncConfiguration
中,通过@Autowired
拿到了容器中实现了AsyncConfigurer
的组件(获取了线程池)。this.executor
就是在此获取值的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration(proxyBeanMethods = false) public abstract class AbstractAsyncConfiguration implements ImportAware { @Autowired void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) { Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> { List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList()); if (CollectionUtils.isEmpty(candidates)) { return null; } if (candidates.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } return candidates.get(0); }); this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor); this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler); } }
|
小结1:我们可以向容器中放入实现了AsyncConfigurer
接口的组件,来向容器中放入线程池
AsyncAnnotationBeanPostProcessor
继承关系:BeanFactoryAware
<-AbstractBeanFactoryAwareAdvisingPostProcessor
<-AsyncAnnotationBeanPostProcessor
此时AsyncAnnotationBeanPostProcessor
往容器中创建了一个增强器AsyncAnnotationAdvisor
在Spring
中,如果Bean
实现了BeanNameAware
接口,则会默认调用setBeanFactory
方法,获取BeanFactory
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; } public void configure(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { this.executor = executor; this.exceptionHandler = exceptionHandler; }
}
|
AsyncAnnotationAdvisor
在AsyncAnnotationAdvisor
中,创建了增强方法this.advice
,它其实也实现了BeanNameAware
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); } protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; } public void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); } @Override public void setBeanFactory(BeanFactory beanFactory) { if (this.advice instanceof BeanFactoryAware) { ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory); } } }
|
AnnotationAsyncExecutionInterceptor
继承关系:MethodInterceptor
&AsyncExecutionAspectSupport
<-AsyncExecutionInterceptor
<-AnnotationAsyncExecutionInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
public void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); } @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } } catch (NoSuchBeanDefinitionException ex) { try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } } } return null; } }
|
更进一步的,AsyncExecutionInterceptor
继续重写了getDefaultExecutor
,加入一个新的逻辑:如果没有默认执行器,就用SimpleAsyncTaskExecutor
1 2 3 4 5 6 7 8 9
| public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered { @Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); } }
|
小结2:若容器中只有一个TaskExecutor
组件,其为默认执行器;若不唯一,拿名字叫”taskExecutor”的,类型为Executor
的组件。若都不满足,使用SimpleAsyncTaskExecutor
作为默认执行器(每次执行被注解方法时,单独创建一个Thread来执行)
在AsyncExecutionInterceptor
中,实现了invoke
方法,把原来方法的调用提交到新的线程池执行,从而实现了方法的异步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered { public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) { super(defaultExecutor); }
@Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } return null; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
}
|
异步处理的原理(@Async)
AnnotationAsyncExecutionInterceptor
继承关系:MethodInterceptor
&AsyncExecutionAspectSupport
<-AsyncExecutionInterceptor
<-AnnotationAsyncExecutionInterceptor
我们继续上面一节的内容,AsyncExecutionAspectSupport
中实现了determineAsyncExecutor
方法,用于确定使用哪个executor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
|
AnnotationAsyncExecutionInterceptor
中实现了getExecutorQualifier
方法
1 2 3 4 5 6 7 8 9 10
| @Override @Nullable protected String getExecutorQualifier(Method method) { Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); }
|
小结3:我们可以通过@Async(value="xxx")
来手动指定想要使用的执行器
SpringBoot中的默认线程池
TaskExecutionAutoConfiguration
SpringBoot中的线程池,是通过TaskExecutionAutoConfiguration
这个自动配置类加载的。我们来看一下其中的源码。
taskExecutorBuilder
方法用于创建一个构造器- 在
applicationTaskExecutor
方法中,使用该构造器创建了线程池 TaskExecutionProperties.class
属性类,绑定了所有的属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @ConditionalOnClass(ThreadPoolTaskExecutor.class) @AutoConfiguration @EnableConfigurationProperties(TaskExecutionProperties.class) public class TaskExecutionAutoConfiguration {
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { TaskExecutionProperties.Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator); builder = builder.taskDecorator(taskDecorator.getIfUnique()); return builder; } @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
}
|
TaskExecutionProperties
该属性配置类绑定到spring.task.execution
,我们可以定制SpringBoot线程池中的基本属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties {
private final Pool pool = new Pool(); private final Shutdown shutdown = new Shutdown(); private String threadNamePrefix = "task-";
public static class Pool { private int queueCapacity = Integer.MAX_VALUE; private int coreSize = 8; private int maxSize = Integer.MAX_VALUE; } }
|