Atomic 原子类
简介
当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值。
通常我们会使用synchronized
来解决这个问题,synchronized
会保证多线程不会同时更新变量。但是synchronized
的开销比较大,执行效率较低。
JDK 1.5
开始提供了java.util.concurrent.atomic
包,这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式。
主要有四类原子更新方式:
- 原子更新基本类型
- 原子更新数组
- 原子更新引用
- 原子更新属性(字段)
Atomic
包里的类基本都是使用Unsafe
实现的包装类。
学习Unsafe
:https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html
原子更新基本类型
AtomicLong
AtomicBoolean
AtomicInteger
AtomicInteger 的方法
方法 | 描述 |
---|---|
public final int get() | 获取当前的值 |
public final void set(int newValue) | 设置为给定值 |
public final void lazySet(int newValue) | 最终设置为给定值,设置后可能读到旧值(unsafe.putOrderedInt) |
public final int getAndSet(int newValue) | 原子方式设置为给定值,并返回旧值(unsafe.getAndSetInt) |
public final boolean compareAndSet(int expect, int update) | 原子方式将该值设置为给定的更新值,如果当前值==预期值(unsafe.compareAndSwapInt) |
public final int getAndIncrement() | 获取当前的值,并自增1 |
public final int getAndDecrement() | 获取当前的值,并自减1 |
public final int getAndAdd(int delta) | 获取当前的值,并加上给定的值 |
public final int incrementAndGet() | 自增1,并获取新值 |
public final int decrementAndGet() | 自减1,并获取新值 |
public final int addAndGet(int delta) | 加上给定的值,并获取新值 |
public final int getAndUpdate(IntUnaryOperator updateFunction) | 获取当前的值,并设置根据函数得到的新值 |
public final int updateAndGet(IntUnaryOperator updateFunction) | 设置根据函数得到的新值,并得到新值 |
使用示例
public class AtomicIntegerTest {
private static final AtomicInteger atomicInteger = new AtomicInteger();
private static final CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
atomicInteger.getAndIncrement();
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println(atomicInteger.get());
}
}
/*
返回值:10
*/
主要源码
主要利用了CAS
的操作、使用volitile
和Unsafe
的native
方法来保证了变量的原子操作。
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
原子更新数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
AtomicIntegerArray 方法
基本类似于AtomicInteger
,会多个参数int i
来确定位置。
方法 | 描述 |
---|---|
public final void set(int i, int newValue) | 设置在位置i 的元素为给定值。 |
public final int getAndSet(int i, int newValue) | 以原子方式设置在位置i 的元素为给定值,并返回原来的值。 |
public final boolean compareAndSet(int i, int expect, int update) | 原子方式将位置i 的元素设置为给定的更新值,如果当前值==预期值 |
使用示例
public class AtomicIntegerArrayTest {
private static final int[] value = {1, 2, 3, 4};
private static final AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
atomicIntegerArray.addAndGet(i, 5);
}
System.out.println(Arrays.toString(value));
System.out.println(atomicIntegerArray.toString());
}
}
/*
[1, 2, 3, 4]
[6, 7, 8, 9]
*/
源码
数组value
通过构造方法传递进去,然后AtomicIntegerArray
会复制一份,所以当AtomicIntegerArray
对其数组内部进行修改时,不会影响传入的数组。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
private final int[] array;
static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
public AtomicIntegerArray(int length) {
array = new int[length];
}
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}
原子更新引用
AtomicReference
AtomicMarkableReference
AtomicStampedReference
AtomicReference 方法
方法 | 描述 |
---|---|
public final V getAndUpdate(UnaryOperator<V> updateFunction) | 根据给定的函数原子更新值,并返回当前值。<br />如果多个线程竞争失败会重试。(compareAndSet) |
public final V updateAndGet(UnaryOperator<V> updateFunction) | 根据给定的函数原子更新值,并返回新值。 |
使用示例
public class AtomicReferenceTest {
private static final AtomicReference<User> atomicReference = new AtomicReference<>();
public static void main(String[] args) {
User user = new User("张三",16);
atomicReference.set(user);
User newUser = new User("李四",18);
atomicReference.compareAndSet(user, newUser);
System.out.println(atomicReference.get());
}
@Data
@AllArgsConstructor
static class User {
private String name;
private int age;
}
}
源码
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile V value;
原子更新属性(字段)
AtomicIntegerFieldUpdater
AtomicReferenceFieldUpdater
AtomicStampedReference
AtomicIntegerFieldUpdater 简单介绍
-
原子更新带有版本号的引用类都是抽象类,每次使用的时候用静态方法
newUpdater
来创建,设置tclass
和fieldName
。@CallerSensitive
:通过使用一种机制来准确地识别此类方法,并允许可靠地发现其调用者,从而替换了现有的手动维护的调用者敏感方法列表,从而提高了JDK
方法处理实现的安全性。jvm
的开发者认为这些方法危险,不希望开发者调用,就把这种危险的方法用@CallerSensitive
修饰,并在jvm
级别检查。
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>(tclass, fieldName, Reflection.getCallerClass());
}
- 更新的字段必须是
public volitile
修饰的,不然会抛出异常。
java.lang.RuntimeException: java.lang.IllegalAccessException: Class xxx.AtomicIntegerFieldUpdateTest can not access a member of class xxx.AtomicIntegerFieldUpdateTest$User with modifiers "private"
java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: Must be volatile type
//源码:
if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException("Must be volatile type");
使用示例
public class AtomicIntegerFieldUpdateTest {
private static final AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
public static void main(String[] args) {
User user = new User("张三",16);
atomicIntegerFieldUpdater.set(user, 21);
System.out.println(atomicIntegerFieldUpdater.get(user));
}
@Data
@AllArgsConstructor
static class User {
private String name;
public volatile int age;
}
}
JDK 8 新增的原子操作类 LongAndder
为了解决高并发下多线程对一个变量CAS
争夺失败后进行自旋而造成的降低开发性能问题,LongAdder
在内部维护多个Cell
元素(一个动态地Cell
数组)来分担对单个变量进行争夺的开销。
LongAdder 的结构
LongAdder
类继承自Striped
,在Striped
内部维护着三个变量。
-
base
:基础值,默认为0。 -
cellsBusy
:状态值,只有0和1,当创建Cell元素、扩容Cell数组或者初始化Cell数组时,使用CAS
操作该变量来保证同时只有一个线程可以进行其中之一的操作。 -
cells
: Table of cells. When non-null, size is a power of 2. 存储所有的Cell
元素,初始值为null,大小是2的次幂。
当前线程访问 Cell 数组里面的那个元素
通过add
方法中getProbe() & m
来确定当前线程使用哪个元素。
getProbe
获取当前线程中变量threadLocalRandomProbe
的值,初始值为0,在longAccumulate
中 初始化。
longAccumulate
是cells
数组被初始化和扩容的地方。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // 初始化 threadLocalRandomProbe
h = getProbe();
wasUncontended = true;
}
// ......
}
如何初始化 Cell 数组
cellsBusy == 0
说明当前cells
数组没有被初始化、扩容或者新建。
初始大小为2。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h = getProbe();
for (; ; ) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// ......
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init) {
break;
}
} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
break; // Fall back on using base
}
}
}
Cell 数组如何扩容
如果当前元素个数没有达到CPU个数并且有冲突则扩容。
将容量扩充为之前的两倍,并复制到新的数组中。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
boolean collide = false; // True if last slot nonempty
for (; ; ) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// 如果当前元素个数没有达到CPU个数并且有冲突则扩容
if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
}
}
处理线程访问分配的 Cell 元素的冲突问题
在longAccumulate
方法中使用advanceProbe(h)
,重新计算当前线程的随机数threadLocalRandomProbe
,以减少下次访问cells
元素的冲突。
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
如何保证线程操作被分配的 Cell 元素的原子性
为了保证变量的可见性将value
声明为volatile
。
使用CAS
保证了value
的原子性。
@sun.misc.Contended
为了避免伪共享。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}