该包的结构如图
其中有java6新加入的一个类LockSupport,这个类已经在自旋锁在高并发的用处,实现各种锁的最后提到过,也是基于Unsafe类的实现 主要的两个方法是
public static void unpark(Thread thread);
public static void park(Object blocker) {
public static void parkNanos(Object blocker, long nanos) ;
public static void parkUntil(Object blocker, long deadline) ;
public static Object getBlocker(Thread t) ;
public static void park() ;
public static void parkNanos(long nanos);
public static void parkUntil(long deadline);
基本是基于
unsafe.putObject
unsafe.unpark
unsafe.park
来实现的。由于已经分析过。所以这里不再说了。
这个包下的类 具体的实现类只有ReentrantLock和ReentrantReadWriteLock(除去单独讲的LockSupport),其余的都是抽象类和接口,做一下归类
抽象类 AbstractOwnableSynchronizer 、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer
接口 Lock、ReadWriteLock、Condition
抽象类AbstractOwnableSynchronizer 该类是主要定义让线程以独占方式拥有同步器,此类为创建锁和相关同步器提供了基础,类本身不管理或使用此信息 很简单的两个方法setExclusiveOwnerThread(Thread t)设置当前拥有独占访问的线程 和getExclusiveOwnerThread() 返回由 setExclusiveOwnerThread最后设置的线程;如果从未设置,则返回 null。
抽象类AbstractQueuedSynchronizer 继承自AbstractOwnableSynchronizer该类为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架 和是MCSLock的扩展。该类有个属性 private volatile int state;
是The synchronization state.
同步的状态表示。拥有正常的set和get方法后,还有个compareAndSetState
方法,是基于unsafe
类的compareAndSwapInt
来实现的,由此类实现同步。
相似的AbstractQueuedLongSynchronizer类的属性定义是private volatile long state;
可以看出是LONG型的属性值。调用的是unsafe.compareAndSwapLong
,所以二者的区别就基本知道了,其它没啥区别。所以我们只要继续分析AbstractQueuedLongSynchronizer类即可
该类有Node 内部类和ConditionObject 内部类,其中Node类定义了两种模式的node属性,
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
一种是独占的一种是共享的。还定义了四种等待状态
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
- 节点等待状态被取消掉了 值为1;
- 节点等待状态标示 等待获得锁的线程去unparking 通知 值为-1;
- 节点等待状态 为处于条件等待状态 值为-2;
- 节点等待状态为节点状态需要向后传播,一般是释放共享需要传播给其他节点,一般是头节点在doReleaseShared 去确保传播继续下去即使其他操作已经介入了。值为-3;
- 还有值为0的情况 这个表明处于非上述四种状态。
这里还有volatile Node prev;和volatile Node next;
队列的前后节点
这里AbstractQueuedSynchronizer也有 private transient volatile Node tail;和 private transient volatile Node head;
这个是首尾节点 不要混淆
结构图大致为
由前面MCSLock的知识知道
volatile Thread thread;
是标示是否为当前线程获得锁的这里还有个
Node nextWaiter;
这个属性要结合Condition去理解,该节点关联的是等待某条件上的下个节点。因为 条件队列只有是独占模式下才可以被访问到。再先来看一下内部类ConditionObject 该类实现了Condition接口。
该对象内部也维护了一个队列,
private transient Node firstWaiter;
private transient Node lastWaiter;
Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。
JDK官方的解释是
Condition将 Object监视器方法(wait、notify和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized方法和语句的使用,Condition替代了 Object 监视器方法的使用。条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像 Object.wait做的那样。Condition实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition实例,请使用其 newCondition()方法。
提供了
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
方法,await开头的都是和await方法一致,只不过添加了写条件而已,await会在当前lock的队列中持有锁的线程上释放锁资源,并新建Condition节点加到Condition队列尾部,阻塞当前线程。
signal()和signalAll()就是将Condition的头节点移动到Lock等待节点尾部,让其等待再次获取锁。
主要看await方法和signal方法是怎么实现的即可
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
可以看到这里借助了LockSupport.park(this);
方法。其他的方法主要是做各种检测 这里就不说了
signal方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
这里调用了doSignal方法
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
看一下while循环调用到的方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
可以看出还是借助了unsafe和 LockSupport.unpark方法实现锁释放。
看到一篇不错的博文对Condition和AbstractQueuedSynchronizer的阐述,这里借助下他的图
以下是AQS队列和Condition队列的出入结点的示意图,可以通过这几张图看出线程结点在两个队列中的出入关系和条件。
I.初始化状态:AQS等待队列有3个Node,Condition队列有1个Node(也有可能1个都没有)
II.节点1执行Condition.await()
1.将head后移
2.释放节点1的锁并从AQS等待队列中移除
3.将节点1加入到Condition的等待队列中
4.更新lastWaiter为节点1
III.节点2执行signal()操作
5.将firstWaiter后移
6.将节点4移出Condition队列
7.将节点4加入到AQS的等待队列中去
8.更新AQS的等待队列的tail
最后给出官方的Condition例子
package com.alibaba.otter.canal.common;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
再看下AbstractQueuedSynchronizer 留给子类去实现的方法,看一下官方文档的说法
为了将此类用作同步器的基础,需要适当地重新定义以下方法,这是通过使用 getState setState和/或 compareAndSetState方法来检查和/或修改同步状态来实现的
- tryAcquire(int)
- tryRelease(int)
- tryAcquireShared(int)
- tryReleaseShared(int)
- isHeldExclusively()
默认情况下,每个方法都抛出 UnsupportedOperationException。这些方法的实现在内部必须是线程安全的,通常应该很短并且不被阻塞。定义这些方法是使用此类的 唯一 受支持的方式。其他所有方法都被声明为 final
,因为它们无法是各不相同的。
也就是说子类需要自己实现这些方法去构造共享锁和独占锁,怎么实现呢?就是利用getState 、setState、和compareAndSetState这几个AbstractQueuedSynchronizer中已经实现好的方法。
我们看下两个实现类ReentrantLock和ReentrantReadWriteLock,这算是比较典型的两个例子了。一个是可重入锁,是独占锁的方式,ReentrantReadWriteLock是共享锁的实现。完美
ReentrantLock中还实现了公平锁Sync和非公平锁NonfairSync
都是借助继承了AbstractQueuedSynchronizer的内部类Sync来实现 看到实现了tryAcquire和tryRelease方法
其中独占方式的判断
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
这样我们就可以简单的利用ReentrantLock的lock和unlock实现锁啦
再看下ReentrantReadWriteLock 这个类 这个类实现了ReadWriteLock 有两种锁readerLock和writerLock
看下内部类ReadLock 主要的方法都是共享的
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
共享锁时通过计数的方式实现的,最大可以到
static final int SHARED_SHIFT = 16;
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
可以看到这里还涉及到了
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = Thread.currentThread().getId();
}
再看下
主要的方法都是独占的
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
这二者都借助
abstract static class Sync extends AbstractQueuedSynchronizer
还是出自AbstractQueuedSynchronizer
这种读写锁,读共享,写独占可以降低锁的粒度 重入方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock想要获得WriteLock则永远都不要想。 WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能.ReadLock可以被多个线程持有并且在作用时排斥任何的WriteLock,而WriteLock则是完全的互斥。这一特性最为重要,因为对于高读取频率而相对较低写入的数据结构,使用此类锁同步机制则可以提高并发量.不管是ReadLock还是WriteLock都支持Interrupt,语义与ReentrantLock一致。WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常,这种锁的用处很多,缓存方面应该更明显 在网上找到的个demo 这里附上
package com.alibaba.otter.canal.common;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private Map<String, Object> map = new HashMap<String, Object>();//缓存器
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
}
public Object get(String id){
Object value = null;
rwl.readLock().lock();//首先开启读锁,从缓存中去取
try{
value = map.get(id);
if(value == null){ //如果缓存中没有释放读锁,上写锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
if(value == null){
value = "aaa"; //此时可以去数据库中查找,这里简单的模拟一下
}
}finally{
rwl.writeLock().unlock(); //释放写锁
}
rwl.readLock().lock(); //然后再上读锁
}
}finally{
rwl.readLock().unlock(); //最后释放读锁
}
return value;
}
}