摘要:ThreadLocal是并发场景下用来解决变量共享问题的类,它能使原本线程间共享的对象进行线程隔离,即一个对象只对一个线程可见。但由于过度设计,比如使用弱引用和哈希碰撞,导致理解难度大、使用成本高,反而成为故障高发点,容易出现内存泄漏、脏数据、共享对象更新等问题。
关键字:ThreadLocal、ThreadLcoalMap、HashCode、1640531527、AtomicInteger、CAS、ABA问题。
一、基础实验
package threadlocal;
import root.Log;
public class ThreadLocalVar {
private static final String TAG = "ThreadLocalVar";
public static void main(String[] args) {
ThreadLocal<String> value01 = new ThreadLocal<>();
value01.set("hello world! in 01");
Log.i(TAG, Thread.currentThread().getName() + ": " + value01.get());
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
ThreadLocal<String> value02 = new ThreadLocal<>();
value02.set("hello world! in 02");
Log.i(TAG, Thread.currentThread().getName() + ": " + value01.get());
Log.i(TAG, Thread.currentThread().getName() + ": " + value02.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
[ThreadLocalVar] main: hello world! in 01
[ThreadLocalVar] Thread-0: null
[ThreadLocalVar] Thread-0: hello world! in 02
结论:
- 不同的线程不可以共享变量。例如上述<code>value01</code>在子线程中不可见。
- 通过使用<code>ThreadLocal</code>为键,在当前线程中存储一个对象。
二、Q&T&A
通过阅读源码部分,可以知道每个线程都会存储一个<code>ThreadLocalMap</code>,来维护当前线程的<code>ThreadLocal</code>对象。
1. <code>main</code>方法主线程的ThreadLocalMap
Q1:通过<code>main</code>方法启动的线程,其是否有初始化<code>ThreadLocalMap</code>?
T1:测试代码
- 在<code>main</code>线程实例化一个<code>ThreadLocal</code>对象,查看其维护的<code>threadLocalHashCode</code>字段值。
public static void main(String[] args) {
ThreadLocal<String> valueContainer = new ThreadLocal<>();
try {
Class<?> threadLocalClz = Class.forName("java.lang.ThreadLocal");
Field localHashFiled = threadLocalClz.getDeclaredField("threadLocalHashCode");
localHashFiled.setAccessible(true);
int a = (int)(localHashFiled.get(valueContainer));
Log.i(TAG, "hash code: " + a);
} catch (Exception e) {
e.printStackTrace();
}
}
// 结果:[TAG] hash code: 1253254570
- 继续设计实验,测试得到:第7次创建<code>ThreadLocal</code>对象时,得到这个值。说明每个<code>main</code>方法中用户自定义逻辑执行之前,已经创建好了<code>ThradLocalMap</code>。这里测不准到底创建了多少个<code>ThreadLocal</code>变量。
public class Custom {
private static final String TAG = "Custom";
public static void main(String[] args) {
for (int i = 0; i < 15; i++) {
int hashCOde = new ValueContainer().get();
Log.i(TAG, i + ": " + hashCOde);
}
}
}
class ValueContainer{
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
public int get(){
return threadLocalHashCode;
}
}
- 通过后面对<code>Thread.currentThread()</code>的测试,使用<code>new Thread().start();</code>新建立线程时得到和<code>main</code>不一样的结论。在新线程中,<code>map</code>并不会初始化。
A1:打印结果
反复 hashCode & 15
0 7 14 5 12 3 10 1 8 15 6 13 4 11 2 9
point value
0 null
7 class java.lang.ref.SoftReference
14 class java.lang.ref.SoftReference
5 class [Ljava.lang.Object
12 null
3 null
10 custom use
2. 进一步对<code>Thread.currentThread()</code>进行反射操作的测试。
Q2:Q1所测试得到的结论是否能够进一步被证实?
T2:设计实验,通过反射,拿到<code>ThraedLocal.ThreadLocalMap.Entry</code>实例,然后通过哈希值取出value。
public static void main(String[] args) {
ThreadLocal<String> valueContainer0 = new ThreadLocal<String>();
valueContainer0.set("hello world");
ThreadLocal<String> valueContainer = new ThreadLocal<String>();
valueContainer.set("hello");
try {
Class<?> threadClz = Class.forName("java.lang.Thread");
Field mapFiled = threadClz.getDeclaredField("threadLocals");
mapFiled.setAccessible(true);
Object maps = mapFiled.get(Thread.currentThread());
// Log.i(TAG, "class type: " + maps.getClass());
Class<?> threadLocalMapClz = Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
Field tableFiled = threadLocalMapClz.getDeclaredField("table");
tableFiled.setAccessible(true);
Object[] table = (Object[])tableFiled.get(maps);
Class<?> threadLocalMapEntryClz = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry");
Field valueFiled = threadLocalMapEntryClz.getDeclaredField("value");
valueFiled.setAccessible(true);
for (int i = 0; i < table.length; i++) {
// Log.i(TAG, i + ": " + table[i]);
if (table[i] != null) {
Object value = valueFiled.get(table[i]);
Log.i(TAG, "type: " + value.getClass());
Log.i(TAG, i + " value: " + value.toString());
System.out.println();
}
}
Class<?> threadLocalClz = Class.forName("java.lang.ThreadLocal");
Field localHashFiled = threadLocalClz.getDeclaredField("threadLocalHashCode");
localHashFiled.setAccessible(true);
int hashCode = (int)(localHashFiled.get(valueContainer));
Log.i(TAG, "hash code: " + hashCode);
int i = hashCode & (table.length - 1);
Log.i(TAG, "i =: " + i);
} catch (Exception e) {
e.printStackTrace();
}
}
A2:打印结果
[TAG] type: class java.lang.String
[TAG] 1 value: hello
[TAG] type: class [Ljava.lang.Object;
[TAG] 5 value: [Ljava.lang.Object;@7ea987ac
[TAG] type: class java.lang.ref.SoftReference
[TAG] 7 value: java.lang.ref.SoftReference@12a3a380
[TAG] type: class java.lang.String
[TAG] 10 value: hello world
[TAG] type: class java.lang.ref.SoftReference
[TAG] 14 value: java.lang.ref.SoftReference@29453f44
[TAG] hash code: -1401181199
[TAG] i =: 1
3. 对Q1的补充
Q3:当使用<code>new Thread().start();</code>时,如果新建立一个<code>ThreadLocal</code>,此时的map是什么样子的?
T3:实验代码
public class Sample_ThreadLocal {
private static final String TAG = "TAG";
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
ThreadLocal<String> valueContainer = new ThreadLocal<String>();
valueContainer.set("hello");
// 反射代码
}
}).start();
}
}
A3:打印结果
[TAG] current size: 1
[TAG] current threshold: 10
[TAG] current table.length: 16
[TAG] type: class java.lang.String
[TAG] 10 value: hello
[TAG] hash code: 1253254570
[TAG] i =: 10
三、线程安全的<code>AtomicInteger</code>
1. ABA问题。
CAS乐观锁机制确实能够提升吞吐,并保证一致性,但在极端情况下可能会出现ABA问题。
(1). 场景一:库存数量
- 并发1(上):获取出数据的初始值是A,后续计划实施CAS乐观锁,期望数据仍是A的时候,修改才能成功
- 并发2:将数据修改成B
- 并发3:将数据修改回A
- 并发1(下):CAS乐观锁,检测发现初始值还是A,进行数据修改
上述并发环境下,并发1在修改数据时,虽然还是A,但已经不是初始条件的A了。中间发生了A变B,B又变A的变化,此A已经非彼A,数据却成功修改,可能导致错误,这就是CAS引发的所谓的ABA问题。
(2). 场景二:堆栈实现
有如下一个堆栈,
- 并发1(上):获取出数据的初始值是A,后续计划实施CAS乐观锁,期望数据仍是A的时候,修改才能成功
- 并发2:将A出栈
- 并发3:将B出栈
- 并发1(下):CAS乐观锁,检测发现初始值还是A,进行数据修改
(3). 分析
ABA问题导致的原因,是CAS过程中只简单进行了“值”的校验,再有些情况下,“值”相同不会引入错误的业务逻辑(例如库存),有些情况下,“值”虽然相同,却已经不是原来的数据了。
(4). Java中的解决方案
- <code>AtomicStampedReference</code>:内部维护了对象值和版本号,在创建<code>AtomicStampedReference</code>对象时,需要传入初始值和初始版本号, 当<code>AtomicStampedReference</code>设置对象值时,对象值以及状态戳都必须满足期望值,写入才会成功。
- <code>AtomicMarkableReference </code>:<code>AtomicStampedReference</code>可以给引用加上版本号,追踪引用的整个变化过程,如:A -> B -> C -> D - > A,通过<code>AtomicStampedReference</code>,我们可以知道,引用变量中途被更改了3次 但是,有时候,我们并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了<code>AtomicMarkableReference </code>,<code>AtomicMarkableReference </code>的唯一区别就是不再用int标识引用,而是使用boolean变量——表示引用变量是否被更改过。
2. CAS在<code>AtomicInteger</code>中的应用
AtomicInteger.class:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
Unsafe.class:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
其中<code>compareAndSwapInt</code>详解:
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
// 此方法是Java的native方法,并不由Java语言实现。
// 方法的作用是,读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较。
// 相等就把x值赋值给offset位置的值。方法返回true。
// 不相等,就取消赋值,方法返回false。
四、源码解读
1. 场景一:一个新的线程使用<code>ThreadLocal</code>的<code>set</code>和<code>get</code>方法
void main(){
// 1. 使用 无参的构造方法初始化ThreadLocal对象。
ThreadLocal<String> threadLocal = new ThreadLocal<>();
threadLocal.set("hello world!");
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
// 2. 上面的可以不看,直接走到了这里 T:当前线程 V:值
createMap(t, value);
}
void createMap(Thread t, T firstValue) {
// 3. 在createMap方法中给当前线程的map赋了初值
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap.class{
private static final int INITIAL_CAPACITY = 16;
private Entry[] table;
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 4. 新建了一个数组map。长度是固定值 = 16
table = new Entry[INITIAL_CAPACITY];
// 5. 计算 当前local的哈希值 与上 0...0 1111 1111
// 5.2疑问:初始化操作只会在这里,所以i = 0
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
}
Thread.class{
ThreadLocal.ThreadLocalMap threadLocals = null;
}
ThreadLocal.class{
// 5.1 local的哈希值赋初值
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
// 5.2 疑问:阅读源码可知,getAndAdd返回的值是AtomicInteger原来的值,那么初始化的时候就是返回0;
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
}
// 6. get方法的初始化使用和上述过程类似。不再赘述。
2. 场景二:一个一个线程已经拥有了一个map实例对象并使用local存储了value
(1). <code>get</code>。
ThreadLocal.class {
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
// 1. 此时会使用map的getEntry来获取value
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 如果是初始化的情况则会直接返回通过重写initValue时回调的value
return setInitialValue();
}
}
ThreadLocalMap.class {
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
}
上述代码不难理解。此时我们考虑当一个线程已经初始化了map后,如何<code>set</code>和<code>get</code>
(2). <code>set</code>。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
// 1. 此时会调用map的set方法。
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap.class{
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
// 2.1 新建的时候 e == null。
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
// 2.2 当ThreadLocal被回收了怎么办?
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 3. 新建一个Entry用来存储value
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 5. 扩容处理
rehash();
}
// 参数
// i:最新添加的Entry的下标
// n:当前存储了多少个Entry
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
// 4. e.get()是创建Entry传递的ThreadLocal
if (e != null && e.get() == null) {
// 4.1 当ThreadLocal被回收了
n = len;
removed = true;
// 4.2 释放老旧的Entry
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
}
五、其他细节
考虑到<code>Entry</code>继承了<code>WeakReference</code>,关于它以及它父类的实现,还有很多的细节需要注意。
- 每次<code>set</code>之后都会检查清除stale的<code>Entry</code>,并监测是否需要触发扩容。
- 当<code>ThreadLocalMap.size >= threshold</code>时,hash表就会触发扩容。
- 神奇的魔数<code>1640531527</code>,还需要再算一下数学原理。(连续生成的哈希码之间的差异(增量值),将隐式顺序线程本地id转换为几乎最佳分布的乘法哈希值,这些不同的哈希值最终生成一个2的幂次方的哈希表。)