1.解决信号量丢失和假唤醒
public class MyWaitNotify3{
MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;
public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}
public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}
---------------------------------
饥饿和公平
如果一个线程因为 CPU 时间全部被其他线程抢走而得不到 CPU 运行时间,这种状态被称之为“饥饿”。而该线程被“饥饿致死”正是因为它得不到 CPU 运行时间的机会。解决饥饿的方案被称之为“公平性” – 即所有线程均能公平地获得运行机会。
下面是本文讨论的主题:
Java 中导致饥饿的原因:
高优先级线程吞噬所有的低优先级线程的 CPU 时间。
线程被永久堵塞在一个等待进入同步块的状态。
线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的 wait 方法)。
在 Java 中实现公平性方案,需要:
使用锁,而不是同步块。
公平锁。
注意性能方面。
Java 中导致饥饿的原因
在 Java 中,下面三个常见的原因会导致线程饥饿:
高优先级线程吞噬所有的低优先级线程的 CPU 时间。
线程被永久堵塞在一个等待进入同步块的状态,因为其他线程总是能在它之前持续地对该同步块进行访问。
线程在等待一个本身(在其上调用 wait())也处于永久等待完成的对象,因为其他线程总是被持续地获得唤醒。
高优先级线程吞噬所有的低优先级线程的 CPU 时间
你能为每个线程设置独自的线程优先级,优先级越高的线程获得的 CPU 时间越多,线程优先级值设置在 1 到 10 之间,而这些优先级值所表示行为的准确解释则依赖于你的应用运行平台。对大多数应用来说,你最好是不要改变其优先级值。
线程被永久堵塞在一个等待进入同步块的状态
Java 的同步代码区也是一个导致饥饿的因素。Java 的同步代码区对哪个线程允许进入的次序没有任何保障。这就意味着理论上存在一个试图进入该同步区的线程处于被永久堵塞的风险,因为其他线程总是能持续地先于它获得访问,这即是“饥饿”问题,而一个线程被“饥饿致死”正是因为它得不到 CPU 运行时间的机会。
线程在等待一个本身(在其上调用 wait())也处于永久等待完成的对象
如果多个线程处在 wait()方法执行上,而对其调用 notify()不会保证哪一个线程会获得唤醒,任何线程都有可能处于继续等待的状态。因此存在这样一个风险:一个等待线程从来得不到唤醒,因为其他等待线程总是能被获得唤醒。
在 Java 中实现公平性
虽 Java 不可能实现 100% 的公平性,我们依然可以通过同步结构在线程间实现公平性的提高。
首先来学习一段简单的同步态代码:
public class Synchronizer{
public synchronized void doSynchronized(){
//do a lot of work which takes a long time
}
}
如果有一个以上的线程调用 doSynchronized()方法,在第一个获得访问的线程未完成前,其他线程将一直处于阻塞状态,而且在这种多线程被阻塞的场景下,接下来将是哪个线程获得访问是没有保障的。
使用锁方式替代同步块
为了提高等待线程的公平性,我们使用锁方式来替代同步块。
public class Synchronizer{
Lock lock = new Lock();
public void doSynchronized() throws InterruptedException{
this.lock.lock();
//critical section, do a lot of work which takes a long time
this.lock.unlock();
}
}
注意到 doSynchronized()不再声明为 synchronized,而是用 lock.lock()和 lock.unlock()来替代。
下面是用 Lock 类做的一个实现:
public class Lock{
private boolean isLocked = false;
private Thread lockingThread = null;
public synchronized void lock() throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
lockingThread = Thread.currentThread();
}
public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
notify();
}
}
注意到上面对 Lock 的实现,如果存在多线程并发访问 lock(),这些线程将阻塞在对 lock()方法的访问上。另外,如果锁已经锁上(校对注:这里指的是 isLocked 等于 true 时),这些线程将阻塞在 while(isLocked)循环的 wait()调用里面。要记住的是,当线程正在等待进入 lock() 时,可以调用 wait()释放其锁实例对应的同步锁,使得其他多个线程可以进入 lock()方法,并调用 wait()方法。
这回看下 doSynchronized(),你会注意到在 lock()和 unlock()之间的注释:在这两个调用之间的代码将运行很长一段时间。进一步设想,这段代码将长时间运行,和进入 lock()并调用 wait()来比较的话。这意味着大部分时间用在等待进入锁和进入临界区的过程是用在 wait()的等待中,而不是被阻塞在试图进入 lock()方法中。
在早些时候提到过,同步块不会对等待进入的多个线程谁能获得访问做任何保障,同样当调用 notify()时,wait()也不会做保障一定能唤醒线程(至于为什么,请看线程通信)。因此这个版本的 Lock 类和 doSynchronized()那个版本就保障公平性而言,没有任何区别。
但我们能改变这种情况。当前的 Lock 类版本调用自己的 wait()方法,如果每个线程在不同的对象上调用 wait(),那么只有一个线程会在该对象上调用 wait(),Lock 类可以决定哪个对象能对其调用 notify(),因此能做到有效的选择唤醒哪个线程。
公平锁
下面来讲述将上面 Lock 类转变为公平锁 FairLock。你会注意到新的实现和之前的 Lock 类中的同步和 wait()/notify()稍有不同。
准确地说如何从之前的 Lock 类做到公平锁的设计是一个渐进设计的过程,每一步都是在解决上一步的问题而前进的:Nested Monitor Lockout, Slipped Conditions 和 Missed Signals。这些本身的讨论虽已超出本文的范围,但其中每一步的内容都将会专题进行讨论。重要的是,每一个调用 lock()的线程都会进入一个队列,当解锁后,只有队列里的第一个线程被允许锁住 Farlock 实例,所有其它的线程都将处于等待状态,直到他们处于队列头部。
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List waitingThreads =
new ArrayList();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
boolean isLockedForThisThread = true;
synchronized(this){
waitingThreads.add(queueObject);
}
while(isLockedForThisThread){
synchronized(this){
isLockedForThisThread =
isLocked || waitingThreads.get(0) != queueObject;
if(!isLockedForThisThread){
isLocked = true;
waitingThreads.remove(queueObject);
lockingThread = Thread.currentThread();
return;
}
}
try{
queueObject.doWait();
}catch(InterruptedException e){
synchronized(this) { waitingThreads.remove(queueObject); }
throw e;
}
}
}
public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if(waitingThreads.size() > 0){
waitingThreads.get(0).doNotify();
}
}
}
public class QueueObject {
private boolean isNotified = false;
public synchronized void doWait() throws InterruptedException {
while(!isNotified){
this.wait();
}
this.isNotified = false;
}
public synchronized void doNotify() {
this.isNotified = true;
this.notify();
}
public boolean equals(Object o) {
return this == o;
}
}
首先注意到 lock()方法不在声明为 synchronized,取而代之的是对必需同步的代码,在 synchronized 中进行嵌套。
FairLock 新创建了一个 QueueObject 的实例,并对每个调用 lock()的线程进行入队列。调用 unlock()的线程将从队列头部获取 QueueObject,并对其调用 doNotify(),以唤醒在该对象上等待的线程。通过这种方式,在同一时间仅有一个等待线程获得唤醒,而不是所有的等待线程。这也是实现 FairLock 公平性的核心所在。
请注意,在同一个同步块中,锁状态依然被检查和设置,以避免出现滑漏条件。
还需注意到,QueueObject 实际是一个 semaphore。doWait()和 doNotify()方法在 QueueObject 中保存着信号。这样做以避免一个线程在调用 queueObject.doWait()之前被另一个调用 unlock()并随之调用 queueObject.doNotify()的线程重入,从而导致信号丢失。queueObject.doWait()调用放置在 synchronized(this)块之外,以避免被 monitor 嵌套锁死,所以另外的线程可以解锁,只要当没有线程在 lock 方法的 synchronized(this)块中执行即可。
最后,注意到 queueObject.doWait()在 try – catch 块中是怎样调用的。在 InterruptedException 抛出的情况下,线程得以离开 lock(),并需让它从队列中移除。
性能考虑
如果比较 Lock 和 FairLock 类,你会注意到在 FairLock 类中 lock()和 unlock()还有更多需要深入的地方。这些额外的代码会导致 FairLock 的同步机制实现比 Lock 要稍微慢些。究竟存在多少影响,还依赖于应用在 FairLock 临界区执行的时长。执行时长越大,FairLock 带来的负担影响就越小,当然这也和代码执行的频繁度相关。
--------------------------
嵌套管程锁死
嵌套管程锁死类似于死锁, 下面是一个嵌套管程锁死的场景:
线程 1 获得 A 对象的锁。
线程 1 获得对象 B 的锁(同时持有对象 A 的锁)。
线程 1 决定等待另一个线程的信号再继续。
线程 1 调用 B.wait(),从而释放了 B 对象上的锁,但仍然持有对象 A 的锁。
线程 2 需要同时持有对象 A 和对象 B 的锁,才能向线程 1 发信号。
线程 2 无法获得对象 A 上的锁,因为对象 A 上的锁当前正被线程 1 持有。
线程 2 一直被阻塞,等待线程 1 释放对象 A 上的锁。
线程 1 一直阻塞,等待线程 2 的信号,因此,不会释放对象 A 上的锁,
而线程 2 需要对象 A 上的锁才能给线程 1 发信号……
你可以能会说,这是个空想的场景,好吧,让我们来看看下面这个比较挫的 Lock 实现:
//lock implementation with nested monitor lockout problem
public class Lock{
protected MonitorObject monitorObject = new MonitorObject();
protected boolean isLocked = false;
public void lock() throws InterruptedException{
synchronized(this){
while(isLocked){
synchronized(this.monitorObject){
this.monitorObject.wait();
}
}
isLocked = true;
}
}
public void unlock(){
synchronized(this){
this.isLocked = false;
synchronized(this.monitorObject){
this.monitorObject.notify();
}
}
}
}
可以看到,lock()方法首先在”this”上同步,然后在 monitorObject 上同步。如果 isLocked 等于 false,因为线程不会继续调用 monitorObject.wait(),那么一切都没有问题 。但是如果 isLocked 等于 true,调用 lock()方法的线程会在 monitorObject.wait()上阻塞。
这里的问题在于,调用 monitorObject.wait()方法只释放了 monitorObject 上的管程对象,而与”this“关联的管程对象并没有释放。换句话说,这个刚被阻塞的线程仍然持有”this”上的锁。
(校对注:如果一个线程持有这种 Lock 的时候另一个线程执行了 lock 操作)当一个已经持有这种 Lock 的线程想调用 unlock(),就会在 unlock()方法进入 synchronized(this)块时阻塞。这会一直阻塞到在 lock()方法中等待的线程离开 synchronized(this)块。但是,在 unlock 中 isLocked 变为 false,monitorObject.notify()被执行之后,lock()中等待的线程才会离开 synchronized(this)块。
简而言之,在 lock 方法中等待的线程需要其它线程成功调用 unlock 方法来退出 lock 方法,但是,在 lock()方法离开外层同步块之前,没有线程能成功执行 unlock()。
结果就是,任何调用 lock 方法或 unlock 方法的线程都会一直阻塞。这就是嵌套管程锁死。
一个更现实的例子
你可能会说,这么挫的实现方式我怎么可能会做呢?你或许不会在里层的管程对象上调用 wait 或 notify 方法,但完全有可能会在外层的 this 上调。 有很多类似上面例子的情况。例如,如果你准备实现一个公平锁。你可能希望每个线程在它们各自的 QueueObject 上调用 wait(),这样就可以每次唤醒一个线程。
下面是一个比较挫的公平锁实现方式:
//Fair Lock implementation with nested monitor lockout problem
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List waitingThreads =
new ArrayList();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
synchronized(this){
waitingThreads.add(queueObject);
while(isLocked ||
waitingThreads.get(0) != queueObject){
synchronized(queueObject){
try{
queueObject.wait();
}catch(InterruptedException e){
waitingThreads.remove(queueObject);
throw e;
}
}
}
waitingThreads.remove(queueObject);
isLocked = true;
lockingThread = Thread.currentThread();
}
}
public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if(waitingThreads.size() > 0){
QueueObject queueObject = waitingThread.get(0);
synchronized(queueObject){
queueObject.notify();
}
}
}
}
public class QueueObject {}
乍看之下,嗯,很好,但是请注意 lock 方法是怎么调用 queueObject.wait()的,在方法内部有两个 synchronized 块,一个锁定 this,一个嵌在上一个 synchronized 块内部,它锁定的是局部变量 queueObject。
当一个线程调用 queueObject.wait()方法的时候,它仅仅释放的是在 queueObject 对象实例的锁,并没有释放”this”上面的锁。
现在我们还有一个地方需要特别注意, unlock 方法被声明成了 synchronized,这就相当于一个 synchronized(this)块。这就意味着,如果一个线程在 lock()中等待,该线程将持有与 this 关联的管程对象。所有调用 unlock()的线程将会一直保持阻塞,等待着前面那个已经获得 this 锁的线程释放 this 锁,但这永远也发生不了,因为只有某个线程成功地给 lock()中等待的线程发送了信号,this 上的锁才会释放,但只有执行 unlock()方法才会发送这个信号。
因此,上面的公平锁的实现会导致嵌套管程锁死。更好的公平锁实现方式可以参考 Starvation and Fairness。
嵌套管程锁死 VS 死锁
嵌套管程锁死与死锁很像:都是线程最后被一直阻塞着互相等待。
但是两者又不完全相同。在死锁中我们已经对死锁有了个大概的解释,死锁通常是因为两个线程获取锁的顺序不一致造成的,线程 1 锁住 A,等待获取 B,线程 2 已经获取了 B,再等待获取 A。如避免死锁中所说的,死锁可以通过总是以相同的顺序获取锁来避免。
但是发生嵌套管程锁死时锁获取的顺序是一致的。线程 1 获得 A 和 B,然后释放 B,等待线程 2 的信号。线程 2 需要同时获得 A 和 B,才能向线程 1 发送信号。所以,一个线程在等待唤醒,另一个线程在等待想要的锁被释放。
不同点归纳如下:
死锁中,二个线程都在等待对方释放锁。
嵌套管程锁死中,线程 1 持有锁 A,同时等待从线程 2 发来的信号,线程 2 需要锁 A 来发信号给线程 1。
-----------------------------------
Java 中的读/写锁
相比Java 中的锁(Locks in Java)里 Lock 实现,读写锁更复杂一些。假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源。但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写(译者注:也就是说:读-读能共存,读-写不能共存,写-写不能共存)。这就需要一个读/写锁来解决这个问题。
Java5 在 java.util.concurrent 包中已经包含了读写锁。尽管如此,我们还是应该了解其实现背后的原理。
以下是本文的主题
读/写锁的 Java 实现(Read / Write Lock Java Implementation)
读/写锁的重入(Read / Write Lock Reentrance)
读锁重入(Read Reentrance)
写锁重入(Write Reentrance)
读锁升级到写锁(Read to Write Reentrance)
写锁降级到读锁(Write to Read Reentrance)
可重入的 ReadWriteLock 的完整实现(Fully Reentrant ReadWriteLock)
在 finally 中调用 unlock() (Calling unlock() from a finally-clause)
读/写锁的 Java 实现
先让我们对读写访问资源的条件做个概述:
读取没有线程正在做写操作,且没有线程在请求写操作。
写入没有线程正在做读写操作。
如果某个线程想要读取资源,只要没有线程正在对该资源进行写操作且没有线程请求对该资源的写操作即可。我们假设对写操作的请求比对读操作的请求更重要,就要提升写请求的优先级。此外,如果读操作发生的比较频繁,我们又没有提升写操作的优先级,那么就会产生“饥饿”现象。请求写操作的线程会一直阻塞,直到所有的读线程都从 ReadWriteLock 上解锁了。如果一直保证新线程的读操作权限,那么等待写操作的线程就会一直阻塞下去,结果就是发生“饥饿”。因此,只有当没有线程正在锁住 ReadWriteLock 进行写操作,且没有线程请求该锁准备执行写操作时,才能保证读操作继续。
当其它线程没有对共享资源进行读操作或者写操作时,某个线程就有可能获得该共享资源的写锁,进而对共享资源进行写操作。有多少线程请求了写锁以及以何种顺序请求写锁并不重要,除非你想保证写锁请求的公平性。
按照上面的叙述,简单的实现出一个读/写锁,代码如下
public class ReadWriteLock{
private int readers = 0;
private int writers = 0;
private int writeRequests = 0;
public synchronized void lockRead()
throws InterruptedException{
while(writers > 0 || writeRequests > 0){
wait();
}
readers++;
}
public synchronized void unlockRead(){
readers--;
notifyAll();
}
public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;
while(readers > 0 || writers > 0){
wait();
}
writeRequests--;
writers++;
}
public synchronized void unlockWrite()
throws InterruptedException{
writers--;
notifyAll();
}
}
ReadWriteLock 类中,读锁和写锁各有一个获取锁和释放锁的方法。
读锁的实现在 lockRead()中,只要没有线程拥有写锁(writers==0),且没有线程在请求写锁(writeRequests ==0),所有想获得读锁的线程都能成功获取。
写锁的实现在 lockWrite()中,当一个线程想获得写锁的时候,首先会把写锁请求数加 1(writeRequests++),然后再去判断是否能够真能获得写锁,当没有线程持有读锁(readers==0 ),且没有线程持有写锁(writers==0)时就能获得写锁。有多少线程在请求写锁并无关系。
需要注意的是,在两个释放锁的方法(unlockRead,unlockWrite)中,都调用了 notifyAll 方法,而不是 notify。要解释这个原因,我们可以想象下面一种情形:
如果有线程在等待获取读锁,同时又有线程在等待获取写锁。如果这时其中一个等待读锁的线程被 notify 方法唤醒,但因为此时仍有请求写锁的线程存在(writeRequests>0),所以被唤醒的线程会再次进入阻塞状态。然而,等待写锁的线程一个也没被唤醒,就像什么也没发生过一样(译者注:信号丢失现象)。如果用的是 notifyAll 方法,所有的线程都会被唤醒,然后判断能否获得其请求的锁。
用 notifyAll 还有一个好处。如果有多个读线程在等待读锁且没有线程在等待写锁时,调用 unlockWrite()后,所有等待读锁的线程都能立马成功获取读锁 —— 而不是一次只允许一个。
读/写锁的重入
上面实现的读/写锁(ReadWriteLock) 是不可重入的,当一个已经持有写锁的线程再次请求写锁时,就会被阻塞。原因是已经有一个写线程了——就是它自己。此外,考虑下面的例子:
Thread 1 获得了读锁。
Thread 2 请求写锁,但因为 Thread 1 持有了读锁,所以写锁请求被阻塞。
Thread 1 再想请求一次读锁,但因为 Thread 2 处于请求写锁的状态,所以想再次获取读锁也会被阻塞。 上面这种情形使用前面的 ReadWriteLock 就会被锁定——一种类似于死锁的情形。不会再有线程能够成功获取读锁或写锁了。
为了让 ReadWriteLock 可重入,需要对它做一些改进。下面会分别处理读锁的重入和写锁的重入。
读锁重入
为了让 ReadWriteLock 的读锁可重入,我们要先为读锁重入建立规则:
要保证某个线程中的读锁可重入,要么满足获取读锁的条件(没有写或写请求),要么已经持有读锁(不管是否有写请求)。 要确定一个线程是否已经持有读锁,可以用一个 map 来存储已经持有读锁的线程以及对应线程获取读锁的次数,当需要判断某个线程能否获得读锁时,就利用 map 中存储的数据进行判断。下面是方法 lockRead 和 unlockRead 修改后的的代码:
public class ReadWriteLock{
private Map readingThreads =
new HashMap();
private int writers = 0;
private int writeRequests = 0;
public synchronized void lockRead()
throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(! canGrantReadAccess(callingThread)){
wait();
}
readingThreads.put(callingThread,
(getAccessCount(callingThread) + 1));
}
public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
int accessCount = getAccessCount(callingThread);
if(accessCount == 1) {
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, (accessCount -1));
}
notifyAll();
}
private boolean canGrantReadAccess(Thread callingThread){
if(writers > 0) return false;
if(isReader(callingThread) return true;
if(writeRequests > 0) return false;
return true;
}
private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if(accessCount == null) return 0;
return accessCount.intValue();
}
private boolean isReader(Thread callingThread){
return readingThreads.get(callingThread) != null;
}
}
代码中我们可以看到,只有在没有线程拥有写锁的情况下才允许读锁的重入。此外,重入的读锁比写锁优先级高。
写锁重入
仅当一个线程已经持有写锁,才允许写锁重入(再次获得写锁)。下面是方法 lockWrite 和 unlockWrite 修改后的的代码。
public class ReadWriteLock{
private Map readingThreads =
new HashMap();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite()
throws InterruptedException{
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread){
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}
private boolean hasReaders(){
return readingThreads.size() > 0;
}
private boolean isWriter(Thread callingThread){
return writingThread == callingThread;
}
}
注意在确定当前线程是否能够获取写锁的时候,是如何处理的。
读锁升级到写锁
有时,我们希望一个拥有读锁的线程,也能获得写锁。想要允许这样的操作,要求这个线程是唯一一个拥有读锁的线程。writeLock()需要做点改动来达到这个目的:
public class ReadWriteLock{
private Map readingThreads =
new HashMap();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite() throws InterruptedException{
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread){
if(isOnlyReader(callingThread)) return true;
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}
private boolean hasReaders(){
return readingThreads.size() > 0;
}
private boolean isWriter(Thread callingThread){
return writingThread == callingThread;
}
private boolean isOnlyReader(Thread thread){
return readers == 1 && readingThreads.get(callingThread) != null;
}
}
现在 ReadWriteLock 类就可以从读锁升级到写锁了。
写锁降级到读锁
有时拥有写锁的线程也希望得到读锁。如果一个线程拥有了写锁,那么自然其它线程是不可能拥有读锁或写锁了。所以对于一个拥有写锁的线程,再获得读锁,是不会有什么危险的。我们仅仅需要对上面 canGrantReadAccess 方法进行简单地修改:
public class ReadWriteLock{
private boolean canGrantReadAccess(Thread callingThread){
if(isWriter(callingThread)) return true;
if(writingThread != null) return false;
if(isReader(callingThread) return true;
if(writeRequests > 0) return false;
return true;
}
}
可重入的 ReadWriteLock 的完整实现
下面是完整的 ReadWriteLock 实现。为了便于代码的阅读与理解,简单对上面的代码做了重构。重构后的代码如下。
public class ReadWriteLock{
private Map readingThreads =
new HashMap();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
public synchronized void lockRead()
throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(! canGrantReadAccess(callingThread)){
wait();
}
readingThreads.put(callingThread,
(getReadAccessCount(callingThread) + 1));
}
private boolean canGrantReadAccess(Thread callingThread){
if(isWriter(callingThread)) return true;
if(hasWriter()) return false;
if(isReader(callingThread)) return true;
if(hasWriteRequests()) return false;
return true;
}
public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
if(!isReader(callingThread)){
throw new IllegalMonitorStateException(
"Calling Thread does not" +
" hold a read lock on this ReadWriteLock");
}
int accessCount = getReadAccessCount(callingThread);
if(accessCount == 1){
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, (accessCount -1));
}
notifyAll();
}
public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite()
throws InterruptedException{
if(!isWriter(Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling Thread does not" +
" hold the write lock on this ReadWriteLock");
}
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread){
if(isOnlyReader(callingThread)) return true;
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}
private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if(accessCount == null) return 0;
return accessCount.intValue();
}
private boolean hasReaders(){
return readingThreads.size() > 0;
}
private boolean isReader(Thread callingThread){
return readingThreads.get(callingThread) != null;
}
private boolean isOnlyReader(Thread callingThread){
return readingThreads.size() == 1 &&
readingThreads.get(callingThread) != null;
}
private boolean hasWriter(){
return writingThread != null;
}
private boolean isWriter(Thread callingThread){
return writingThread == callingThread;
}
private boolean hasWriteRequests(){
return this.writeRequests > 0;
}
}
在 finally 中调用 unlock()
在利用 ReadWriteLock 来保护临界区时,如果临界区可能抛出异常,在 finally 块中调用 readUnlock()和 writeUnlock()就显得很重要了。这样做是为了保证 ReadWriteLock 能被成功解锁,然后其它线程可以请求到该锁。这里有个例子:
lock.lockWrite();
try{
//do critical section code, which may throw exception
} finally {
lock.unlockWrite();
}
上面这样的代码结构能够保证临界区中抛出异常时 ReadWriteLock 也会被释放。如果 unlockWrite 方法不是在 finally 块中调用的,当临界区抛出了异常时,ReadWriteLock 会一直保持在写锁定状态,就会导致所有调用 lockRead()或 lockWrite()的线程一直阻塞。唯一能够重新解锁 ReadWriteLock 的因素可能就是 ReadWriteLock 是可重入的,当抛出异常时,这个线程后续还可以成功获取这把锁,然后执行临界区以及再次调用 unlockWrite(),这就会再次释放 ReadWriteLock。但是如果该线程后续不再获取这把锁了呢?所以,在 finally 中调用 unlockWrite 对写出健壮代码是很重要的。
-----------------
重入锁死
重入锁死与死锁和嵌套管程锁死非常相似。锁和读写锁两篇文章中都有涉及到重入锁死的问题。
当一个线程重新获取锁,读写锁或其他不可重入的同步器时,就可能发生重入锁死。可重入的意思是线程可以重复获得它已经持有的锁。Java 的 synchronized 块是可重入的。因此下面的代码是没问题的:
(译者注:这里提到的锁都是指的不可重入的锁实现,并不是 Java 类库中的 Lock 与 ReadWriteLock 类)
public class Reentrant{
public synchronized outer(){
inner();
}
public synchronized inner(){
//do something
}
}
注意 outer()和 inner()都声明为 synchronized,这在 Java 中这相当于 synchronized(this)块(译者注:这里两个方法是实例方法,synchronized 的实例方法相当于在 this 上加锁,如果是 static 方法,则不然,更多阅读:哪个对象才是锁?)。如果某个线程调用了 outer(),outer()中的 inner()调用是没问题的,因为两个方法都是在同一个管程对象(即 this)上同步的。如果一个线程持有某个管程对象上的锁,那么它就有权访问所有在该管程对象上同步的块。这就叫可重入。若线程已经持有锁,那么它就可以重复访问所有使用该锁的代码块。
下面这个锁的实现是不可重入的:
public class Lock{
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
}
如果一个线程在两次调用 lock()间没有调用 unlock()方法,那么第二次调用 lock()就会被阻塞,这就出现了重入锁死。
避免重入锁死有两个选择:
编写代码时避免再次获取已经持有的锁
使用可重入锁
至于哪个选择最适合你的项目,得视具体情况而定。可重入锁通常没有不可重入锁那么好的表现,而且实现起来复杂,但这些情况在你的项目中也许算不上什么问题。无论你的项目用锁来实现方便还是不用锁方便,可重入特性都需要根据具体问题具体分析。
-------------------------
mport java.util.concurrent.atomic.AtomicLong;
public class AtomicLong{
private AtomicLong count = new AtomicLong(0);
public void inc(){
boolean updated = false;
while(!updated){
long prevCount = this.count.get();
updated = this.count.compareAndSet(prevCount, prevCount + 1);
}
}
public long count(){
return this.count.get();
}
}
这个版本仅仅是上一个版本的线程安全版本。这一版我们感兴趣的是 inc()方法的实现。inc()方法中不再含有一个同步块。而是被下面这些代码替代:
boolean updated = false;
while(!updated){
long prevCount = this.count.get();
updated = this.count.compareAndSet(prevCount, prevCount + 1);
}
上面这些代码并不是一个原子操作。也就是说,对于两个不同的线程去调用 inc()方法,然后执行 long prevCount = this.count.get()语句,因此获得了这个计数器的上一个 count。但是,上面的代码并没有包含任何的竞态条件。
秘密就在于 while 循环里的第二行代码。compareAndSet()方法调用是一个原子操作。它用一个期望值和 AtomicLong 内部的值去比较,如果这两个值相等,就把 AtomicLong 内部值替换为一个新值。compareAndSet()通常被 CPU 中的 compare-and-swap 指令直接支持。因此,不需要去同步,也不需要去挂起线程。
假设,这个 AtomicLong 的内部值是 20。然后,两个线程去读这个值,都尝试调用 compareAndSet(20, 20 + 1)。尽管 compareAndSet()是一个原子操作,这个方法也会被这两个线程相继执行(某一个时刻只有一个)。
第一个线程会使用期望值 20(这个计数器的上一个值)与 AtomicLong 的内部值进行比较。由于两个值是相等的,AtomicLong 会更新它的内部值至 21(20 + 1 )。变量 updated 被修改为 true,while 循环结束。
现在,第二个线程调用 compareAndSet(20, 20 + 1)。由于 AtomicLong 的内部值不再是 20,这个调用将不会成功。AtomicLong 的值不会再被修改为 21。变量,updated 被修改为 false,线程将会再次在 while 循环外自旋。这段时间,它会读到值 21 并企图把值更新为 22。如果在此期间没有其它线程调用 inc()。第二次迭代将会成功更新 AtomicLong 的内部值到 22。
为什么称它为乐观锁
上一部分展现的代码被称为乐观锁(optimistic locking)。乐观锁区别于传统的锁,有时也被称为悲观锁。传统的锁会使用同步块或其他类型的锁阻塞对临界区域的访问。一个同步块或锁可能会导致线程挂起。
乐观锁允许所有的线程在不发生阻塞的情况下创建一份共享内存的拷贝。这些线程接下来可能会对它们的拷贝进行修改,并企图把它们修改后的版本写回到共享内存中。如果没有其它线程对共享内存做任何修改, CAS 操作就允许线程将它的变化写回到共享内存中去。如果,另一个线程已经修改了共享内存,这个线程将不得不再次获得一个新的拷贝,在新的拷贝上做出修改,并尝试再次把它们写回到共享内存中去。
称之为“乐观锁”的原因就是,线程获得它们想修改的数据的拷贝并做出修改,在乐观的假在此期间没有线程对共享内存做出修改的情况下。当这个乐观假设成立时,这个线程仅仅在无锁的情况下完成共享内存的更新。当这个假设不成立时,线程所做的工作就会被丢弃,但任然不使用锁。
乐观锁使用于共享内存竞用不是非常高的情况。如果共享内存上的内容非常多,仅仅因为更新共享内存失败,就用浪费大量的 CPU 周期用在拷贝和修改上。但是,如果砸共享内存上有大量的内容,无论如何,你都要把你的代码设计的产生的争用更低。
乐观锁是非阻塞的
我们这里提到的乐观锁机制是非阻塞的。如果一个线程获得了一份共享内存的拷贝,当尝试修改时,发生了阻塞,其它线程去访问这块内存区域不会发生阻塞。
对于一个传统的加锁/解锁模式,当一个线程持有一个锁时,其它所有的线程都会一直阻塞直到持有锁的线程再次释放掉这个锁。如果持有锁的这个线程被阻塞在某处,这个锁将很长一段时间不能被释放,甚至可能一直不能被释放。
非阻塞算法是不容易实现的
正确的设计和实现非阻塞算法是不容易的。在尝试设计你的非阻塞算法之前,看一看是否已经有人设计了一种非阻塞算法正满足你的需求。
Java 已经提供了一些非阻塞实现(比如 ConcurrentLinkedQueue),相信在 Java 未来的版本中会带来更多的非阻塞算法的实现。
除了 Java 内置非阻塞数据结构还有很多开源的非阻塞数据结构可以使用。例如,LAMX Disrupter 和 Cliff Click 实现的非阻塞 HashMap。查看我的Java concurrency references page查看更多的资源。
使用非阻塞算法的好处
非阻塞算法和阻塞算法相比有几个好处。下面让我们分别看一下:
选择
非阻塞算法的第一个好处是,给了线程一个选择当它们请求的动作不能够被执行时做些什么。不再是被阻塞在那,请求线程关于做什么有了一个选择。有时候,一个线程什么也不能做。在这种情况下,它可以选择阻塞或自我等待,像这样把 CPU 的使用权让给其它的任务。不过至少给了请求线程一个选择的机会。
在一个单个的 CPU 系统可能会挂起一个不能执行请求动作的线程,这样可以让其它线程获得 CPU 的使用权。不过即使在一个单个的 CPU 系统阻塞可能导致死锁,线程饥饿等并发问题。
没有死锁
非阻塞算法的第二个好处是,一个线程的挂起不能导致其它线程挂起。这也意味着不会发生死锁。两个线程不能互相彼此等待来获得被对方持有的锁。因为线程不会阻塞当它们不能执行它们的请求动作时,它们不能阻塞互相等待。非阻塞算法任然可能产生活锁(live lock),两个线程一直请求一些动作,但一直被告知不能够被执行(因为其他线程的动作)。
没有线程挂起
挂起和恢复一个线程的代价是昂贵的。没错,随着时间的推移,操作系统和线程库已经越来越高效,线程挂起和恢复的成本也不断降低。不过,线程的挂起和户对任然需要付出很高的代价。
无论什么时候,一个线程阻塞,就会被挂起。因此,引起了线程挂起和恢复过载。由于使用非阻塞算法线程不会被挂起,这种过载就不会发生。这就意味着 CPU 有可能花更多时间在执行实际的业务逻辑上而不是上下文切换。
在一个多个 CPU 的系统上,阻塞算法会对阻塞算法产生重要的影响。运行在 CPUA 上的一个线程阻塞等待运行在 CPU B 上的一个线程。这就降低了程序天生就具备的并行水平。当然,CPU A 可以调度其他线程去运行,但是挂起和激活线程(上下文切换)的代价是昂贵的。需要挂起的线程越少越好。
降低线程延迟
在这里我们提到的延迟指的是一个请求产生到线程实际的执行它之间的时间。因为在非阻塞算法中线程不会被挂起,它们就不需要付昂贵的,缓慢的线程激活成本。这就意味着当一个请求执行时可以得到更快的响应,减少它们的响应延迟。
非阻塞算法通常忙等待直到请求动作可以被执行来降低延迟。当然,在一个非阻塞数据数据结构有着很高的线程争用的系统中,CPU 可能在它们忙等待期间停止消耗大量的 CPU 周期。这一点需要牢牢记住。非阻塞算法可能不是最好的选择如果你的数据结构哦有着很高的线程争用。不过,也常常存在通过重构你的程序来达到更低的线程争用。