by shihang.mai
0. 前言
大神Doug Lea在类上注释已经有使用例子,这里贴一下
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
1. 数据结构
- StampLock内部会维护一个CLH队列。
每一个节点是一个WNode(Wait Node)
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
}
当队列中有若干个线程等待,就会像下面的图一样
- 锁状态位state
private transient volatile long state;
private static final long ORIGIN = WBIT << 1;
private static final long WBIT = 1L << LG_READERS;
private static final int LG_READERS = 7;
public StampedLock() {
state = ORIGIN;
}
上面是long state的初始化的值,即1<<8( ...0001 0000 0000)
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
上面是加写锁,将state += WBIT(1<<7),即 ...0001 1000 0000
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}
上面是释放写锁,将state = (stamp += WBIT),即 ...0010 0000 0000
这里相当于把释放锁的次数也记录了,记录它的目的是因为整个state 的状态判断都是基于CAS操作的。而普通的CAS操作可能会遇到ABA的问题,如果不记录次数,那么当写锁释放掉,申请到,再释放掉时,我们将无法判断数据是否被写过
这里继续说一下前7位,它是用来记录读线程的数量的,所以它最大记录126个,超过的部分存放在readerOverflow
private static final long RFULL = RBITS - 1L;
private static final long RBITS = WBIT - 1L;
private static final long WBIT = 1L << LG_READERS;
private static final int LG_READERS = 7;
private transient int readerOverflow;
总结一下,如图
2. 写锁的加锁和释放
- 加锁
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
如果CAS设置state失败,表示写锁申请失败,这时,会调用acquireWrite(),而对于这个方法,比较复杂,就简单总结一下就是
- 释放锁
- 恢复state中写锁标志位为0
- 同时增加释放锁次数
- 唤醒后续线程
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}
3. 读锁的加锁和释放
- 加锁
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
队列中没有写锁&&读线程个数没有超过126,直接获得锁,并且读线程数量加1。这里也有一个acquireRead(),简单总结一下
- 释放锁
- state读线程数-1
- 唤醒下一个线程
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
4. 使用不当会打满CPU
public class StampedLockTest {
public static void main(String[] args) throws InterruptedException {
final StampedLock lock = new StampedLock();
Thread t1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 模拟程序阻塞等待其他资源
LockSupport.park();
});
t1.start();
// 保证t1获取写锁
Thread.sleep(100);
Thread t2 = new Thread(() -> {
// 阻塞在悲观读锁
lock.readLock();
});
t2.start();
// 保证t2阻塞在读锁
Thread.sleep(100);
// 中断线程t2,会导致线程t2所在CPU飙升
t2.interrupt();
t2.join();
}
}
原因如下:
- 如果没有中断,那么阻塞在readLock()上的线程在经过几次自旋后,会进入park()等待,一旦进入park()等待,就不会占用CPU了。
- 但是park()这个函数有一个特点,就是一旦线程被中断,park()就会立即返回,也不抛异常。
- 而线程中断标记一直打开着,不停的自选,所以CPU就爆满了。
解决
在StampedLock内部,在park()返回时,需要判断中断标记位,并作出正确的处理,比如,退出,抛异常,或者把中断位给清理一下