java并发编程(十八)啥是读写锁ReentrantReadWriteLock?

前面我们学习了AQS,ReentrantLock等,现在来学习一下什么是读写锁ReentrantReadWriteLock。

当读操作远远高于写操作时,这时候可以使用【读写锁】让【读-读】可以并发,提高性能。

本文还是基于源码的形式,希望同学们能够以本文为思路,自己跟踪源码一步步的debug进去,加深理解。

一、初识ReentrantReadWriteLock

同样的,先看下其类图:

image.png
  • 实现了读写锁接口ReadWriteLock
  • 有5个内部类,与ReentrantLock相同的是FairSyncNonfairSyncSync,另外不同的是增加两个内部类,都实现了Lock接口:
    • WriteLock
    • ReadLock
  • Sync 增加了两个内部类 :
    • HoldCounter:持有锁的计数器
    • ThreadLocalHoldCounter :维护HoldCounter的ThreadLocal

二、使用案例

通常会维护一个操作数据的容器类,内部应该封装好数据的read和write方法,如下所示:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @description: 数据容器类
 * @author:weirx
 * @date:2022/1/13 15:29
 * @version:3.0
 */
public class DataContainer {

    /**
     * 初始化读锁和写锁
     */
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    protected void read(){
        readLock.lock();
        try {
            System.out.println("获取读锁");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println("释放读锁");
        }
    }

    protected void write(){
        writeLock.lock();
        try {
            System.out.println("获取写锁");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println("释放写锁");
        }
    }
}

简单测试一下,分为读读、读写、写写。

  • 读读:
    public static void main(String[] args) {
        //初始化数据容器
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.read();
        }, "t2").start();
    }

结果,读读不互斥,同时获取读锁,同时释放:

获取读锁
获取读锁
释放读锁
释放读锁
  • 读写:
    public static void main(String[] args) {
        //初始化数据容器
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();
    }

结果,读写互斥,无论是先执行read还是write方法,都会等到读锁或写锁被释放之后,才会获取下一把锁:

获取读锁 -- 第一个执行
释放读锁 -- 第二个执行
获取写锁 -- 第三个执行
释放写锁 -- 第四个执行
  • 写写:
    public static void main(String[] args) {
        //初始化数据容器
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.write();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();
    }

结果,写写互斥,只有第一把写锁释放后,才能获取下一把写锁:

获取写锁
释放写锁
获取写锁
释放写锁

注意:

  • 锁重入时,持有读锁再去获取写锁,会导致写锁一直等待
        protected void read(){
          readLock.lock();
          try {
              System.out.println("获取读锁");
              TimeUnit.SECONDS.sleep(1);
              System.out.println("获取写锁");
              writeLock.lock();
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              readLock.unlock();
              System.out.println("释放读锁");
          }
      }
    
    结果:不会释放
    获取读锁
    获取写锁
    
  • 锁重入时,持有写锁,可以再去获取读锁。
     protected void write(){
          writeLock.lock();
          try {
              System.out.println("获取写锁");
              TimeUnit.SECONDS.sleep(1);
              System.out.println("获取读锁");
              readLock.lock();
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              writeLock.unlock();
              System.out.println("释放写锁");
          }
      }
    
    结果:
    获取写锁
    获取读锁
    释放写锁
    

三、源码分析

我们根据前面的例子,从读锁的获取到释放,从写锁的获取到释放,依次查看源码。

先注意一个事情,读写锁是以不同的位数来区分独占锁和共享锁的状态的:

       /*
         * 读和写分为上行下两个部分,低16位是独占锁状态,高16位是共享锁状态
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 返回以count表示的共享持有数 */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** 返回以count表示的互斥保持数  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

3.1 读锁分析

3.1.1 读锁获取

从 readLock.lock(); 这里进入分析过程:

        /**
        * 获取读锁。
        * 如果写锁没有被另一个线程持有,则获取读锁并立即返回。
        * 如果写锁被另一个线程持有,那么当前线程将被禁用以用于线程调度目的并处于休眠状态,直到获得读锁为止
        */
        public void lock() {
            sync.acquireShared(1);
        }

如上的lock方法,是ReentrantReadWriteLock子类ReadLock的方法,而acquireShared方法是在AQS的子类Syn当中定义的,这个方法尝试以共享的方式获取读锁,失败则进入等待队列, 不断重试,直到获取读锁为止。

    public final void acquireShared(int arg) {
        // 被其他线程持有的话,就走AQS的doAcquireShared
        if (tryAcquireShared(arg) < 0)
            // 获取共享锁,失败加入等待队列,不可中断的获取,直到获取为止
            doAcquireShared(arg);
    }

tryAcquireShared是在ReentrantReadWriteLock当中实现的,我们直接看代码:

        protected final int tryAcquireShared(int unused) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 获取当前锁状态
            int c = getState();
            // 独占锁统计不等于0 且 持有者不是当前线程,就返回 -1 ,换句话说,被其他线程持有
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 共享锁数量
            int r = sharedCount(c);
            // 返回fase才有资格获取读锁
            if (!readerShouldBlock() &&
                // 持有数小于默认值
                r < MAX_COUNT &&
                // CAS 设置锁状态
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 持有共享锁为0
                if (r == 0) {
                    // 第一个持有者是当前线程
                    firstReader = current;
                    // 持有总数是 1 
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 持有锁的是当前线程本身,就把技术 + 1
                    firstReaderHoldCount++;
                } else {
                    // 获取缓存计数
                    HoldCounter rh = cachedHoldCounter;
                    // 如果是null 或者 持有线程的id不是当前线程
                    if (rh == null || rh.tid != getThreadId(current))
                        // 赋值给缓存
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        // rh不是null ,且是当前线程,就把读锁持有者设为缓存中的值
                        readHolds.set(rh);
                    // 将其 + 1
                    rh.count++;
                }
                return 1;
            }
            // 想要获取读锁的线程应该被阻塞,保底工作,处理 CAS 未命中和在 tryAcquireShared 中未处理的重入读取
            return fullTryAcquireShared(current);
        }

从上面的源码我们可以看得出来,写锁和读锁之间是互斥的。

3.1.2 读锁释放

直接看关键部分

    /**
      * 以共享模式释放锁,tryReleaseShared返回true,则释放
      */
    public final boolean releaseShared(int arg) {
        // 释放锁
        if (tryReleaseShared(arg)) {
            // 唤醒队列的下一个线程
            doReleaseShared();
            return true;
        }
        return false;
    }

看看读写锁的tryReleaseShared实现:

        protected final boolean tryReleaseShared(int unused) {
            //。。。省略。。。
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
                    // 计数为 0 才是真正释放
                    return nextc == 0;
            }
        }

如果上述方法释放成功,则走下面AQS继承来的方法:

    private void doReleaseShared() {
        // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
        // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
              // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
              // 防止 unparkSuccessor 被多次执行
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 如果已经是 0 了,改为 -3,用来解决传播性
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }

3.2 写锁分析

3.2.1 获取锁

    public final void acquire(int arg) {
        // 尝试获得写锁失败
        if (!tryAcquire(arg) &&
                        // 将当前线程关联到一个 Node 对象上, 模式为独占模式
                        // 进入 AQS 队列阻塞
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) {
            selfInterrupt();
        }
    }

读写锁的上锁方法:tryAcquire

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
           // 获得低 16 位, 代表写锁的 state 计数
            int w = exclusiveCount(c);
            if (c != 0) {
                // 如果写锁是0 或者 当前线程不等于独占线程,获取失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 写锁计数超过低 16 位, 报异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 写锁重入, 获得锁成功
                setState(c + acquires);
                return true;
            }
            // 写锁应该阻塞
            if (writerShouldBlock() ||
                //    更改计数失败  
                !compareAndSetState(c, c + acquires))
                // 获取锁失败
                return false;
            // 设置当前线程独占锁
            setExclusiveOwnerThread(current);
            return true;
        }

3.2.2 释放锁

release:

    public final boolean release(int arg) {
        // 尝试释放写锁成功
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease:

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        // 因为可重入的原因, 写锁计数为 0, 才算释放成功
        boolean free = exclusiveCount(nextc) == 0;
        if (free) {
            setExclusiveOwnerThread(null);
        }
        setState(nextc);
        return free;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容