前言
前面我们在java多线程相关(1)-synchronized,wait,notify,volatile中简单介绍了最简单的多线程同步的方法;就我个人的理解,多线程同步的目的有两个 1.保证多线程的执行顺序 2.维护多线程可见变量(如volalite变量,或者更高一级的synchronized修饰变量)的安全;我将在本章中尝试写出我个人的理解
线程同步常见示例
CountDownLatch示例
CountDownLatch等待多个线程执行完成再继续执行,被多个线程阻塞,类似于多个线程join
mport java.util.concurrent.CountDownLatch;
public class TestMultiThread1 {
private static final CountDownLatch countDwonLatch = new CountDownLatch(2);
private static class TestCountDownLatchThread extends Thread{
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("TestCountDownLatchThread 子线程开始执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("TestCountDownLatchThread 子线程执行完成");
countDwonLatch.countDown();
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
System.out.println("主线程开始执行");
new TestCountDownLatchThread().start();
new TestCountDownLatchThread().start();
try {
countDwonLatch.await();
System.out.println("主线程等待子线程执行完成");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("主线程继续执行");
}
}
结果为:
主线程开始执行
TestCountDownLatchThread 子线程开始执行
TestCountDownLatchThread 子线程开始执行
TestCountDownLatchThread 子线程执行完成
TestCountDownLatchThread 子线程执行完成
主线程等待子线程执行完成
主线程继续执行
改为countDwonLatch.await(1000, TimeUnit.MILLISECONDS);的后结果为:
主线程开始执行
TestCountDownLatchThread 子线程开始执行
TestCountDownLatchThread 子线程开始执行
主线程等待子线程执行完成
主线程继续执行
TestCountDownLatchThread 子线程执行完成
TestCountDownLatchThread 子线程执行完成
CyclicBarrier示例
CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
private static class TestCyclicBarrierThread extends Thread{
private int worktime;
TestCyclicBarrierThread(int worktime) {
this.worktime = worktime;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("TestCyclicBarrierThread 子线程开始执行");
try {
Thread.sleep(worktime*1000);
System.out.println("TestCyclicBarrierThread 子线程执行"+worktime+"s");
try {
cyclicBarrier.await();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("TestCountDownLatchThread 子线程继续执行");
}
}
public static void main(String[] args) {
for(int i = 0;i<4;i++) {
new TestCyclicBarrierThread(i+1).start();
}
}
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程执行1s
TestCyclicBarrierThread 子线程执行2s
TestCyclicBarrierThread 子线程执行3s
TestCyclicBarrierThread 子线程执行4s
TestCountDownLatchThread 子线程继续执行
TestCountDownLatchThread 子线程继续执行
TestCountDownLatchThread 子线程继续执行
TestCountDownLatchThread 子线程继续执行
将CyclicBarrier对象改为
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程开始执行
TestCyclicBarrierThread 子线程执行1s
TestCyclicBarrierThread 子线程执行2s
TestCyclicBarrierThread 子线程执行3s
TestCyclicBarrierThread 子线程执行4s
TestCountDownLatchThread 子线程做额外操作 //选一个子线程做额外操作
TestCountDownLatchThread 子线程继续执行
TestCountDownLatchThread 子线程继续执行
TestCountDownLatchThread 子线程继续执行
TestCountDownLatchThread 子线程继续执行
Semaphore示例
Semaphore感觉其实和锁有点类似,它一般用于控制对某组资源的访问权限(不是一个,而是一组同类型资源);用一组资源来维护线程的同步
private static final Semaphore semaphore = new Semaphore(5); //机器数
private static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); //获得信号量
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release(); //释放信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
for(int i=0;i<8;i++)
new Worker(i,semaphore).start();
从上面几个示例我们可以看出,java为我们提供了很多种线程同步方式的类,这些都在java.util.concurrent包下面,其示意图可以如图表示
我们可以看到其中的AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的
下面我们就来看看AQS
AQS(线程同步器)
AQS的定义
AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类;大致结构如图
一个volatile资源,以及一个线程队列,维护征用该资源的多个线程;
该资源可以表现任何状态。比如, CountDownLatch表示剩余持有该资源的线程数,Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)等等;这些都是根据AQS自定义出来的
AbstractQueuedSynchronizer类
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
一般来说,独占式和共享式实现一种即可,当然也可以都实现(如ReentrantReadWriteLock,表示读写锁)
AbstractQueuedSynchronizer代码分析
以独占模式为例
1209 /**
1210 * Acquires in exclusive mode, ignoring interrupts. Implemented
1211 * by invoking at least once {@link #tryAcquire},
1212 * returning on success. Otherwise the thread is queued, possibly
1213 * repeatedly blocking and unblocking, invoking {@link
1214 * #tryAcquire} until success. This method can be used
1215 * to implement method {@link Lock#lock}.
1216 *
1217 * @param arg the acquire argument. This value is conveyed to
1218 * {@link #tryAcquire} but is otherwise uninterpreted and
1219 * can represent anything you like.
1220 */
1221 public final void acquire(int arg) {
1222 if (!tryAcquire(arg) &&
1223 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1224 selfInterrupt();
1225 }
函数流程如下:
1.tryAcquire()尝试直接去获取资源,如果成功则直接返回;
2.addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3.acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上
tryAcquire
由各个同步器自己实现,如ReentrantLock中的tryAcquire
213 protected final boolean tryAcquire(int acquires) {
214 return nonfairTryAcquire(acquires);
215 }
126 /**
127 * Performs non-fair tryLock. tryAcquire is implemented in
128 * subclasses, but both need nonfair try for trylock method.
129 */
130 final boolean nonfairTryAcquire(int acquires) {
131 final Thread current = Thread.currentThread();
132 int c = getState();
133 if (c == 0) {
134 if (compareAndSetState(0, acquires)) { //CAS 方式,自旋操作
135 setExclusiveOwnerThread(current); //当前state可用,当前线程得到锁了
136 return true;
137 }
138 }
139 else if (current == getExclusiveOwnerThread()) {
140 int nextc = c + acquires;
141 if (nextc < 0) // overflow
142 throw new Error("Maximum lock count exceeded");
143 setState(nextc); //锁的值+acquires(可重入)
144 return true;
145 }
146 return false;
147 }
addWaiter
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
629 /**
630 * Creates and enqueues node for current thread and given mode.
631 *
632 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
633 * @return the new node
634 */
635 private Node addWaiter(Node mode) {
636 Node node = new Node(mode);
637
638 for (;;) {
639 Node oldTail = tail;
640 if (oldTail != null) {
641 U.putObject(node, Node.PREV, oldTail);
642 if (compareAndSetTail(oldTail, node)) {
643 oldTail.next = node;
644 return node; //node放入队尾并返回
645 }
646 } else {
647 initializeSyncQueue(); //这个线程和mode所创建的node是第一个节点
648 }
649 }
650 }
acquireQueued
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了
881 /**
882 * Acquires in exclusive uninterruptible mode for thread already in
883 * queue. Used by condition wait methods as well as acquire.
884 *
885 * @param node the node
886 * @param arg the acquire argument
887 * @return {@code true} if interrupted while waiting
888 */
889 final boolean acquireQueued(final Node node, int arg) {
890 try {
891 boolean interrupted = false;
892 for (;;) {
893 final Node p = node.predecessor(); //node的前一个节点
894 if (p == head && tryAcquire(arg)) { //前一个为头节点就可以尝试拿state资源了
895 setHead(node); //拿到了,释放头节点
896 p.next = null; // help GC
897 return interrupted;
898 }
//如果自己可以休息了,就进入waiting状态,直到被unpark()
899 if (shouldParkAfterFailedAcquire(p, node) &&
900 parkAndCheckInterrupt())
901 interrupted = true;
902 }
903 } catch (Throwable t) {
904 cancelAcquire(node);
905 throw t;
906 }
907 }
shouldParkAfterFailedAcquire
818 /**
819 * Checks and updates status for a node that failed to acquire.
820 * Returns true if thread should block. This is the main signal
821 * control in all acquire loops. Requires that pred == node.prev.
822 *
823 * @param pred node's predecessor holding status
824 * @param node the node
825 * @return {@code true} if thread should block
826 */
827 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
828 int ws = pred.waitStatus; //拿到前驱的状态
829 if (ws == Node.SIGNAL)
830 /*
831 * This node has already set status asking a release
832 * to signal it, so it can safely park.
833 */
834 return true; //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
835 if (ws > 0) {
836 /*
837 * Predecessor was cancelled. Skip over predecessors and
838 * indicate retry.
839 */
840 do {
841 node.prev = pred = pred.prev; //如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
842 } while (pred.waitStatus > 0);
843 pred.next = node;
844 } else {
845 /*
846 * waitStatus must be 0 or PROPAGATE. Indicate that we
847 * need a signal, but don't park yet. Caller will need to
848 * retry to make sure it cannot acquire before parking.
849 */
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
850 pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
851 }
852 return false;
853 }
parkAndCheckInterrupt
等线程找到安全的等待位置时,就可以安全进入等待状态(区分与阻塞的关系)
862 /**
863 * Convenience method to park and then check if interrupted.
864 *
865 * @return {@code true} if interrupted
866 */
867 private final boolean parkAndCheckInterrupt() {
868 LockSupport.park(this);
869 return Thread.interrupted();
870 }
线程状态转换
阻塞和等待的区别
阻塞:当一个线程试图获取对象锁(非java.util.concurrent库中的锁,即synchronized),而该锁被其他线程持有,则该线程进入阻塞状态。它的特点是使用简单,由JVM调度器来决定唤醒自己,而不需要由另一个线程来显式唤醒自己,不响应中断。
等待:当一个线程等待另一个线程通知调度器一个条件时,该线程进入等待状态。它的特点是需要等待另一个线程显式地唤醒自己,实现灵活,语义更丰富,可响应中断。例如调用:Object.wait()、Thread.join()以及等待Lock或Condition。
参考Thread详解
AQS aquire大致流程如下:
release
唤醒队列的下一个节点代表的线程
1274 /**
1275 * Releases in exclusive mode. Implemented by unblocking one or
1276 * more threads if {@link #tryRelease} returns true.
1277 * This method can be used to implement method {@link Lock#unlock}.
1278 *
1279 * @param arg the release argument. This value is conveyed to
1280 * {@link #tryRelease} but is otherwise uninterpreted and
1281 * can represent anything you like.
1282 * @return the value returned from {@link #tryRelease}
1283 */
1284 public final boolean release(int arg) {
1285 if (tryRelease(arg)) {
1286 Node h = head;
1287 if (h != null && h.waitStatus != 0)
1288 unparkSuccessor(h);
1289 return true;
1290 }
1291 return false;
1292 }
以上是独占的逻辑,共享模式也是类似的,详细可以参考Java并发之AQS详解,这篇文章写的比较清楚
线程安全
我们常见的保护线程安全的几种方式为
1,使用线程安全的类(自定义线程同步器,其实本质还是对各种锁的使用),2,使用锁,3,避免使用和设置成员变量类,保持无状态等;4,使用关键字保证线程安全(例:volatile)等
以 AtomicInteger为例
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;// 初始int大小
// 省略了部分代码...
// 带参数构造函数,可设置初始int大小
public AtomicInteger(int initialValue) {
value = initialValue;
}
// 不带参数构造函数,初始int大小为0
public AtomicInteger() {
}
// 获取当前值
public final int get() {
return value;
}
// 设置值为 newValue
public final void set(int newValue) {
value = newValue;
}
//返回旧值,并设置新值为 newValue
public final int getAndSet(int newValue) {
/**
* 这里使用for循环不断通过CAS操作来设置新值
* CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系
* */
for (;;) {
int current = get();
if (compareAndSet(current, newValue)) //通过自旋,不断比较当前值和理想值,直到可以设置为止
return current;
}
}
// 原子的设置新值为update, expect为期望的当前的值
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 获取当前值current,并设置新值为current+1
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
// 此处省略部分代码,余下的代码大致实现原理都是类似的
}
本质上这是cas(Compare and Swap),是一种乐观锁
CAS 悲观锁与乐观锁
Java在JDK1.5之前都是靠 synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有共享变量的锁,都采用独占的方式来访问这些变量。这就是一种独占锁,独占锁其实就是一种悲观锁,所以可以说 synchronized 是悲观锁。
悲观锁机制存在以下问题:
1. 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
2. 一个线程持有锁会导致其它所有需要此锁的线程挂起。
3. 如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
对比于悲观锁的这些问题,另一个更加有效的锁就是乐观锁。其实乐观锁就是:每次不加锁而是假设没有并发冲突而去完成某项操作,如果因为并发冲突失败就重试,直到成功为止。
CAS是乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试
乐观锁的缺点
1 ABA 问题
如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 “ABA”问题。(比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的)
JDK 1.5 以后的 AtomicStampedReference 类就提供了此种能力(加一个时间戳),其中的 compareAndSet 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
2 循环时间长开销大
自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
3 只能保证一个共享变量的原子操作
CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类把多个共享变量合并成一个共享变量来操作
总结
综上,我们大致了解了java多线程同步,线程安全的一些注意事项;了解aqs(同步器)和cas(乐观锁技术-原子类);这些都被用来定义concurrent包中的基础类;而concurrent包高层类正是基础类的进一步复用
扩展学习-concurrent高层类,如# ConcurrentHashMap