ReentrantReadWriteLock源码分析

为什么要有读写锁

ReentrantReadWriteLock 适用于读多写少的场景,标识同一时间,可以有多个线程并发读,但是不可以多个线程并发写。由于互斥锁,如:ReentrantLock,上锁后,无论你是读操作还是写操作,它们之间都是互斥的来保证线程安全,但是这样做的话,效率比较低。

使用示例

public class ReentrantReadWriteLockTest {

    public static void main(String[] args) throws InterruptedException {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

        Thread r1 = new Thread(() -> {
            // 上读锁
            readLock.lock();
            try {
                System.out.println("r1 开始读啦!!");
            } finally {
                // 释放读锁
                readLock.unlock();
            }
        });


        Thread r2 = new Thread(() -> {
            // 上读锁
            readLock.lock();
            try {
                System.out.println("r2 开始读啦!!");
            } finally {
                readLock.unlock();
            }
        });

        Thread w1 = new Thread(() -> {
            // 上读锁
            writeLock.lock();
            try {
                System.out.println("w1 开始写啦!!");
            } finally {
                // 释放写锁
                writeLock.unlock();
            }
        });

        r1.start();
        r2.start();
        w1.start();
    }
}

核心思想

  • 基于AQS实现,用state来标识是否获取到锁,获取不到锁,线程交由AQS同步队列进行管理
  • 用32位的state来同时表示读锁和写锁,低16位表示写锁,高16位表示读锁
  • 公平与非公平
  • 写写互斥,读写互斥,读读共享
  • 锁降级

源码分析

构造方法

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * default (nonfair) ordering properties.
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}

从构造方法上看,默认是使用非公平的方式获取锁

获取写锁(互斥锁)

AQS.aquire(int arg) 方法
     /**
     * 获取互斥锁的模版方法
     * 流程:调用子类实现的tryAcquire方法尝试获取锁,若获取锁成功,则终止,若获取不成功,则调用AQS的addWaiter方法,新增节点到AQS的双端同步队列里,
     * 然后调用acquireQueued方法再次进行判断获取锁资源,若还是获取不到锁,则将线程阻塞
     * @param arg
     */
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
ReentrantReadWriteLock. tryAcquire(int arg) 方法
        /**
         * 获取写锁(互斥锁)
         * @param acquires
         * @return true:获取锁成功;false:获取锁失败
         */
        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程对象
            Thread current = Thread.currentThread();
            // 获取当前锁状态(AQS 的 state)
            int c = getState();
            // 获取写锁(互斥锁)数量
            int w = exclusiveCount(c);
            if (c != 0) { // 当前有锁状态(读锁或写锁,或两者都有)
                // (Note: if c != 0 and w == 0 then shared count != 0)
                /**
                 *  重要:
                 *  w==0 : 意味着目前没有线程持有写锁。则:(c != 0 && w == 0) 意味着当前有线程持有读锁,但是没有任何线程持有写锁
                 *  current != getExclusiveOwnerThread(): 意味着当前线程没有持有写锁。
                 *  所以:return false 有两种情况:1、当前有线程持有读锁(理论:不能够进行锁升级,读写互斥)   2、目前有其他线程持有写锁(理论:写写互斥)
                 *  这两种情况,满足其一,都会返回false --> 当前线程进去AQS队列进行等待调度
                 */
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 判断持写锁数量是否超过最大值,超过则报错
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                /**
                 * 走到这里意味着:(w != 0 && current == getExclusiveOwnerThread() && (w + exclusiveCount(acquires) < MAX_COUNT))= true
                 * 即:当前线程本身就持有了写锁,并且写锁数量小于最大值。这里相当于写锁的重入(锁重入)
                 */
                setState(c + acquires);
                return true;
            }
            /**
             * 走到这里意味着:c==0 即 当前属于无锁状态。
             * writerShouldBlock():判断是否需要阻塞当前线程,分公平和非公平两种情况。
             * 公平:看在我前面是否有线程正在阻塞(AQS队列中,是否有节点正在等待唤醒),有:则writerShouldBlock() == true。若无:则为 false,表示当前线程不需要阻塞。
             *      总结:公平的获取写锁的机制为:查看前面是否有线程持有锁(无论是读锁还是写锁)或有线程正在等待锁,若有,直接进入队列等待
             * 非公平:writerShouldBlock()永远返回的是false,即不需要看前面是否有节点正在阻塞等待唤醒,直接上来就cas抢锁
             *      总结:非公平的获取写锁的机制为:
             *
             */
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires)) // CAS抢锁
                return false;
            // 走到这里意味着:线程抢锁(写锁)成功,将exclusiveOwnerThread属性设置为自己,标识我已经获得了写锁,你们其他线程进队列等待去吧
            setExclusiveOwnerThread(current);
            return true;
        }
ReentrantReadWriteLock. addWaiter() 方法
    /**
     * 向队列添加新的节点,从尾巴插入
     * @param mode 新增的节点对象
     * @return 返回新的尾节点
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

/**
     * 向队列添加新的节点,从尾巴插入
     * @param node 新增的节点对象
     * @return 返回新的尾节点
     */
    private Node enq(final Node node) {
        for (;;) { // 自旋,向队列尾部新增节点,直到成功为止
            Node t = tail;
            if (t == null) { // Must initialize
                /**
                 * 若当前的尾节点为空,说明现在还没有任何节点插入到队列内,此时我们对头节点和尾节点进行初始化new Node()。
                 * 因此得出一个结论:头节点在后续的逻辑中,永远不可能为空
                 */
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // 将当前节点设置为新的尾节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

我们可以看到,在调用enq()方法前,程序会先进行cas做节点插入逻辑,但是enq()方法也有同样的插入逻辑,这样做会不会感觉逻辑重复了,是不是多余了?
实际上这是一个优化,大部分情况下是不用走到enq方法里去自旋插入节点的,为了提高性能,避免进入for循环,这里将插入操作提前了。

AQS.acquireQueued(final Node node, int arg) 方法
/**
     * 获取锁资源,获取不到则对线程进行阻塞
     * @param node 已经在队列里的节点
     * @param arg
     * @return
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // 是否被中断标识
            boolean interrupted = false;
            for (;;) { // 自旋
                // 获取前驱节点
                final Node p = node.predecessor();
                /**
                 *  若前驱节点为头节点,那么当前线程此时立刻尝试获取锁
                 *
                 *  问:为什么要在这里获取锁?
                 *  答:因为前驱节点如果是头节点的话,会存在两种情况:
                 *      1. 该头结点是初始化的虚节点,那么当前线程对应的node节点就应该是队列里的第一个"线程节点",那么此时应该直接尝试获取锁,并且将头结点更新为当前节点
                 *      2. 该头节点不是虚节点,是正常的"线程节点",那么此时很有可能头结点释放了锁,那么此时去尝试获取锁就有比较大的概率获取到,并且将头结点更新为当前节点
                 */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 抢锁成功,return false
                    return interrupted;
                }
                // 判断线程是否可以阻塞,可以的话则进行阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果上面的代码,出现异常,那么这里进行兜底处理,会判断failed标识符,若为true,则调用cancel方法将节点失效掉
            if (failed)
                cancelAcquire(node);
        }
    }


 /**
     * 判断当前线程是否可以"安心"的阻塞
     * @param pred
     * @param node
     * @return
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取当前线程的前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            // 若前驱节点的状态为SIGNAL(-1),则直接返回true,表示可以安心的进行阻塞了
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            /**
             * 若前驱节点的状态>0,即为CANCELLED(1)状态,则遍历往前寻找到一个CANCELLED状态节点为止,并将寻找到的节点与本节点进行关联。
             * 关联完后,返回false,由上游程序自旋重新调用本方法进行判断
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            /**
             * 若前驱节点的状态不为SIGNAL(-1),CANCELLED(1)。状态可能为:CONDITION(-2),PROPAGATE(-3)或 初始化(0)状态,
             * 则将前驱节点的状态变成SIGNAL后,由上游程序自选重新调用本方法进行判断
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    * 阻塞当前线程
     * @return 被中断,则返回true,否则 返回false
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
AQS.cancelAcquire方法
    /**
     * 取消节点。
     * 流程:将当前节点Thread置为空-->获取到当前节点往前的第一个有效节点(非CANCELLED状态)-->将当前节点的状态置为CANCELLED(失效)-->将当前节点从队列内清除出去-->唤醒后继节点
     * 虽然在这个方法里,会将当前节点从队列内清除出去,但是在并发情况下,其他线程有可能获取到当前节点的状态为CANCEELED
     * @param node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        // 日常判空
        if (node == null)
            return;

        node.thread = null;

        // 获取前驱节点
        Node pred = node.prev;
        // 从后往前遍历获取到不为CANCELLED状态的前驱节点为止
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 获取前驱节点的下一个节点
        Node predNext = pred.next;

        // 将当前线程节点的状态变为CANCELLED状态
        node.waitStatus = Node.CANCELLED;

        /**
         * 若当前线程节点为队列内的尾节点,则将上面找到的有效的前驱节点作为新的尾节点(CAS操作,将tail变量赋值为pred),成功后tail==pred
         */
        if (node == tail && compareAndSetTail(node, pred)) {
            /**
             * 重点:
             * 由于是双向练表,所以这里需要将新的尾节点(pred)的next节点cas为null。此时的状态为:pred->null pred<-node,当前节点的前驱指针还是指向pred的。
             * 由于此时next-null,所以整条链路的next是不完整的,所以后续遍历查找节点应该从后往前利用pred指针找
             */
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            /**
             * 进到这里意味着:当前的节点不是尾节点 或 上面cas更新尾节点失败(并发引起)
             */
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    // 将上面找到的前驱节点与当前线程的后继节点相连(CAS next指针)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 唤醒后继节点
                unparkSuccessor(node);
            }
            // 当前节点的后继节点指向自己,方便GC
            node.next = node; // help GC
        }
    }

释放写锁(互斥锁)

AQS.release(int arg) 和unparkSuccessor(Node node)方法
    // 释放互斥锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    // 唤醒后继节点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 当前释放锁的节点的状态
        int ws = node.waitStatus;
        if (ws < 0)
            // 若当前节点状态不是CANCELLED状态(即:是有效节点),则直接cas,将当前节点状态修改为0(初始化状态)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 获取后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            /**
             * 若当前节点的后继节点为空 或者 后继节点的状态为CANCELLED状态,则从后往前找到第一个状态不为CANCELLED的节点
             * 问:为什么要从后往前找?
             * 答:这是由于在取消节点的cancelAcquire()方法里,我们取消节点时,会有短暂的时刻导致next指针不完整,但是pre指针对整条链路来说是完整的,所以需要从后往前找。详细看cancelAcquire方法
             */
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒上面找到的当前线程的后继有效节点
            LockSupport.unpark(s.thread);
    }


获取读锁(共享锁)

AQS.acquireShared(int arg) 方法
    /**
     * 获取共享锁,若获取不成功,则将线程放到队列里面去等待。
     * 流程:调用子类的tryAcquireShared方法尝试获取锁,若获取锁不成功,则调用doAcquireShared方法,入队等待
     * @param arg
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
ReentrantReadWriteLock.tryAcquireShared(int arg) 方法
        /**
         * 获取读锁(共享锁)
         * @param unused
         * @return
         */
        protected final int tryAcquireShared(int unused) {
            // 获取当前线程对象
            Thread current = Thread.currentThread();
            // 获取当前锁状态
            int c = getState();
            /**
             * exclusiveCount(c) != 0 意味着 当前有线程持有写锁(互斥锁)
             * getExclusiveOwnerThread() != current 意味着 当前持有写锁(互斥锁)的线程不是本线程
             */
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current) // 这个判断意味着,同个线程,获取完写锁后,也可以直接获取读锁,即同个线程同时可以持有写锁和读锁(支持锁降级)
                // 走到这里表示:当前有其他线程持有写锁,那么本线程获取读锁失败,直接返回-1,交由AQS将本线程放入队列内等待唤醒调度
                return -1;
            // 获取读锁数量
            int r = sharedCount(c);
            /**
             * readerShouldBlock() 判断是否需要阻塞当前线程,分公平和非公平两种情况。
             * 公平: 看在我前面是否有线程正在阻塞(AQS队列中,是否有节点正在等待唤醒),有:readerShouldBlock() == true。若无:则为 false,表示当前线程不需要阻塞。
             * 非公平:看我前面是否有线程正在等待着获取写锁(即互斥节点),若有,则readerShouldBlock() == true。若无:则为 false,表示不需要阻塞,可以直接进行cas抢锁
             * 问:非公平方式获取读锁,为什么需要判断队列里面是否有互斥节点(等待获取写锁的线程)?
             * 答:防止写线程饥饿。如果获取读锁不需要判断队列内是否有线程在等待获取写锁的话,那么如果有大量的线程在争抢读锁的情况的话,
             * 写线程将会很久拿不到写锁,将会长久阻塞,不利于写线程的执行。这就是线程饥饿,所以才需要在获取读线程先判断是否有线程正在等待获取写线程。
             * 若有,则读线程也需要进入队列排队等待,若没有,则读线程直接上去CAS抢锁
             */
            if (!readerShouldBlock() &&
                r < MAX_COUNT && // 判断读锁数量是否小于最大值
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 走到这里意味着:目前队列中没有等待的线程(公平) 或 没有等待获取写锁的线程(非公平),并且读锁数量在最大值范围内,且cas获取读锁成功
                if (r == 0) { // 读锁数量为0,即:我是第一个获取到读锁的线程
                    /**
                     * 将当前线程标识为第一个获取读锁的线程。思考:为什么要这样?有什么好处?
                     * 答:提升性能,因为维护ThreadLocal成本比较高
                     */
                    firstReader = current;
                    // 第一个获取读锁的线程对应的锁重入次数
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 进到这里来意味着:程序允许到现在,只有一个线程在一直持有锁,就是当前线程,所以将读锁重入次数++
                    firstReaderHoldCount++;
                } else {
                    /**
                     * 进到这里意味着:已经有其他线程持有读锁了,那么我需要在ThreadLocal里维护自己的读锁重入次数
                     * cachedHoldCounter:为HoldCounter类型对象,始终保存最近(最后)获取锁的线程及其锁重入次数
                     */
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                    /**
                     * 进到这里意味着:
                     * 情况1:cachedHoldCounter == null ,那么此时需要进行更新,更新最后获取锁的线程引用
                     * 情况2:当前标识的最后获取锁的线程对象不是自己本身,那么也需要对其进行更新
                     */
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                    /**
                     * 进到这里意味着:当前标识的最后获取锁的线程是自己本身,并且锁数量被释放完了(rh.count == 0)
                     * 问:为什么需要readHolds.set(rh)?
                     * 答:怎么样才能让程序进入到这里,就是最后一个线程在获取锁后,又将锁给释放掉,然后又重新获取锁。因为释放锁的同时,
                     * 会将count--,当减到0时,会将当前线程在ThreadLocal里的引用给remove掉,所以才需要这里的set操作。
                     * new Thread(()->{
                     *             readLock.lock();
                     *             readLock.unlock();
                     *             readLock.lock();
                     *         }).start();
                     */
                        readHolds.set(rh);
                    rh.count++;
                }
                // 返回抢锁成功
                return 1;
            }
            /**
             * 走到这里意味着:
             * 1、readerShouldBlock() == true 即:判断到当前线程需要进行阻塞。
             *    公平:表明前面有线程等待获取锁(无论是读锁还是写锁)。
             *    非公平:表明前面有写线程正在等待获取写锁(队列内有互斥节点)
             * 2、r < MAX_COUNT 读锁数量超过最大限制
             * 3、CAS抢锁失败
             */
            return fullTryAcquireShared(current);
        }


        /**
         * 一个完整的获取读锁(共享锁)的方法。当调用tryAcquireShared()获取锁没成功,就会调用此方法来获取读锁。
         * 这里解释一下:tryAcquireShared()方法上面讲了获取锁的流程,其实是作者为了性能做的一个优化,一般情况下是不用走到fullTryAcquireShared()这个方法的,
         * 除非在tryAcquireShared()方法中获取锁不成功就会走到这个方法来死循环重试拿锁。
         */
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            // 自旋抢锁
            for (;;) {
                // 获取当前锁状态
                int c = getState();
                // 判断当前有没有线程持有写锁(互斥锁),0表示没有,>0表示有
                if (exclusiveCount(c) != 0) {
                    // 判断当前持有写锁的线程是不是当前线程本身,若不是,则return -1 ,交由AQS管理,让本线程进入队列等待
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) { // 判断当前读线程是否需要被阻塞。
                    // Make sure we're not acquiring read lock reentrantly
                    /**
                     * 进到这里意味着:
                     * 1、公平:判断到队列内有线程正在等待获取锁
                     * 2、非公平:判断到队列内有写线程在等待获取锁
                     */
                    if (firstReader == current) {
                        // 若当前线程是已经获取过读锁的第一个线程,那么啥这里啥也不做,交由最下面的代码进行CAS操作
                        // assert firstReaderHoldCount > 0;
                    } else {
                        // 判断rh==null,为什么需要判断是否为空,是因为有可能同一个线程,在下面CAS抢锁失败后,重新循环抢锁,这时,rh就可能不为空
                        if (rh == null) {
                            // cachedHoldCounter:为HoldCounter类型对象,始终保存最近(最后)获取锁的线程及其锁重入次数。也是一种优化手段,避免维护ThreadLocal。
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                        /**
                         * 走到这里意味着:线程在下面CAS失败后,重新循环从头开始走逻辑,但是此时,当前线程的锁已经被释放了。
                         * 此时直接返回-1,让当前线程进入队列等待
                         */
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                /**
                 * 走到这里意味着:
                 * 情况1:当前持有互斥锁的线程是当前线程本身
                 * 情况2:当前线程判断到不需要阻塞(公平:前面没有任何线程在等待获取锁,那么直接CAS获取锁。非公平:前面没有任何线程在等待获取锁 或 前面没有写线程在队列里等待)
                 * 情况3:当前线程是firstReader线程
                 */
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    // 返回抢锁成功
                    return 1;
                }
            }
        }


AQS.doAcquireShared(int arg) 方法
    /**
     * 获取共享锁,不成功则将线程进行阻塞
     * @param arg
     */
    private void doAcquireShared(int arg) {
        // 向队列新增一个共享节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 自旋
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 若前驱节点为头节点,则尝试获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 若获取锁成功,则设置新的头节点,并结束程序
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 若获取锁失败,或前驱节点不是头节点,阻塞当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                // 如果上面的代码,出现异常,那么这里进行兜底处理,会判断failed标识符,若为true,则调用cancel方法将节点失效掉
                cancelAcquire(node);
        }
    }

释放共享锁

AQS.releaseShared(int arg)
    /**
     * 流程:调用子类tryReleaseShared方法尝试释放锁,释放锁成功后,则调用doReleaseShared方法,唤醒下一个节点进行抢锁
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

ReentrantReadWriteLock.tryReleaseShared(int unused)
       /**
         * 释放共享锁(读锁)
         * @param unused
         * @return
         */
        protected final boolean tryReleaseShared(int unused) {
            // 获取当前线程对象
            Thread current = Thread.currentThread();
            // 判断当前线程是否是firstReader线程
            if (firstReader == current) {
                if (firstReaderHoldCount == 1)
                    // 若本线程的读锁重入次数此时判断已经是为1了,那么本次释放就会将这个线程的读锁全部释放完,那么也需要释放本线程,所以将firstReader引用指向空
                    firstReader = null;
                else
                    // 读锁重入次数--
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                /**
                 * 进到这里意味着:
                 * 情况1:cachedHoldCounter == null ,注意,这里跟获取共享锁的逻辑不同,这里是释放锁,所以不用更新cachedHoldCounter引用
                 * 情况2:当前线程不是cachedHoldCounter线程(不是最后获取锁的线程),那么当前线程的持有锁的重入次数情况需从readHolds(ThreadLocal)里面获取
                 */
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    // 锁重入次数<=1,那么本次释放后,需要将本个线程也释放掉,那么readHolds里面也不需要再维护本线程的持锁重入情况了,所以需要remove掉
                    readHolds.remove();
                    if (count <= 0) // 释放锁太多次,导致益处,则抛异常
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }

            // 自旋 CAS释放锁,即CAS减少state值,直到CAS成功为止
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    // 释放锁成功,判断到锁重入次数为
                    return nextc == 0;
            }
        }
AQS.doReleaseShared()方法
    /**
     * 释放共享锁,唤醒后续等待的线程节点
     */
    private void doReleaseShared() {
        for (;;) {
            // 获取当前的头节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { // 若节点状态为SIGNAL(-1),则cas将状态置为初始化状态(0)
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 上面cas成功,则唤醒下一个节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

锁降级

锁降级:表示同一个线程,获取了写锁后,在释放写锁前,又获取了读锁,我们称这个过程叫做锁降级。通过对tryAcquireShared源码的分析,ReentrantReadWriteLock是允许锁降级的。

小试牛刀

  • 问题1:ReentrantReadWriteLock是如何解决线程饥饿的?
  • 问题2:firstReader和firstReaderHoldCount的作用是什么?
  • 问题3:所有读线程维护自身ThreadLocalHoldCounter,将所有的ThreadLocalHoldCounter里的count相加,是否与state的高16位的值一致?
  • 问题4:什么是锁降级?
  • 问题5:AQS的同步队列为什么是双端队列?

这里只对问题5进行一个回答,其他的问题通过上面的源码分析,都可以找到答案。

AQS的同步队列为什么是双端队列?
我们想象一下如果是单向队列的话,删除一个节点是怎么样的流程?
1、单向链表删除其中一个节点,需要从头遍历到当前节点,并记录我的上一个节点引用,再将上一个节点的next指向我的下一个节点
2、通过源码我们知道,当线程进入队列时,要判断上一个节点的状态是否为signal,若为signal,则当前线程可以安心阻塞
3、双向链表,方便唤醒下一个节点。这里说一下:原本AQS只是单向链表,并且只是维护了一个prev前指针,那么此时你要唤醒你的下一个节点就不好唤醒了,问题就跟删除一个节点一样了。

结论:AQS的同步队列为什么是双端队列?AQS程序是Doug Lea设计,用双向队列只不过是更加符合他的设计,并且更加容易实现罢了。那么方便啥呢?方便你去获取前一个节点的状态,也方便你去唤醒下一个节点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容