AQS
AQS即AbstractQueuedSynchronizer(抽象队列同步器)。AQS是ReentrantLock
,ReentrantReadWriteLock
,CountDownLatch
和Semaphore
等多线程同步工具的基础。
AQS内部维护了一个双向链表,用于保存因为等待获取锁而阻塞的线程,所有获取同步状态失败的线程会依次追加到链表末尾。
锁的类型
AQS可用来构造各种各样的锁。这些锁主要分为几种类型呢?
- 共享锁/独占锁:共享锁是可以有多个线程持有同一个共享资源的锁。独占锁是同一个共享资源,同一时间只能有一个线程获得锁。
- 公平锁/非公平锁:公平锁是各个需要获取锁的线程,会按照申请锁的顺序来分配优先级。先申请锁的线程先获得锁。非公平锁不遵循这个规则,允许存在“插队”的情况。
- 重入锁/非重入锁:可重入锁就是一个已经获得锁的线程,可以再次获得锁。Java的
synchronized
关键字和ReentrantLock
都是可重入锁。 - 乐观锁/悲观锁:乐观锁和悲观锁实质操作共享资源时的假想状态不同。乐观锁会认为资源没有被其他线程操作,但是在操作的时候需要一定的机制来验证资源确实没有被其他线程操作。例如原子CAS(Compare And Set)操作,首先验证资源和期待值是否相同,然后再把资源修改为目标值。如果资源和期待值不相同,CAS操作会失败。悲观锁则是在操作资源前先占有该资源,不允许它被其他资源访问,然后再操作资源。典型的就是Java的
synchronized
关键字和ReentrantLock
。
CAS操作
CAS即Compare And Set,字面意思是比较并设置。CAS操作是原子性的。
CAS操作用于更新值。首先对比变量原值,如果相同,再设置变量为新的值,中间过程不会被其他线程打断。
CAS操作底层由Unsafe实现。Unsafe为sun公司提供的一系列内存操作native方法的工具包,由于使用了类似C语言指针操作内存的方式,性能很高,但是具有很大的风险。通常来说我们无法在自己的应用中直接使用这个类。
Unsafe提供了如下几个方法:
- compareAndSwapObject(Object o, long offset, Object expected, Object x);
- compareAndSwapInt(Object o, long offset, int expected, int x);
- compareAndSwapLong(Object o, long offset, long expected, long x);
这三个方法针对Object,int和long三种类型提供了CAS功能。
接下来有个问题,这些方法中的offset是如何获取的?
比如我们有一个class:
public class Demo {
private volatile int first;
private volatile int second;
}
获取second这个int变量offset的方法为:
long offset = unsafe.objectFieldOffset(Demo.class.getDeclaredField("second"));
使用这个offset,我们就可以通过Unsafe
等帮助来使用CAS设置second
这个变量的内容了。
compareAndSwapInt(demo, offset, 0, 1);
compareAndSwap
系列方法是有返回值的。如果返回true
说明CAS操作成功,如果为false
说明操作失败,通常原因为目标变量的值和期待的值不相等(有其他线程同时操作这个变量)。
AQS的几个成员变量
下文要提到的3个AQS的成员变量如下:
- head:Node类型,用来表示AQS内部双向链表的头结点。Node会在下一节介绍。
- tail:双向链表的尾节点。
- status:同步状态。根据不同锁的类型可以有不同的含义。例如在
ReentrantLock
中代表某个线程持有锁的次数(重入深度)。Semaphore
代表允许当前可用的并发资源数(permits)等。
Node
Node
是AQS中最关键的内部类,用来代表双向链表的节点。Node中有如下成员变量:
- waitStatus:等待状态。Node类中有5种等待状态的定义,后面会讲它们的含义。
- prev:指向前一个Node的指针。
- next:指向后一个Node的指针。
- thread:Node包装的线程。即队列中排队等待获取锁的线程。
- nextWaiter:下一个等待节点的状态。有两个值:
Node.SHARED
和Node.EXCLUSIVE
分别代表下一个节点需要获取共享锁和独占锁。
Node
的状态有5种:
- SIGNAL:后置节点是阻塞状态,或者将要进入阻塞状态。
- CANCELLED:因为超时或者中断的原因被取消。被取消的节点不会再被阻塞。
- CONDITION:调用ConditionObject的await方法之时,线程被包装为node,waitStatus设置为CONDITION。这个node进入condition队列。
- PROPAGATE:表明需要无条件向后传递唤醒节点动作,只有head节点可能为这个状态。由
doReleaseShared
方法负责设置。 - 0:节点刚入队,或者是准备唤醒head之后的节点时,将head节点状态设置为0。
AQS几个重要的方法
AQS作为一个同步框架,可以衍生出不同的线程同步工具。AQS是一个抽象类,是否能够获取同步状态,唤醒等待线程等操作需要在AQS各个行为不同的子类中分别定义。这些方法最重要的是如下4个:
- tryAcquire():尝试获取独占同步状态。如果获取成功,返回true。否则线程接下来要在AQS队列中排队等待。
- tryRelease():尝试释放独占同步状态。如果需要唤醒后续等待的线程,返回true,否则返回false。
- tryAcquireShared():尝试获取共享同步状态。如果获取失败,返回负数。如果获取同步状态后,其他线程无法再获取共享同步状态,返回0,如果获取成功后其他线程仍可以继续获取共享同步状态,返回正数。
- tryReleaseShared():尝试释放共享同步状态。如果需要唤醒后续等待的线程,返回true,否则返回false。
- isHeldExclusively():同步状态是否被当前线程持有。
除此之外,AQS还提供了几个常用的工具方法供实现类调用。这些方法如下:
- hasQueuedPredecessors():是否有其他线程,或者将要有其他线程排在队列前面。这个方法对于公平锁的实现十分关键。
- apparentlyFirstQueuedIsExclusive():第一个排队的线程是否要获取独占同步状态。该方法在
ReentrantReadWriteLock
中使用。
这些方法的代码在后面用到的时候分析。
独占模式执行过程
获取同步状态
独占模式使用acquire
方法获取同步状态。
方法代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法的逻辑如下所示:
- 执行
tryAcquire
方法。这个方法返回是否能够获取到同步状态。需要在用户实现类中编写相关逻辑。 -
tryAcquire
返回true,acquire方法直接返回,代表获取到同步状态,线程可以继续执行。 - 如果
tryAcquire
方法返回false。当前thread会被封装为Node,加入到等待队列。 - 入队列后执行
acquireQueued
方法,尝试获取同步状态,如果这个节点前置节点的状态为SIGNAL,会进入阻塞状态。如果前置节点不是SIGNAL也不是CANCELLED状态,CAS设置它为SIGNAL状态,然后自己进入阻塞状态。
入队等待操作
addWaiter
方法
addWaiter
将线程包装为一个node,放入AQS队列的tail。
注意:这个地方用到了Node类的构造函数Node(Thread thread, Node mode)
。其中mode决定了同步模式,可选值为Node.EXCLUSIVE
(独占,同时只有一个线程处于同步状态)和Node.SHARED
(可以有多个线程处于同步状态)。
// 对于tryAcquire方法,由于它是独占模式,这里参数传入的是Node.EXCLUSIVE
private Node addWaiter(Node mode) {
// 将当前线程包装为一个Node,使用独占模式: {node.prev=null, node.next=null}
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 下面这一段是快速入队尾的逻辑,如果失败需要自旋入队尾
Node pred = tail;
// 如果tail不为空
if (pred != null) {
// 设置node前置节点为tail节点
node.prev = pred; // {node.prev=tail, node.next=null}
// 使用CAS设置新创建的节点为tail
if (compareAndSetTail(pred, node)) {
// 如果设置成功,设置原来tail节点的下一个节点为这个新添加的node
// 然后返回
pred.next = node;
return node;
}
}
// 如果tail为空,或者是CAS设置tail失败,执行这段逻辑
enq(node);
// 返回新创建的node
return node;
}
enq(node)
方法,该方法负责初始化队列的head和tail,同时将参数node设置为tail。相当于慢速版的设置tail方法。代码如下:
private Node enq(final Node node) {
// 自旋反复尝试
for (;;) {
Node t = tail;
// 如果tail为null,说明AQS的等待队列第一次使用,没有任何node,尚未初始化,执行初始化逻辑
if (t == null) {
// 创建一个空的node,并将其设置为tail和head
// 然后再次执行该循环,进入else分支
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 设置node的前置节点为tail
node.prev = t;
// CAS尝试入队尾
// 如果失败,自旋反复尝试直到成功
if (compareAndSetTail(t, node)) {
// 如果成功,设置原来tail的后置节点为node,并退出循环返回node的前置节点
t.next = node;
return t;
}
}
}
}
用一个例子说明:
如果thread1第一个入队,首先初始化队列:
head:new Node()
tail:new Node()
然后CAS设置thread1所在的node为tail:
head:{id=0, prev=null, next=1, thread=null}
tail: {id=1, prev=0, next=null, thread=thread1}
注意:这里id是为了指代方便故意添加的,实际node没有这个属性。
同理,如果再入队thread2,队列节点依次为:
head:{id=0, prev=null, next=1, thread=null}
{id=1, prev=0, next=2, thread=thread1}
tail: {id=2, prev=1, next=null, thread=thread2}
acquireQueued
方法
acquireQueued
方法负责:
- 如果node位于AQS等待队列head之后(队列中第二个节点为排队阻塞状态,head节点为已得到锁正在运行的节点),尝试获得同步状态(执行
tryAcquire
)。 - 如果获取锁成功,将自己所在node设置为head,原head节点废弃。
- 如果自己不是队列第二个节点(有其他线程在前面排队),需要检查自己是否需要阻塞,如果需要,阻塞该线程。
代码如下:
// 返回当前线程是否被中断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取node的前置节点
final Node p = node.predecessor();
// 如果node的前置节点为head,尝试获取同步状态。head是一个虚节点,真正排队等待的节点位于head之后
if (p == head && tryAcquire(arg)) {
// tryAcquire如果成功,将node设置为head
setHead(node);
// 设置原来head节点的next为null,方便GC
// 原来head节点被移除队列
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果node的前置节点不为head或者tryAcquire失败
// 执行shouldParkAfterFailedAcquire判断是否需要阻塞
// parkAndCheckInterrupt会阻塞当前线程,如果线程被中断,方法返回true,然后interrupted变量被设置为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果中间发生异常,取消获取同步状态
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
方法。
该方法判断node前置节点的waitStatus。有如下3种情况:
- 如果前置节点waitStatus为SIGNAL,返回true,说明thread可以被阻塞。
- 如果前置节点waitStatus大于0,说明节点被cancelled,把该节点从队列中排除。然后从后向前,排除所有连续的状态为cancelled的节点。
- 如果前置节点waitStatus为其他值,设置前置节点状态为SIGNAL。
代码如下所示:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前置节点的状态为SIGNAL,返回true,可以阻塞当前线程
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 如果前置节点的状态为cancelled,则从后向前遍历,找到最近的一个状态不为cancelled的节点
// 并将该节点的下一个节点设计为当前节点。(中间一连串状态为cancelled的节点被移除队列)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// CAS设置前置节点的状态为SIGNAL
// 可能会问这里为何不用确保CAS成功
// 因为执行到这里,方法会返回false,线程不会阻塞
// 外层方法acquireQueued会保持自旋,再次调用此方法
// CAS会再次执行,直到成功
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
下面举例说明下整个步骤。
队列初始状态为:
head: {thread: null, ws: 0}
tail: {thread: thread1, ws: 0}
第一次执行完该方法后,由于前置节点的状态为0,将前置节点的状态设置为SIGNAL,返回false。
现在队列中的节点为:
head: {thread: null, ws: SIGNAL}
tail: {thread: thread1, ws: 0}
由于shouldParkAfterFailedAcquire返回false,acquireQueued方法会重复执行。会再次tryAcquire,若成功,则获取到同步状态,方法返回,线程继续执行。若失败,执行shouldParkAfterFailedAcquire
。这时候当前节点的前置节点状态为SIGNAL,会执行parkAndCheckInterrupt,线程会被阻塞。
parkAndCheckInterrupt
方法负责阻塞线程。如果线程被唤醒,返回线程是否被中断。代码如下:
private final boolean parkAndCheckInterrupt() {
// 当前线程阻塞,直到被unpark,或者被中断时候返回
// 返回线程的中断状态
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park()
方法用于手工阻塞线程的运行。
最后分析下cancelAcquire
的逻辑。
cancelAcquire
方法用于取消节点的排队过程。代码和分析如下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// node为null,返回
if (node == null)
return;
// 清除node的thread变量
node.thread = null;
// Skip cancelled predecessors
// 剔除队列中在该node之前,连续的状态为CANCELLED的node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取队列中从后向前最近一个状态不为CANCELLED节点的下一个节点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置node状态为CANCELLED
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
// 如果node在队尾,CAS设置对尾为pred(队列中从后向前最近一个状态不为CANCELLED节点)
// 如果CAS成功,再CAS设置tail节点的next节点为null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 如果pred不是head
// 确保pred节点的状态为SIGNAL
// 同时pred的thread不能为null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 队列跳过这个CANCELLED的node
// 设置pred节点的next为node的next节点
compareAndSetNext(pred, predNext, next);
} else {
// 否则(pred为head或者CAS设置pred状态为SIGNAL失败),需要唤醒后面排队的线程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
释放同步状态
release方法
释放同步状态为release方法。代码和解释如下:
public final boolean release(int arg) {
// 如果tryRelease返回true,说明需要唤醒队列中排队的线程
if (tryRelease(arg)) {
Node h = head;
// head不为空并且状态不为0(此时为SIGNAL)之时,唤醒后续节点
// (状态为SIGNAL之后的节点才会被park)
if (h != null && h.waitStatus != 0)
// 唤醒head节点的后置节点
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor方法
unparkSuccessor
方法设置node节点的状态为0,然后向后找到距离node最近的状态不为cancelled的节点,唤醒它的线程。
代码如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 如果是除cancelled外的其他状态,设置head状态为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// s为head的后置节点
Node s = node.next;
// 如果s节点为null或者状态为cancelled
// 从后向前遍历,找到距离head最近状态不为cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒这个节点的线程
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式执行过程
获取同步状态
获取同步状态的入口为acquireShared
方法。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
其中tryAcquireShared
的返回值含义为:
0 获取同步状态成功,且有剩余的状态可以被其它线程获取
- = 0 获取同步状态成功,没有剩余的状态
- < 0 获取同步状态失败
如果tryAcquireShared
返回值小于0,说明说去同步状态失败,执行doAcquireShared
方法。
doAcquireShared
对应独占模式的acquireQueued
方法。逻辑基本类似。
代码如下:
private void doAcquireShared(int arg) {
// 节点加入队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 获取tryAcquireShared返回值
int r = tryAcquireShared(arg);
if (r >= 0) {
// 和独占模式的不同之处,这里需要向后置节点传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
方法用于设置node为head,并且向后续节点传递release事件。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置node为新的head节点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 如果propagate > 0
// 或者原先head节点为null,或者状态不为cancelled之时,都会执行
// 或者目前head节点为null,或者状态不为cancelled之时,也会执行(head可能会在运行行代码之前被修改)
// 这段逻辑为传播过程,唤醒后续的节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果s节点为null(node位于队列尾部),或者后置节点类型为共享节点的话,唤醒后面的节点
if (s == null || s.isShared())
doReleaseShared();
}
}
这里可能有疑问,为什么要传递release事件。以ReentrantReadWriteLock为例,这个lock的read锁是共享的,write锁是独占的。如果write锁释放之后,某个线程获得了read锁,这时候通过传递release事件,后续等待read锁的线程才能够唤醒并继续执行。
doReleaseShared
方法:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 自旋
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果head节点状态为SIGNAL
if (ws == Node.SIGNAL) {
// 使用CAS操作,设置head状态为0,如果失败再次尝试
// (1)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后置节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 设置h节点状态为PROPAGATE,表示下一次acquireShared执行时此节点无条件propagate
// setHeadAndPropagate方法中判断时该node的waitStatus小于0,可以向后propagate
// 共享模式中的doReleaseShared可能会被多个线程同时执行
// 如果一个线程刚执行完(1),另一线程就会遇到ws为0的情况
continue; // loop on failed CAS
}
// 这里用来传播release操作
// setHeadAndPropagate会改变head,如果有线程tryAcquire成功,执行完setHeadAndPropagate方法的setHead方法,head会发生变化,说明此时有线程被唤醒,再次循环doReleaseShared,尽管被唤醒的线程也有可能同时执行doReleaseShared,但这样不会影响release的传播
// 如果head没有改变,说明没有线程被唤醒(后续线程tryAcquireShared返回值不大于0),此时传播行为终止,退出循环
// 也有可能是被唤醒的线程获取到同步状态后尚未执行到setHeadAndPropagate的setHead方法,此时head没有改变。尽管这时候退出循环,但是被唤醒的线程仍有机会执行doReleaseShared。不影响release的传播
if (h == head) // loop if head changed
break;
}
}
释放同步状态
释放同步状态的入口方法为releaseShared
。代码如下:
public final boolean releaseShared(int arg) {
// 如果tryReleaseShared返回true,说明需要唤醒队列中等待的线程
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中doReleaseShared
前面已经分析过,此处不再赘述。
带有时间限制的获取锁的方法
AQS还支持在允许时间内获取同步状态的方法。如果立刻获取成功,或者在允许时间内获取成功返回true,否则在等待最大允许时间之后,返回false。
下面以有限时间内获取独占锁的方法说明下工作原理。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算超时的时刻
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算还剩下多少时间
nanosTimeout = deadline - System.nanoTime();
// 如果超时,返回false
if (nanosTimeout <= 0L)
return false;
// spinForTimeoutThreshold为1000纳秒
// 如果剩下的时间多于spinForTimeoutThreshold,再阻塞当前线程
// 否则不必阻塞,保持自旋即可
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法中使用了spinForTimeoutThreshold
变量来优化。线程的阻塞和唤醒过程是比较耗时的。如果线程只需要“阻塞”等待很小的一段时间,为了保正线程从阻塞状态恢复的速度,不要使用Locksupport.park
。可以让线程自旋,虽然会消耗一定的CPU资源,但是线程一直保持执行状态,跳出自旋状态的耗时要比中阻塞状态中恢复小得多。
带有时间限制的获取共享锁的方法和独占锁的类似,不再赘述。