前面我们学习了AQS,ReentrantLock等,现在来学习一下什么是读写锁ReentrantReadWriteLock。
当读操作远远高于写操作时,这时候可以使用【读写锁】让【读-读】可以并发,提高性能。
本文还是基于源码的形式,希望同学们能够以本文为思路,自己跟踪源码一步步的debug进去,加深理解。
一、初识ReentrantReadWriteLock
同样的,先看下其类图:
- 实现了读写锁接口
ReadWriteLock
- 有5个内部类,与ReentrantLock相同的是
FairSync
、NonfairSync
和Sync
,另外不同的是增加两个内部类,都实现了Lock接口:WriteLock
ReadLock
- Sync 增加了两个内部类 :
-
HoldCounter
:持有锁的计数器 -
ThreadLocalHoldCounter
:维护HoldCounter的ThreadLocal
-
二、使用案例
通常会维护一个操作数据的容器类,内部应该封装好数据的read和write方法,如下所示:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @description: 数据容器类
* @author:weirx
* @date:2022/1/13 15:29
* @version:3.0
*/
public class DataContainer {
/**
* 初始化读锁和写锁
*/
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
protected void read(){
readLock.lock();
try {
System.out.println("获取读锁");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
System.out.println("释放读锁");
}
}
protected void write(){
writeLock.lock();
try {
System.out.println("获取写锁");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
System.out.println("释放写锁");
}
}
}
简单测试一下,分为读读、读写、写写。
- 读读:
public static void main(String[] args) {
//初始化数据容器
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
结果,读读不互斥,同时获取读锁,同时释放:
获取读锁
获取读锁
释放读锁
释放读锁
- 读写:
public static void main(String[] args) {
//初始化数据容器
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
结果,读写互斥,无论是先执行read还是write方法,都会等到读锁或写锁被释放之后,才会获取下一把锁:
获取读锁 -- 第一个执行
释放读锁 -- 第二个执行
获取写锁 -- 第三个执行
释放写锁 -- 第四个执行
- 写写:
public static void main(String[] args) {
//初始化数据容器
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.write();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
结果,写写互斥,只有第一把写锁释放后,才能获取下一把写锁:
获取写锁
释放写锁
获取写锁
释放写锁
注意:
- 锁重入时,持有读锁再去获取写锁,会导致写锁一直等待
结果:不会释放protected void read(){ readLock.lock(); try { System.out.println("获取读锁"); TimeUnit.SECONDS.sleep(1); System.out.println("获取写锁"); writeLock.lock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { readLock.unlock(); System.out.println("释放读锁"); } }
获取读锁 获取写锁
- 锁重入时,持有写锁,可以再去获取读锁。
结果:protected void write(){ writeLock.lock(); try { System.out.println("获取写锁"); TimeUnit.SECONDS.sleep(1); System.out.println("获取读锁"); readLock.lock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { writeLock.unlock(); System.out.println("释放写锁"); } }
获取写锁 获取读锁 释放写锁
三、源码分析
我们根据前面的例子,从读锁的获取到释放,从写锁的获取到释放,依次查看源码。
先注意一个事情,读写锁是以不同的位数来区分独占锁和共享锁的状态的:
/*
* 读和写分为上行下两个部分,低16位是独占锁状态,高16位是共享锁状态
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回以count表示的共享持有数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回以count表示的互斥保持数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
3.1 读锁分析
3.1.1 读锁获取
从 readLock.lock(); 这里进入分析过程:
/**
* 获取读锁。
* 如果写锁没有被另一个线程持有,则获取读锁并立即返回。
* 如果写锁被另一个线程持有,那么当前线程将被禁用以用于线程调度目的并处于休眠状态,直到获得读锁为止
*/
public void lock() {
sync.acquireShared(1);
}
如上的lock方法,是ReentrantReadWriteLock子类ReadLock的方法,而acquireShared方法是在AQS的子类Syn当中定义的,这个方法尝试以共享的方式获取读锁,失败则进入等待队列, 不断重试,直到获取读锁为止。
public final void acquireShared(int arg) {
// 被其他线程持有的话,就走AQS的doAcquireShared
if (tryAcquireShared(arg) < 0)
// 获取共享锁,失败加入等待队列,不可中断的获取,直到获取为止
doAcquireShared(arg);
}
tryAcquireShared是在ReentrantReadWriteLock当中实现的,我们直接看代码:
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取当前锁状态
int c = getState();
// 独占锁统计不等于0 且 持有者不是当前线程,就返回 -1 ,换句话说,被其他线程持有
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 共享锁数量
int r = sharedCount(c);
// 返回fase才有资格获取读锁
if (!readerShouldBlock() &&
// 持有数小于默认值
r < MAX_COUNT &&
// CAS 设置锁状态
compareAndSetState(c, c + SHARED_UNIT)) {
// 持有共享锁为0
if (r == 0) {
// 第一个持有者是当前线程
firstReader = current;
// 持有总数是 1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 持有锁的是当前线程本身,就把技术 + 1
firstReaderHoldCount++;
} else {
// 获取缓存计数
HoldCounter rh = cachedHoldCounter;
// 如果是null 或者 持有线程的id不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 赋值给缓存
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// rh不是null ,且是当前线程,就把读锁持有者设为缓存中的值
readHolds.set(rh);
// 将其 + 1
rh.count++;
}
return 1;
}
// 想要获取读锁的线程应该被阻塞,保底工作,处理 CAS 未命中和在 tryAcquireShared 中未处理的重入读取
return fullTryAcquireShared(current);
}
从上面的源码我们可以看得出来,写锁和读锁之间是互斥的。
3.1.2 读锁释放
直接看关键部分
/**
* 以共享模式释放锁,tryReleaseShared返回true,则释放
*/
public final boolean releaseShared(int arg) {
// 释放锁
if (tryReleaseShared(arg)) {
// 唤醒队列的下一个线程
doReleaseShared();
return true;
}
return false;
}
看看读写锁的tryReleaseShared实现:
protected final boolean tryReleaseShared(int unused) {
//。。。省略。。。
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
// 计数为 0 才是真正释放
return nextc == 0;
}
}
如果上述方法释放成功,则走下面AQS继承来的方法:
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
// 防止 unparkSuccessor 被多次执行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3.2 写锁分析
3.2.1 获取锁
public final void acquire(int arg) {
// 尝试获得写锁失败
if (!tryAcquire(arg) &&
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
// 进入 AQS 队列阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) {
selfInterrupt();
}
}
读写锁的上锁方法:tryAcquire
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获得低 16 位, 代表写锁的 state 计数
int w = exclusiveCount(c);
if (c != 0) {
// 如果写锁是0 或者 当前线程不等于独占线程,获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 写锁计数超过低 16 位, 报异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入, 获得锁成功
setState(c + acquires);
return true;
}
// 写锁应该阻塞
if (writerShouldBlock() ||
// 更改计数失败
!compareAndSetState(c, c + acquires))
// 获取锁失败
return false;
// 设置当前线程独占锁
setExclusiveOwnerThread(current);
return true;
}
3.2.2 释放锁
release:
public final boolean release(int arg) {
// 尝试释放写锁成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因为可重入的原因, 写锁计数为 0, 才算释放成功
boolean free = exclusiveCount(nextc) == 0;
if (free) {
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}