Java 并发——一文读懂 ReadWriteLock

ReadWriteLock 背后维护着一对相互关联的锁,一个用于读,一个用于写。读锁可以被多个读线程并发获取,
只要没有写线程。而写锁不支持此种情况。读写锁,读-读能共存,读-写不能共存,写-写不能共存。

自实现

如果某个线程想要读取资源,只要没有线程正在对该资源进行写操作且没有线程请求对该资源的写操作即可。

public class ReadWriteLock {
    private int readers = 0;
    private int writers = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException{
        while(writers > 0 || writeRequests > 0){
            wait();
        }
        readers++;
    }

    public synchronized void unlockRead(){
        readers--;
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;

        while(readers > 0 || writers > 0){
            wait();
        }
        writeRequests--;
        writers++;
    }

    public synchronized void unlockWrite() throws InterruptedException{
        writers--;
        notifyAll();
    }
}

比较简单的一个读写锁实现,其中有一个问题就是如果写操作很频繁,那么读线程可能会产生"饥饿现象"。
需要注意的是,在两个释放锁的方法(unlockRead,unlockWrite)中,都调用了notifyAll方法,
而不是notify。要解释这个原因,我们可以想象下面一种情形:

如果有线程在等待获取读锁,同时又有线程在等待获取写锁。如果这时其中一个等待读锁的线程被
notify() 方法唤醒,但因为此时仍有请求写锁的线程存在(writeRequests>0),所以被
唤醒的线程会再次进入阻塞状态。然而,等待写锁的线程一个也没被唤醒,就像什么也没发生过一样
(信号丢失)。如果用的是 notifyAll() 方法,所有的线程都会被唤醒,然后判断能否获得其
请求的锁。

用 notifyAll() 方法还有一个好处。如果有多个读线程在等待读锁且没有线程在等待写锁时,
调用 unlockWrite() 方法后,所有等待读锁的线程都能立马成功获取读锁,而不是一次只允许一个。

读写锁重入

上面代码不支持读写锁重入的,当一个已经持有写锁的线程再次请求写锁时,就会被阻塞。原因是已经有一个
写线程了——就是它自己。此外,考虑下面的例子:

  1. thread#1 获得了读锁
  2. thread#2 请求写锁,但因为 thread#1 持有了读锁,所以写锁请求被阻塞
  3. thread#1 再想请求一次读锁,但因为 thread#2 处于请求写锁的状态,所以想再次获取读锁也会被阻塞

所以实现读写锁可重入的思路就是:

  • 保存当前获取读锁的线程,并计数。当前读线程之前如果获取到了读锁,则直接重入,否则,
    如果有写线程或有写线程在等待锁,那么获取读锁失败,需要等待
  • 保存当前获取写锁的线程,并计数。如果没有读线程,或没有写线程,那么获取写锁成功。或者同一线程再次
    获取写锁。

ReentrantReadWriteLock

类图

ReentrantReadWriteLock

ReentrantReadWriteLock.Sync

数据结构:

// 读锁和写锁计数常量
// 锁状态逻辑上被划分成两部分无符号整数:
// 地位表示排他锁计数(exclusive(writer) lock)
// 高位表示共享锁计数(shared(reader) lock)
// 共享状态 state 为 int 型,32 位,所以 SHARED_SHIFT = 16,各占一半
static final int SHARED_SHIFT   = 16;
// 65536,二进制:10000000000000000
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 65535,二进制:1111111111111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 第一个获取读锁的线程,即把 shared count 从 0 改变为 1 的线程
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

// 上一个线程获取读锁的计数
private transient HoldCounter cachedHoldCounter;

// 当前线程所持有的可重入读锁的计数
private transient ThreadLocalHoldCounter readHolds;

构建 ReentrantReadWriteLock.Sync 时会进行初始化操作

Sync() {
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // 默认为 0
}

非公平加锁

ReadLock(读锁)

读锁,实现了 Lock 接口,内部依赖对 AQS 的实现 Sync 类。

// ReadLock 类
// 如果写锁没有被其他线程获取,那么立即返回,否则,阻塞
 public void lock() {
    sync.acquireShared(1);
}

// AQS 类
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// ReentrantReadWriteLock.Sync 类
// unused = 1
protected final int tryAcquireShared(int unused) {
    /*
     * 1. 如果有其他线程获取了 写锁,失败
     * 2. 
     */
    Thread current = Thread.currentThread();
    int c = getState();
    
    // exclusiveCount(c) => c & 1111111111111111(Sync.EXCLUSIVE_MASK)
    
    // ① 判断是否有其他线程获取了 写锁。锁降级:如果获取写锁的线程可以获取读锁
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
        
    // sharedCount(c) => c >>> 16(Sync.SHARED_SHIFT) (高 16 位表示共享读)
    int r = sharedCount(c);
    
    // ② 核心
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) { 
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { 
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // ③
    return fullTryAcquireShared(current);
}

// NonfairSync 类
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
// AQS 类
// 如果队列中等待的第一个节点(head 节点不算等待,因为它表示的线程拿到锁了)是 写线程,
// 那么 读线程 应该被 阻塞。目的防止 写线程 产生饥饿现象
final boolean apparentlyFirstQueuedIsExclusive() {
    // 1. 队列未被初始化
    // 1.1 读线程来获取读锁,返回 false
    // 2. 队列被初始化
    // 2.1 队列当前只有一个节点 head,返回 false
    // 2.2 队列不止一个节点 head
    // 2.2.1 第一个等待节点的 nextWaiter 不是 SHARED 模式,返回 true
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

核心逻辑在上述 tryAcquireShared() 方法注释 ② 标示的 if() 判断逻辑,我们一步一步来解析。
if() 判断条件由三部分组成,并且是 && 关系,也就是只要有一部分不满足,就进入注释 ③ 标示的逻辑代码。

  • !readerShouldBlock()
    上述代码里解释了 readerShouldBlock() 方法逻辑。注意这里还对其结果取反了。
  • r < MAX_COUNT
    ReentrantReadWriteLock.Sync 类的数据结构有说 MAX_COUTN 字段。这里表示 读线程 不能超过 65535 个,否则会报错。
  • compareAndSetState(c, c + SHARED_UNIT)
    c 变量是当前 state 的值。如果此时第一个 读线程 进来,c = 0,c + SHARED_UNIT = 65536,为啥是这样?因为高 16 位表示 共享读,并且计算 读线程 的个数是 c >>> 16。
    读线程获取锁,修改 state 值是state += 65536。

当满足上述条件时,表示线程已经拿到读锁了,if() 判断条件的内部逻辑分三步走:

  1. 还没有线程获取读锁,即

     (getState() >>> SHARED_SHIFT) = 0
    

    记录第一个获取读锁的线程,以及它的重入数。

  2. 如果第一步不满足,判断 firstReader 跟当前线程是否同一线程,是,firstReaderHoldCount 增加。

  3. 否则,执行下面逻辑

    HoldCounter rh = cachedHoldCounter; // 当前线程的缓存计数
    // rh 还未被初始化,即没有第二个线程进来获取读锁
    if (rh == null || rh.tid != getThreadId(current))
        // 初始化 cachedHoldCounter
        cachedHoldCounter = rh = readHolds.get();
    else if (rh.count == 0)
        readHolds.set(rh);
    rh.count++;
    

如果注释 ② 的 if() 判断没有满足,即线程不能获取读锁,或已经达到最高读线程数,或 CAS 失败,那么都会进入注释 ③ 的方法 fullTryAcquireShared()。

// ReentrantReadWriteLock.Sync 类
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 死循环
    for (;;) {
        int c = getState();
        
        // 是否有写线程获取多写锁。锁降级
        if (exclusiveCount(c) != 0) {
        
            if (getExclusiveOwnerThread() != current)
                // 有写线程获取到写锁,其他线程来获取读锁,直接返回 -1,进队列
                return -1;
                
        } else if (readerShouldBlock()) { // 没有写锁(写锁可能被释放)
              // head(读线程) -> node1(写线程,等待)
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                
                if (rh.count == 0)
                    return -1;
            }
        }
        
        // ③ 计数设计
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

// AQS 类
// 获取读锁
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    // 获取读锁失败,入队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 循环
        for (;;) {
            // 前继节点
            final Node p = node.predecessor();
            if (p == head) { // 前继节点为 head
                int r = tryAcquireShared(arg);
                if (r >= 0) { // 获取到读锁
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

我们理下进入 fullTryAcquireShared() 方法的各种情况:

  1. 写锁降级,即 thread1 获取到写锁,且准备再获取读锁

    • thread1 获取到读锁之前,线程 B 准备获取写锁,此时 thread1 能正常获取读锁


      队列

      进入注释③ 的代码

    • thread1 获取到读锁之前,没有其他线程准备获取写锁


      Sync
  2. thread1 获取到写锁,thread2 来获取读锁


    Read
WriteLock(写锁)

整个过程同 ReentrantLock类的获取锁逻辑差不多。

小结

  • 写锁可以降级为读锁,读锁不能升级为写锁
  • 同步队列中第一个等待的节点是写线程,则不能获取读锁,需要排队
  • 读线程获取到读锁,需要传递后继节点
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342