Spring3开始提供了@Async注解。该注解可以标注在方法或者类上,从而可以方便的实现方法的异步调用。调用者在调用异步方法时将立即返回,方法的实际执行将提交给指定的线程池中的线程执行。
一、@Async基础使用
首先在启动类中或配置类中添加 @EnableAsync 注解。
@EnableAsync
public class WriterSpringAsyncConfig {
// ...
}
然后在方法或类上添加@Async注解
public interface WriteAsyncHandleService {
/**
* 开始异步执行
*/
@Async
void startAsyncJob();
}
@Async注意事项:
- @Async标注在类上时,表示该类的所有方法都是异步方法。
- @Async注解的方法一定要通过依赖注入调用(因为要通过代理对象调用),不能直接通过this对象调用,否则不生效。
调用
/**
* 某ServiceImpl类
*/
@Slf4j
@Service
public class ServiceImpl implements Service {
@Resource
private WriteAsyncHandleService service;
@Override
public R doSomething() {
service.startAsyncJob();
}
}
乍一看。使用相当简单,但这是只理论上,在实际生产环境中,数据千变万化,若任由程序创建和执行多线程任务,很容易OOM。so,需要一个线程池来维护这些线程。
二、@Async创建线程池
假设有一个多线程写入文件的需求。 首先创建一个配置类:
**
* SpringAsync配置
* @author lzyz.fun
*/
@Slf4j
@Configuration(proxyBeanMethods = false)
@EnableAsync
public class WriterSpringAsyncConfig {
/**
* 写入线程池1
*/
@Bean
public ThreadPoolTaskExecutor WriterPoolOne() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置 核心线程数
executor.setCorePoolSize(5);
//配置 最大线程数(当前活动线程不能超过这个限制)
executor.setMaxPoolSize(5);
//配置 队列大小
executor.setQueueCapacity(10);
//线程池维护线程所允许的空闲时间 当线程空闲时间达到keepAliveTime时,线程会被销毁,直到线程数量=corePoolSize
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
executor.setKeepAliveSeconds(30);
//设置是否允许核心线程超时。若允许,核心线程超时后,会被销毁。默认为不允许
executor.setAllowCoreThreadTimeOut(true);
// 配置线程池中的线程的名称前缀
// executor.setThreadNamePrefix("Writer-1-");
// 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(2);
// 设置线程工厂(如需)
// executor.setThreadFactory(new ExportSystemThreadFactory());
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
*
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// executor.setRejectedExecutionHandler((runnable, e) -> {
// log.error("队列已满。");
// throw new RejectedExecutionException("队列已满,请稍后再试。");
// });
//执行初始化
executor.initialize();
return executor;
}
/**
* 写入线程池2
*/
@Bean
public ThreadPoolTaskExecutor WriterPoolTwo() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置 核心线程数
executor.setCorePoolSize(5);
//配置 最大线程数(当前活动线程不能超过这个限制)
executor.setMaxPoolSize(5);
//配置 队列大小
executor.setQueueCapacity(10);
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(30);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("Writer-2-");
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(2);
//设置线程工厂
// executor.setThreadFactory(new ExportSystemThreadFactory());
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
*
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// executor.setRejectedExecutionHandler((runnable, e) -> {
// log.error("队列已满。");
// throw new RejectedExecutionException("队列已满,请稍后再试。");
// });
//执行初始化
executor.initialize();
return executor;
}
// 如果资源允许。还可以继续创建其他不同线程池。
然后只需要在 @Async注解 中,加入你创建的线程池名称即可。
/**
* 多线程文件写入接口,具体业务实现此接口即可。
* @author lzyz.fun
*/
public interface WriterAsyncHandle {
/**
* 线程池Bean1 执行
*/
@Async("WriterPoolOne")
void startDefaultWrite();
/**
* 线程池Bean2 执行
*/
@Async("WriterPoolTwo")
void startVipWrite();
}
还可以在执行时,获取线程池内相关状态,如判断是否已满:
/**
* 判断指定的线程池是否已满
* @param poolName 线程池name,如WriterPoolOne
* @param queueMaxSize 线程池队列最大容量
* @param PoolMaxSize 线程池最大线程数
* @return
*/
public static Boolean getTreadPoolQueueIsFull(String poolName, int PoolMaxSize, int queueMaxSize){
ThreadPoolTaskExecutor bean = SpringContextHolder.getBean(poolName);
BlockingQueue<Runnable> queue = bean.getThreadPoolExecutor().getQueue();
// System.out.println(poolName + " - getPoolSize = "+ bean.getThreadPoolExecutor().getPoolSize());
// System.out.println(poolName + " - queuezise = " + queue.size() + ", queueInfo = " + queue.toString() );
if(bean.getThreadPoolExecutor().getPoolSize() >= PoolMaxSize && queue.size() >= queueMaxSize){
return true;
}
return false;
}