非内置锁存在的意义
synchronized
关键字提供了一套非常完整的java内置锁实现,简单易用通过块语句控制锁的范围,而且性能不低,隐藏了偏向锁,轻量、重量锁等复杂的概念,对程序员非常友好。
那么为什么还要存在一套独立于内置锁的实现呢,这里我们指的就是java.util.concurrent.locks
下面的这套锁。
- 首先,内置锁的范围只能位于块语句内,不能交叉或者跨越不同块(方法)。
什么是交叉呢?假想一个场景来说明,我们有一台设备,需要在移动中接入沿路不同的信号基站,接入时必须持有这个基站的某个信道的锁,但是呢要求连接不断开,也就是先拿到下一个基站信道的锁,连接成功后,才能释放上一个信道的锁,这个时候就需要锁的范围能够存在交集,但又不完全重合。
- 其次,内置锁的锁定过程不能被中断,只有下一个等待锁的线程真正获取了锁之后,你的代码才重新取得控制权。
- 再次,内置锁不能做尝试,一旦执行到同步块,就只有两种结果,要么得到,要么进入等待。
尝试锁的意义,在于有些情况下竞争不一定非要取得成功,比如在做并行计算时,假如线程的个数已经超过了当前待计算资源的个数,那么与其频繁发生阻塞和竞争,倒不如休眠或者处理其他事情,效率反而更高
- 最后,每个内置锁都仅有一个条件等待对象(ConditionObject),也就是这个实例自身,只有在它自己的同步块内,才能调用wait和notify。
于是Doug Lea大师的这套锁实现,就针对了上述的几种场景,提供了解决方案,而且在实现的同时,还是保持了高度的可扩展性。
搭建可重入锁的框架
想要扩展内置锁,首先要支持可重入,下面来看看重入的概念:
允许同一个线程,连续多次获取同一把锁
举例来说,上一篇文章《形形色色的锁》中提到的几种自旋锁实现,都不可重入,而内置锁和locks包下的这些锁,都是可重入锁。
要实现锁,首先就要明确定义锁具备的一些基本操作原语
获取
释放
首先获取和释放,独立为两种操作,就允许这两者发生在不同的代码块甚至函数中(虽然并不推荐这么做)。
获取(允许中断)
允许中断的获取,直接利用了线程的interrupt标记,实现了锁的可打断,即获取前若已interrupt,则终止动作,获取中发现被中断,同样终止退出。
尝试获取
尝试获取,实现了非阻塞的获取锁的能力,要么立即获取,要么失败立即返回,把是否轮询或者其他进一步操作的自由,留给了调用者。
创建条件(condition)
创建条件独立成为一种操作后,就不再束缚于object提供的wait和notify机制,可以创建任意多个条件对象,然后统统交由一把锁来控制,可以高效实现一对多的事件分发。
源码中的Lock接口
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
condition接口 (除了多提供带时间参数的重载版本await方法,功能上与Object的wait并无差别)
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
AbstractQueuedSynchronizer
concurrent库中有大量的锁实现,其中一部分是继承自Lock接口比如ReentrantLock、ReentrantReadWriteLock,也有些自称派系的Semaphore、CountDownLatch,不过看了源码就发现,并非每种锁的实现都在重复造轮子,因为他们的基础行为有着非常多的共同点,比如等待队列, 独占获取,共享获取等。
这份抽象,就是AbstractQueuedSynchronizer,下面简称AQS
AQS具备以下几种特性
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
为了能够充分理解AQS,首先要引入一个支持的工具类,LockSupport,这里我会花费较大篇幅,深入和有对比地来介绍java虚拟机是如何实现这个类的功能。
public class LockSupport {
...
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
...
}
LockSupport存在的目的是为锁操作提供一种阻塞等待的机制,它的代码不多,但是存在的意义却非常重要。试想,阻塞和恢复运行,在java中是多么想当然的事情,Object.wait()和早些版本的Thread.suspend()方法都可以实现这种功能。那为什么还要费周折来实现这么一个LockSupport呢?
Thread的暂停恢复操作很早就被deprecated掉了,因为有可能招致死锁,那么能否借用旧版sdk suspend()的实现机制来代替park?不行,因为park方法提供的是类似信号量的操作,也就是说,如果先unpark,则下一次park会立即返回,那么可以理解LockSupport等同于初始值为0,最大值为1的信号量。但改变信号量和线程阻塞是两个操作,如果要合二为一,又要使用锁,这里本身就是要在java层提供一个有别于synchronized的锁,总不能再去引用人家吧。所以在native层直接合并成一步原子操作是比较合适的。
同样的原因,Object.wait()首先就必须在自己的同步块中进行,那也必须引用到内置锁,所以这里park/unpark就必须完整地实现一套自己的阻塞机制。下面来探究一下这三种阻塞机制的差别:
- Thread.suspend() 和 Thread.resume()
- Object.wait() 和 Object.notify()
- LockSupport.park() 和 LockSupport.unpark()
在介绍源码前,先来看下java虚拟机的线程实现框架
java线程各个接口的逻辑其实是一层一层地分发下去执行的,调用链看起来很冗长,不过研究一下就能看出来jvm的大部分功能,都是依照这个思路,将逻辑分解和派发下去的,直到最后和平台相关的操作派发给具体平台实现层来完成,比如在windows上创建线程是用win32API的 CreateThread(), 而在linux上就是pthread_create()
native层线程类继承关系:
Thread.suspend()
openjdk/jdk/source/share/native/java/lang/Thread.c
...
static JNINativeMethod methods[] = {
...
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
...
};
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
...
openjdk/hotspot/src/share/vm/prims/jvm.cpp
...
JVM_ENTRY(void, JVM_SuspendThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_SuspendThread");
oop java_thread = JNIHandles::resolve_non_null(jthread);
JavaThread* receiver = java_lang_Thread::thread(java_thread);
if (receiver != NULL) {
{
MutexLockerEx ml(receiver->SR_lock(), Mutex::_no_safepoint_check_flag);
if (receiver->is_external_suspend()) {
return;
}
if (receiver->is_exiting()) { // thread is in the process of exiting
return;
}
receiver->set_external_suspend();
}
receiver->java_suspend();
}
JVM_END
...
openjdk/hotspot/src/share/vm/runtime/thread.cpp
void JavaThread::java_suspend() {
{ MutexLocker mu(Threads_lock);
if (!Threads::includes(this) || is_exiting() || this->threadObj() == NULL) {
return;
}
}
{ MutexLockerEx ml(SR_lock(), Mutex::_no_safepoint_check_flag);
if (!is_external_suspend()) {
// a racing resume has cancelled us; bail out now
return;
}
// suspend is done
uint32_t debug_bits = 0;
if (is_ext_suspend_completed(false /* !called_by_wait */,
SuspendRetryDelay, &debug_bits) ) {
return;
}
}
VM_ForceSafepoint vm_suspend;
VMThread::execute(&vm_suspend);
}
// Part II of external suspension.
// A JavaThread self suspends when it detects a pending external suspend
// request. This is usually on transitions. It is also done in places
// where continuing to the next transition would surprise the caller,
// e.g., monitor entry.
//
// Returns the number of times that the thread self-suspended.
//
// Note: DO NOT call java_suspend_self() when you just want to block current
// thread. java_suspend_self() is the second stage of cooperative
// suspension for external suspend requests and should only be used
// to complete an external suspend request.
//
int JavaThread::java_suspend_self() {
...
while (is_external_suspend()) {
ret++;
this->set_ext_suspended();
// _ext_suspended flag is cleared by java_resume()
while (is_ext_suspended()) {
this->SR_lock()->wait(Mutex::_no_safepoint_check_flag);
}
}
return ret;
}
openjdk/hotspot/src/share/vm/runtime/mutex.cpp
bool Monitor::wait(bool no_safepoint_check, long timeout, bool as_suspend_equivalent) {
Thread * const Self = Thread::current() ;
...
int Monitor::IWait (Thread * Self, jlong timo) {
...
for (;;) {
if (ESelf->Notified) break ;
int err = ParkCommon (ESelf, timo) ;
if (err == OS_TIMEOUT || (NativeMonitorFlags & 1)) break ;
}
...
return WasOnWaitSet != 0 ; // return true IFF timeout
}
static int ParkCommon (ParkEvent * ev, jlong timo) {
// Diagnostic support - periodically unwedge blocked threads
intx nmt = NativeMonitorTimeout ;
if (nmt > 0 && (nmt < timo || timo <= 0)) {
timo = nmt ;
}
int err = OS_OK ;
if (0 == timo) {
ev->park() ;
} else {
err = ev->park(timo) ;
}
return err ;
}
openjdk/hotspot/src/os/linux/vm/os_linux.cpp
void os::PlatformEvent::park() { // AKA "down()"
...
if (v == 0) {
...
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
...
}
}
结论很清楚,最终一步一步调用进入了pthread_cond_wait,也就是利用了linux pthread的锁(其他平台版本也有各自的实现),进入了阻塞状态,而条件锁能够阻塞最终一定是通过系统调用,随后将当前该线程移出调度。但是这个过程是怎么从第一步java_suspend()调用到java_suspend_self()中去了呢?
我们注意到:在JVM_SuspendThread函数中,set_external_suspend()就已经被调用了,也就是说调用java_suspend()前,这个标记就已经置位了,接下来就等着这个标记被检查就行了,我们看看源码的注释怎么解释的:
openjdk/hotspot/src/share/vm/runtime/thread.hpp
// The external_suspend
// flag is checked by has_special_runtime_exit_condition() and java thread
// will self-suspend when handle_special_runtime_exit_condition() is
// called. Most uses of the _thread_blocked state in JavaThreads are
// considered the same as being externally suspended; if the blocking
// condition lifts, the JavaThread will self-suspend. Other places
// where VM checks for external_suspend include:
// + mutex granting (do not enter monitors when thread is suspended)
// + state transitions from _thread_in_native
//
// In general, java_suspend() does not wait for an external suspend
// request to complete. When it returns, the only guarantee is that
// the _external_suspend field is true.
总结一下,有三种时机会检查这个标记位:
- has_special_runtime_exit_condition()调用时
- 要进入monitors的时候,如果此时已经是suspend,就不用再进了
- javaThread从native状态切换回java状态的时候
其中,1和3会调用java_suspend_self()
openjdk/hotspot/src/share/vm/runtime/thread.hpp
bool has_special_runtime_exit_condition() {
return (_special_runtime_exit_condition != _no_async_condition) ||
is_external_suspend() || is_deopt_suspend();
}
openjdk/hotspot/src/share/vm/runtime/thread.cpp
void JavaThread::handle_special_runtime_exit_condition(bool check_asyncs) {
bool do_self_suspend = is_external_suspend_with_lock();
if (do_self_suspend && (!AllowJNIEnvProxy || this == JavaThread::current())) {
frame_anchor()->make_walkable(this);
java_suspend_self();
}
if (check_asyncs) {
check_and_handle_async_exceptions();
}
}
void JavaThread::check_safepoint_and_suspend_for_native_trans(JavaThread *thread) {
JavaThread *curJT = JavaThread::current();
bool do_self_suspend = thread->is_external_suspend();
...
if (do_self_suspend && (!AllowJNIEnvProxy || curJT == thread)) {
JavaThreadState state = thread->thread_state();
thread->set_thread_state(_thread_blocked);
thread->java_suspend_self();
thread->set_thread_state(state);
...
}
}
if (SafepointSynchronize::do_call_back()) {
// If we are safepointing, then block the caller which may not be
// the same as the target thread (see above).
SafepointSynchronize::block(curJT);
}
...
}
那么有哪些地方会调用handle_special_runtime_exit_condition()呢,比如在SafepointSynchronize::block中,或者在JavaCallWrapper::JavaCallWrapper调用被包装的java方法前,或者在javaThread状态相关的类析构的时候。而check_safepoint_and_suspend_for_native_trans()则会在SharedRuntime::generate_native_wrapper或者InterpreterGenerator::generate_native_entry中调用,总而言之,就是不论调用的是java方法还是native方法,总会有机会检查这个_external_suspend标记并调用第二步的java_suspend_self()。因为本文侧重于锁和线程,更多的关于java/native方法调用过程,有机会再单独写文章分析之。
SafePoint
我们注意到java_suspend()方法中有这么一段:
openjdk/hotspot/src/share/vm/runtime/thread.cpp
void JavaThread::java_suspend() {
...
VM_ForceSafepoint vm_suspend;
VMThread::execute(&vm_suspend);
...
}
这里引入一个概念SafePoint,援引openJdk官方的解释:
A point during program execution at which all GC roots are known and all heap object contents are consistent. From a global point of view, all threads must block at a safepoint before the GC can run.
通俗解释SafePoint原本是为了方便GC而在字节码中或者编译成二进制的指令中插入的一些特殊操作:
- 对于解释执行,这个插入的操作就是去检查当前线程是否处于SafePoint同步状态,如果是就进入阻塞。
- 对于已经AOT的代码比如经过HotSpot优化过的部分,这个操作就是访问一个特殊内存位置造成SIGSEGV,然后因为JVM已经自己捕获了这个信号,所以就有机会检查,是否是因为SafePoint而进入,进而执行阻塞。
SafePoint出现的位置主要有:
- 循环的末尾 (防止大循环的时候一直不进入safepoint,而其他线程在等待它进入safepoint)
- 方法返回前
- 调用方法的call之后
- 抛出异常的位置
openjdk/hotspot/src/share/vm/runtime/vm_operations.hpp
// dummy vm op, evaluated just to force a safepoint
class VM_ForceSafepoint: public VM_Operation {
public:
VM_ForceSafepoint() {}
void doit() {}
VMOp_Type type() const { return VMOp_ForceSafepoint; }
};
在openJDK的源码vmThread.cpp以及整个工程中,都没有找到对于这个ForceSafepoint操作的处理相关代码,而且它的doit()函数也没有其他的实现。如果按照注释的字面意思理解,就是强制当前这个线程进入SafePoint,也就是说,暂停不是随随便便什么时刻都可以发生的,只有处在SafePoint才不至于影响到其他活动(譬如内存分配或者GC)。
Object.wait()
openjdk/jdk/src/share/native/java/lang/Object.c
static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
JNIEXPORT void JNICALL
Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls,
methods, sizeof(methods)/sizeof(methods[0]));
}
openjdk/hotspot/src/share/vm/prims/jvm.cpp
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
...
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
monitor->wait(millis, true, THREAD);
...
}
openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
...
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
}
...
}
到了这里就能看懂了,最后又走到os_linux.cpp的park()方法里面去了,为什么Object.wait()这么简单,不想Thread.suspend()那样呢?那是因为,能够进入wait,说明已经进入了synchronized的临界区域,很多工作就可以省掉了。
LockSupport.park()
最后来看看LockSupport.park()的实现:
openjdk/jdk/src/share/classes/sun/misc/unsafe.java
public final class Unsafe {
...
public native void unpark(Object thread);
public native void park(boolean isAbsolute, long time);
...
}
openjdk/hotspot/src/share/vm/prims/unsafe.cpp
static JNINativeMethod methods_18[] = {
...
{CC"park", CC"(ZJ)V", FN_PTR(Unsafe_Park)},
{CC"unpark", CC"("OBJ")V", FN_PTR(Unsafe_Unpark)}
...
}
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {
oop obj = thread->current_park_blocker();
event.set_klass((obj != NULL) ? obj->klass() : NULL);
event.set_timeout(time);
event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
event.commit();
}
UNSAFE_END
openjdk/hotspot/src/share/vm/runtime/park.hpp
class Parker : public os::PlatformParker {
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
openjdk/hotspot/src/os/linux/vm/os_linux.cpp
void Parker::park(bool isAbsolute, jlong time) {
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
JavaThread *jt = (JavaThread *)thread;
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
ThreadBlockInVM tbivm(jt);
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
OrderAccess::fence();
return;
}
...
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
...
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
status = pthread_mutex_unlock(_mutex);
} else {
status = pthread_mutex_unlock(_mutex);
status = pthread_cond_signal (&_cond[_cur_index]);
}
} else {
pthread_mutex_unlock(_mutex);
}
} else {
pthread_mutex_unlock(_mutex);
}
}
最后没有悬念的也是调用了pthread_cond_wait(),但是仔细观察,就可以看出这里通过pthread的接口,封装了一个上限为1资源,也就是_count,通过这样的操作,就构成了一个条件锁,这也是我们在java世界构建一套有别于内置锁的锁实现的基石。
比较完了三者,我们由要回到主题,继续讲解AQS了。
再回顾一下AQS的几个要素:
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
现在我们已经有了一个完全从底层独立实现的条件锁,它支持了阻塞等待,并且可以被中断,看起来剩下的主要工作量,就是构建等待队列了。
阻塞等待队列
我们理清楚一下AQS在java层扮演的角色,它其实就是一个java层的条件锁,但和LockSupport提供的基础条件锁语义不同的是,它能派生出N多条件,每个条件都可以有自己的等待队列,然后锁本身也有等待队列,也就是说,既可以拿它来构造一个普通的带队列的锁,也可以构造出支持多条件的带队列的锁。那么公平性其实就体现在队列上,可以插队就不公平,不让插队,就是公平的。
这里要指出一点,即使是公平队列,也只能保证在队列中的线程按顺序获取锁,但是并不能保证两个线程同时进入队列时,先请求的一定排在前面,同样的,没有队列的时候,两个线程同时来获取锁,这时谁能拿到也是没有保证的,仅取决于当时cpu的调度情况。
大概是处于性能考虑,这个队列的操作是无锁的,入队和出队都是基于CAS和自旋重试,下面来看看AQS是如何定义获取锁的动作的
获取锁的流程:
进入等待队列的流程:
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
出队的过程,由释放操作来完成:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
释放的过程相对简单些,不过注意下这了s.waitStatus有可能是CANCELED = 1,此时就必须寻找队列中下一个还没有被cancel的节点来释放。这里我们通过unpark的调用位置,也能看出,很有可能是和获取锁的park动作并发的,也就是谁先谁后不确定,有可能在竞争者进入队列后但是park前,当前锁的持有者就调用了unpark。不过根据前面的分析,因为park操作包含了一个信号量在里面,所以即使先调用了unpark,也没关系,park会立即返回让其立即重新参与竞争。
但是这里总感觉少了点什么?对了,好像漏掉了设置队列head,此时被唤醒的线程应该是被设置为head不是吗?
恩,还真不是,我们再来分析下这个模型,这个链表显然是带头结点的,也就是头结点不过是一个空结构,用来指示第一个节点的位置,通过传递头结点,就可以完成dequeue操作,最上面队列示意图中紫色的当前持有锁的线程,其实本身是不在这个等待队列里的,但它可以是头结点。有点晕吗?我们还是从release()调用结束来分析,被唤醒的第一个线程,此时拿到锁了吗?并没有,因为它还没来得及tryAcquire(),在具体的实现里,通常是要调用setExclusiveOwnerThread(Thread) 这个AQS父类的方法把自己设置进去才算真正拿到锁。所以,从这个线程被park的地方接着看就明白了,它会在唤醒后的下一个循环,把自己设置为头结点:
final boolean acquireQueued(final Node node, int arg) {
...
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
...
}
共享/独占
我们刚才讨论的acquire和release操作,都是针对互斥锁的,那什么是共享锁呢,直接拿ReentrantReadWriteLock为例,它允许多个读线程共享一个锁资源,但是同时又和写线程互斥,也就好比一本书,写好之后,多少个人围着看都没问题,但是一旦作者需要修改或重写,那等大家看完后,这本书就收起来只有作者可以访问了。(当然如果有一个以上的作者,那他们之间也必须轮流的写而不能同时获取访问权。)
如何实现共享锁呢,还是来看AQS源码
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先,这里定义了共享获取的原语tryAcquireShared,用以区分互斥锁
- addWaiter入队时传入的是Node.SHARED而不是Node.EXCLUSIVE
- 等待结束后,不仅把字节置为头结点,还得看看下一个节点是否也是共享获取,如果是把他唤醒
这里第3点是为了后续的请求共享锁线程,第2点是为了给第3点铺路,那么只有第1点才是最关键的,决定着共享获取和互斥获取的差别。那看看ReentrantReadWriteLock是如何实现tryAcquireShared()的
java.util.concurrent.locks.ReentrantReadWriteLock
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
这里加入了两个概念exclusiveCount和sharedCount,独占和共享计数,用来表示当前锁的状态,显然有了这两种状态,共享和独占获取就能知道自己应该是等待还是立即拿到锁。
独占获取实现:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
通过对state字段的按位拆分,读写锁拥有了可以标记当前是属于共享读还是互斥写状态的能力,那为什么必须复用state字段而不能新增字段呢?还是和之前分析LockSupport同样的理由,如果代表资源的字段有多个,那么就无法通过一次CAS来完成赋值,那就起码是两次,于是又要用到锁来把这两个操作一起保护起来,而这里恰是构建锁代码的一部分,蛋和鸡的问题不是么。
公平/非公平
公平性在代码上主要依赖AQS的抽象方法tryAcquire的具体实现来保证,如同上面已经分析过的,这里的公平性,仅仅能保证在已经存在等待的情况下,队列前面的线程能够优先获取锁,但是并不能保证两个同时去争抢的线程,先来的一定先拿到锁或者排在队列的前面(java代码的非原子性问题导致)
以ReentrantLock的代码为例,它是通过派生了AQS类来定义共享和非共享两种行为
那么区别公平和非公平的关键在于,调用公平锁的线程不是直接上去就抢,而是先礼貌的看下有没有人在排队,如果有,就自觉排在最后一个,很像香港人对吧:)
java.util.concurrent.locks.ReentrantLock
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
...
}
static final class NonfairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可重入 & 允许中断
最后这两点特性就比较简单了,可重入功能实现关键点在于每次获取锁都调用了setExclusiveOwnerThread(Thread),这样能够知道当前获取锁的是哪个线程,就很容易做到可重入。(不要搞糊涂了,AQS中代表资源的是state字段,而不是这个Thread,不然就只能表示0和1,也支持不了上面说的共享锁了)
对于中断的支持也不复杂,因为我们分析过park方法本身是可以支持中断的,那么只需要在park被中断后作出对应项的响应即可:
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 注意:这里不再是记录标记为而是直接抛
// 中断异常来跳出循环
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结
1. 可重入锁框架,相比于内置锁最大的区别:
- 范围可重叠
- 获取锁过程允许中断和失败
- 支持多个条件队列
- 可以扩展出读写锁、CountDownLatch、Semphore等多种灵活的形式
2. LockSupport
它从native层为我们提供了一套独立于内置锁的完整的带信号量的阻塞机制,是能够实现有别于内置锁的另外一套锁的必要条件
3. AQS
AQS利用了LockSupport的阻塞机制,在其上构建了一系列的接口
acquire()
acquireInterruptibly()
acquireShared()
acquireSharedInterruptibly()
以及对应的release和其它接口
如果从抽象角度来说,这些接口或者实现定义了满足前面讨论过的五种属性的对应语义(semantic),这些语义才是实现各种各样的锁的基石。这也符合分层次设计的思想,避免了在每个具体的锁实现中都考虑如何细致而又正确地去完成这些动作。
补充
1. 关于原子性的理解
我们反复提到,没办法保证真正意义上的先来先得,就是因为原子性。我们知道java的一行代码经过虚拟机的解释,可以转化成多条字节码指令,而一条字节码指令又可能分解成多条汇编指令来完成(有些伪指令可能会包含多个指令),从这个角度上讲,先到先得那就必须是先执行到锁方法的第一条汇编指令的那个线程应该优先得到锁,至少在java层这是不可能的。
既然获取锁包含了这么多的前置操作,那么如果我们直接用CAS操作,来代替锁会怎样。其实这么做也存在隐患,并非所有场景都适用,比如很有名的ABA问题,我们来看一个案例:
现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,然后希望用CAS将栈顶替换为B:
head.compareAndSet(A,B);
在T1执行上面这条指令之前,线程T2介入,将A、B出栈,再pushD、C、A,此时堆栈结构如下图,而对象B此时处于游离状态:
此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B,但实际上B.next为null,所以此时的情况变为:
其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,平白无故就把C、D丢掉了,这是严重的逻辑错误。
怎么解决呢? 方法有很多,可以参考ObjectMonitor的限制出队个数的方式或者参考AtomicStampedReference的实现,这里不再展开讨论了。
2. 重入锁的性能
测试场景:MacOS + java 1.8 + 赋值1000万次
测试结果:
单线程,无冲突
锁类型 | 耗时(ms) | 相对增幅(相对第一名) |
---|---|---|
synchronized | 166 | +0% |
writelock | 190 | +14% |
readlock | 216 | +30% |
在无冲突的情况下,内置锁的速度是最快的,这是因为偏向锁机制的效率更高,至于如何做到这点的,可以参考下面的链接资料。
而写锁比读锁快一丢丢,也许和非竞争条件下,执行的代码量有关,当然这点差别绝对不是告诉我们优先用写锁,还是应该看读写的频率来定。
双线程,有冲突
锁类型 | 耗时(ms) | 相对增幅(相对第一名) |
---|---|---|
synchronized | 448 | +0% |
1 readlock + 1 writelock | 1038 | +132% |
2 writelock | 1137 | +154% |
3. 内存屏障
如果你有仔细留意分析LockSupport的过程,肯定注意到了这句话OrderAccess::fence();
为什么它总是在一些很重要的操作之后,调用一下?它其实是一个内存屏障功能,也叫做memory barier或者memory fence,能够保证从另一个CPU来看屏障的两边的所有指令都是正确的程序顺序,而保持程序顺序的外部可见性;其次它们可以实现内存数据可见性,确保内存数据会同步到CPU缓存子系统。我们知道编译器有个功能叫OoO(Out-of-order execution),会为了更好利用缓存而交换一些指定的顺序,在java中则可能是字节码,在java1.5之前,volatile就仅仅做到了可见性而没有支持内存屏障,导致二次判空方式实现的单例,仍存在重复创建的风险,之后的jdk版本,volatile都同时涵盖了可见性和内存屏障双重含义,也就是给volatile变量赋值的时候,这句代码后面会被自动插入一条内存屏障,来组织编译器优化执行顺序。所以不是非要解决可见性才能使用volatile.
内存屏障 vs CAS
貌似这两个功能在一定程度上有些交集,因为CAS本身是可以保证执行完赋值操作一定完成的。那么就有必要来比较下这两者的性能差异,通常前者的性能消耗是在若干个CPU时钟周期,而一次硬件CAS则最起码要数百个时钟周期,锁的话开销就更大了。那么我们要做的,就是选择既可以满足我们需求,又开销最小的一种操作即可。
关于java内存和缓存的一致性以及volatile的更多细节,在下一篇《形形色色的锁3》中再详细讨论
参考资料,但写这篇文章不一定都用上了
openjdk 1.8 source
偏向锁
Synchronization Public2
cpu缓存