Spring3开始提供了@Async注解。该注解可以标注在方法或者类上,从而可以方便的实现方法的异步调用。调用者在调用异步方法时将立即返回,方法的实际执行将提交给指定的线程池中的线程执行。
一、@Async基础使用
首先在启动类中或配置类中添加 @EnableAsync 注解。
1 2 3 4 | @EnableAsync public class WriterSpringAsyncConfig { // ... } |
然后在方法或类上添加@Async注解
1 2 3 4 5 6 7 8 | public interface WriteAsyncHandleService { /** * 开始异步执行 */ @Async void startAsyncJob(); } |
@Async注意事项:
- @Async标注在类上时,表示该类的所有方法都是异步方法。
- @Async注解的方法一定要通过依赖注入调用(因为要通过代理对象调用),不能直接通过this对象调用,否则不生效。
调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | /** * 某ServiceImpl类 */ @Slf4j @Service public class ServiceImpl implements Service { @Resource private WriteAsyncHandleService service; @Override public R doSomething() { service.startAsyncJob(); } } |
乍一看。使用相当简单,但这是只理论上,在实际生产环境中,数据千变万化,若任由程序创建和执行多线程任务,很容易OOM。so,需要一个线程池来维护这些线程。
二、@Async创建线程池
假设有一个多线程写入文件的需求。 首先创建一个配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | ** * SpringAsync配置 * @author lzyz.fun */ @Slf4j @Configuration(proxyBeanMethods = false) @EnableAsync public class WriterSpringAsyncConfig { /** * 写入线程池1 */ @Bean public ThreadPoolTaskExecutor WriterPoolOne() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置 核心线程数 executor.setCorePoolSize(5); //配置 最大线程数(当前活动线程不能超过这个限制) executor.setMaxPoolSize(5); //配置 队列大小 executor.setQueueCapacity(10); //线程池维护线程所允许的空闲时间 当线程空闲时间达到keepAliveTime时,线程会被销毁,直到线程数量=corePoolSize //如果allowCoreThreadTimeout=true,则会直到线程数量=0 executor.setKeepAliveSeconds(30); //设置是否允许核心线程超时。若允许,核心线程超时后,会被销毁。默认为不允许 executor.setAllowCoreThreadTimeOut(true); // 配置线程池中的线程的名称前缀 // executor.setThreadNamePrefix("Writer-1-"); // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean executor.setWaitForTasksToCompleteOnShutdown(true); // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住 executor.setAwaitTerminationSeconds(2); // 设置线程工厂(如需) // executor.setThreadFactory(new ExportSystemThreadFactory()); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // executor.setRejectedExecutionHandler((runnable, e) -> { // log.error("队列已满。"); // throw new RejectedExecutionException("队列已满,请稍后再试。"); // }); //执行初始化 executor.initialize(); return executor; } /** * 写入线程池2 */ @Bean public ThreadPoolTaskExecutor WriterPoolTwo() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置 核心线程数 executor.setCorePoolSize(5); //配置 最大线程数(当前活动线程不能超过这个限制) executor.setMaxPoolSize(5); //配置 队列大小 executor.setQueueCapacity(10); //线程池维护线程所允许的空闲时间 executor.setKeepAliveSeconds(30); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("Writer-2-"); //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean executor.setWaitForTasksToCompleteOnShutdown(true); //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住 executor.setAwaitTerminationSeconds(2); //设置线程工厂 // executor.setThreadFactory(new ExportSystemThreadFactory()); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // executor.setRejectedExecutionHandler((runnable, e) -> { // log.error("队列已满。"); // throw new RejectedExecutionException("队列已满,请稍后再试。"); // }); //执行初始化 executor.initialize(); return executor; } // 如果资源允许。还可以继续创建其他不同线程池。 |
然后只需要在 @Async注解 中,加入你创建的线程池名称即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | /** * 多线程文件写入接口,具体业务实现此接口即可。 * @author lzyz.fun */ public interface WriterAsyncHandle { /** * 线程池Bean1 执行 */ @Async("WriterPoolOne") void startDefaultWrite(); /** * 线程池Bean2 执行 */ @Async("WriterPoolTwo") void startVipWrite(); } |
还可以在执行时,获取线程池内相关状态,如判断是否已满:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | /** * 判断指定的线程池是否已满 * @param poolName 线程池name,如WriterPoolOne * @param queueMaxSize 线程池队列最大容量 * @param PoolMaxSize 线程池最大线程数 * @return */ public static Boolean getTreadPoolQueueIsFull(String poolName, int PoolMaxSize, int queueMaxSize){ ThreadPoolTaskExecutor bean = SpringContextHolder.getBean(poolName); BlockingQueue<Runnable> queue = bean.getThreadPoolExecutor().getQueue(); // System.out.println(poolName + " - getPoolSize = "+ bean.getThreadPoolExecutor().getPoolSize()); // System.out.println(poolName + " - queuezise = " + queue.size() + ", queueInfo = " + queue.toString() ); if(bean.getThreadPoolExecutor().getPoolSize() >= PoolMaxSize && queue.size() >= queueMaxSize){ return true; } return false; } |