第8章 - Java 多线程1

第8章 - Java 多线程1

作者:vwFisher
时间:2019-09-04
GitHub代码:https://github.com/vwFisher/JavaBasicGuide

目录



1 多线程

一个线程则是进程中的执行流程,一个进程中可以同时包括多个线程

每个线程也可以得到一小段程序的执行时间,这样一个进程就可以具有多个并发执行的线程,每个线程也可以得到一小段执行的线程。在单线程中,程序代码按调用顺序依次往下执行,如果需要一个进程同时完成多段代码的操作,就需要产生多线程。

其实应用程序的执行是CPU在做着快速的切换完成的,这个切换是随机的

JVM启动时就启动了多个线程,至少有两个线程在执行

  1. 执行main函数的线程(该线程的认为代码都定义在main函数中)
  2. 负责垃圾回收的线程(会调用finalize() 回收,随机顺序) System.gc(); 来回收垃圾

1.1 相关概念

  1. 异步(asynchronous) 和 同步(synchronize) 的区别
  2. 并发(Concurrency) 和 并行(Parallelism)
  3. 临界区:即共享资源,但每次使用只能有一个线程使用它,其他线程如果需要使用就要等待
  4. 阻塞 和 非阻塞
  5. 死锁:A 和 B 互相占用着对方的资源而不释放,导致 A 和 B 都阻塞了
  6. 饥饿:指一/多个线程因为种种原因(如:优先级低等)无法获取资源,导致一致无法执行
  7. 活锁:A 和 B 需要对方资源,而双方都采取谦让策略(主动释放释放),导致资源在 A 和 B 不断跳动,但是 A 和 B 始终不能同事拿到所有资源而正常执行


1.2 并发控制级别

由于临界区的存在,多线程之间的并发必须受到控制,大致分为以下级别

  1. 阻塞(Blocking):synchronized,重入锁,会在执行代码前尝试得到临界区的锁,拿不到就阻塞等待
  2. 无饥饿(Starvation-Free):线程调度总是倾向于优先级高的,如果锁是公平的,满足先来后到,那么所有线程都有机会执行
  3. 无障碍(Obstruction-Free):无障碍是一种最弱的非阻塞调度,自由进出临界区/无竞争时,有限步内完成操作,有竞争时,回滚数据
  4. 无锁(Lock-Free):是无障碍的,保证有一个线程可以在有限步内完成操作,可能引起饥饿
  5. 无等待(Wait-Free):是无锁的,要求所有的线程都必须在有限步内完成,保证无饥饿。典型无等待结构就是RCU(Read-Copy-Update),读数据不加以控制,写数据,写到副本,等待合适时机更新


1.3 Java内存模型特点

  1. 原子性(Atomicity):指一个操作一旦开始,不会被其他线程影响。
  2. 可见性(Visibility):指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道这个修改。引起可见性的问题来源(系统的缓存优化,硬件优化,指令重排等)
  3. 有序性(Ordering):代码是按顺序执行的,指令重排可能会引起多线程(我们认为的代码没有按顺序执行)

指令重排:Java虚拟机和执行系统会对指令进行重排优化,重排遵循一定规则:

  1. 程序顺序原则:一个线程内保证语义的串行性。但没有义务保证多线程间的语义也一致
  2. volatile规则:volatile变量的写,先发生于读,这保证了volatile变量的可见性
  3. 锁规则:解锁(unlock)必须发生在随后的加锁(lock)前
  4. 传递性:A先于B,B先于C,那么A必然先于C
  5. 线程的 start() 方法先于它的每一个动作
  6. 线程的所有操作先于线程的终结( Thread.join() )
  7. 线程的中断( interrupt() )先于被中断线程的代码
  8. 对象的构造函数执行、结束先于 finalize() 方法


1.4 线程概念

线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

  1. 程序:是指令数据及其组织形式的描述。含有指令和数据的文件,被存储在磁盘或其他的数据存储设备中,也就是说程序是静态的代码。
  2. 进程:进程是程序的一次执行过程,是系统运行程序(系统进行资源分配和调度)的基本单位,因此进程是动态的,进程是程序的实体,也是线程的容器。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如:CPU时间,内存空间,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程是进程划分成的更小的运行单位。线程和进程最大的不同在于基本上各进程是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。从另一角度来说,进程属于操作系统的范畴,主要是同一段时间内,可以同时执行一个以上的程序,而线程则是在同一程序内几乎同时执行一个以上的程序段。
  3. 线程: 与进程相似,但线程是一个比进程更小的执行单位,是进程中一个负责程序执行的控制单元(执行路径,程序执行的最小单位)。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。
  4. 多线程: 一个进程中可以多执行路径。开启多个线程是为了同时运行自己要执行的任务

生命周期:

线程生命周期图.png

线程的状态在Thread的内部类State有定义

状态名称 说明
NEW 初始状态。线程被创建,但还没有调用 start()
RUNNABLE 运行状态。这个阶段会在就绪、执行中轮回,得到系统资源就执行,时间片结束后又到就绪。多线程看上去是同时执行,实际上,同一时间点上只有一个线程在执行,只是(系统分配时间片给线程)线程切换地很快。
BLOCKED 阻塞状态。表示线程阻塞于锁,即 RUNNABLE 过程中遇到 synchronized 同步块进入阻塞,此时回暂停执行,直到获取请求的锁
WAITING 等待状态。调用 wait() 方法进入等待,此时会释放占有的资源,同时等到n otify()/notifyAll() 来唤醒线程回到 RUNNABLE
TIME_WAITING 超市等待状态,该状态不同于 WAITING,它是可以在指定时间自行返回的。sleep(time)/wait(time) 进入。但是该方法不会释放占有的资源
TERMINATED 终止状态,执行完毕后,进入TERMINATED状态,线程结束

其他方法:join():调用join()加入线程后,等待目标线程的终止


1.5 Thread类

内部维护属性

private volatile String name;  // 线程名称
private int         priority;  // 优先级
private Thread      threadQ;
private long        eetop;
private boolean     single_step;       // 是否是单步执行
private boolean     daemon = false;    // 是否是守护线程
private boolean     stillborn = false; // 虚拟机状态
private Runnable    target;   // 执行目标(即被执行的Runnable)
private ThreadGroup group;    // 所属线程组
private ClassLoader contextClassLoader;  // 线程的上下文
private AccessControlContext inheritedAccessControlContext; // 继承的请求控制
private static int threadInitNumber;     // 默认线程的自动编号

// 当前线程附属的 ThreadLocal,而 ThreadLocalMap 会被 ThreadLocal 维护)
ThreadLocal.ThreadLocalMap threadLocals = null; 

// 为子线程提供从父线程那里继承的值。
// 在创建子线程时,子线程会接收所有可继承的线程局部变量的初始值,以获得父线程所具有的值。
// 创建一个线程时如果保存了所有 InheritableThreadLocal 对象的值,那么这些值也将自动传递给子线程。
// 如果一个子线程调用 InheritableThreadLocal 的 get() ,那么它将与它的父线程看到同一个对象
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; 

private long stackSize; // 该线程请求的堆栈大小,默认一般都是忽略
private long nativeParkEventPointer; 
private long tid;       // 线程专属ID,但名字可能重复
private static long threadSeqNumber;   // 用来生成Thread ID
private volatile int threadStatus = 0; // 标识线程状态,默认0=NEW

// 中断阻塞器:当线程发生IO中断时,需要在线程被设置为中断状态后调用该对象的interrupt方法
volatile Object parkBlocker; 

private volatile Interruptible blocker;          // 阻塞器锁,主要用于处理阻塞情况
private final Object blockerLock = new Object(); // 阻断锁
public final static int MIN_PRIORITY = 1;        // 最小优先级
public final static int NORM_PRIORITY = 5;       // 线程的优先级中第二的同时也是默认的优先级
public final static int MAX_PRIORITY = 10;       // 最大优先级
private boolean stopBeforeStart;                 // 判断stop是否在Start前
private Throwable throwableFromStop;             // 记录Throwable,在停止之前

构造方法:都是以下4个参数的重载方法

public Thread(ThreadGroup group, Runnable target, String name, long stackSize);
  • group:线程组,默认null
  • target:被调用的目标对象,默认null
  • name:名字,默认:"Thread-" nextThreadNum()
  • stackSize:新线程分配所需堆栈大小,默认0

构造函数,最终调用init()的重载,主要流程如下:获取当前线程 -> 获取安全管理器(如果有,安全检查,权限检查),设置线程组(检查是否允许调用线程修改线程组参数),设置守护标记和优先级。

private void init(ThreadGroup g, 
                  Runnable target, 
                  String name, 
                  long stackSize, 
                  AccessControlContext acc, 
                  boolean inheritThreadLocals)

本地方法

public static native Thread currentThread(); // 获得当前的线程
public static native void yield();           // 使当前线程从执行状态 -> 就绪状态。CPU重新选择线程执行

// 当前正在执行的线程睡眠,休眠结束后 -> 运行状态
public static native void sleep(long millis) throws InterruptedException; 

public final native boolean isAlive(); // 判断线程是否存活

// 当且仅当当前线程在指定的对象上保持监视器锁时,才返回 true。
public static native boolean holdsLock(Object obj); 

private native void start0();        // 开始线程
private native void setPriority0(int newPriority); //设置线程优先级
private native void stop0(Object o); // 停止线程
private native void suspend0();      // 线程挂起(暂停)
private native void resume0();       // 将一个挂起线程复活继续执行
private native void interrupt0();    // 该线程的中断状态将被设置
private native boolean isInterrupted(boolean ClearInterrupted)

方法

private static synchronized int nextThreadNum();  // 获取next线程编号
private static synchronized long nextThreadID()   // 获取next线程ID
public static void sleep(long millis, int nanos)  // millis毫秒,nanos纳秒,1毫秒=1000纳秒

public synchronized void start()  // NEW -> RUNNABLE
public void run()                 // RUNNABLE:就绪 -> 执行
private void exit()               // Run方法执行结束后来结束线程的

public final void stop()  // Deprecated
public final synchronized void stop(Throwable obj)  // Deprecated
private final synchronized void stop1(Throwable th) // 

public void interrupt()              // 中断线程
public boolean isInterrupted()       // 判断是否被中断
public static boolean interrupted()  // 判断是否被中断,并清除当前中断状态
public final void suspend()          // 线程挂起
public final void resume()           // 挂起 -> 唤醒
public final void setPriority(int newPriority) // 设置线程的优先级
public final int getPriority()                 // 线程的优先级
public final void setName(String name)         // 设置线程名称
public final String getName()                  // 获得线程名称
public final ThreadGroup getThreadGroup()      // 获得当前线程的线程组
public static int activeCount()                // 获得当前线程组还在活动中的线程数
public static int enumerate(Thread tarray[])   // 将这个线程组中存活的线程全部复制到预设置好的数组中
public final void join() throws InterruptedException
public final synchronized void join(long millis) throws InterruptedException

// 阻塞当前线程,使其处于等待状态,因为子线程执行的时间可能比主线程执行时间还长,
// 所以join是主线程需要在它执行完后再销毁。当然也可以加参数join(long millis, int nanos),
// 使其等待N秒N毫秒,如果它已经处于join方法,则报InterruptedException 。
public final synchronized void join(long millis, int nanos) throws InterruptedException

public static void dumpStack()          // 将当前线程的堆栈跟踪打印至标准错误流
public final void setDaemon(boolean on) // 设置线程是否是守护线程
public final boolean isDaemon()         // 确认线程是否是守护线程
public final void checkAccess()         // 检查是否允许调用线程修改线程组参数
public String toString()                // 返回该线程的字符串表示形式,包括线程名称、优先级和线程组。
public ClassLoader getContextClassLoader()

// 线程的上下文ClassLoader。默认使用父线程的ClassLoader(用于加载应用程序的类加载器)。
// 着2个方法需要权限,如果有安全管理器,对应权限:获取=getClassLoader,修改=setContextClassLoader
public void setContextClassLoader(ClassLoader cl)

public StackTraceElement[] getStackTrace() // 返回一个表示该线程堆栈转储的堆栈跟踪元素数组

// 返回所有活动线程的堆栈跟踪的一个映射
public static Map<Thread, StackTraceElement[]> getAllStackTraces() 


1.6 Thread基本操作

1.6.1 新建线程

线程的2种实现方式

  1. 继承java.lang.Thread类(Thread也是实现Runable接口)
  2. 实现java.lang.Runnable接口(实现Callable用FutureTask的方式,与继承Runnable一样)

建议:采用第二种方式,实现Runnable好处在于

  1. 将线程的任务从线程的子类中分离出来,可以进行单独封装。按照面向对象的思想将任务的封装成对象。
  2. 避免了java单继承的局限性。

启动:将需要实现的功能代码写入run()中,调用start()来执行线程(JVM会启动线程调用run()方法)。

调用run和调用start有什么区别

  1. 直接调用run, 那和执行普通方法没有任何区别.
  2. 只有在调用start方法, run方法才会作为一个线程方法执行

(basic.multhread.basic.ThreadDemo)

public class ThreadDemo {
    public static void main(String[] args) {
        ThreadTask t1 = new ThreadTask();
        ThreadTask t2 = new ThreadTask();
        t1.start();    // 开启线程,调用run方法。
        t2.start();
        RunableTask runableTask = new RunableTask();
        Thread t3 = new Thread(runableTask);
        Thread t4 = new Thread(runableTask);
        t3.start();
        t4.start();
        System.out.println("Main Over,threadName:" + Thread.currentThread().getName());
    }

    /** 第一种方式,继承Thread */
    static class ThreadTask extends Thread {
        @Override
        public void run() {
            for (int x = 0; x < 2; x++) {
                System.out.println("ThreadTask --> x:" + x + ",name:" + Thread.currentThread().getName());
            }
        }
    }

    /** 第二种方式,实现Runnable */
    static class RunableTask implements Runnable {
        @Override
        public void run() {
            for (int x = 0; x < 2; x++) {
                System.out.println("RunableTask --> x:" + x + ",name:" + Thread.currentThread().getName());
            }
        }
    }
}

1.6.2 终止线程

stop():已过时,因为它是强制终止线程(线程执行一半就结束可能影响数据一致性)。建议自己维护stopFlag记录停止标记来替代stop()。当为true时,while循环执行的线程就停止

(basic.multhread.basic.stop.StopDemo)

public class StopDemo {
    public static void main(String[] args) {
        Runnable runnable = new StopThreadTask();
        Thread thread = new Thread(runnable);
        thread.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ((StopThreadTask) runnable).stopByStopFlag();
    }
    
    static class StopThreadTask implements Runnable {
        private boolean stopFlag = false;
        @Override
        public void run() {
            while (true) {
                if (stopFlag) {
                    System.out.println("StopThreadTask --> Stop by stopFlag");
                    break;
                }
                System.out.println("StopThreadTask run,threadName:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void stopByStopFlag() {
            stopFlag = true;
            System.out.println("StopThreadTask --> stopByStopFlag()");
        }
    }
}

1.6.3 中断线程

(basic.multhread.basic.stop.StopInterruptDemo)

与中断相关的方法:

public void interrupt()             // 中断线程
public boolean isInterrupted()      // 判断是否被中断
public static boolean interrupted() // 判断是否被中断,并清除当前中断状态

当调用 interrupt,调用 IsInterrupted 即为 true。但是目标线程接收到中断命令,不一定就会退出。可以使用该标记在while循环体中控制停止

注:Thread.sleep() 由于中断而抛出 InterruptedException 异常(该线程被中断异常),此时,它会清理中断标记(catch后需要再设置中断标记位)

while (true) {
    if (Thread.currentThread().isInterrupted()) {
        System.out.println("interruptTest --> stop by isInterrupted():" + Thread.currentThread().isInterrupted());
        break;
    }
}

1.6.4 wait(等待) 和 notify(唤醒)

Object 对象中的 wait() 和 notify() / notifyAll()

工作机制:

  1. 当调用 wait(),该线程会进入等待队列(可能有多个线程)
  2. notify 用来环境等待队列中某一个线程
  3. notifyAll() 唤醒等待队列所有线程。 当线程被唤醒后,还需要看是否能拿到需要的资源(进入临界区)

注:wait() 和 sleep() 区别:wait会释放资源、执行权,sleep不会释放资源,释放执行权

(basic.multhread.basic.WaitNotifyDemo)

public class WaitNotifyDemo {
    final static Object object = new Object();

    public static class T1 implements Runnable {
        @Override
        public void run() {
            System.out.println("T1 Run,time:" + System.currentTimeMillis());
            synchronized (object) {
                System.out.println("T1 Start,time:" + System.currentTimeMillis());
                try {
                    System.out.println("T1 wait for object,time:" + System.currentTimeMillis());
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("T1 End,time:" + System.currentTimeMillis());
            }
        }
    }

    public static class T2 implements Runnable {
        @Override
        public void run() {
            System.out.println("T2 Run,time:" + System.currentTimeMillis());
            synchronized (object) {
                System.out.println("T2,time:" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("T2 Start notify one Thread,time:" + System.currentTimeMillis());
                object.notify();
                System.out.println("T2 Sleep End,time:" + System.currentTimeMillis());
            }
        }
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(new T1());
        Thread t2 = new Thread(new T2());
        t1.start();
        t2.start();
    }
}

1.6.5 suspend(挂起) 和 resume(继续执行)

(basic.multhread.basic.supend.SupendResumeDemo, GoodSupendResumeDemo)

public final void suspend() // 线程挂起
public final void resume()  // 挂起 -> 唤醒

2个方法都已过时,不建议使用,suspend挂起线程后,不会释放资源。需要等到 resume 继续执行完毕后才释放。而且 resume 再 supend 之前执行,会引起线程永远在挂起,而无法继续执行。更严重的是它不会释放资源

建议使用 wait()/notify() 或 重入锁的 Condition 来代替

1.6.6 join(等待线程结束) 和 yield(谦让)

(basic.multhread.basic.JoinDemo)

public final void join() throws InterruptedException // -> join(0) 一直等待
public final synchronized void join(long millis) throws InterruptedException
public final synchronized void join(long millis, int nanos) throws InterruptedException

在很多情况下,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后再结束,这个时候就要用到 join() 方法了.

join() 会阻塞当前线程,直到目标线程执行完毕。默认一直等待(0),给出指定时间,当子线程执行超过指定时间没有结束,主线程会继续执行。

而yield(),顾名思义谦让,他会让出CPU时间片且释放资源,由系统调度执行所有线程的某一个线程(注意,yield()后,也可能CPU还是执行当前线程),

1.6.7 volatile 与 Java内存模型(JMM)

(basic.multhread.basic.VolatileDemo)

Java内存模型都是围绕着原子性、有序性、可见性展开的。Java使用了一些特殊的操作或关键字,来告诉虚拟机不能随意变动优化目标指令。如:volatile

volatile 告诉 JVM,这个变量可能会有多个线程来修改,要保证其他线程的可见性问题。但是volatile无法保证一些符合复合操作的原子性(保证可见性,不保证原子性)。也无法替代 synchronized 关键字的作用

volatile能保证的:

  1. 保证可见性:使用 volatile 关键字会强制将修改的值立即写入主存。例如,线程1对val进行修改,线程2的工作内存中的缓存变量var失效
  2. 不保证原子性:例如 i++ 操作,是分为2个步骤,获取 -> 加1操作,然而却有可能获取 -> (其他线程加1) -> 当前线程+1,实际结果是2

最佳实践:

  1. 对变量的写操作,依赖于当前值,例如i++,依赖于i
  2. 该变量没有包含在具有其他变量的不变式中

非原子操作时,volatile 线程不安全

public class VolatileDemo {

    private volatile int count = 0;

    public int getCount() {
        return this.count;
    }

    public void incrementCount() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        this.count++;
    }

    public static void main(String[] args) {
        VolatileDemo demo = new VolatileDemo();
        for (int i = 0; i < 1000; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.incrementCount();
                }
            });
            thread.setName("Thread " + i);
            thread.start();
        }
        while (Thread.activeCount() > 2) {
            Thread.yield();
        }
        System.out.println(demo.getCount());
    }
}

原因如下,因为 i++ 不是原子操作,假设执行流程如下时,就会出现问题

  1. 线程A 读取 count 的值
  2. 线程B 读取 count 的值
  3. 线程B 执行加1操作
  4. 线程A 执行加1操作
  5. 线程A 写入最新 count
  6. 线程B 写入最新 count

1.6.8 线程组

(basic.multhread.basic.ThreadGroupDemo)

可以将同类线程,放置在同一个线程组进行管理。同样ThreadGroup也有stop(),用于停止线程组所有线程

ThreadGroup tg = new ThreadGroup("PrintGroup");
Thread t1 = new Thread(tg, new ThreadGroupTask(), "t1");
Thread t2 = new Thread(tg, new ThreadGroupTask(), "t2");
t1.start();
t2.start();
System.out.println(tg.activeCount()); // 获取线程组里的线程个数
tg.list(); // 打印线程组里的线程信息

1.6.9 daemon(守护线程)

(basic.multhread.basic.DaemonThreadDemo)

守护线程是一个特殊线程。只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作。只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。

守护线程必须在 start() 之前 setDaemon(true),否则只是当作用户线程使用而已

1.6.10 线程的优先级

每个线程都具有各自的优先级,线程的优先级可以在程序中表明该线程的重要性,如果有很多线程处于就绪状态, 系统会根据优先级来决定首先使哪个线程进入运行状态。但这并不意味着低优先级的线程得不到运行,而只是它运行的几率比较小,如垃圾回收线程的优先级就较低

public final static int MIN_PRIORITY = 1; // 最小优先级
public final static int NORM_PRIORITY = 5; // 线程的优先级中第二的同时也是默认的优先级
public final static int MAX_PRIORITY = 10; // 最大优先级

通过如下方法设置,和获取优先级,优先级范围:1~10。不是该范围将抛出IllegalArgumentException异常

public final void setPriority(int newPriority) // 设置线程的优先级
public final int getPriority() // 线程的优先级

1.6.11 synchronized 与 线程安全

并发程序开发一大关注点就是线程安全。关键字synchronized的作用是实现线程间的同步。对同步的代码加锁,每一次执行职能由一个线程可以进入同步块,从而保证线程间的安全性。作用:

  1. 指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁
  2. 直接作用于实例方法:相当于给当前实例加锁,进入同步代码前要获得当前实例的锁
  3. 直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁

synchronized可以保证有序性和原子性

(basic.multhread.basic.SynchronizedDemo)

public class SynchronizedDemo {
    public static class SynchronizedThread implements Runnable {
        static int i = 0;
        public synchronized void increase() {
            i++;
            System.out.println(Thread.currentThread().getName() + " increase(),i:" + i);
        }

        @Override
        public void run() {
            for (int j = 0; j < 5; j++) {
                increase();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("i:" + SynchronizedThread.i);
        Runnable runnable = new SynchronizedThread();
        Thread t1 = new Thread(runnable, "t1");
        Thread t2 = new Thread(runnable, "t2");
        t1.start();
        t2.start();
        try {
            t1.join();
            t2.join();
            System.out.println("i:" + SynchronizedThread.i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}




2 java.util.concurrent - 并发包

2.1 同步控制

synchronized是对同步控制最简单的控制方法。它决定了一个线程是否可以访问临界区资源。wait() 和 notify() 起到了线程等待和通知的作用。而 JDK 还提供其他的更好地方式来替代这些方法

  • 重入锁 - synchronized的功能扩展
  • Condition条件 - 重入锁的好搭档
  • ReadWriteLock - 读写锁
  • CountDownLatch - 计数器(闭锁)
  • CyclicBarrier - 循环栅栏
  • Semaphore - 信号量
  • Exchanger - 交换机
  • LockSupport - 线程阻塞工具类

2.1.1 重入锁 - synchronized的功能扩展

构造函数

public ReentrantLock()
public ReentrantLock(boolean fair) // fair:是否公平

常用方法

public void lock()   // 尝试获得锁,获取不到则睡眠
public void unlock() // 释放锁

// 尝试获取锁,获取不到则睡眠,当有别的线程调用interrupt()打断它,抛出异常
public void lockInterruptibly() throws InterruptedException 

public boolean isHeldByCurrentThread() // 查询当前线程是否保持此锁定 
public boolean isFair()                // 判断Lock是否为公平锁 
public final boolean isLocked()        // 查询lock 是否被任意线程所持有。

// 尝试获得锁,如果成功,返回true,示范返回false,该方法不会等待,立即返回
public boolean tryLock() 

// 在给定时间内尝试获得锁
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException 

重入锁的实现中,主要包含三个要素:

  1. 是原子状态:使用CAS操作来存储当前的状态,判断锁是否已经被别的线程持有
  2. 是等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作
  3. 是阻塞原语park()和unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。参考LockSupport

2.1.1.1 lock()/unlock()

(basic.multhread.concurrent.reentrant.ReentrantLockDemo)

重入锁可以完全替代 synchronized 关键字,重入锁使用 java.util.concurrent.locks.ReentrantLock 实现

lock() 和 unlock() 成对出现,与 synchronized 相比,重入锁有着明显的操作过程,需要我们手动控制,何时加锁,何时释放锁。对于逻辑控制的灵活性要比 synchronized 好。

而为什么叫重入锁呢,因为它可以重复进入,例如如下代码:

ReentrantLockDemo lock = new ReentrantLockDemo();
lock.lock();
lock.lock();
try {
    // 业务逻辑
} finally {
    lock.unlock();
    lock.unlock();
}

当然申请了多少次锁,就需要释放多少次锁。否则会抛出异常

2.1.1.2 中断响应

对于synchronized,如果一个线程在等待锁(要么获得锁继续执行,要么继续等待)。而重入锁可以被中断。

使用 lockInterruptibly() 和 interrupt() 捕获抛出异常,来控制被打断的线程的处理(对自己的已获得的锁进行释放,从而解决死锁)

(basic.multhread.concurrent.reentrant.ReentrantLockInterruptDemo)

public class ReentrantLockInterruptDemo {

    public static class ReentrantLockInterruptRunnable implements Runnable {
        static ReentrantLock lock1 = new ReentrantLock();
        static ReentrantLock lock2 = new ReentrantLock();
        int lockNum;

        /**
         * 方便构造死锁
         * @param lockNum
         */
        public ReentrantLockInterruptRunnable(int lockNum) {
            this.lockNum = lockNum;
        }

        @Override
        public void run() {
            try {
                if (lockNum == 1) {
                    System.out.println(Thread.currentThread().getName() + " 请求lock1,time:" + System.currentTimeMillis());
                    lock1.lockInterruptibly();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) { }
                    System.out.println(Thread.currentThread().getName() + " 请求lock2,time:" + System.currentTimeMillis());
                    lock2.lockInterruptibly();
                    System.out.println(Thread.currentThread().getName() + " 拿到lock2,time:" + System.currentTimeMillis());
                    System.out.println(Thread.currentThread().getName() + " Finish End,time:" + System.currentTimeMillis());
                } else {
                    System.out.println(Thread.currentThread().getName() + " 请求lock2,time:" + System.currentTimeMillis());
                    lock2.lockInterruptibly();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) { }
                    System.out.println(Thread.currentThread().getName() + " 请求lock1,time:" + System.currentTimeMillis());
                    lock1.lockInterruptibly();
                    System.out.println(Thread.currentThread().getName() + " 拿到lock1,time:" + System.currentTimeMillis());
                    System.out.println(Thread.currentThread().getName() + " Finish End,time:" + System.currentTimeMillis());
                }
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " 被打断,time:" + System.currentTimeMillis());
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread().getName() + ",lock1.isHeldByCurrentThread: " + lock1.isHeldByCurrentThread());
                System.out.println(Thread.currentThread().getName() + ",lock2.isHeldByCurrentThread: " + lock2.isHeldByCurrentThread());
                if (lock1.isHeldByCurrentThread()) {
                    System.out.println(Thread.currentThread().getName() + ",执行 lock1.unlock()");
                    lock1.unlock();
                }
                if (lock2.isHeldByCurrentThread()) {
                    System.out.println(Thread.currentThread().getName() + ",执行 lock2.unlock()");
                    lock2.unlock();
                }
                System.out.println(Thread.currentThread().getName() + ",线程退出");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockInterruptRunnable runnable1 = new ReentrantLockInterruptRunnable(1);
        ReentrantLockInterruptRunnable runnable2 = new ReentrantLockInterruptRunnable(2);
        Thread t1 = new Thread(runnable1, "t1");
        Thread t2 = new Thread(runnable2, "t2");
        t1.start();
        t2.start();
        Thread.sleep(2000);
        t2.interrupt();
    }
}

2.1.1.3 锁申请等待时间

除了中断响应(等待外部interrupt通知外),可以使用 tryLock() / tryLock(long timeout, TimeUnit unit) 来尝试在指定时间内获取锁。这样就不会因为获取不到资源而一直等待。可以更好的控制流程,避免死锁的发生

(basic.multhread.concurrent.reentrant.ReentrantTryLockDemo)

public class ReentrantTryLockDemo {
    public static class ReentrantTryLockRunnable implements Runnable {
        static ReentrantLock lock = new ReentrantLock();

        @Override
        public void run() {
            try {
                if (lock.tryLock(1, TimeUnit.SECONDS)) {
                    System.out.println(Thread.currentThread().getName() + " Get lock Success,time:" + System.currentTimeMillis());
                    Thread.sleep(2000);
                } else {
                    System.out.println(Thread.currentThread().getName() + " Get lock Fail,time:" + System.currentTimeMillis());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread().getName() + ",lock.isHeldByCurrentThread: " + lock.isHeldByCurrentThread());
                if (lock.isHeldByCurrentThread()) {
                    System.out.println(Thread.currentThread().getName() + ",执行 lock.unlock()");
                    lock.unlock();
                }
                System.out.println(Thread.currentThread().getName() + ",线程退出");
            }
        }
    }

    public static void main(String[] args) {
        ReentrantTryLockRunnable runnable = new ReentrantTryLockRunnable();
        Thread t1 = new Thread(runnable, "t1");
        Thread t2 = new Thread(runnable, "t2");
        t1.start();
        t2.start();
    }
}

2.1.1.4 公平锁

(basic.multhread.concurrent.reentrant.ReentrantFairLockDemo)

当多个线程等待同一个锁时,系统只会从这个锁的等待队列中随机挑选一个,这是非公平的(synchronized 关键字控制的临界区)。

重入锁允许我们对其公平性进行设置,他有个一个构造函数传入 fair 参数。源码中维护一个 FairSync 和 NonfairSync 对象。FairSync 维护者一个有序队列,相对成本较高,性能有所降低


2.1.2 Condition条件 - 重入锁的好搭档

Condition 接口的出现替代了 Object 中的 wait、notify、notifyAll 方法,将这些监视器方法单独进行了封装, 变成 Condition 监视器对象可以任意锁进行组合

wait()、notify()、notifyAll() 是配合 synchronized 使用

而 Condition 的 await()、signal()、signalAll() 是配合 ReentranLock 使用

常用方法

void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
  1. await() 方法会使当前线程等待,同时释放当前锁,当其他线程中使用 signal() / signalAll 方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。和 Object.wait() 类似
  2. awaitUninterruptibly() 与 await() 类似,不过他不会在等待过程中响应中断
  3. signal() 唤醒一个等待中线程
  4. signalAll() 唤醒所有等待中线程

与 ReentrantLock 的使用,从重入锁中获取新的条件

public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
  1. 当 condition 调用 await() 时要求线程会先释放相关的锁。
  2. 当 condition 调用 signal() 要求线程先获得相关的锁。但不会自动释放相关的锁,所以在调用 signal() 后需要手动释放锁。而被唤醒的线程就会重新尝试获得与之绑定的重入锁,一旦成功获取而继续执行。

(basic.multhread.concurrent.reentrant.ReentrantConditionDemo)

public class ReentrantConditionDemo {
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    static class ReentrantConditionRunnable implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 申请锁,time:" + System.currentTimeMillis());
                lock.lock();
                condition.await();
                System.out.println("Thread is going on");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantConditionRunnable runnable = new ReentrantConditionRunnable();
        Thread t1 = new Thread(runnable, "t1");
        t1.start();
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + " 申请锁,time:" + System.currentTimeMillis());
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}


2.1.3 ReadWriteLock - 读写锁

读写锁可以有效地帮助减少锁竞争。对于读场景大于写场景,读写锁的会比内部锁/重入锁性能高很多。读写锁的访问约束情况:

  1. 读-读 不互斥:读读之间不阻塞
  2. 读-写 互斥:读阻塞写,写阻塞读
  3. 写-写 互斥:写写阻塞

下面的例子,可以看出写耗时远远超过读耗时。而读和写一起执行,相互阻塞。

(basic.multhread.concurrent.ReadWriteLockDemo)

public class ReadWriteLockDemo {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            // 模拟读1秒
            Thread.sleep(1000);
            System.out.println("<-- " + Thread.currentThread().getName() + ",read value=" + value + ",time:" + System.currentTimeMillis());
            return value;
        } finally {
            lock.unlock();
        }
    }

    public void handleWrite(Lock lock, int value) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            this.value = value;
            System.out.println("--> " + Thread.currentThread().getName() + ",write value=" + value + ",time:" + System.currentTimeMillis());
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        final ReadWriteLockDemo demo = new ReadWriteLockDemo();
        boolean useReadWriteLock = true;
        Lock useReadLock = useReadWriteLock ? readLock : lock;
        Lock useWriteLock = useReadWriteLock ? writeLock : lock;

        Runnable readRunable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(useReadLock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable writeRunable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWrite(useWriteLock, new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            new Thread(readRunable).start();
        }

        for (int i = 0; i < 10; i++) {
            new Thread(writeRunable).start();
            new Thread(readRunable).start();
        }
    }
}


2.1.4 CountDownLatch - 计数器(闭锁)

API解释:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。个人理解是CountDownLatch让可以让一组线程同时执行,然后在这组线程全部执行完前,可以让另一个线程等待。

用来控制线程等待,让某一个线程等待直到倒计时结束,才开始执行

处理机制:初始化一个值N(相当于一组线程有N个),每个线程调用一次 countDown(),那么 countDownLatch 减1,等所有线程都调用过 countDown(),那么 countDownLatch 值达到0,那么线程从 await() 处接着往下执行。

构造函数

public CountDownLatch(int count) // count:计数个数

常用方法

public void countDown() // 计数-1
public void await() // 等待,直到count=0时,才会执行后面的操作

(basic.multhread.concurrent.CountDownLatchDemo)

public class CountDownLatchDemo {
    private final static int CDL_NUM = 10;
    private static int SHOW_NUM = 1;
    /** 计数器, 用来控制线程 */
    private final static CountDownLatch CDL = new CountDownLatch(CDL_NUM);
    /** 示例工作线程类 */
    private static class WorkingThread extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < CDL_NUM; i++) {
                new SampleThread().start();
                CDL.countDown();
            }
        }
    }
    private static class SampleThread extends Thread {
        @Override
        public void run() {
            System.out.println("[SampleThread] started");
            try {
                // 会阻塞在这里等待 mCountDownLatch 里的count变为0;
                // 也就是等待另外的WorkingThread调用countDown()
                CDL.await();
            } catch (InterruptedException e) {
            }
            showNum(this);
            System.out.println("[SampleThread] end");
        }
    }

    private synchronized static void showNum(Thread thread) {
        System.out.println("theadName=" + thread.getName() + ",showNum=" + (SHOW_NUM++));
    }

    public static void main(String[] args) {
        WorkingThread workingThread = new WorkingThread();
        workingThread.start();
    }
}


2.1.5 CyclicBarrier - 循环栅栏

一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。图示:

CyclicBarrier图示.png

处理机制:初始化一个值 N(相当于一组线程有 N 个),每个线程调用一次 await(),那么 barrier 加 1,等所有线程都调用过 await(),那么 barrier 值达到初始值 N,所有线程接着往下执行,并将 barrier 值重置为 0,再次循环下一次

与 CountDownLatch 类似,可以实现线程间的计数等待,不同的地方在于:CyclicBarrier 是可以循环使用的

构造函数

public CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties)
  • parties:计数个数
  • barrierAction:计数每次归 0 后执行的动作。

常用方法

public int await() throws InterruptedException, BrokenBarrierException // 返回剩余barrier数

await() 会重新计数,每个线程执行完,计数 -1。可能会抛出 2 个异常

  • InterruptedException:执行时被中断
  • BrokenBarrierException:该循环栅栏破损,可能无法等待所有线程都到齐(无法继续执行)

(basic.multhread.concurrent.CyclicBarrierDemo)

public class CyclicBarrierDemo {
    private static final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
        public void run() { // 每次线程到达栅栏点,此方法都会执行
            System.out.println("\n--------barrier action--------\n");
        }
    });

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new CyclicBarrierDemo().new Worker()).start();
        }
    }

    class Worker implements Runnable {
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "--第一阶段");
                Thread.sleep(Math.round(10000));
                barrier.await(); // 每一次await()都会阻塞,等5个线程都执行到这一步(相当于barrier++操作,加到初始化值5),才继续往下执行
                System.out.println(Thread.currentThread().getName() + "--第二阶段");
                Thread.sleep(Math.round(10000));
                barrier.await(); // 每一次5个线程都到达共同的屏障节点,会执行barrier初始化参数中定义的Runnable.run()
                System.out.println(Thread.currentThread().getName() + "--第三阶段");
                Thread.sleep(Math.round(10000));
                barrier.await();
                System.out.println(Thread.currentThread().getName() + "--第四阶段");
                Thread.sleep(Math.round(10000));
                barrier.await();
                System.out.println(Thread.currentThread().getName() + "--第五阶段");
                Thread.sleep(Math.round(10000));
                barrier.await();
                System.out.println(Thread.currentThread().getName() + "--结束");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}


2.1.6 Semaphore - 信号量

一个计数信号量,主要用于控制多线程对共同资源库访问的限制。信号量为多线程协作提供了更为强大的控制方法(对锁的扩展)。对于 内部锁synchronized / 重入锁ReentrantLock,一次都只允许一个线程访问一个资源。而信号量可以指定多个线程访问某一个资源。是对锁的扩展。

简单的说:定义 N 个数量许可,执行的线程需要申请许可(并且运行完后要手动释放许可),当 N 变为 0 时,所有持有许可的线程同时执行。

构造方法

public Semaphore(int permits) // 信号量的准入数(许可数量)
public Semaphore(int permits, boolean fair) // fair:公平锁

常用方法

public void acquire() throws InterruptedException 
    尝试获取一个准入的许可。若无法获得,则等待
public void acquireUninterruptibly()
    同acquire,但不响应中断
    
public boolean tryAcquire()
    尝试获取准入许可,成功返回true,失败返回false,立即返回,不会等待
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
    指定时间尝试获取准入许可

public void release()  // 释放一个许可

主要用法

(basic.multhread.concurrent.SemaphoreDemo)

public class SemaphoreDemo {
    static final Semaphore semaphore = new Semaphore(2);
    static class SemaphoreRunable implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + ",time:" + System.currentTimeMillis());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        SemaphoreRunable runable = new SemaphoreRunable();

        System.out.println("当前可用许可=" + semaphore.availablePermits());
        for (int i = 0; i < 10; i++) {
            exec.submit(runable);
//            exec.execute(runable);
        }
        System.out.println("当前可用许可=" + semaphore.availablePermits());
        exec.shutdown();
    }
}


2.1.7 Exchanger - 交换机

Exchanger 类表示一种两个线程可以进行互相交换对象的会和点。这种机制图示如下:

Exchanger图示.png

场景:两个线程 A、B,各自有一个数据类型相同的变量 a、b。A 线程往 a 中填数据(生产),B 线程从 b 中取数据(消费)。具体如何让 a、b 在内存发生关联,就由 Exchanger 完成。

与 SynchronousQueue 对比,如果生产需要 5s,消费需要 5s。SynchronousQueue 一次存取需要10s,而 Exchanger 只需要5s。两个线程通过一个 Exchanger 交换对象。可以理解为 SynchronousQueue 的双向形式

传统的 SynchronousQueue 存取需要同步,就是 A 放入需要等待 B 取出,B 取出需要等待 A 放入,在时间上要同步进行。

而 Exchanger 在 B 取出的时候,A 是同步在放入的。即:

  1. A 放入a,a满,然后与 B 交换内存,那 A 就可以操作 b(b空),而 B 可以操作 a;
  2. 等 b 被 A 存满,a 被 B 用完,再交换;
  3. 那 A 又填充 a,B 又消费 b,依次循环。两个内存在一定程度上是同时被操作的,在时间上不需要同步。

构造函数

public class Exchanger<V> {
    public Exchanger() {}
}

常用方法

public V exchange(V x) throws InterruptedException // 交换数据
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

(basic.multhread.concurrent.CyclicBarrierDemo)

public class ExchangerDemo {
    public static void main(String[] args) {
        exchangeSimpleDemo();
        producerConsumerDemo();
    }
    
    static class ExchangerRunnable implements Runnable {
        Exchanger<Object> exchanger = null;
        Object object = null;

        public ExchangerRunnable(Exchanger<Object> exchanger, Object object) {
            this.exchanger = exchanger;
            this.object = object;
        }

        public void run() {
            try {
                Object previous = this.object;
                this.object = this.exchanger.exchange(this.object);
                System.out.println(Thread.currentThread().getName() + " exchanged " + previous + " for " + this.object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 简单的交换例子
     */
    public static void exchangeSimpleDemo() {
        System.out.println("===== 简单交换例子 =====");
        Exchanger<Object> exchanger = new Exchanger<Object>();
        new Thread(new ExchangerRunnable(exchanger, "A")).start();
        new Thread(new ExchangerRunnable(exchanger, "B")).start();
        new Thread(new ExchangerRunnable(exchanger, "C")).start();
        new Thread(new ExchangerRunnable(exchanger, "D")).start();
    }
    
    /**
     * 生产/消费者 例子
     */
    public static void producerConsumerDemo() {
        System.out.println("\n===== 生产 消费 例子 =====");
        new Thread(new ExchangerDemo().new ProducerLoop()).start();
        new Thread(new ExchangerDemo().new ConsumerLoop()).start();
    }
    
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static Exchanger<Queue<Integer>> exchanger = new Exchanger<Queue<Integer>>();

    class ProducerLoop implements Runnable {
        private Queue<Integer> pBuffer = new LinkedBlockingQueue<Integer>();
        private final int maxnum = 5;

        @Override
        public void run() {
            try {
                for (;;) {
                    Thread.sleep(500);
                    int num = (int) Math.round(Math.random() * 100);
                    pBuffer.offer(num);
                    System.out.println("--> 生产插入数据:" + num);
                    if (pBuffer.size() == maxnum) {
                        System.out.println(sdf.format(new Date()) + " --> producer交换前,数据:" + pBuffer.toString());
                        pBuffer = exchanger.exchange(pBuffer);
                        System.out.println(sdf.format(new Date()) + " --> producer交换后,数据:" + pBuffer.toString());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    class ConsumerLoop implements Runnable {
        private Queue<Integer> cBuffer = new LinkedBlockingQueue<Integer>();
        @Override
        public void run() {
            try {
                for (;;) {
                    if (cBuffer.size() == 0) {
                        System.out.println("\n" + sdf.format(new Date()) + " <-- consumer交换前,数据:" + cBuffer.toString());
                        cBuffer = exchanger.exchange(cBuffer);
                        System.out.println(sdf.format(new Date()) + " <-- consumer交换后,数据:" + cBuffer.toString());
                    }
                    int num = cBuffer.poll();
                    System.out.println("<-- 消费者,消费数据:" + num);
                    Thread.sleep(800);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


2.1.8 LockSupport - 线程阻塞工具类

LockSupport 是一个线程阻塞工具,它可以在线程内任意位置让线程阻塞。

与 Thread.suspend() 相比,它弥补了由于 resume() 在前发生,导致线程无法继续执行的情况

与 Object.wait() 相比,它不需要获得某个对象的锁,也不会抛出 InterruptExpection 异常

常用方法

public static void park()   // 获取许可并消费,类似suspend()/wait(),挂起。可以阻塞当前线程
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline)
public static void unpark(Thread thread)  // 使许可变成可用,类似resume(),继续执行

park() / unpack() 不像 suspend() / resume(),必须要求 suspend 在 resume 前面

我们不需要保证 pack() 在 unpack() 之后,因为 LockSuppoert 类使用类似信号量的机制。为每一个线程准备了一个许可,如果许可可用立即返回,且消费这个许可(即该许可变成不可用)。否则阻塞等待。

park() 支持中断影响,但是他不会抛出异常。我们需要从 Thread.interrupted() 等方法来获得中断标记

(basic.multhread.concurrent.LockSupportDemo)

public class LockSupportDemo {
    public static Object objectLock = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");

    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            synchronized (objectLock) {
                System.out.println("in " + getName() + ",time:" + System.currentTimeMillis());
                LockSupport.park();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        System.out.println("time:" + System.currentTimeMillis());
        LockSupport.unpark(t1);
        System.out.println("time:" + System.currentTimeMillis());
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}


2.2 线程池 - 线程复用

多线程的软件设计方法确实可以最大限度地发挥现代多核处理器的计算能力,提高生产系统的吞吐量和性能。

线程是一种轻量级工具,但创建和关闭依然需要花费时间,如果每次处理任务都创建,可能就会出现创建销毁大于线程工作所占用时间。其次,线程时需要占用内存空间的,大量的线程会抢占宝贵内存资源,也可能会导致 Out of Memory 异常。即使没有,大量线程回收也会给 GC 带来压力。

为了避免频繁创建/销毁,以及控制线程数量,维护一个线程池,当需要使用线程从线程池中获取。


2.2.1 JDK对线程池的支持

JDK 提供 Executor 框架。其本质就是线程池。均在 java.util.concurrent 包下

Executor类关系图.png
  • ThreadPoolExecutor:代表线程池
  • ExecutorService:执行器 Service
  • ScheduleExecutorService:对 ExecutorService 的扩展,可以定时间执行任务
  • Executors:线程池工厂角色,通过它可以获取指定功能的线程池,提供如下工厂方法:
public static ExecutorService newFixedThreadPool(int nThreads[, ThreadFactory threadFactory])
public static ExecutorService newSingleThreadExecutor([ThreadFactory threadFactory])
public static ExecutorService newCachedThreadPool([ThreadFactory threadFactory]) 
public static ExecutorService newWorkStealingPool(int parallelism)
public static ScheduledExecutorService newSingleThreadScheduledExecutor([ThreadFactory threadFactory])
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize[, ThreadFactory threadFactory])

这里做下分类:

  1. SingleThread:只有一个线程的线程池,任务队列模式,[LinkedBlockingQueue]
  2. FixedThread:固定线程数量的线程池。任务队列模式(当有新任务,有空闲线程就立即执行。没有则放入任务队列,等待线程空闲后执行),[LinkedBlockingQueue]
  3. CacheThread:线程数量可变的线程池。优先使用空闲线程,没有空闲就创建,[SynchronousQueue]

ExecutorService常用方法

void shutdown() 
boolean isShutdown()
boolean isTerminated()

// 提交任务并返回Future,需要获取通过Future结果的
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<?> submit(Runnable task)

void execute(Runnable command) // 直接执行任务,不需要获取结果的

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
  throws InterruptedException, ExecutionException, TimeoutException;

invokeAny():要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。

无法保证返回的是哪个 Callable 的结果 - 只能表明其中一个已执行结束。如果其中一个任务执行结束(或者抛了一个异常),其他 Callable 将被取消。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;

invokeAll():将调用所有 Callable 对象。invokeAll() 返回一系列的 Future 对象,通过它们你可以获取每个 Callable 的执行结果。

记住,一个任务可能会由于一个异常而结束,因此它可能没有 成功。无法通过一个 Future 对象来告知我们是两种结束中的哪一种。

ScheduledExecutorService常用方法

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
                                              long initialDelay, 
                                              long period, 
                                              TimeUnit unit)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
                                                 long initialDelay, 
                                                 long delay, 
                                                 TimeUnit unit)
  • schedule:在延迟多少时间后,一次性调度任务
  • scheduleAtFixedRate:周期性执行任务,在给定的初始延时initialDelay后,按照周期period执行任务
  • scheduleWithFixedDelay:周期性执行任务,在给定的初始延时initialDelay后,任务完成后,延迟delay再执行下一次任务。

注意如果任务调度出现异常,会导致后面所有调度都停止


2.2.2 JDK线程池的内部实现

Executors的所有工厂方法,最后都指向ThreadPollExecutor构造函数

/**
 * corePoolSize     指定线程池中的线程数量
 * maximunPoolSize  指定线程池中的最大线程数量
 * keepAliveTime    当线程数量 > corePoolSize,多余的空闲线程的存活时间。默认 60
 * unit             keepAliveTime 的单位,默认 TimeUnit.SECONDS
 * workQueue        任务队列,被提交尚未被执行的任务
 * threadFactory    线程工厂,用于创建线程,一般使用默认即可
 * handler          拒绝策略,当任务太多来不及处理,如何拒绝任务
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

对应Exectors的静态方法:

1).SingleThread ==> ThreadPoolExecutor(1, 1, 
                                       0L, TimeUnit.MILLISECONDS, 
                                       new LinkedBlockingQueue<Runnable>(), 
                                       threadFactory);
2).FixedThread ==> ThreadPoolExecutor(nThreads, nThreads, 
                                      0L, TimeUnit.MILLISECONDS, 
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory); 
3).CacheThread ==> ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                      60L, TimeUnit.SECONDS, 
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory); 

实例:

(basic.multhread.concurrent.executor.ExecutorServiceDemo)

public class ExecutorServiceDemo {
    public static void main(String[] args) {
        ExecutorServiceDemo demo = new ExecutorServiceDemo();
        demo.executorServiceDemo();
        demo.executeDemo();
        demo.submitDemo();
        demo.invokeDemo();
        demo.scheduledExecutorServiceDemo();
    }

    /**
     * 首先使用 newFixedThreadPool() 工厂方法创建一个 ExecutorService。这里创建了一个十个线程执行任务的线程池。
     * 然后,将一个 Runnable 接口的匿名实现类传递给 execute() 方法。
     */
    public void executorServiceDemo() {
        int corePoolSize = 5;
        int maxPoolSize = 10;
        long keepAliveTime = 5000;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("自定义名字");
                        // 可以设置守护线程,线程名称,优先级,可以追踪创建的线程数量
                        return thread;
                    }
                },
                /**
                 * 1).AbortPolicy(默认):直接抛出异常,阻止系统正常工作
                 * 2).CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。不会真的丢弃任务,但是,任务提交线程的性能可能会急剧下降
                 * 3).DiscardOledestPloicy:该策略将丢弃最老的一个请求
                 * 4).DiscardPolicy:丢弃无法处理的任务
                 * 也可以自定义实现 RejectedExecutionHandler 来定义拒绝策略。
                 */
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(Thread.currentThread().getName() + "," + r.toString() + " 抛弃");
                    }
                })
        {
            private long time = 0L;
            /** 扩展线程池 */
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                super.beforeExecute(t, r);
                System.out.println(Thread.currentThread().getId() + "准备执行");
                time = System.currentTimeMillis();
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                System.out.println(Thread.currentThread().getId() + "执行完成,use " + (System.currentTimeMillis() - time) + " ms");
            }

            @Override
            protected void terminated() {
                super.terminated();
                System.out.println(Thread.currentThread().getId() + "线程池退出,use " + (System.currentTimeMillis() - time) + " ms");
            }
        };
    }

    /**
     * execute(Runnable):直接执行,与submit对比,execute是个空方法。这将导致 ExecutorService 中的某个线程执行该 Runnable。
     */
    public void executeDemo() {
        System.out.println("===== execute(Runnable) =====");
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("Asynchronous task");
            }
        });
        executorService.shutdown();
    }

    /**
     * submit(Runnable):一个 Runnable 实现类,但它返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕。
     * submit(Callable):与上类似。不过Callable的call()方法能够返回一个结果。Runnable.run()不能够返回一个结果。
     */
    public void submitDemo() {
        System.out.println("\n===== submit(Runnable) =====");
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> futureRunnable = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("Asynchronous task");
            }
        });
        try {
            System.out.println("是否执行完毕=" + futureRunnable.isDone());
            System.out.println(futureRunnable.get());
            System.out.println("是否执行完毕=" + futureRunnable.isDone());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("\n===== submit(Callable) =====");
        Future<Object> futureCallable = executorService.submit(new Callable<Object>(){
            @Override
            public Object call() throws Exception {
                System.out.println("Asynchronous Callable");
                return "完成的回调结果";
            }
        });
        try {
            System.out.println("是否执行完毕=" + futureCallable.isDone());
            System.out.println("future.get()=" + futureCallable.get());
            System.out.println("是否执行完毕=" + futureCallable.isDone());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * invokeAny():要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。
     *             无法保证返回的是哪个 Callable 的结果 - 只能表明其中一个已执行结束。如果其中一个任务执行结束(或者抛了一个异常),其他 Callable 将被取消。
     * invokeAll():将调用所有 Callable 对象。invokeAll() 返回一系列的 Future 对象,通过它们你可以获取每个 Callable 的执行结果。
     *             记住,一个任务可能会由于一个异常而结束,因此它可能没有 "成功"。无法通过一个 Future 对象来告知我们是两种结束中的哪一种。
     */
    public void invokeDemo() {
        System.out.println("\n===== invokeAny(...) =====");
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Set<Callable<String>> callables = new HashSet<Callable<String>>();
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Task 1";
            }
        });
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Task 2";
            }
        });
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Task 3";
            }
        });
        try {
            String result = executorService.invokeAny(callables);
            System.out.println("result = " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("\n===== invokeAll(...) =====");
        try {
            List<Future<String>> futures = executorService.invokeAll(callables);
            for (Future<String> future : futures) {
                System.out.println("future.get = " + future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }

    public void scheduledExecutorServiceDemo() {
        System.out.println("\n===== scheduledExecutorServiceDemo =====");
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        // 首先一个内置 5 个线程的 ScheduledExecutorService 被创建。之后一个 Callable 接口的匿名类示例被创建然后传递给 schedule() 方法。后边的俩参数定义了 Callable 将在 2 秒钟之后被执行。
        ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() {
            @Override
            public Object call() throws Exception {
                System.out.println("Executed!");
                return "Called!";
            }
        }, 2, TimeUnit.SECONDS);
    }
}

2.2.2.1 BlockingQueue - 任务队列

它是一个 BlockingQueue 接口的对象,仅用于存放 Runnable 对象。线程池增长策略:

任务提交:
    |-- 线程池里线程数量 < corePoolSize ==> 创建新的线程来执行任务,即时有空闲线程
    |-- 线程池里线程数量 >= corePoolSize
        |-- 队列未满 ==> 放入队列中
        |-- 队列满
            |-- 线程池里线程数量 < maximumPoolSize ==> 创建线程执行任务
            |-- 线程池里线程数量 = maximumPoolSize ==> 执行拒绝策略

线程回收:当线程池数量 > corePoolSize,若线程的空闲时间达到keepAliveTime ==> 关闭空闲线程

BlockingQueue可以有如下队列:

  1. ArrayBlockingQueue(有界的任务队列):遵循线程池增长策略。需要指定队列数量
  2. LinkedBlockingQueue(无界的任务队列):遵循线程池增长策略。可以不指定队列数量,队列增长直到耗尽系统内存
  3. SynchronousQueue(直接提交队列):任务不会存放在队列,遵循线程池增长策略,但是不考虑队列
  4. PriorityBlockingQueue(优先任务队列):带有执行优先级的队列,根据任务自身的优先级顺序先后执行。

2.2.2.2 ThreadFacotry

ThreadFactory是一个接口,它只有一个方法,用来创建线程:

Thread newThread(Runnable r);

自定义线程:可以跟踪线程池何时创建了多少个线程,自定义线程名称、组以及优先级信息,甚至可以设置为守护线程等。

2.2.2.3 RejectedExecutionHandler - 拒绝策略

  1. AbortPolicy (抛异常处理,默认策略):直接抛出异常,阻止系统正常工作
  2. CallerRunsPolicy (直接在调用者线程执行):只要线程池未关闭,该策略直接在调用者线程中执行当前被丢弃的任务。不会真的丢弃任务,但是,任务提交线程的性能可能会急剧下降
  3. DiscardOldestPolicy (丢弃最老的):该策略将丢弃最老的一个请求(即将被执行的一个任务),并尝试再次提交当前任务。
  4. DiscardPolicy (丢弃任务):丢弃无法处理的任务,不予任何处理,如果允许任务丢失

也可以自定义实现 RejectedExecutionHandler 来定义拒绝策略。

2.2.2.4 扩展线程池 - 钩子方法(hook)

ThreadPoolExecutor 提供了 beforeExecute()、afterExecute()、terminated() 三个接口对线程池控制。

2.2.2.5 优化线程池线程数量

一般来说,确定线程池大小需要考虑 CPU数量、内存大小 等因素。参考公式:

Ncpu = CPU数量
Ncpu = 目标CPU的使用率,0 <= Ucpu <= 1
W/C = 等待时间 / 计算时间 的比率

为了保证处理器达到期望的使用率,最优的池的大小等于:

Nthreads = Ncpu * Ncpu * (1 + W/C)

在 Java 中,可以通过如下语句来获取 CPU 核心数

Runtime.getRuntime().availableProcessors();

2.2.2.6 在线程池打印堆栈

可以用装饰模式,增强ThreadPoolExecutor的方法,对异常进行try...catch,就可以做到打印堆栈信息


2.2.3 Fork/Join - 分而治之框架

分而治之 是非常有效地处理大量数据的方法。

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,但是它可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

fork(分叉) 和 join(合并)的原理,包含2个递归进行的步骤。分别是分叉步骤和合并步骤。

1.分叉

通过一定规则将任务分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。

什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

2.合并

一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。

2.2.4 ForkJoinPool

ForkJoinPool 是一个特殊的线程池,它的设计是为了更好的配合 分叉-和-合并 任务分割的工作。

构造方法

public ForkJoinPool()
public ForkJoinPool(int parallelism)
public ForkJoinPool(int parallelism, 
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler, 
                    boolean asyncMode)
                    
private ForkJoinPool(int parallelism, 
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler, 
                     int mode, 
                     String workerNamePrefix)
  • parallelism:并行数量,默认Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()
  • ForkJoinWorkerThreadFactory:new DefaultForkJoinWorkerThreadFactory()
  • asyncMode:
  • mode:
  • workerNamePrefix:
  • UncaughtExceptionHandler:

方法

public <T> T invoke(ForkJoinTask<T> task)
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

大部分方法都是提交ForkJoinTask类。它有2个子类

RecursiveAction:无返回值,代表一个 Action。它可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行。比如写数据到磁盘,然后就退出了

RecursiveTask:有返回值,代表一个任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。可以有几个水平的分割和合并。


2.3 JDK 的并发容器

JDK提供高并发集合容器,都在java.util.concurrent包下。

  1. ConcurrentHashMap:高效并发的 HashMap,可以理解为线程安全的 HashMap
  2. CopyOnWriterArrayList:采用不变模式实现,与 ArrayList 类似,在读多写少的场合,性能远远好于 Vector。采用不变模式实现,写入数据时,重新复制一份
  3. ConcurrentLinkedQueue:高效的并发队列,使用链表。CAS 无锁实现
  4. BlockingQueue:这是一个接口,JDK 内部通过链表、数组等方式实现这个接口,表示阻塞队列,非常适合用于作为数据共享的通道
  5. ConcurrentSkipListMap:跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找

2.3.1 线程安全的 HashMap 和 List

Collections提供的一些方法可以获取线程安全的集合

Collections.synchronizedMap(); // 线程安全的HashMap
Collections.synchronizedList(new LinkedList<String>); // 线程安全的List,可以包装任何List的子类

里面操作用了mutex这个变量进行同步来实现线程安全。但是性能没有ConcurrenthashMap好

2.3.2 ConcurrentMap - 并发Map

java.util.concurrent.ConcurrentMap 接口表示了一个能够对别人的访问(插入和提取)进行并发处理的 java.util.Map。除了从其父接口 java.util.Map 继承来的方法之外还有一些额外的原子性方法。

ConcurrentMap 的实现类有

  1. ConcurrentHashMap
  2. ConcurrentNavigableMap
  3. ConcurrentSkipListMap

2.3.2.1 ConcurrentHashMap:

和 java.util.HashTable 类很相似,但 ConcurrentHashMap 能够提供比 HashTable 更好的并发性能。在你从中读取对象的时候 ConcurrentHashMap 并不会把整个 Map 锁住。

此外,在你向其中写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map。它的内部只是把 Map 中正在被写入的部分进行锁定。

另外一个不同点是,在被遍历的时候,即使是 ConcurrentHashMap 被改动,它也不会抛 ConcurrentModificationException。尽管 Iterator 的设计不是为多个线程的同时使用。

2.3.3.2 ConcurrentNavigableMap

这是一个支持并发访问的 java.util.NavigableMap,它还能让它的子 map 具备并发访问的能力。所谓的 "子 map" 指的是诸如 headMap(),subMap(),tailMap() 之类的方法返回的 map。

NavigableMap 中的方法不再赘述,本小节我们来看一下 ConcurrentNavigableMap 添加的方法。

以下的Map集合持有的其实只是对象的引用,如果你对原始 map 里的元素做了改动,这些改动将影响到子 map 中的元素

  1. headMap(T toKey) 方法返回一个包含了小于给定 toKey 的 key 的子 map。
  2. tailMap(T fromKey) 方法返回一个包含了不小于给定 fromKey 的 key 的子 map。
  3. subMap() 方法返回原始 map 中,键介于 from(包含) 和 to (不包含) 之间的子 map。
  4. descendingKeySet()
  5. descendingMap()
  6. navigableKeySet()

2.3.3.2 ConcurrentSkipListMap - 跳表

跳表是一种可以用来快速查找的数据结构,优点类似于平衡树

本质时同时维护多个链表,且链表时分层的,最底层维护了所有元素,每上一层都是下一层的子集。查找数据时,从最上层开始遍历,如果找不到则定位一个反问,进入下一层查找

这是空间换时间的算法




2.4 BlockingQueue

BlockingQueue即阻塞队列,被阻塞的情况主要有如下两种:

  • 当队列满了的时候进行入队列操作
  • 当队列空了的时候进行出队列操作

2.4.1 BlockingQueue的使用

内部实现:使用 Object[] 数组实现,结合重入锁和Condition实现

阻塞队列主要用在 生产者/消费者 的场景,有线程 P和C,生产者(线程P) 往队列中放数据,消费者(线程C) 从队列中取数据。

  • 生产者put对象,当队列到了容纳临界点,就会发生阻塞,直到消费者task对象。
  • 消费者视图从一个空队列,take数据,那么会进入阻塞,直到生产者put对象

接口方法,阻塞队列一共有四套方法分别用来进行 insert(插入)、remove(移除)、examine(检查),当每套方法对应的操作不能马上执行时会有不同的反应,下面这个表格就分类列出了这些方法:

抛异常 特殊值 阻塞 超时
insert add(o) offer(o) put(o) offer(o, timeout, timeunit)
remove remove(o) poll() take() poll(timeout, timeunit)
examine element() peek()
  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

注意:不能向 BlockingQueue中插入null,否则会报 NullPointerException。

2.4.2 BlockingQueue的实现类

BlockingQueue 只是 java.util.concurrent 包中的一个接口,而在具体使用时,我们用到的是它的实现类,当然这些实现类也位于 java.util.concurrent 包中。在 Java6 中,BlockingQueue 的实现类主要有以下几种:

  1. ArrayBlockingQueue - 数组阻塞队列
  2. DelayQueue - 延迟队列
  3. LinkedBlockingQueue - 链阻塞队列
  4. PriorityBlockingQueue - 具有优先级的阻塞队列
  5. SynchronousQueue - 同步队列

2.4.2.1 ArrayBlockingQueue - 数组阻塞队列

  1. 特点:有边界的阻塞队列,容量有限,在初始化时指定大小,指定后不可改变
  2. 内部实现:数组
  3. 操作数据:先进先出,的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();

2.4.2.2 DelayQueue - 延迟队列

  1. 特点:有序队列
  2. 内部实现:使用 PriorityQueue 和 ReentrantLock 实现
  3. 操作数据:元素到期后才能取出

DelayQueue 中的元素必须实现 java.util.concurrent.Delayed 接口,这个接口的定义非常简单:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

getDelay() 方法的返回值就是队列元素被释放前的保持时间,如果返回 0 或者 一个负值,就意味着该元素已经到期需要被释放,此时 DelayedQueue 会通过其 take() 方法释放此对象。

2.4.2.3 LinkedBlockingQueue - 链阻塞队列

  1. 特点:范围任意(默认 Integer.MAX_VALUE),相比 ArrayBlockingQueue,吞吐量好一点
  2. 内部实现:链表和重入锁实现
  3. 操作数据:与 ArrayBlockingQueue 一样,先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();

2.4.2.4 PriorityBlockingQueue - 具有优先级的阻塞队列

  1. 特点:没有边界
  2. 内部实现:数组和重入锁
  3. 操作数据:元素必须实现 java.lang.Comparable 接口,队列优先级的排序规则是按照这个接口的实现来定义的。我们可以从 PriorityBlockingQueue 获得一个迭代器 Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。

注意:PriorityBlockingQueue 中允许插入 null对象。

2.4.2.5 SynchronousQueue - 同步队列

  1. 特点:仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。


2.4.3 lockingDeque - 阻塞双端队列

BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。

  1. 在不能够插入元素时,它将阻塞住试图插入元素的线程;
  2. 在不能够抽取元素时,它将阻塞住试图抽取的线程。

deque(双端队列, Double Ended Queue)。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

2.4.3.1 BlockingDeque 的使用

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。

如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。

一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。

一个线程生产元素,把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

BlockingDeque 的方法。BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常 特殊值 阻塞 超时
插入(First) addFirst(o) offerFirst(o) putFirst(o) offerFirst (o, timeout, timeunit)
移除(First) removeFirst(o) pollFirst() takeFirst() pollFirst (timeout, timeunit)
检查(First) getFirst() peekFirst()
插入(Last) addLast(o) offerLast(o) putLast(o) offerLast (o, timeout, timeunit)
移除(Last) removeLast(o) pollLast() takeLast() pollLast (timeout, timeunit)
检查(Last) getLast() peekLast()

四组不同的行为方式解释:

  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

2.4.3.2 BlockingDeque的实现类

BlockingDeque 继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。

以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:

BlockingQueue BlockingDeque
add() addLast()
offer() offerLast()
put() putLast()
remove() removeFirst()
poll() pollFirst()
take() takeFirst()
element() getFirst()
peek() peekFirst()

2.4.3.3 链阻塞双端队列 LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。

deque(双端队列, Double Ended Queue)。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

LinkedBlockingDeque 是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。




©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容

  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,610评论 2 17
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,766评论 1 19
  • 线程池ThreadPoolExecutor corepoolsize:核心池的大小,默认情况下,在创建了线程池之后...
    irckwk1阅读 708评论 0 0
  • 在单位莫名的空虚,不知道自己的时间浪费在单位无聊工作的意义。但是不去浪费,又无法获取生存的资本,为了每月的工资而工...
    沈水之南阅读 136评论 0 1
  • 自己刚刚量完血压,有些超高了。作为一名全科大夫,仔妈突然急了,开始埋怨我不爱惜自己身体。 然后她一直强调高血压的危...
    心光宝爸阅读 68评论 0 2