java.util.concurrent源码阅读 05 ReentrantReadWriteLock

ReentrantReadWriteLock与ReentrantLock

说到ReentrantReadWriteLock, 首先要做的是与ReentrantLock划清界限. 它和后者都是单独的实现, 彼此之间没有继承或实现的关系.
ReentrantLock实现了标准的互斥操作, 也就是一次只能有一个线程持有锁, 也即所谓独占锁的概念. 显然这个特点在一定程度上面减低了吞吐量, 实际上独占锁是一种保守的锁策略, 在这种情况下任何"读/读", "写/读", "写/写"操作都不能同时发生.但是同样需要强调的一个概念是, 锁是有一定的开销的, 当并发比较大的时候, 锁的开销就比较客观了. 所以如果可能的话就尽量少用锁, 非要用锁的话就尝试看能否改造为读写锁.

ReadWriteLock描述的是: 一个资源能够被多个读线程访问, 或者被一个写线程访问, 但是不能同时存在读写线程. 也就是说读写锁使用的场合是一个共享资源被大量读取操作, 而只有少量的写操作(修改数据). ReadWriteLock接口的代码如下:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     * @return the lock used for reading.
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     * @return the lock used for writing.
     */
    Lock writeLock();
}

ReentrantReadWriteLock的特性

ReentrantReadWriteLock有以下几个特性:
1.公平性
非公平锁(默认) 这个和独占锁的非公平性一样, 由于读线程之间没有锁竞争, 所以读操作没有公平性和非公平性, 写操作时, 由于写操作可能立即获取到锁, 所以会推迟一个或多个读操作或者写操作. 因此非公平锁的吞吐量要高于公平锁.
公平锁利用AQS的CLH队列, 释放当前保持的锁(读锁或者写锁)时, 优先为等待时间最长的那个写线程分配写入锁, 当前前提是写线程的等待时间要比所有读线程的等待时间要长. 同样一个线程持有写入锁或者有一个写线程已经在等待了, 那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞, 直到最先的写线程释放锁. 如果读线程的等待时间比写线程的等待时间还有长, 那么一旦上一个写线程释放锁, 这一组读线程将获取锁.
2.重入性
读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁. 当然了只有写线程释放了锁, 读线程才能获取重入锁.
写线程获取写入锁后可以再次获取读取锁, 但是读线程获取读取锁后却不能获取写入锁.
另外读写锁最多支持65535个递归写入锁和65535个递归读取锁.
3.锁降级
写线程获取写入锁后可以获取读取锁, 然后释放写入锁, 这样就从写入锁变成了读取锁, 从而实现锁降级的特性.
4.锁升级
读取锁是不能直接升级为写入锁的. 因为获取一个写入锁需要释放所有读取锁.
5.锁获取中断
读取锁和写入锁都支持获取锁期间被中断. 这个和独占锁一致.
5.条件变量
写入锁提供了条件变量(Condition)的支持, 这个和独占锁一致, 但是读取锁却不允许获取条件变量, 将得到一个UnsupportedOperationException异常.

ReentrantReadWriteLock中的state

在ReentrantLock中该字段用来描述有多少线程获持有锁. 在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数), 在共享锁的时代就是持有锁的数量. ReadWriteLock的读、写锁是相关但是又不一致的, 所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量. 显然现在一个state就不够用了. 于是在ReentrantReadWrilteLock里面将这个字段一分为二, 高位16位表示共享锁的数量, 低位16位表示独占锁的数量(或者重入数量). 2^16-1=65536, 这就是上节中提到的为什么共享锁和独占锁的数量最大只能是65535的原因了.

本文主要分析以下几个过程:

ReentrantReadWriteLock的创建(公平锁/非公平锁)
上锁: lock()(读取锁/写锁)
解锁: unlock()(读取锁/写锁)

ReentrantReadWriteLock的内部结构

ReentrantReadWriteLock的内部主要结构如下:

ReentrantReadWriteLock-->ReadWriteLock
NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
ReadLock/WriteLock-->Lock

注意 对应关系: "子类/实现"-->"父类/接口". 其中, NonfairSync, FairSync,Sync,ReadLock和WriteLock是ReentrantReadWriteLock的内部类.

ReentrantReadWriteLock的创建

ReentrantReadWriteLock的构造函数

public ReentrantReadWriteLock() {
    this(false);
}

/**
 * Creates a new {@code ReentrantReadWriteLock} with
 * the given fairness policy.
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

可以看到与ReentrantLock一样, 锁的主体部分依然是Sync(FairSync/NonFaireSync), 但是增加了readerLock和writerLock两个实例. 参看ReadLock和WriterLock的构造函数可知两个实例共享一个sync实例.

/**
* ReadLock构造函数
*/
protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}
/**
* WriteLock构造函数
*/
protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}

写入锁的获取

主要流程与ReentrantLock类似.

1.持有锁线程数非0(c=getState()不为0), 如果写线程数(w)为0(那么读线程数就不为0)或者独占锁线程(持有锁的线程)不是当前线程就返回失败, 或者写入锁的数量(其实是重入数)大于65535就抛出一个Error异常. 否则进行2.
2.持有锁线程数为0(c=0的情况)时, 如果当前线程需要阻塞(writerShouldBlock, 这是FairSync与NonFairSync的区别)那么就返回失败, 如果增加写线程数失败也返回失败. 否则进行3.
3.设置独占线程(写线程)为当前线程, 返回true.

涉及到的函数:
ReentrantReadWriteLock.WriteLock->lock()

public void lock() {
    sync.acquire(1);
}

AbstractQueuedSynchronizer->acquire(int arg)

public final void acquire(int arg) {
    if (!tryAcquire(arg) && 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

ReentrantReadWriteLock.Sync->tryAcquire(int acquires)(主体函数)

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

接下来看writerShouldBlock在ReentrantReadWriteLock.FairSync与ReentrantReadWriteLock.NonFairSync中的实现.
ReentrantReadWriteLock.NonFairSync->writerShouldBlock()

final boolean writerShouldBlock() {
    return false; // writers can always barge
}

即ReentrantReadWriteLock.NonFairSync在获取写入锁的过程中永远不会阻塞.

ReentrantReadWriteLock.FairSync->writerShouldBlock()

final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

ReentrantReadWriteLock.FairSync的实现与ReentrantLock一致, 如果AQS队列不为空并且当前线程不是在AQS的队列头那么就阻塞线程, 直到队列前面的线程处理完锁逻辑.

读取锁的获取

读取锁的获取过程比写入锁稍显复杂,主要流程如下:
1.如果写线程持有锁(也就是独占锁数量不为0), 并且独占线程不是当前线程, 那么就返回失败. 因为允许写入线程获取锁的同时获取读取锁. 否则进行2.
2.如果读线程请求锁数量达到了65535(包括重入锁), 那么就跑出一个错误Error, 否则进行3
3.如果读线程不用等待(实际上是是否需要公平锁), 并且增加读取锁状态数成功, 那么就返回成功, 否则进行4.
4.fullTryAcquireShared(内部包含一个无限循环, 实际上是过程3的不断尝试直到CAS计数成功或者被写入线程占有锁)

涉及的代码如下:
ReentrantReadWriteLock.ReadLock->lock()

public void lock() {
    sync.acquireShared(1);
}

AbstractQueuedSynchronizer->acquireShared(int arg)

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

ReentrantReadWriteLock.Sync->tryAcquireShared(int unused)

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

ReentrantReadWriteLock.Sync->fullTryAcquireShared(Thread current)

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId()) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

读取锁的释放

读取锁的释放其实就是一个不断尝试的CAS操作, 直到修改状态成功. 前面说过state的高16位描述的共享锁(读取锁)的数量, 所以每次都需要减去2^16, 这样就相当于读取锁数量减1. 实际上SHARED_UNIT=1<<16.

ReentrantReadWriteLock.ReaderLock->unlock()

public  void unlock() {
    sync.releaseShared(1);
}

AbstractQueuedSynchronizer->releaseShared(int arg)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

ReentrantReadWriteLock.Sync->tryReleaseShared(int unused)

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

HoldCounter

HoldCounter保存了当前线程持有共享锁(读取锁)的数量, 包括重入的数量. 那么这个数量就必须和线程绑定在一起.
在Java里面将一个对象和线程绑定在一起, 就只有ThreadLocal才能实现了. 所以毫无疑问HoldCounter就应该是绑定到线程上的一个计数器.

static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = Thread.currentThread().getId();
}

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

小结

使用ReentrantReadWriteLock可以推广到大部分读, 少量写的场景, 因为读线程之间没有竞争, 所以比起sychronzied, 性能好很多.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容