百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Java四种线程池和参数详解(java的四种线程池)

wxin55 2024-11-07 13:13 9 浏览 0 评论

一、四种线程池

Java通过Executors提供四种静态方法来创建线程池

例如:

//创建一个可缓存线程池

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

//执行任务

cachedThreadPool.execute(Runnable command);

如下:

1.newSingleThreadPool 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。

2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

3.newScheduledThreadPool 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。

4.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需求,可灵活回收空闲线程,若无可回收线程,则新建线程。

二、核心参数

四种线程池本质都是创建ThreadPoolExecutor类,ThreadPoolExecutor构造参数如下

1.int corePoolSize 核心线程数量

2.int maximumPoolSize 最大线程数量

3.long keepAliveTime 超过corePoolSize的线程多久不活动别销毁时间

4.TimeUnit unit 时间单位

5.BlockingQueue<Runnable> workQueue 任务队列

6.ThreadFactory threadFactory 线程池工厂

7.RejectedExecutionHandler handler 拒绝策略

三、线程池工厂 ThreadFactory

所在包位置:java.util.concurrent.ThreadFactory

ThreadFactory接口。这个接口是Java自身提供的,用户可以实现它自定义自己的线程启动方式,可以设置线程名称、类型以及优先级等属性。

ThreadFactory vs Default ThreadFactory:

在一个典型的Java ExecutorService程序中,其线程都需要被指定以何种形式运行,如果程序初始化ExecutorService时没有指定ThreadFactory,程序会采用一个默认的ThreadFactory来生成提交线程,但是对于一个严谨对程序来说,定义自己的ThreadFactory永远是个最佳选择。Why??

1.设置更有描述意义的线程名称。如果使用默认的ThreadFactory,它给线程起名字大概规律就是pool-m-thread-n这个样子,如pool-1-thread-1。但是当你分析一个thread dump时,看着这样的名字就很难知道线程的目的。所以使用一个有描述意义的线程名称是分析追踪问题的clue NO.1。

2.设置线程是否是守护线程,默认的ThreadFactory总是提交非守护线程

3.设置线程优先级,默认ThreadFactory总是提交的一般优先级线程

例子:

CustomThreadFactoryBuilder类实现了一种优雅的Builder Mechanism方式去得到一个自定义ThreadFactory实例。ThreadFactory接口中有一个接受Runnable类型参数的方法newThread(Runnable r),你自己的factory逻辑就应该写在这个方法中,去配置线程名称、优先级、守护线程状态等属性。

public class CustomThreadFactoryBuilder {

    private String namePrefix = null;

    private boolean daemon = false;

    private int priority = Thread.NORM_PRIORITY;

    public CustomThreadFactoryBuilder setNamePrefix(String namePrefix) {

        if (namePrefix == null) {

            throw new NullPointerException();

        }

        this.namePrefix = namePrefix;

        return this;

    }

    public CustomThreadFactoryBuilder setDaemon(boolean daemon) {

        this.daemon = daemon;

        return this;

    }

    public CustomThreadFactoryBuilder setPriority(int priority) {

        if (priority < Thread.MIN_PRIORITY){

            throw new IllegalArgumentException(String.format(

                    "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));

        }

        if (priority > Thread.MAX_PRIORITY) {

            throw new IllegalArgumentException(String.format(

                    "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));

        }

        this.priority = priority;

        return this;

    }

    public ThreadFactory build() {

        return build(this);

    }

    private static ThreadFactory build(CustomThreadFactoryBuilder builder) {

        final String namePrefix = builder.namePrefix;

        final Boolean daemon = builder.daemon;

        final Integer priority = builder.priority;

        final AtomicLong count = new AtomicLong(0);

         /*

        return new ThreadFactory() {

            @Override

            public Thread newThread(Runnable runnable) {

                Thread thread = new Thread(runnable);

                if (namePrefix != null) {

                    thread.setName(namePrefix + "-" + count.getAndIncrement());

                }

                if (daemon != null) {

                    thread.setDaemon(daemon);

                }

                if (priority != null) {

                    thread.setPriority(priority);

                }

                return thread;

            }

        };*/

        //jdk8中还是优先使用lamb表达式

        return (Runnable runnable) -> {

            Thread thread = new Thread(runnable);

            if (namePrefix != null) {

                thread.setName(namePrefix + "-" + count.getAndIncrement());

            }

            if (daemon != null) {

                thread.setDaemon(daemon);

            }

                /*

                    thread.setPriority(priority);

                */

            return thread;

        };

    }

}

SimpleTask类实现类Runnable接口,打印出了线程的运行属性(名称,优先级等)。

public class SimpleTask implements Runnable {

    private long sleepTime;

    public SimpleTask(long sleepTime) {

        super();

        this.sleepTime = sleepTime;

    }

    @Override

    public void run() {

        while (true) {

            try {

                System.out.println("Simple task is running on " 

                        + Thread.currentThread().getName() + " with priority " + Thread.currentThread().getPriority());

                Thread.sleep(sleepTime);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}


CustomThreadFactoryDemo类使用我们上面的CustomThreadFactoryBuilder类创建类一个ThreadFactory实例,又使用这个实例获得类一个ExecutoryService,这样所有这个线程池中的线程都会按我们定义好的属性被生成,下面代码中执行类三个SimpleTask。

public class CustomThreadFactoryDemo {

    public static void main(String[] args) {

        ThreadFactory customThreadfactory = new CustomThreadFactoryBuilder()

        .setNamePrefix("DemoPool-Thread").setDaemon(false)

        .setPriority(Thread.MAX_PRIORITY).build();

        ExecutorService executorService = Executors.newFixedThreadPool(3,

        customThreadfactory);

        // Create three simple tasks with 1000 ms sleep time

        SimpleTask simpleTask1 = new SimpleTask(1000);

        SimpleTask simpleTask2 = new SimpleTask(1000);

        SimpleTask simpleTask3 = new SimpleTask(1000);

        // Execute three simple tasks with 1000 ms sleep time

        executorService.execute(simpleTask1);

        executorService.execute(simpleTask2);

        executorService.execute(simpleTask3);

    }

}


输出结果:

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

四、任务队列 BlockingQueue

Tops:BlockingQueue不能够添加null对象,否则会抛出空指针异常。

接口抽象方法

boolean add(E e);

添加元素,添加成功返回true ,添加失败抛出异常 IllegalStateException。

boolean offer(E e);

true:添加元素成功 ; false : 添加元素失败。

void put(E e) throws InterruptedException;

添加元素,直到有空间添加成功才会返回,阻塞方法。

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

true:添加数据成功,false:超时时间到。

E take() throws InterruptedException;

获取队头的元素,阻塞方法,会一直等到有元素获取到才会返回,获取到元素时并将队列中的该元素删除。

E poll(long timeout, TimeUnit unit) throws InterruptedException;

获取队头的元素,阻塞方法,超时时间到则返回null,获取到元素时并将队列中的该元素删除。

int remainingCapacity();

返回理想情况下此队列可以添加的其他元素的数量.

boolean remove(Object o);

移除指定的元素。

boolean contains(Object o);

检查是否包含该元素

int drainTo(Collection<? super E> c);

移除队列中的所有元素并添加到集合c,返回被移除元素的数量。

int drainTo(Collection<? super E> c, int maxElements);

移除队列中maxElements个元素并添加到集合c,返回被移除元素的数量。

实现类

Tops: 所有的实现类都是并发安全的。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

SynchronousQueue

它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

LinkedBlockingDeque

LinkedBlockingDeque就是一个双向队列,任何一端都可以进行元素的出入。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

LinkedBlockingQueue

LinkedBlockingQueue是一个单向队列,只能一端出一端入的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

DelayQueue

是一个支持延时获取元素的无界阻塞队列。内部用 PriorityQueue 实现。

LinkedTransferQueue

PriorityBlockingQueue

PriorityBlockingQueue是带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

五、拒绝策略 RejectedExecutionHandle

在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,针对这些问题java线程池提供了以下几种策略:

1. AbortPolicy

2. DiscardPolicy

3. DiscardOldestPolicy

4. CallerRunsPolicy

5. 自定义策略

AbortPolicy

该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            //不做任何处理,直接抛出异常

            throw new RejectedExecutionException("Task " + r.toString() +

                                                 " rejected from " +

                                                 e.toString());

        }

DiscardPolicy

这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        //就是一个空的方法

        }

DiscardOldestPolicy

这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。

因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

            	//移除队头元素

                e.getQueue().poll();

                //再尝试入队

                e.execute(r);

            }

        }

CallerRunsPolicy

使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                //直接执行run方法

                r.run();

            }

        }


自定义策略

如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。

例如:我定义了我的一个拒绝策略,叫做MyRejectPolicy,里面的逻辑就是打印处理被拒绝的任务内容

public class MyRejectPolicy implements RejectedExecutionHandler{

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        //Sender是我的Runnable类,里面有message字段

        if (r instanceof Sender) {

            Sender sender = (Sender) r;

            //直接打印

            System.out.println(sender.getMessage());

        }

    }

}

Tops:这几种策略没有好坏之分,只是适用不同场景,具体哪种合适根据具体场景和业务需要选择,如果需要特殊处理就自己定义好了。

相关推荐

ES6中 Promise的使用场景?(es6promise用法例子)

一、介绍Promise,译为承诺,是异步编程的一种解决方案,比传统的解决方案(回调函数)更加合理和更加强大在以往我们如果处理多层异步操作,我们往往会像下面那样编写我们的代码doSomething(f...

JavaScript 对 Promise 并发的处理方法

Promise对象代表一个未来的值,它有三种状态:pending待定,这是Promise的初始状态,它可能成功,也可能失败,前途未卜fulfilled已完成,这是一种成功的状态,此时可以获取...

Promise的九大方法(promise的实例方法)

1、promise.resolv静态方法Promise.resolve(value)可以认为是newPromise方法的语法糖,比如Promise.resolve(42)可以认为是以下代码的语...

360前端一面~面试题解析(360前端开发面试题)

1.组件库按需加载怎么做的,具体打包配了什么-按需加载实现:借助打包工具(如Webpack的require.context或ES模块动态导入),在使用组件时才引入对应的代码。例如在V...

前端面试-Promise 的 finally 怎么实现的?如何在工作中使用?

Promise的finally方法是一个非常有用的工具,它无论Promise是成功(fulfilled)还是失败(rejected)都会执行,且不改变Promise的最终结果。它的实现原...

最简单手写Promise,30行代码理解Promise核心原理和发布订阅模式

看了全网手写Promise的,大部分对于新手还是比较难理解的,其中几个比较难的点:状态还未改变时通过发布订阅模式去收集事件实例化的时候通过调用构造函数里传出来的方法去修改类里面的状态,这个叫Re...

前端分享-Promise可以中途取消啦(promise可以取消吗)

传统Promise就像一台需要手动组装的设备,每次使用都要重新接线。而Promise.withResolvers的出现,相当于给开发者发了一个智能遥控器,可以随时随地控制异步操作。它解决了三大...

手写 Promise(手写输入法 中文)

前言都2020年了,Promise大家肯定都在用了,但是估计很多人对其原理还是一知半解,今天就让我们一起实现一个符合PromiseA+规范的Promise。附PromiseA+规范地址...

什么是 Promise.allSettled()!新手老手都要会?

Promise.allSettled()方法返回一个在所有给定的promise都已经fulfilled或rejected后的promise,并带有一个对象数组,每个对象表示对应的pr...

前端面试-关于Promise解析与高频面试题示范

Promise是啥,直接上图:Promise就是处理异步函数的API,它可以包裹一个异步函数,在异步函数完成时抛出完成状态,让代码结束远古时无限回掉的窘境。配合async/await语法糖,可...

宇宙厂:为什么前端离不开 Promise.withResolvers() ?

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.为什么需要Promise.with...

Promise 新增了一个超实用的 API!

在JavaScript的世界里,Promise一直是处理异步操作的神器。而现在,随着ES2025的发布,Promise又迎来了一个超实用的新成员——Promise.try()!这个新方法简...

一次搞懂 Promise 异步处理(promise 异步顺序执行)

PromisePromise就像这个词的表面意识一样,表示一种承诺、许诺,会在后面给出一个结果,成功或者失败。现在已经成为了主流的异步编程的操作方式,写进了标准里面。状态Promise有且仅有...

Promise 核心机制详解(promise机制的实现原理)

一、Promise的核心状态机Promise本质上是一个状态机,其行为由内部状态严格管控。每个Promise实例在创建时处于Pending(等待)状态,此时异步操作尚未完成。当异步操作成功...

javascript——Promise(js实现promise)

1.PromiseES6开始支持,Promise对象用于一个异步操作的最终完成(包括成功和失败)及结果值的表示。简单说就是处理异步请求的。之所以叫Promise,就是我承诺,如果成功则怎么处理,失败怎...

取消回复欢迎 发表评论: