前言
回顾一下【多线程与并发】Java并发理论基础 这篇文章中,我们说到,现成安全的方法包含:
互斥同步:Synchronized和ReentranLock
非阻塞同步:CAS,Atomic…
无同步方案:栈封闭,ThreadLocal,可重入代码
本篇文章我们来看一下CAS。
悲观锁和乐观锁更新数据方式
CAS机制是一种数据更新的方式。在讲CAS是什么之前,我们先来了解下再多线程环境下,对共享变量进行数据更新的两种模式:悲观锁模式和乐观锁模式。
-
悲观锁模式:
悲观锁更新方式认为在更新数据的时候大概率会有其他线程去争夺公共资源,所以悲观锁的做法是:第一个获取资源的县城会将资源锁起来,其他未争夺到资源的线程将进入阻塞队列进行排队,当第一个获取资源的线程释放锁后,这些线程才能有机会重新争夺资源。
synchronized是Java中悲观锁的典型实现,它会使没争抢到资源的线程进入阻塞状态,县城在阻塞状态和Runnable状态之间切换效率较低。
-
乐观锁模式:
乐观锁更新方式认为在更新数据的时候其他线程争抢这个共享变量的概率很小,所以更新数据的时候不会对共享数据加锁。但是在正式更新数据之前会检查数据是否被其他线程改变过,如果未被其他线程改变过,就将共享变量变更新成最新值,如果发现共享变量已经被其他线程更新过了,就重试,直到成功为止。
CAS机制就是乐观锁的典型实现。
什么是CAS机制
- CAS的全称为Compare-And-Swap,直译就是对比交换。一种用于在多线程环境下实现同步功能的机制。CAS 操作包含三个操作数 -- 内存位置、预期数值和新值。
- CAS是一条CPU的原子指令,它的实现逻辑是将内存位置处的数值与预期数值相比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何操作。
- 在 Java 中,并没有直接实现 CAS,相关的实现是通过 C++ 内联汇编的形式实现的,Java 代码需通过 JNI 才能调用。Java中的sun.misc.Unsafe类提供了compareAndSwapInt和compareAndSwapLong等几个方法实现CAS。
我们通过上面说的到的三个核心参数进一步认识CAS的操作逻辑:
主内存中存放的共享变量的值:V (一般情况下这个V是内存的地址值,通过这个地址可以访问到内存中的值)
工作内存中共享变量的副本值(预期值):A
-
需要讲共享变量更新到的最新值:B
如上图所示,主存中保存了一个V值,当某个线程需要使用V值时:第一步从主存中读取V值到线程的工作内存A中,第二步计算变为B值,第三步再把B值写回到内存V值中。多个线程公用V值都是如此操作。
CAS的核心是在将B值写入到V之前要比较A值和V值是否相同,如果不同,证明此时V值已经被其他线程修改,那么此时重新将V值赋给A,并重新计算得到B,如果相同,则将B值赋值给V。值得注意的是CAS机制中的这个步骤是原子性的(从指令层面提供原子操作),所以CAS机制可以解决多线程并发编程对共享变量读写的原子性问题。
Java中的CAS机制
- Atomic系列类
- Lock系列类底层实现
- Java1.6以上版本,synchronized转变为重量级锁之前,也会采用CAS机制
CAS实现自旋锁
我们都知道用锁或者synchronized关键字可以实现原子操作,只是加锁或使用 synchronized 关键字带来的性能损耗较大。而CAS可以实现乐观锁,直接利用CPU层面的指令实现原子操作,没有加锁和线程上下文切换的开销,所以性能很高。
CAS是实现自旋锁的基础,CAS利用CPU指令保证了操作的原子性,以达到锁的效果。那为什么叫自旋锁呢,自旋也就是自循环,一般用一个无限循环实现。其实就是在一个无限循环中,执行一个 CAS 操作,当操作成功,返回 true 时,循环结束;当返回 false 时,接着执行循环,继续尝试 CAS 操作,直到返回 true。
CAS问题
上面说到CAS为乐观锁,使用 CAS 解决并发问题通常情况下性能更优。但是CAS也有几个问题:
-
ABA问题
问题:CAS在操作值的时候需要检查值有没有变化,如果没有发生变化则更新。如果一个值原本是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。
解决思路:ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。
从Java 1.5开始提供了AtomicStampedReference类解决ABA问题,用Pair这个内部类实现,包含两个属性,分别代表版本号和引用,在compareAndSet中先对当前引用进行检查,再对版本号标志进行检查,只有全部相等才更新值。
Java中除了AtomicStampedReference能解决ABA问题,还有其他类可以解决吗?答案是有的——AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,标记值有修改。
AtomicStampedReference:带版本戳的原子引用类型,版本戳为int类型。
AtomicMarkableReference:带版本戳的原子引用类型,版本戳为boolean类型。
-
长时间循环开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。在线程之间竞争程度大的时候,如果使用CAS,每次都有很多的线程在竞争,也就是说CAS机制不能更新成功。这种情况下CAS机制会一直重试,这样就会比较耗费CPU。如果线程之间竞争程度小,使用CAS是一个很好的选择;但是如果竞争很大,使用锁可能是个更好的选择。
-
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。
从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
CAS示例
上面说了那么多,下面我们简单看下Java中CAS的使用。
在高并发下如果不使用CAS,多线程同时修改一个变量的值我们需要synchronized加锁(为什么不用Lock,上面我们提到,Lock底层的AQS也是基于CAS进行获取锁的)。
public class Example {
private int i=0;
public synchronized int add(){
return i++;
}
}
底层基于CAS进行更新数据的AtomicInteger,不需要加锁就在多线程并发场景下实现数据的一致性。
public class Example{
private AtomicInteger i = new AtomicInteger(0);
public int add(){
return i.addAndGet(1);
}
}
UnSafe类详解
在并发实现中CAS操作必须具备原子性,而且是硬件级别的原子性,Java被隔离在硬件之上,显然是无法实现,这时为了能直接操作操作系统层面,就需要通过C++编写native方法来扩展实现。
JDK在sun.misc包下提供了一个满足CAS要求的类--Unsafe,它主要提供一些执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。AQS就是使用此类完成硬件级别的原子操作。UnSafe通过JNI调用本地C++代码执行CPU硬件指令集。
在JDK9之后,
sun.misc.Unsafe
被移动到jdk.unsupported
模块中,同时在java.base
模块克隆了一个jdk.internal.misc.Unsafe
类,代替了 JDK8 以前的sun.misc.Unsafe
的功能。jdk.internal
包不开放给开发者调用,完完全全 import 不到。
sun.misc.Unsafe
内部都是委托jdk.internal.misc.Unsafe
来操作的,所以后面功能分析都是基于jdk.internal.misc.Unsafe
的。 而且jdk.internal.misc.Unsafe
提供更全的操作,sun.misc.Unsafe
只开放了部分。
如下图,先来看下Unsafe类总体功能:
下面我们对相关方法和应用场景进行详细介绍。
Unsafe与CAS
从Java5开始引入了对CAS机制的底层的支持,在这之前需要开发人员编写相关的代码才可以实现CAS。在原子变量类Atomic中(例如AtomicInteger、AtomicLong)可以看到CAS操作的代码,在这里的代码都是调用了底层(核心代码调用native修饰的方法)的实现方法。下面我们一起来进入到Unsafe中一探究竟。
正如上面所说sun.misc.Unsafe
内部都是委托 jdk.internal.misc.Unsafe
来操作的,所以下面源码都是来自 jdk.internal.misc.Unsafe
:
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
@HotSpotIntrinsicCandidate
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!weakCompareAndSetLong(o, offset, v, v + delta));
return v;
}
@HotSpotIntrinsicCandidate
public final int getAndSetInt(Object o, long offset, int newValue) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, newValue));
return v;
}
@HotSpotIntrinsicCandidate
public final long getAndSetLong(Object o, long offset, long newValue) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!weakCompareAndSetLong(o, offset, v, newValue));
return v;
}
@HotSpotIntrinsicCandidate
public final Object getAndSetObject(Object o, long offset, Object newValue) {
Object v;
do {
v = getObjectVolatile(o, offset);
} while (!weakCompareAndSetObject(o, offset, v, newValue));
return v;
}
//以weakCompareAndSetInt为例,上面其他方法一样
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,int expected,int x) {
return compareAndSetInt(o, offset, expected, x);
}
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,int expected,int x);
从源码中发现,内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)。
又从Unsafe类中发现,原子操作其实只支持下面三个方法。
public final native boolean compareAndSetInt(Object o, long offset,int expected,int x);
public final native boolean compareAndSetLong(Object o, long offset,long expected,long x);
public final native boolean compareAndSetObject(Object o, long offset,Object expected,Object x);
我们发现Unsafe只提供了3种CAS方法:compareAndSetInt、compareAndSetLong和compareAndSetObject,都是native方法。
Unsafe底层
我们继续往下看看Unsafe的compareAndSet*方法实现CAS操作,他是一个本地方法,位于unsafe.cpp中。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
} UNSAFE_END
可以看到它通过 Atomic::cmpxchg
来实现比较和替换操作。其中参数x是即将更新的值,参数e是原内存的值。
如果是Linux的x86,Atomic::cmpxchg
方法的实现如下:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
而windows的x86的实现如下:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::isMP(); //判断是否是多处理器
_asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
如果是多处理器,为cmpxchg指令添加lock前缀。反之,就省略lock前缀(单处理器会不需要lock前缀提供的内存屏障效果)。这里的lock前缀就是使用了处理器的总线锁(最新的处理器都使用缓存锁代替总线锁来提高性能)。
cmpxchg(void* ptr, int old, int new),如果ptr和old的值一样,则把new写到ptr内存,否则返回ptr的值,整个操作是原子的。在Intel平台下,会用lock cmpxchg来实现,使用lock触发缓存锁,这样另一个线程想访问ptr的内存,就会被block住。
Unsafe其它功能
Unsafe 提供了硬件级别的操作,比如说获取某个属性在内存中的位置,比如说修改对象的字段值。下面我们简单看几个Unsafe提供的其他功能:
//获取给定的 paramField 的内存地址偏移量,这个值对于给定的 field 是唯一的且是固定不变的
public native long staticFieldOffset(Field paramField);
//获取数组第一个元素的偏移地址
public native int arrayBaseOffset(Class paramClass);
//获取数组的转换因子即数组中元素的增量地址的
public native int arrayIndexScale(Class paramClass);
//分配内存
public native long allocateMemory(long paramLong);
//扩充内存
public native long reallocateMemory(long paramLong1, long paramLong2);
//释放内存
public native void freeMemory(long paramLong);
Java中的原子类操作
在JDK1.5版本之前,多行代码的原子性主要通过synchronized关键字进行保证。在JDK1.5版本,Java提供了原子类型专门确保变量操作的原子性。所谓的原子操作类,指的是java.util.concurrent.atomic包下,一系列以Atomic开头的包装类。例如:AtomicBoolean,AtomicInteger,AtomicLong。它们分别用于Boolean,Integer,Long类型的原子性操作。
-
原子更新基本类型:使用原子的方式更新基本类型
- AtomicBoolean: 原子更新布尔类型。
- AtomicInteger: 原子更新整型。
- AtomicLong: 原子更新长整型。
-
原子更新数组:通过原子的方式更新数组里的某个元素
- AtomicIntegerArray: 原子更新整型数组里的元素。
- AtomicLongArray: 原子更新长整型数组里的元素。
- AtomicReferenceArray: 原子更新引用类型数组里的元素。
-
原子更新引用类型:
- AtomicReference: 原子更新引用类型。
- AtomicStampedReference: 原子更新引用类型, 内部使用Pair来存储元素值及其版本号。
- AtomicMarkableReferce: 原子更新带有标记位的引用类型。
-
原子更新字段类:
- AtomicReference: 原子更新引用类型。
- AtomicStampedReference: 原子更新引用类型, 内部使用Pair来存储元素值及其版本号。
- AtomicMarkableReferce: 原子更新带有标记位的引用类型。
-
原子累加器(JDK1.8):AtomicLong和AtomicDouble的升级类型,专门用于数据统计,性能更高。
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
原子类型累加器是JDK1.8引进的并发新技术,它可以看做AtomicLong和AtomicDouble的部分加强类型。低并发、一般的业务场景下AtomicLong是足够了。如果并发量很多,存在大量写多读少的情况,那LongAdder可能更合适,代价是消耗更多的内存空间。
AtomicInteger
这么多原子类,我们就以AtomicInteger为例来了解一下。
如何使用
AtomicInteger常用Api:
public final int get()//获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement()//获取当前的值,并自减
public final int getAndAdd(int delta)//获取当前的值,并加上预期的值
void lazySet(int newValue)://最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的
多线程中使用synchronized让变量自增:
private volatile int count = 0;
// 若要线程安全执行执行 count++,需要加锁
public synchronized void increment() {
count++;
}
public int getCount() {
return count;
}
使用 AtomicInteger:
private AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
// 使用 AtomicInteger 后,不需要加锁,也可以实现线程安全
public int getCount() {
return count.get();
}
源码解析
public class AtomicInteger extends Number implements java.io.Serializable {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
//用于获取value字段相对当前对象的“起始地址”的偏移量
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;
//返回当前值
public final int get() {
return value;
}
//递增加detla
public final int getAndAdd(int delta) {
//三个参数,1、当前的实例 2、value实例变量的偏移量 3、当前value要加上的数(value+delta)。
return U.getAndAddInt(this, VALUE, delta);
}
//递增加1
public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
...
}
可以看到 AtomicInteger 底层用的是volatile的变量和CAS来进行更改数据的。
- volatile保证线程的可见性,多线程并发时,一个线程修改数据,可以保证其它线程立马看到修改后的值。
- CAS 保证数据更新的原子性。
最后
AtomicStampedReference 如何解决ABA问题?
如何解决?在上面我们已经说过思路了,每次compareAndSwap
后给数据的版本号加1,下次compareAndSwap
的时候不仅比较较数据,也比较版本号,值相同,版本号不同也不能执行成功。Java
中提供了AtomicStampedReference
来解决该问题
public class AtomicTest {
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);
public static void main(String[] args){
first().start();
second().start();
}
private static Thread first() {
return new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread() +",初始值 a = " + atomicStampedRef.getReference());
int stamp = atomicStampedRef.getStamp(); //获取当前标识别
try {
Thread.sleep(1000); //等待1秒 ,以便让干扰线程执行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1,2,stamp,stamp +1); //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
System.out.println("操作线程" + Thread.currentThread() +",CAS操作结果: " + isCASSuccess);
},"主操作线程");
}
private static Thread second() {
return new Thread(() -> {
Thread.yield(); // 确保thread-first 优先执行
atomicStampedRef.compareAndSet(1,2,atomicStampedRef.getStamp(),atomicStampedRef.getStamp() +1);
System.out.println("操作线程" + Thread.currentThread() +",【increment】 ,值 = "+ atomicStampedRef.getReference());
atomicStampedRef.compareAndSet(2,1,atomicStampedRef.getStamp(),atomicStampedRef.getStamp() +1);
System.out.println("操作线程" + Thread.currentThread() +",【decrement】 ,值 = "+ atomicStampedRef.getReference());
},"干扰线程");
}
}
执行结果:
操作线程Thread[主操作线程,5,main],初始值 a = 1
操作线程Thread[干扰线程,5,main],【increment】 ,值 = 2
操作线程Thread[干扰线程,5,main],【decrement】 ,值 = 1
操作线程Thread[主操作线程,5,main],CAS操作结果: false
可以看到上面的执行结果符合我们的预期,当干扰线程将值由1更新为2,再由2更新为1后,主操作线程是能够知道值的改变的。那么
AtomicStampedReference内部是如何操作的呢?
AtomicStampedReference
内部维护了一个 Pair
的数据结构,用volatile
修饰,保证可见性,用于打包数据对象和版本号。
private static class Pair<T> {
final T reference; //维护对象引用
final int stamp; //用于标志版本
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
它的compareAndSet
方法如下:
/**
* expectedReference :更新之前的原始值
* newReference : 将要更新的新值
* expectedStamp : 期待更新的标志版本
* newStamp : 将要更新的标志版本
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 获取当前的(元素值,版本号)对
Pair<V> current = pair;
return
// 引用没变
expectedReference == current.reference &&
// 版本号没变
expectedStamp == current.stamp &&
// 新引用等于旧引用
((newReference == current.reference &&
// 新版本号等于旧版本号
newStamp == current.stamp) ||
// 构造新的Pair对象并CAS更新
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
// 调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
- 首先判断传入的参数是否符合
Pair
的预期,从数据和版本号两个方面来判断,有一个不符合就打回; - 如果传入的参数与
Pair
中的一样,直接返回true
,不用更新; - 使用
casPair
来比较交换当前的Pair
与传入参数构成的Pair
; -
casPair
又调用compareAndSwapObject
来交互Pair
属性。
所以简单来说,AtomicStampedReference
是通过加版本号来解决CAS
的ABA
问题。至于怎么加版本号,因为compareAndSwapObject
只能对比交互一个对象,所以只需要将数据和版本号打包到一个对象里就解决问题了。