百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

技术专家带你彻底掌握线程池(技术专家带你彻底掌握线程池工程师)

wxin55 2024-11-07 13:13 12 浏览 0 评论

1. 导读

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。说到线程池,几乎是项目中必备的、面试中必问的,但是很多人实际并没有彻底掌握这项技能。如生产如何设置核心线程与最大线程配比、线程池的拒绝策略取舍等等。

本文包含以下内容:

  • 什么是线程池?
  • 线程池相关类讲解
  • JDK 定义的四类线程池
  • 线程池的 7 大参数详解
  • Spring/Spring Boot 使用线程池
  • 根据设备 CPU 动态配置线程池
  • 常见面试题精讲

2. 什么是线程池?

2.1 基本概念

线程池,顾名思义,就是存放预先创建好的线程的池子,需要使用的时候直接从池子里拿即可。池化技术,可以类比数据库连接池,存放预先创建好的数据库连接的池子。

2.2 线程池优点

我们主张项目中,用线程池代替自己创建的线程,那么为什么这样建议呢?下面就来说一说,线程池的优点,为什么选择使用线程池。

合理分配

设想一下这样的情景,项目中使用到线程的地方,都是 new Thread 的方式,也就是说每次执行方法时都创建线程。那么当大量请求涌入,方法被疯狂调用,那么线程是不是也在疯狂地递增,这样,用不了多久服务器 CPU 就会被挤爆,而从导致宕机、瘫痪等问题。而这,显然不是我们愿意看到的,线程池的出现,很好的解决了这个问题。

线程池可以指定核心线程数和最大线程数,以及任务队列,限制了线程不能被无限创建,集中由线程池进行分配,避免了可能由线程引发的资源耗尽问题。

线程预热

项目启动,线程池就会预先创建一部分线程以供使用。需要使用时,直接使用即可,减少了创建线程所需要的时间。

资源复用

线程的创建到销毁是比较消耗 CPU 资源的,使用线程池,线程可以重复使用,提高了资源利用率。

2.3 进程和线程

本来想省略此节,但是由于面试中经常会提问,我们还是拿出来说一说。

  • 进程:一个正在执行的计算机程序就是一个进程
  • 线程:CPU 调度的最小单位,一个进程由一个或多个线程组成

2.4 线程的状态

此小节为高频面试点,最好做到倒背如流。线程拥有生命周期,生命周期的各个阶段就是线程的状态。

线程有以下状态:

  • 新建
  • 就绪
  • 阻塞
  • 等待(等待/等待超时)
  • 终止

线程状态源码

源码位置:java.lang.Thread

public enum State {
        /**
         * 线程被创建但还未启动
         */
        NEW,

        /**
         * 线程为就绪(可运行)状态,在 jvm 中执行,但是可能需要等待其他操作系统资源执行
         */
        RUNNABLE,

        /**
         * 线程被监控器锁阻塞
         */
        BLOCKED,

        /**
         * 线程处于等待状态,需要被唤醒才能继续执行
         */
        WAITING,

        /**
         * 等待超时,正在等待的线程超过了指定的等待时间。
         */
        TIMED_WAITING,

        /**
         * 线程终止,线程执行完成
         */
        TERMINATED;
    }

2.5 并发和并行

记得有一次面试问到过这个问题,在这里也给大家分享一下,并发和并行。

  • 并发:多个线程访问同一个资源
  • 并行:同一时间执行多个任务

2.6 创建线程的几种方式

  • new Thread 类
  • 实现 Runnable 接口(无返回值)
  • 实现 Callable 接口(有返回值)
  • 使用线程池

3. 线程池相关类讲解

3.1 简单但有设计的 Executor 接口

线程池顶层接口是 Executor,它提供了一个 execute 执行方法。Executor 顶层接口的设计,用户只需要提供实现 Runnable 接口的实现类即可,不需要关心线程的创建和具体的执行。任务提交与创建和执行进行了解耦。

public interface Executor {

    void execute(Runnable command);
}

3.2 进一步增强的 ExecutorService 接口

ExecutorService 在 Executor 的基础上,增加了一些能力:

  • 停止和关闭任务线程
  • 批量执行或指定执行用户提交的任务
  • 提交一个用户执行的 Runnable 的任务

常用方法

// 提交 Runnable 任务
submit(Callable<T> task);
submit(Runnable task);
submit(Runnable task, T result);
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行
awaitTermination(long timeout, TimeUnit unit);
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
shutdown();
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
shutdownNow();
// 批量执行给定的任务
invokeAll(Collection<? extends Callable<T>> tasks);
// 执行单个指定的任务
invokeAny(Collection<? extends Callable<T>> tasks);

3.3 AbstractExecutorService 抽象类

AbstractExecutorService 抽象类比较简单,其大部分方法都继承于 ExecutorService,在此基础上增加了两个 protected 方法,供子类重写。

// 为给定可运行任务和默认值返回一个 RunnableFuture。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value);
// 为给定可运行任务返回一个 RunnableFuture。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)

从 3.1~3.3 都是一些接口和抽象类的设计,并没有具体实现,可见设计在前、实现在后的重要性。

3.4 主角 ThreadPoolExecutor 类

从图 1 中,我们看到了 ThreadPoolExecutor 的继承关系图,即 ThreadPoolExecutor 实现了以上所有的接口和抽象类所具备的能力。我们平时说的 Java 线程池的真身,其实就是 ThreadPoolExecutor。

3.4.1 ThreadPoolExecutor 的运行原理图

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。

3.4.2 任务提交执行流程

用户提交任务,ThreadPoolExecutor 进行任务分配,分以下四种种情况:

  • 存在空闲核心线程,线程分配核心线程直接执行
  • 无空闲核心线程,阻塞队列未满,则缓冲执行。此时如果核心线程有空闲了,线程分配核心线程从阻塞队列中获取任务执行
  • 无空闲核心线程,阻塞队列队满,则线程分配新的线程执行任务。新线程的数量上限即最大线程数
  • 无空闲核心线程,阻塞队列队满,已到达最大线程数,则会执行饱和策略(任务拒绝)

看图更好理解:

3.4.3 线程池生命周期

线程的生命周期和线程池的生命周期是有区别的,ThreadPoolExecutor 的运行状态有 5 种,分别为:

状态的转换流程图如下:

线程池的状态,不是用户显示设置的,而是由线程池内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量(workerCount)两个关键参数的维护放在了一起。

代码如下:

// 原子整形, 底层采用 CAS 原理控制并发
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 变量是用于控制线程池状态和有效线程数量的一个字段,它包含两部分信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。用一个变量存储两个值的设计,可以避免在做出相关决策时出现不一致的情况,不必为了维护两者的一致,而占用锁资源。

ctl 变量的相关计算是使用位运算来完成的,相比于基础运算,位运算速度较快。

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 计算当前运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 计算当前线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过状态和线程数生成 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

3.4.4 添加线程源码讲解

private boolean addWorker(Runnable firstTask, boolean core) {
       //相当于 goto
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 如果线程池的状态到了 SHUTDOWN 或者之上的状态时候,只有一种情况还需要继续添加线程,
            // 那就是线程池已经 SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务(firstTask 为 null)
            // 这里还继续添加线程的原因是加快执行等待队列中的任务,尽快让线程池关闭
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
               // 传入的 core 的参数,唯一用到的地方,如果线程数超过理论最大容量,如果 core 是 true 跟最大核心线程数比较,否则跟最大线程数比较
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过 CAS 自旋,增加线程数+1,增加成功跳出双层循环,继续往下执行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               // 检测当前线程状态如果发生了变化,则继续回到 retry,重新开始循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 走到这里,说明我们已经成功的将线程数+1 了,但是真正的线程还没有被添加
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 添加线程,Worker 是继承了 AQS,实现了 Runnable 接口的包装类
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
               // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 检查线程状态, 逻辑和之前一样
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                       // 线程只能被 start 一次
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      // workers 是一个 HashSet,添加我们新增的 Worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                  // 启动 Worker
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3.4.5 Worker 的工作流程

3.5 线程工具类 Executors

Executors 是线程的工具类,用于帮助用户快速创建线程池。 此类包含所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。此类支持以下各种方法:

  • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
  • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
  • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
  • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
  • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。

具体的应用我们将在下一节详细讲解,剩下相关的线程池相关类,我们将在后续逐步讲解。

4. JDK 定义的四类线程池

小建议:建议先阅读第五节——线程池 7 大参数详解,这样有助于大家阅读理解。

JDK 中定义了四类线程池:

  • 固定数量线程池
  • 单线程线程池
  • 带缓存的线程池
  • 定时任务线程池

下面我们将来一步步解析这四类线程池,这四类线程池可直接使用 3.5 节中的 Executors 创建。

4.1 固定数量线程池

4.1.1 创建固定数量线程池

    /** 使用 Executors 工具类创建固定数量线程池 */
    private ExecutorService executorService = Executors.newFixedThreadPool(3);

    public void start() {
        // 提交一个 Runnable 任务
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

4.1.2 固定数量线程池源码解读

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定数量线程池的底层就是我们第 3 节讲解的 ThreadPoolExecutor 类,通过构造方法设置参数。特点:核心线程数和最大线程数相等,存活时间为 0,即始终活跃,阻塞队列使用的是 LinkedBlockingQueue。

4.1.3 阻塞队列 LinkedBlockingQueue

上一小节提到,固定数量线程池使用的是 LinkedBlockingQueue 作为阻塞队列,那么 LinkedBlockingQueue 队列有什么特点呢?为什么选择它作为阻塞队列呢?

  • 由链表结构组成的队列,队列中的元素按 FIFO(先进先出的原则对元素进行排序)
  • 排在队列头部的元素是时间最长的元素,排在队尾的元素是时间最短的元素
  • 链接队列的吞吐量通常要高于基于数组的队列

缺点:

  • 如果指定容量,则可以在一定程度上防止队列过度拓展,队满时无法插入。如果不指定容量,则使用 Integer.MAX_VALUE 作为默认容量。

由于设定了固定数量的线程,那么用户提交的任务很可能就超出了核心线程数,此时任务队列对插入和取出的要求就比较高,链表结构在插入和删除的效率较高,故选择此队列。

4.2 单线程线程池

4.2.1 创建单线程线程池

    /** 使用 Executors 工具类创建 */
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void start() {
        // 提交一个 Runnable 任务
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

4.2.2 单线程线程池源码

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

单线程线程池指定核心线程和最大线程均为一,即从始至终线程池中只会存在一个线程,线程始终活跃,阻塞队列为 LinkedBlockingQueue

4.3 带缓存的线程池

4.3.1 创建带缓存的线程池

    private ExecutorService executorService = Executors.newCachedThreadPool();

    public void start() {
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

4.3.2 带缓存线程池源码

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

带缓存的线程池,这里的“缓存”不是指的数据缓存,而是指按需创建线程,并设置了存活时间,在存活时间内线程还可以处理其他任务。我们可以看到,它设置的核心线程数是 0,最大线程数是 Integer.MAX_VALUE。也就是说,线程池创建时,不初始化存放线程,当用户提交任务时,只要任务数小于 Integer.MAX_VALUE,则直接创建线程执行。线程执行完成后并不会立即销毁,而会缓存存活 60 秒,在 60 秒内,如果还有用户任务提交,且任务数小于等于存活的线程数,则由存活的线程执行。如果大于存活线程数,且小于 Integer.MAX_VALUE,则创建 任务数 - 存活线程数 的差值个线程,进行处理。

4.3.3 阻塞队列 SynchronousQueue

我们发现,带缓存的线程池没有使用 LinkedBlockingQueue 阻塞队列,而是使用的 SynchronousQueue 队列。

特点:队列中的元素插入和移出必须是同时操作的,也就是说一个任务被取出的同时,也要有一个任务被插入。二者同时进行,是一个同步队列。

同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

支持公平和非公平,看源码。

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

当指定为公平队列时,会创建一个 FIFO 的有序队列,否则顺序是未指定的。

4.4 定时任务线程池

4.4.1 创建一个定时任务线程池

    // 创建一个定时任务线程池, 并指定核心线程数
    private ExecutorService executorService = Executors.newScheduledThreadPool(10);

    public void start() {
        // 提交一个 Runnable 任务
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

4.4.2 定时任务线程池源码

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

底层使用的时 ScheduledThreadPoolExecutor, 我们追踪进去看一下,发现它是 super 调用父类的构造方法。

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

其父类就是 ThreadPoolExecutor。创建的是一个指定核心线程数,最大线程数为 Integer.MAX_VALUE,阻塞队列为 DelayedWorkQueue 的线程池。

DelayedWorkQueue 基于堆的数据结构 类似于 DelayQueue 和 PriorityQueue,每个 ScheduledFutureTask 将其索引记录到 堆数组。这弥补了查找任务的损失的效率 ,大大加快删除速度(从 O(n) 到 O(log n))。

5. 线程池的 7 大参数详解

从上一节我们知道,JDK 自带的四类线程池都是根据配置 ThreadPoolExecutor 而得到的。不同的参数组合诞生不同线程池,这 7 大参数几乎是面试中的必考题,也是实际生产中必须要使用到的。掌握它,让你的线程池使用游刃有余。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

七大参数分别是:

  • 核心线程数
  • 最大线程数
  • 线程存活时间
  • 存活时间单位
  • 阻塞队列
  • 线程工厂
  • 拒绝策略

5.1 核心线程数

核心线程数指的是初始化时就需要创建的线程,核心线程始终活跃,不管有没有需要执行的任务,核心线程都不会销毁。可以理解为,随时待命!

5.2 最大线程数

顾名思义,线程池中最多允许存在多少个线程。当核心线程繁忙,队列队满的情况下,如果“最大线程数 - 核心线程数 > 0”,线程池则会新建线程执行任务。

5.3 线程存活时间

当线程数大于核心数时,这是多余的空闲线程(即存活于蓝色区域的线程)在终止前等待新任务的最长时间。和时间单位参数连用。

5.4 存活时间单位

和线程存活时间一起使用,指定的是一段时间。常用单位有:

  • TimeUnit.NANOSECONDS 纳秒
  • TimeUnit.MILLISECONDS 毫秒,1 秒 = 1000 毫秒
  • TimeUnit.SECONDS 秒
  • TimeUnit.MINUTES 分

举例:30,TimeUnit.SECONDS ==> 存活时间:30 秒

5.5 阻塞队列

线程池中的阻塞队列类型也挺多的,特性也不尽相同,这也提升了线程池的灵活及多样性。参数类型是 BlockingQueue,BlockingQueue 是一个接口,它的实现类都可以使用。

实现类有:ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue。

它们之间的特性:

BlockingQueue 的四类方法:

这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。

5.6 线程工厂

线程工厂可以用户自定义,也可以使用默认的线程工程。线程工厂就是用来创建线程的。

使用默认的线程工厂:Executors.defaultThreadFactory()

源码如下:

   static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            // 从安全管理器中拿到线程组
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // 指定线程的名字
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 设置用户进程
            if (t.isDaemon())
                t.setDaemon(false);
            // 设置优先级
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

默认的线程工厂主要是设置了一些线程名称规则,用户线程,以及线程默认的优先级。

当然,你也可以自定义线程工厂,参照默认的线程工厂的实现就可以,这样你自己创建的线程的名称,优先级等等都是可以按照你自己的规范来。

5.7 拒绝策略(饱和策略)

任务的拒绝策略,也可以叫饱和策略,就是当阻塞队列队满时,剩下提交的任务的处理策略。

JDK 中提供了四种拒绝策略,默认使用的是饱和丢弃策略。

    /**
     * 源码中默认使用的是 AbortPolicy 策略 
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

饱和策略详解:

具体选择哪种策略,需要根据实际的业务场景来考量

6. Spring/Spring Boot 使用线程池

如果大家已经很熟悉能够使用线程池,则可以直接跳过本节。

6.1 Spring 使用线程池

6.1.1 创建 maven 工程,导入相关依赖

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.12.RELEASE</version>
        </dependency>

6.1.2 创建并配置线程池

Spring 有两种方式,一种是使用配置类的形式,一种是在 bean.xml 中配置。我们演示使用配置类的。

创建一个线程池配置类,并配置好 7 大参数。

/**
 * @author 九月长安
 * @version $Id: MyThreadPoolConfig.java, v 0.1 2021-08-03 18:41 九月长安 Exp $
 */
@Configuration
public class MyThreadPoolConfig {

    // 指定注入的 bean 名称
    @Bean(name = "executorService")
    public ExecutorService getThreadPool() {
        return new ThreadPoolExecutor(2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue(),
            Executors.defaultThreadFactory(),
            new AbortPolicy());
    }
}

6.1.3 使用线程池

/**
 * @author 九月长安
 * @version $Id: UserService.java, v 0.1 2021-08-03 11:16 九月长安 Exp $
 */
@Service
public class UserService {

    @Autowired
    private UserDao userDao;

    // 注入线程池
    @Autowired
    private ExecutorService executorService;

    public void getUserInfo() throws ExecutionException, InterruptedException {
        // 提交有返回值的任务
        Future<Person> future = executorService.submit(new Callable<Person>() {
            @Override
            public Person call() throws Exception {
                return userDao.getPerson();
            }
        });
        // 获取返回结果
        Person p = future.get();
        // 打印
        System.out.println(p);
    }

}

打印结果:

至此,您已掌握 Spring 线程池的基本使用。实际开发中,很多任务是可以异步执行的,这些任务使用线程池能够大大地提升速度。例如向用户推送消息,我们没必要去等待全部推送完再返回,我们只需要将执行结果记录一下,过段时间去查询一下执行情况即可。

6.2 Spring Boot 使用线程池

6.2.1 创建一个 Spring Boot 项目

大家可以使用 IDE 创建,也可以使用 Spring 官网提供的初始化向导 地址:https://start.spring.io/。

6.2.2 创建并配置线程池

Spring Boot 线程池创建与配置和 Spring 几乎一样。

@Configuration
public class MyThreadPoolConfig {

    // 指定注入的 bean 名称
    @Bean(name = "executorService")
    public ExecutorService getThreadPool() {
        return new ThreadPoolExecutor(2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue(),
            Executors.defaultThreadFactory(),
            new AbortPolicy());
    }
}

6.2.3 线程池的使用

Spring Boot 线程池使用和 Spring 没有太大的区别,一样是注入然后使用。可参考 6.1 节。详细操作方法可查阅 Java API,地址:

https://tool.oschina.net/apidocs/apidoc?api=jdk-zh

7. 根据设备 CPU 动态配置线程池

追求线程池配置的最佳合理参数,是大家共同的夙愿,我们先来看一看由于配置不合理导致出现问题的实际案例。

7.1 实际案例

案例 1:页面大量产生接口服务降级

原因:没有预估好调用的流量,导致最大核心数设置偏小,大量抛出 RejectedExecutionException,导致队满而抛出异常,从而产生降级。

案例 2:自身作为上游服务,执行时间过长,导致整体服务超时,影响下游服务大量调用失败

原因:阻塞队列设置过长,最大线程数设置太小,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致调用超时失败。

7.2 追求最佳参数配置

那么有没有一个参数是最佳参数配置呢?这个还在不断地讨论和实践中,因为实际的服务器环境和业务要求复杂且多样,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,但是追求完美依然是我们要做的。

美团技术团队针对以上方案,也没有得出一个最佳通用的配置,没有一个通用的公式可以解决这一问题。

7.3 较为常用的配比

其实我们最难确定的就是核心线程和最大线程的配比,那么有没有一些配比是较为常用的呢?其实是有的。

  • CPU 密集型:核心线程数 = CPU 核数 + 1
  • IO 密集型:核心线程数 = CPU 核数 * 2,最大线程数 = CPU 核数/(1- 阻塞系数),阻塞系数:0.8~0.9

例如:8 核,则 8/(1-0.9) = 80,及最大线程数为 80。

7.4 动态化线程池

线程池既然那么重要,而且参数不能最佳适配业务场景,那么能不能设计一个动态化的线程池?例如现在业务负载过大,动态的调整核心线程数,那么是不是就能完美的解决这一问题呢?我们来看一下美团技术团队的实践架构:

个人觉得已经是相当的不错,包含申请,动态调参,监控告警,让线程池始终处于最佳状态。想要设计自己的线程池架构的小伙伴,可以参考此架构设计。

8. 常见面试题精讲

此节希望大家学习完成后时常来温习,做到胸有成竹最好了。

创建线程有哪几种方式?

答:new Thread 类,实现 Runnable 接口,实现 Callable 接口,使用线程池。

使用线程池有什么好处?

答:资源合理分配,提高资源复用,提升执行效率,线程创建执行与任务提交解耦。

线程池 7 大参数有哪些?

答:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列、线程工厂、拒绝策略。

如果核心线程数满了,那么此时提交的任务怎么处理?

如果核心线程数满了,则将任务提交至阻塞队列等待执行,如果阻塞队列也满了,且最大线程数 - 核心线程数 > 0 则创建新的线程执行提交的任务。

线程池的拒绝策略有哪些?

  • AbortPolicy 丢弃任务并抛出异常。
  • DiscardPolicy 丢弃任务,不抛异常。
  • DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
  • CallerRunsPolicy 由调用线程执行该任务。例如:如果是主线程调用线程池,提交任务,则拒绝的任务由主线程执行。

如果让你来设计线程池你会怎样设计?

首先是根据业务场景,判断是 CPU 密集型还是 IO 密集型,不同的类型方案不一样,通常 IO 密集型设置的 CPU 核数较多。其次根据实际访问量,以及部署环境来设定参数。拒绝策略的话,需要看具体业务对任务不能执行的容忍程度。最好设置足够适合的队列长度、核心线程数、最大线程数,尽量避免触发拒绝策略。

相关推荐

ES6中 Promise的使用场景?(es6promise用法例子)

一、介绍Promise,译为承诺,是异步编程的一种解决方案,比传统的解决方案(回调函数)更加合理和更加强大在以往我们如果处理多层异步操作,我们往往会像下面那样编写我们的代码doSomething(f...

JavaScript 对 Promise 并发的处理方法

Promise对象代表一个未来的值,它有三种状态:pending待定,这是Promise的初始状态,它可能成功,也可能失败,前途未卜fulfilled已完成,这是一种成功的状态,此时可以获取...

Promise的九大方法(promise的实例方法)

1、promise.resolv静态方法Promise.resolve(value)可以认为是newPromise方法的语法糖,比如Promise.resolve(42)可以认为是以下代码的语...

360前端一面~面试题解析(360前端开发面试题)

1.组件库按需加载怎么做的,具体打包配了什么-按需加载实现:借助打包工具(如Webpack的require.context或ES模块动态导入),在使用组件时才引入对应的代码。例如在V...

前端面试-Promise 的 finally 怎么实现的?如何在工作中使用?

Promise的finally方法是一个非常有用的工具,它无论Promise是成功(fulfilled)还是失败(rejected)都会执行,且不改变Promise的最终结果。它的实现原...

最简单手写Promise,30行代码理解Promise核心原理和发布订阅模式

看了全网手写Promise的,大部分对于新手还是比较难理解的,其中几个比较难的点:状态还未改变时通过发布订阅模式去收集事件实例化的时候通过调用构造函数里传出来的方法去修改类里面的状态,这个叫Re...

前端分享-Promise可以中途取消啦(promise可以取消吗)

传统Promise就像一台需要手动组装的设备,每次使用都要重新接线。而Promise.withResolvers的出现,相当于给开发者发了一个智能遥控器,可以随时随地控制异步操作。它解决了三大...

手写 Promise(手写输入法 中文)

前言都2020年了,Promise大家肯定都在用了,但是估计很多人对其原理还是一知半解,今天就让我们一起实现一个符合PromiseA+规范的Promise。附PromiseA+规范地址...

什么是 Promise.allSettled()!新手老手都要会?

Promise.allSettled()方法返回一个在所有给定的promise都已经fulfilled或rejected后的promise,并带有一个对象数组,每个对象表示对应的pr...

前端面试-关于Promise解析与高频面试题示范

Promise是啥,直接上图:Promise就是处理异步函数的API,它可以包裹一个异步函数,在异步函数完成时抛出完成状态,让代码结束远古时无限回掉的窘境。配合async/await语法糖,可...

宇宙厂:为什么前端离不开 Promise.withResolvers() ?

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.为什么需要Promise.with...

Promise 新增了一个超实用的 API!

在JavaScript的世界里,Promise一直是处理异步操作的神器。而现在,随着ES2025的发布,Promise又迎来了一个超实用的新成员——Promise.try()!这个新方法简...

一次搞懂 Promise 异步处理(promise 异步顺序执行)

PromisePromise就像这个词的表面意识一样,表示一种承诺、许诺,会在后面给出一个结果,成功或者失败。现在已经成为了主流的异步编程的操作方式,写进了标准里面。状态Promise有且仅有...

Promise 核心机制详解(promise机制的实现原理)

一、Promise的核心状态机Promise本质上是一个状态机,其行为由内部状态严格管控。每个Promise实例在创建时处于Pending(等待)状态,此时异步操作尚未完成。当异步操作成功...

javascript——Promise(js实现promise)

1.PromiseES6开始支持,Promise对象用于一个异步操作的最终完成(包括成功和失败)及结果值的表示。简单说就是处理异步请求的。之所以叫Promise,就是我承诺,如果成功则怎么处理,失败怎...

取消回复欢迎 发表评论: