跳至主要內容

19、ThreadPoolTaskExecutor 线程池的使用

安图新大约 6 分钟

19、ThreadPoolTaskExecutor 线程池的使用

一、线程池简介

1.1 为什么使用线程池

  • 降低系统资源消耗: 通过重用已存在的线程,降低线程创建和销毁造成的消耗;
  • 提高系统响应速度: 当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
  • 方便线程并发数的管控: 因为线程若是无限制的创建,可能会导致内存占用过多而产生 OOM,并且会造成 CPU 过度切换(CPU 切换线程是有时间成本的,需要保持当前执行线程的现场,并恢复要执行线程的现场);
  • 提供更强大的功能: 延时定时线程池。

1.2 线程池为什么需要使用队列

因为线程若是无限制的创建,可能会导致 内存 占用过多而产生 OOM,并且会造成 CPU 过度切换。

创建线程池的消耗较高,或者线程池创建线程需要获取 mainlock 这个全局锁,影响并发效率,阻塞队列可以很好的缓冲。

1.3 线程池为什么要使用阻塞队列而不是用非阻塞队列

阻塞队列 可以保证任务队列中没有任务是阻塞获取任务的线程,使得线程进入 wait 状态,释放 CPU 资源,当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。

使得线程不至于一直占用 CPU 资源。(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢?

1.4 如何配置线程池

  • CPU 密集型任务:

尽量使用较小的线程池,一般为 CPU 核心数 +1。因为 CPU密集型任务使得 CPU 使用率很高,若开很多的线程数,会造成 CPU 过度切换。

  • IO 密集型任务:

可以使用较大的线程池,一般为 2*CPU 核心数。IO密集型任务 CPU 使用率并不高,因此可以让 CPU 在等待 IO 的时候有其他线程去处理别的任务,充分利用 CPU 时间。

  • 混合型任务:

可以将任务分为 IO密集型和 CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。

因为如果划分之后两个任务执行时间有数据级别的差距,那么拆分没有意义。因为先执行完的任务就要等候执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

1.5 execute() 和 submit() 方法

  • execute():执行一个任务,没有返回值。
  • submit():提交一个任务,有返回值。

submit() 方法的使用如下:

  • submit(Callable<T> task): 能够获取到它的返回值,通过 feature.get() 获取(阻塞直到任务执行完)。一般使用 FutureTask + Callable 配合使用。
  • submit(Runnable task, T result):能通过传入的载体 result 间接获得线程的返回值。
  • submit(Runnable task):则是没有返回值的,就算获取它的返回值也是 null。

future.get() 方法会使获取结果的线程进入 阻塞状态,直到线程执行完成之后,唤醒获取结果的线程,然后返回结果。


二、ThreadPoolTaskExecutor 线程池简介

2.1 简介

  • ThreadPoolTaskExecutor 是 Spring Framework 提供的一个线程池执行器,它基于 java.util.concurrent 包中的 ThreadPoolExecutor 实现,并提供了更方便的 Spring 配置和生命周期管理。

在 Spring 应用程序中,你可以通过配置一个 ThreadPoolTaskExecutor 来创建并管理一个自定义的线程池。这个线程池可以根据你的需求设置 核心线程数最大线程数队列容量线程存活时间 以及 拒绝策略 等属性。

2.2 核心参数配置

  • corePoolSize:最小线程数,默认为 1
  • maxPoolSize:最大线程数,默认为 Integer.MAX_VALUE
  • keepAliveSeconds:(maxPoolSize-corePoolSzie)部分线程空闲最大存活时间,默认存活时间是 60s
  • queueCapacity:阻塞队列的大小,默认为 Integer.MAX_VALUE,默认使用 LinkedBlockingQueue
  • allowCoreThreadTimeOut:是否允许核心线程过期,设置为 true 的话,keepAliveSeconds 参数设置的有效时间对 corePoolSize 线程也有效,默认是 false
  • threadNamePrefix:线程名称前缀,为 ThreadPoolTaskExecutor 的增强功能,默认为“类名-”。
  • threadFactory:设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字。
  • rejectedExecutionHandler:拒绝策略,当队列 workQueue 和线程池 maxPoolSize 都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。

2.3 ThreadPoolTaskExecutor 内部执行流程

 
 

我们通过 execute(Runnable) 方法或者 submit(Runnable) 方法将 Runnable 任务添加到线程池时:

  • 如果线程池中的数量小于 corePoolSize,即使线程池中的线程都处于空闲状态,也要 创建新的线程来处理被添加的任务

  • 如果线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue 未满,那么 任务被放入缓冲队列

  • 如果线程池中的数量大于 corePoolSize,缓冲队列 workQueue 满了,并且:

  • 线程池中的数量小于 maxPoolSize,则 创建新的线程来处理被添加的任务

  • 线程池中的数量等于 maxPoolSize,则 通过 handler 所指定的拒绝策略来处理被添加的任务

也就是说,处理任务的优先级为:

核心线程corePoolSize > 任务队列workQueue > 最大线程maxPoolSize > 拒绝策略handler


三、ThreadPoolTaskExecutor 的代码示例

3.1 配置线程池示例

ThreadPoolConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * <p> @Title ThreadPoolConfig
 * <p> @Description 线程池配置
 *
 * @author ACGkaka
 * @date 2024/01/18 21:08
 */
@Configuration
public class ThreadPoolConfig {



    /** 最佳线程数:操作系统线程数+2 */
    private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 2;
    /** 最大线程数 */
    private static final int MAX_POOL_SIZE = CORE_POOL_SIZE * 2;
    /** 队列长度 */
    private static final int QUEUE_CAPACITY = 10000;

    /**
     * 发送短信线程池
     */
    @Bean
    public ThreadPoolTaskExecutor sendSMSThreadPool() {


        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(QUEUE_CAPACITY);
        executor.setThreadNamePrefix("SendSMS-");
        // 等待任务执行完毕后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

3.2 使用线程池示例

import com.demo.service.DemoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * <p> @Title DemoServiceImpl
 * <p> @Description 测试ServiceImpl
 *
 * @author ACGkaka
 * @date 2023/4/24 18:14
 */
@Slf4j
@Service
public class DemoServiceImpl implements DemoService {



    @Resource
    private ThreadPoolTaskExecutor sendSMSThreadPool;

    @Override
    public void sendSMS() {


        // 线程池-发送短信
        sendSMSThreadPool.execute(() -> {


            log.info("Thread: {}, SMS sending...", Thread.currentThread().getName());
        });
    }
}

3.3 执行结果

请求后,日志打印结果如下:

(可以看到线程名称前缀已成功生效。)

 
 

整理完毕,完结撒花~ 🌻

参考地址:

1、 Spring 线程池 ThreadPoolTaskExecutor 的使用,https://blog.csdn.net/u012060033/article/details/111934507;

2、 线程池 ThreadPoolTaskExcutor 详解,https://juejin.cn/post/7073459521691222024;

3、 Java 线程池详解,https://blog.csdn.net/weixin_40096160/article/details/130542750;