Spring中@Async注解创建自定义线程池

Spring中@Async注解创建自定义线程池

Spring3开始提供了@Async注解。该注解可以标注在方法或者类上,从而可以方便的实现方法的异步调用。调用者在调用异步方法时将立即返回,方法的实际执行将提交给指定的线程池中的线程执行。

一、@Async基础使用

首先在启动类中或配置类中添加 @EnableAsync 注解。

@EnableAsync
public class WriterSpringAsyncConfig  {
  // ...
}

然后在方法或类上添加@Async注解

public interface WriteAsyncHandleService {

    /**
     * 开始异步执行
     */
    @Async
    void startAsyncJob();
}

@Async注意事项:

  • @Async标注在类上时,表示该类的所有方法都是异步方法。
  • @Async注解的方法一定要通过依赖注入调用(因为要通过代理对象调用),不能直接通过this对象调用,否则不生效。

调用

/**
 * 某ServiceImpl类
 */
@Slf4j
@Service
public class ServiceImpl implements Service {

    @Resource
    private WriteAsyncHandleService service;


    @Override
    public R doSomething() {
         service.startAsyncJob();
    }
}

乍一看。使用相当简单,但这是只理论上,在实际生产环境中,数据千变万化,若任由程序创建和执行多线程任务,很容易OOM。so,需要一个线程池来维护这些线程。

二、@Async创建线程池

假设有一个多线程写入文件的需求。 首先创建一个配置类:

**
 * SpringAsync配置
 * @author lzyz.fun
 */
@Slf4j
@Configuration(proxyBeanMethods = false)
@EnableAsync
public class WriterSpringAsyncConfig  {

   /**
     * 写入线程池1
     */
    @Bean
    public ThreadPoolTaskExecutor WriterPoolOne() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置 核心线程数
        executor.setCorePoolSize(5);

        //配置 最大线程数(当前活动线程不能超过这个限制)
        executor.setMaxPoolSize(5);

        //配置 队列大小
        executor.setQueueCapacity(10);

        //线程池维护线程所允许的空闲时间 当线程空闲时间达到keepAliveTime时,线程会被销毁,直到线程数量=corePoolSize
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);

        //设置是否允许核心线程超时。若允许,核心线程超时后,会被销毁。默认为不允许
        executor.setAllowCoreThreadTimeOut(true);


        // 配置线程池中的线程的名称前缀
        // executor.setThreadNamePrefix("Writer-1-");

        // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        executor.setWaitForTasksToCompleteOnShutdown(true);

        // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        executor.setAwaitTerminationSeconds(2);

        // 设置线程工厂(如需)
        // executor.setThreadFactory(new ExportSystemThreadFactory());

        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         *
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//        executor.setRejectedExecutionHandler((runnable, e) -> {
//            log.error("队列已满。");
//            throw new RejectedExecutionException("队列已满,请稍后再试。");
//        });

        //执行初始化
        executor.initialize();

        return executor;
    }

    /**
     * 写入线程池2
     */
    @Bean
    public ThreadPoolTaskExecutor WriterPoolTwo() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置 核心线程数
        executor.setCorePoolSize(5);

        //配置 最大线程数(当前活动线程不能超过这个限制)
        executor.setMaxPoolSize(5);

        //配置 队列大小
        executor.setQueueCapacity(10);

        //线程池维护线程所允许的空闲时间
        executor.setKeepAliveSeconds(30);

        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("Writer-2-");

        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        executor.setWaitForTasksToCompleteOnShutdown(true);

        //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        executor.setAwaitTerminationSeconds(2);

        //设置线程工厂
        // executor.setThreadFactory(new ExportSystemThreadFactory());

        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         *
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//        executor.setRejectedExecutionHandler((runnable, e) -> {
//            log.error("队列已满。");
//            throw new RejectedExecutionException("队列已满,请稍后再试。");
//        });

        //执行初始化
        executor.initialize();

        return executor;
    }

        // 如果资源允许。还可以继续创建其他不同线程池。

然后只需要在 @Async注解 中,加入你创建的线程池名称即可。

/**
 * 多线程文件写入接口,具体业务实现此接口即可。
 * @author lzyz.fun
 */
public interface WriterAsyncHandle {

    /**
     * 线程池Bean1 执行
     */
    @Async("WriterPoolOne")
    void startDefaultWrite();

    /**
     * 线程池Bean2 执行
     */
    @Async("WriterPoolTwo")
    void startVipWrite();
}

还可以在执行时,获取线程池内相关状态,如判断是否已满:


    /**
     * 判断指定的线程池是否已满
     * @param poolName     线程池name,如WriterPoolOne
     * @param queueMaxSize 线程池队列最大容量
     * @param PoolMaxSize  线程池最大线程数
     * @return
     */
    public static Boolean getTreadPoolQueueIsFull(String poolName, int PoolMaxSize, int queueMaxSize){
        ThreadPoolTaskExecutor bean = SpringContextHolder.getBean(poolName);
        BlockingQueue<Runnable> queue = bean.getThreadPoolExecutor().getQueue();

//        System.out.println(poolName + " - getPoolSize = "+ bean.getThreadPoolExecutor().getPoolSize());
//        System.out.println(poolName + " - queuezise = " + queue.size() + ", queueInfo = " + queue.toString() );
        if(bean.getThreadPoolExecutor().getPoolSize() >= PoolMaxSize && queue.size() >= queueMaxSize){
            return true;
        }
        return false;
    }

Comments

No comments yet. Why don’t you start the discussion?

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注