概念理解
1.AQS-- 指AbstractQueuedSynchronizer类。
AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。继承AbstractOwnableSynchronizer类
2.AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
(01)独占锁-- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。
(02)共享锁-- 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。
3.CLH队列-- Craig, Landin, and Hagersten lock queue
CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。
CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。
Node是CLH队列的节点,它的几个成员变量:waitStatus/prev/next/thread/nextWaiter
(1)每个Node都会与一个线程对应
(2)prev和next分别代表上一个等待节点和下一个等待节点
(3)waitStatus代表线程等待状态
1->CANCELLED,线程已被取消;
-1->SIGNAL,表示当前线程的后续线程需要被unpark(唤醒);
-2->CONDITION,线程处于Condition休眠状态,等待被唤醒;
-3->PROPAGATE,其他线程获得“共享锁”;
(4)nextWaiter的值为EXCLUSIVE代表“独占锁”,SHARED代表“共享锁”。
4.CAS函数-- Compare And Swap
CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。
源码分析
lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:
finalvoidlock() {
acquire(1);
}
其实很多“锁”的实现,本质上来讲都是因为各种“锁”的类拥有一个继承了AQS的syns成员变量,通过重写tryAcquire()类似的方法来完成个性化的需要,最终都是通过syns来实现锁的机制。
AQS类中acquire方法:
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
依次解析
(1)tryAcquire
(2)addWaiter
(3)acquireQueued
(4)selfInterrupt
(1)ReentrantLock的FairSync中的tryAcquire的实现:
protected final booleantryAcquire(intacquires) {
finalThread current = Thread.currentThread();
intc = getState(); //获取锁的状态
if(c ==0) { //c=0代表“锁没有被任何线程占有”
if(!hasQueuedPredecessors() && //如果是CLH队列中第一个线程,设置“锁”状态,同时设置“锁”的拥有者为当前线程
compareAndSetState(0,acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if(current == getExclusiveOwnerThread()) { //如果“锁”的拥有者已经是当前线程,设置状态即可,可重入
intnextc = c + acquires;
if(nextc <0)
throw newError("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
(1)tryAcquire的作用就是尝试去获取锁。获取成功返回true,失败则返回false。
(2)addWaiter(Node.EXCLUSIVE):
privateNodeaddWaiter(Node mode) {
Node node =newNode(Thread.currentThread(),mode);
// Try the fast path of enq; backup to full enq on failure
Node pred =tail;
if(pred !=null) { //如果CLH队列不为空,添加到队尾
node.prev= pred;
if(compareAndSetTail(pred,node)) {
pred.next= node;
returnnode;
}
}
enq(node); // 自旋。队列为空创建空队列,否者插入队尾
returnnode;
}
(2)addWaiter的作用是,把当前线程加入到CLH队列中。意味着当前线程加入到获取“锁”的等待线程队列中。
(3)acquireQueued():
final booleanacquireQueued(finalNode node, intarg) {
booleanfailed =true;
try{
booleaninterrupted =false; //当前线程在休眠时,有没有被中断过
for(;;) { // 又一个自旋操作
finalNode p = node.predecessor(); //获取前一节点
if(p ==head&& tryAcquire(arg)) { //不断自旋,当发现自己是队列的老二并且头节点=p是“公平锁”的需要!!!p=head 保证了“当前线程”被唤醒是因为被前继节点unpark()
setHead(node);
p.next=null;// help GC
failed =false;
returninterrupted;
}
if(shouldParkAfterFailedAcquire(p,node) && //返回当前线程是否应该被阻塞,当前继节点是SIGNAL状态,意味着当前线程要被unpack唤醒,否者就往前找waitStatus不是CANCEL的线程作为前继节点
parkAndCheckInterrupt()) //通过LockSupport.park()阻塞当前线程,并且返回中断状态 注:thread.interrupted 会返回中断状态并且清空中断标
interrupted =true;
}
}finally{
if(failed)
cancelAcquire(node);
}
}
private static booleanshouldParkAfterFailedAcquire(Node pred,Node node) {
intws = pred.waitStatus;
if(ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if(ws >0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do{
node.prev= pred = pred.prev;
}while(pred.waitStatus>0);
pred.next= node;
}else{
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
}
return false;
}
private final booleanparkAndCheckInterrupt() {
LockSupport.park(this);
returnThread.interrupted(); 注:LockSupport()中的park(),unpack() 和 object中的wait(),notify()作用相似,但是更加轻量级,wait(),notify()必须先通过Synchronized锁获取
}
(3)acquireQueue() 的作用是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有被中断过。
(4)acquireQueued()的parkAndCheckInterrupt()方法清除了中断状态
static voidselfInterrupt() {
Thread.currentThread().interrupt();
}
(4)selfInterrupt()的作用就是当前线程自己产生一个中断。
总结:
先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到"CLH队列"末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
其他:
ReentractLock类的unlock()封装了AQS的release()方法,锁的释放,形式与acquire()类似,这里就不再说了。