Java线程池原理以及自定义线程池

当你需要同时限制应用程序中运行的线程数时,线程池非常有用。 启动新线程会产生性能开销,每个线程也会为其堆栈等分配一些内存。

可以将任务传递给线程池,而不是为每个任务启动并发执行的新线程。 只要线程池有任何空闲线程,任务就会分配给其中一个线程并执行。 在内部,任务被插入到阻塞队列中,池中的线程从该阻塞队列中出队。 当新任务插入队列时,其中一个空闲线程将成功将其出列并执行它。 线程池中的其余空闲线程将被阻塞,等待出列任务。

从上述所知,一个基本的线程池需要具有

  1. 一个存储线程的容器(容器可以使用队列,链表等数据结构),当有任务时,就从容器中拿出一个线程,来执行任务。
  2. 一个存储任务的阻塞队列。(阻塞队列可以控制任务提交的最大数)
  3. 线程池对外暴露一个execute(Runnable task)方法,用以外界向线程池中提交任务。

自定义阻塞队列

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue<T> {

    /**
     *     使用链表实现一个阻塞队列(数据结构定义数据存储和获取方式,所以只要满足这两点,阻塞队列可以用链表,也可以使用数组等来实现)
     */
    private List<T> queue = new LinkedList();
    /**
     * limit用来限制提交任务的最大数,默认10
     */
    private int limit = 10;

    public BlockingQueue(int limit) {
        this.limit = limit;
    }

    /**
     *
     * @param item
     *
     *  enqueue是一个同步方法,当任务到达上限,便会调用wait方法进行阻塞,否则将任务放入队列中,并唤醒dequeue()任务线程
     */
    public synchronized void enqueue(T item){
        while (this.queue.size() == this.limit) {
            this.wait();
        }
        if (this.queue.size() <= limit) {
            this.notifyAll();
        }
        this.queue.add(item);
    }


    /**
     *
     * @return
     *
     *     dequeue也是一个同步方法,当队列中没有任务时便会调用wait方法进入阻塞,当任务到达最大容量是唤醒其他dequeue()线程
     *     ,并出列一个任务。
     */
    public synchronized T dequeue() {
        while (this.queue.size() == 0) {
            this.wait();
        }
        if (this.queue.size() == this.limit) {
            this.notifyAll();
        }

        return this.queue.remove(0);
    }

 public synchronized int size(){
        return queue.size();
    }
}



新建一个线程池线程类,用来执行提交的任务。结构体中传入任务队列,run()方中发现taskQueue有任务时,获取任务并执行,没有任务就阻塞。

public class PoolThread extends Thread {


    private  BlockingQueue taskQueue = null;

    private boolean isStopped = false;

    public PoolThread(BlockingQueue taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run(){
        while(!isStopped() && !Thread.currentThread().isInterrupted()){
            try{
                //从任务队列获取任务并执行
                Runnable runnable = (Runnable) taskQueue.dequeue();
                runnable.run();
            } catch(Exception e){
                isStopped = true;
                break;
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        this.interrupt();
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}

新建线程池类

public interface Service {

    //关闭线程池
    void shutdown();

    //查看线程池是否已经被shutdown
    boolean isShutdown();

  //提交任务到线程池
    void execute(Runnable runnable);
}
import java.util.ArrayDeque;
import java.util.Queue;

public class ThreadPool  implements Service {

    /**
     * 任务队列,用来存储提交的任务
     */
    private BlockingQueue<Runnable> taskQueue = null;

    /**
     * 线程池中存储线程的容器。
     */
    private Queue<PoolThread> threads = new ArrayDeque<PoolThread>();
    
    private boolean isShutdown = false;
    
    public ThreadPool(int initSize, int maxNoOfTasks){
        taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);

        //初始化线程池
        for (int i = 0; i < initSize; i++) {
            threads.add(new PoolThread(taskQueue));
        }
        
        //启动线程池线程
        threads.forEach(thread -> thread.start());
    }
    
     @Override
    public synchronized void execute(Runnable task)  {
        if (this.isStopped){
            throw new IllegalStateException("ThreadPool is stopped");
        }
        //任务入列
        taskQueue.enqueue(task);
    }
    
    @Override
    public synchronized void shutdown(){
        this.isShutdown= true;
        threads.forEach(thread -> thread.doStop());
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }
}

至此,一个简单的线程池便完成。新建一个线程池测试类

import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {

        final ThreadPool threadPool = new ThreadPool(5 , 20);

        //定义20个任务并且提交到线程池
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() ->{
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + " is running add done");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while (true){
            System.out.println("---------------------------------");
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

打印每次输出5条记录,共输出4次

---------------------------------
---------------------------------
Thread-3 is running add done
Thread-1 is running add done
Thread-0 is running add done
Thread-4 is running add done
Thread-2 is running add done
---------------------------------
---------------------------------
Thread-2 is running add done
Thread-4 is running add done
Thread-0 is running add done
Thread-1 is running add done
Thread-3 is running add done
---------------------------------
---------------------------------
Thread-0 is running add done
Thread-1 is running add done
Thread-3 is running add done
Thread-2 is running add done
Thread-4 is running add done
---------------------------------
---------------------------------
Thread-2 is running add done
Thread-1 is running add done
Thread-3 is running add done
Thread-0 is running add done
Thread-4 is running add done
---------------------------------

当执行完任务后,使用visualvm工具或jstack命令获取线程快照,可以看到有5个线程池中的线程

"Thread-4" #16 prio=5 os_prio=0 tid=0x00000000207b0000 nid=0x2b7c in Object.wait() [0x000000002141e000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-3" #15 prio=5 os_prio=0 tid=0x00000000207ad000 nid=0x56d0 in Object.wait() [0x000000002131f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-2" #14 prio=5 os_prio=0 tid=0x00000000207ab800 nid=0x4cbc in Object.wait() [0x000000002121f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-1" #13 prio=5 os_prio=0 tid=0x00000000207a9800 nid=0x3670 in Object.wait() [0x000000002111f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-0" #12 prio=5 os_prio=0 tid=0x00000000207a9000 nid=0x4d84 in Object.wait() [0x000000002101f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

从线程快照可以看到,线程池的线程名称使用系统默认名称,但在实际编码中通常都会按我们规范定义系统名称,所以我们使用工厂模式对线程的创建进行重构。

使用工厂模式有一下好处

  1. 对象的创建如果比较复杂,需要经过一系列的初始化。使用工厂模式,可以屏蔽这过程。
  2. 把同一类事物归于一个框架之下。比如A和B,他们需要自己定义线程池线程创建,但规定他们都要实现工厂接口,便可以把他们控制在同一框架之下。
  3. 解耦。(只要是不直接创建目标对象,基本上都可以叫解耦或者对修改关闭对扩展开放)

新建线程工厂接口

@FunctionalInterface
public interface ThreadFactory {
    Thread createThread(Runnable runnable);
}

重构后的线程池类如下:

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool  implements Service {

    /**
     * 任务队列,用来存储提交的任务
     */
    private BlockingQueue<Runnable> taskQueue = null;

    /**
     * 线程池中存储线程的容器。
     */
    private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();


    /**
     * 默认线程工厂
     */
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

    private boolean isShutdown = false;


    public ThreadPool(int initSize, int maxNoOfTasks){
        taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);

        //初始化线程池
        for (int i = 0; i < initSize; i++) {
            newThread();
        }
    }

    private void newThread(){
        PoolThread poolThread = new PoolThread(taskQueue);
        Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
        ThreadTask threadTask = new ThreadTask(thread , poolThread);
        threads.add(threadTask);
        thread.start();
    }
    /**
     * 工厂模式屏蔽对象创建的过程
     */
    private static class DefaultThreadFactory implements ThreadFactory{

        private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);

        private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());

        private static final AtomicInteger COUNTER = new AtomicInteger(0);

        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
        }
    }

    /**
     * ThreadTask 只是PoolThread和Thread的组合,因为后面关闭线程还需要用到poolThread的doStop方法
     */
    private static class ThreadTask{

        Thread thread;
        PoolThread poolThread;

        public ThreadTask(Thread thread , PoolThread poolThread){
            this.thread = thread;
            this.poolThread = poolThread;
        }
    }

     @Override
    public synchronized void execute(Runnable task) {
        if (this.isShutdown){
            throw new IllegalStateException("ThreadPool is stopped");
        }
        //任务入列
        taskQueue.enqueue(task);
    }

  @Override
    public synchronized void shutdown(){
        this.isShutdown = true;
        threads.forEach(threadTask -> threadTask.poolThread.doStop());
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }
}

运行测试类,结果如下图所示


image.png

dump文件如下所示


image.png

到目前为如果线程任务队列到达上限,便会调用wait方法进行阻塞,我们可以自定义拒接策略,使处理更灵活。

public interface DenyPolicy<T> {

    void reject(T runnable, ThreadPool threadPool);

    //该拒接策略会直接将任务丢弃
    class DiscardDenyPolicy implements DenyPolicy<Runnable>{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            System.out.println(runnable + "do nothing");
        }
    }

    //该拒绝策略会向任务提交者抛出异常
    class AbortDenyPolicy implements DenyPolicy<Runnable>{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            throw new RunnbaleDenyException("The runnbale " + runnable + " will be abort.");
        }
    }

    //该拒绝策略会使用任务在提交者所在的线程中执行任务
    class RunnerDenyPolicy implements DenyPolicy<Runnable>{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            if (!threadPool.isShutdown()){
                runnable.run();
            }
        }
    }
}
public class RunnbaleDenyException extends RuntimeException {

    public RunnbaleDenyException(String message) {
        super(message);
    }
}

  • reject 为拒接方法
  • DiscardDenyPolicy 策略会直接丢弃掉Runnable任务。
  • AbortDenyPolicy 策略会抛出RunnbaleDenyException异常。
  • RunnerDenyPolicy 策略,交给调用者的线程直接运行runnable,而不会被加入到线程池中。

重构阻塞队列,当队列中的值超出最大容量时使用拒接策略。

重构后的阻塞队列

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue<T> {

    /**
     *     使用链表实现一个阻塞队列(数据结构定义数据存储和获取方式,所以只要满足这两点,阻塞队列可以用链表,也可以使用数组等来实现)
     */
    private List<T> queue = new LinkedList();
    /**
     * limit用来限制提交任务的最大数,默认10
     */
    private int limit = 10;


    /**
     * 拒接策略
     */
    private DenyPolicy denyPolicy;

    private ThreadPool threadPool;


    public BlockingQueue(int limit , DenyPolicy denyPolicy , ThreadPool threadPool) {
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
    }

    /**
     *
     * @param item
     *  enqueue是一个同步方法,当任务到达上限,便会调用wait方法进行阻塞,否则将任务放入队列中,并唤醒dequeue()任务线程
     */
    public synchronized void enqueue(T item) {
        //若果队列到达最大容量,调用拒接策略
        if (this.queue.size() >= this.limit) {
            denyPolicy.reject(item , threadPool);
        }
        if (this.queue.size() <= limit) {
            this.notifyAll();
        }
        this.queue.add(item);
    }


    /**
     *
     * @return
     *
     *     dequeue也是一个同步方法,当队列中没有任务时便会调用wait方法进入阻塞,当任务到达最大容量是唤醒其他dequeue()线程
     *     ,并出列一个任务。
     */
    public synchronized T dequeue(){
        while (this.queue.size() == 0) {
            this.wait();
        }
        if (this.queue.size() == this.limit) {
            this.notifyAll();
        }
        return this.queue.remove(0);
    }

 public synchronized int size(){
        return queue.size();
    }
}

线程池类修改如下两点ThreadPool.class

...
public class ThreadPool implements Service{
 /**
     * 默认使用丢弃策略
     */
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();

    public ThreadPool(int noOfThreads , int maxNoOfTasks){
        taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);

        //初始化线程池
        for (int i = 0; i < noOfThreads; i++) {
            newThread();
        }
    }
}
...

运行测试类测试类,可以看到当任务到达最大容量时,就会有任务被抛弃


image.png

目前初始化线程池时,只指定了初始线程数init,并不能很好的管理线程池线程数量。继续对线程池进行扩展。

  • 新增两个控制线程池线程数量的参数。线程池自动扩充时最大的线程池数量max,线程池空闲时需要释放线程但是也要维护一定数量的活跃线程数量或者核心数量core。有了这init , max , core三个参数就能很好的控制线程池中线程数量,三者之间的关系init <= core <= max。
  • 新增参数Keepedalive时间,该时间主要决定线程各个重要参数自动维护的时间间隔。

重构后的线程池类

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool implements Service{

    /**
     * 初始化线程数量
     */
    private final int initSize;

    /**
     *   线程池最大线程数量
     */
    private final int maxSzie;

    /**
     *     线程池核心线程数量
     */
    private final int coreSize;

    /**
     *   当前活跃的线程数量
     */
    private int activeCount;

    private final long keepAliveTime;

    private final TimeUnit timeUnit;

    private  InternalTask internalTask;

    /**
     *     创建线程所需的工厂
     */
    private final ThreadFactory threadFactory;


    /**
     * 任务队列,用来存储提交的任务
     */
    private BlockingQueue<Runnable> taskQueue = null;

    /**
     * 线程池中存储线程的容器。
     */
    private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();


    /**
     * 默认线程工厂
     */
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

    private boolean isShutdown = false;

    /**
     * 默认使用丢弃策略
     */
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();


    public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
        this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
    }

    public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
                      , DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){

        this.initSize = initSize;
        this.maxSzie = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.taskQueue = new  BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;

        init();

    }



    private void init(){
        //初始化线程池
        for (int i = 0; i < initSize; i++) {
            newThread();
        }

        //启动内部维护线程
        internalTask =  new InternalTask();
        internalTask.start();
    }

    private void newThread(){
        PoolThread poolThread = new PoolThread(taskQueue);
        Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
        ThreadTask threadTask = new ThreadTask(thread , poolThread);
        activeCount++;
        threads.add(threadTask);
        thread.start();
    }

    private void removeThread(){
        //从线程池中移除某个线程
        ThreadTask threadTask = threads.remove();
        threadTask.poolThread.stop();
        this.activeCount--;
    }
    /**
     * 工厂模式屏蔽对象创建的过程
     */
    private static class DefaultThreadFactory implements ThreadFactory{

        private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);

        private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());

        private static final AtomicInteger COUNTER = new AtomicInteger(0);

        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
        }
    }

    /**
     * ThreadTask 只是PoolThread和Thread的组合,因为后面关闭线程还需要用到poolThread的doStop方法
     */
    private static class ThreadTask{

        Thread thread;
        PoolThread poolThread;

        public ThreadTask(Thread thread , PoolThread poolThread){
            this.thread = thread;
            this.poolThread = poolThread;
        }
    }


    @Override
    public synchronized void execute(Runnable task)  {
        if (this.isShutdown){
            throw new IllegalStateException("ThreadPool is stopped");
        }
        //任务入列
        taskQueue.enqueue(task);
    }

    @Override
    public int getInitSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.initSize;
    }

    @Override
    public int getMaxSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.maxSzie;
    }

    @Override
    public int getCoreSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.coreSize;
    }

    @Override
    public int getQueueSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return taskQueue.size();
    }

    @Override
    public int getActiveCount() {
        synchronized (this){
            return this.activeCount;
        }
    }

    @Override
    public synchronized void shutdown(){
        this.isShutdown = true;
        threads.forEach(threadTask -> threadTask.poolThread.doStop());
        internalTask.interrupt();
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }



    class InternalTask extends Thread{
        @Override
        public void run() {
            //run方法继承自Thread,主要用于维护线程数量,比如扩容,回收等工作
            while (!isShutdown&&!isInterrupted()){
                try {
                    timeUnit.sleep(keepAliveTime);
                } catch (InterruptedException e) {
                    isShutdown = true;
                    break;
                }
                synchronized (ThreadPool.this){
                    if (isShutdown){
                        break;
                    }
                    //当前队列中任务尚未处理,并且activeCount< coreSize则继续扩容
                    if (taskQueue.size() > 0 && activeCount <coreSize){
                        for (int i = initSize; i < coreSize ; i++){
                            newThread();
                        }
                        //continue的目的在于不想让线程的扩容直接打到maxsize
                        continue;
                    }

                    //当前的队列中有任务尚未处理,并且activeCount < maxSize则继续扩容
                    if (taskQueue.size() > 0 && activeCount < maxSzie){
                        for (int i = coreSize; i < maxSzie ; i++){
                            newThread();
                        }
                    }

                    //如果任务队列中没有任务,则需要回收,回收至coreSize即可
                    if (taskQueue.size() == 0 && activeCount > coreSize ){
                        for (int i = coreSize ; i < activeCount ; i++){
                            removeThread();
                        }
                    }
                }
            }
        }
    }
}

线程池类中主要新增了如下参数

  /**
     * 初始化线程数量
     */
    private final int initSize;

    /**
     *   线程池最大线程数量
     */
    private final int maxSzie;

    /**
     *     线程池核心线程数量
     */
    private final int coreSize;

    /**
     *   当前活跃的线程数量
     */
    private int activeCount;

    private final long keepAliveTime;

    private final TimeUnit timeUnit;

    /**
     *     创建线程所需的工厂
     */
    private final ThreadFactory threadFactory;

   private  InternalTask internalTask;

重写了两个构造函数

 public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
        this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
    }

    public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
                      , DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){

        this.initSize = initSize;
        this.maxSzie = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.taskQueue = new  BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;

        init();
       
    }

新增一个线程类,用于维护内部状态

 class InternalTask extends Thread{
        @Override
        public void run() {
            //run方法继承自Thread,主要用于维护线程数量,比如扩容,回收等工作
            while (!isShutdown&&!isInterrupted()){
                try {
                    timeUnit.sleep(keepAliveTime);
                } catch (InterruptedException e) {
                    isShutdown = true;
                    break;
                }
                synchronized (ThreadPool.this){
                    if (isShutdown){
                        break;
                    }
                    //当前队列中任务尚未处理,并且activeCount< coreSize则继续扩容
                    if (taskQueue.size() > 0 && activeCount <coreSize){
                        for (int i = initSize; i < coreSize ; i++){
                            newThread();
                        }
                        //continue的目的在于不想让线程的扩容直接打到maxsize
                        continue;
                    }

                    //当前的队列中有任务尚未处理,并且activeCount < maxSize则继续扩容
                    if (taskQueue.size() > 0 && activeCount < maxSzie){
                        for (int i = coreSize; i < maxSzie ; i++){
                            newThread();
                        }
                    }

                    //如果任务队列中没有任务,则需要回收,回收至coreSize即可
                    if (taskQueue.size() == 0 && activeCount > coreSize ){
                        for (int i = coreSize ; i < activeCount ; i++){
                            removeThread();
                        }
                    }
                }
            }
        }
    }

以及一系列辅助方法

public interface Service {
.....
  //获取线程池的初始化大小
    int getInitSize();

    //获取线程池最大的线程数
    int getMaxSize();

    //获取线程池核心线程梳理
    int getCoreSize();

    //获取线程池中活跃线程的数量大小
    int getQueueSize();

    //获取线程池中用于缓存任务队列的大小
    int getActiveCount();
.....
}
@Override
    public int getInitSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.initSize;
    }

    @Override
    public int getMaxSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.maxSzie;
    }

    @Override
    public int getCoreSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.coreSize;
    }

    @Override
    public int getQueueSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return taskQueue.size();
    }

    @Override
    public int getActiveCount() {
        synchronized (this){
            return this.activeCount;
        }
    }

执行测试类

import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {

        final ThreadPool threadPool = new ThreadPool(2 , 6 , 4 , 1000);

        //定义20个任务并且提交到线程池
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() ->{
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + " is running add done");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while (true){
            System.out.println("getActiveCount: " + threadPool.getActiveCount());
            System.out.println("getQueueSize: " + threadPool.getQueueSize());
            System.out.println("getCoreSize: " + threadPool.getCoreSize());
            System.out.println("getMaxSize: "+ threadPool.getMaxSize());
            System.out.println("======================================");
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

会有如下输出,activeCount数量会增长到与maxSize一直,最后会保持与coreSize相等

getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--4 is running add done
thread-pool--3 is running add done
thread-pool--5 is running add done
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
thread-pool--4 is running add done
thread-pool--5 is running add done
thread-pool-0 is running add done
thread-pool--1 is running add done
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================

到这里,一个功能比较完善的线程池就已经完成了
代码地址: github

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 线程池中有一定数量的工作线程,工作线程会循环从任务队列中获取任务,并执行这个任务。那么怎么去停止这些工作线程呢?这...
    wo883721阅读 1,593评论 0 14
  • 第一部分 来看一下线程池的框架图,如下: 1、Executor任务提交接口与Executors工具类 Execut...
    压抑的内心阅读 4,243评论 1 24
  • 【JAVA 线程】 线程 进程:是一个正在执行中的程序。每一个进程执行都有一个执行顺序。该顺序是一个执行路径,或者...
    Rtia阅读 2,758评论 2 20
  • 感恩路上 文/熙云阁 朵朵云儿漫步在蔚蓝的天空 感谢风儿让她们相聚 羊群悠闲地唱着小曲 感恩大地带给它...
    熙云阁阅读 166评论 0 0
  • 站内优化思路(未来几天会不定时的分享一下我的站内优化详细笔记,今天整理了一下优化思路,觉得有用的朋友可以关注收藏一...
    简小猫阅读 451评论 0 4