ConcurrentHashMap 是一个支持并发检索和并发更新的线程安全的HashMap(但不允许空key或value)。不管是在实际工作或者是面试中,ConcurrentHashMap 都是在整个JUC集合框架里出现频率最高的一个类,所以,对ConcurrentHashMap有一个深入的认识对我们自身还是非常重要的。本章我们来从源码层面详细分析 ConcurrentHashMap 的实现(基于JDK 8),希望对大家有所帮助。
1. 概述
ConcurrentHashMap 在JDK 7之前是通过Lock和Segment(分段锁)实现并发安全,JDK 8之后改为CAS+synchronized来保证并发安全(为了序列化兼容,JDK 8的代码中还是保留了Segment的部分代码)。由于笔者没有过多研究过JDK 7的源码,所以我们后面的分析主要针对JDK 8。
首先来看一下ConcurrentHashMap、HashMap和HashTable的区别:
- HashMap 是非线程安全的哈希表,常用于单线程程序中。
- Hashtable 是线程安全的哈希表,由于是通过内置锁 synchronized 来保证线程安全,在资源争用比较高的环境下,Hashtable 的效率比较低。
- ConcurrentHashMap 是一个支持并发操作的线程安全的HashMap,但是他不允许存储空key或value。使用CAS+synchronized来保证并发安全,在并发访问时不需要阻塞线程,所以效率是比Hashtable 要高的。
2. 数据结构
ConcurrentHashMap 是通过Node来存储K-V的,从上图可以看出,它的内部有很多Node节点(在内部封装了一个Node数组-table
),不同的节点有不同的作用。下面我们来详细看一下这几个Node节点类:
-
Node<K, V>: 保存k-v、k的
hash
值和链表的 next 节点引用,其中 V 和 next 用volatile
修饰,保证多线程环境下的可见性。 - TreeNode<K, V>: 红黑树节点类,当链表长度>=8且数组长度>=64时,Node 会转为 TreeNode,但它不是直接转为红黑树,而是把这些 TreeNode 节点放入TreeBin 对象中,由 TreeBin 完成红黑树的封装。
- TreeBin<K, V>: 封装了 TreeNode,红黑树的根节点,也就是说在 ConcurrentHashMap 中红黑树存储的是 TreeBin 对象。
- ForwardingNode<K, V>: 在节点转移时用于连接两个 table(table和nextTable)的节点类。包含一个 nextTable 指针,用于指向下一个table。而且这个节点的 k-v 和 next 指针全部为 null,hash 值为-1。只有在扩容时发挥作用,作为一个占位节点放在 table 中表示当前节点已经被移动。
-
ReservationNode<K,V>: 在
computeIfAbsent
和compute
方法计算时当做一个占位节点,表示当前节点已经被占用,在compute
或computeIfAbsent
的 function 计算完成后插入元素。hash值为-3。
2.1 核心参数
//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//初始容量
private static final int DEFAULT_CAPACITY = 16;
//数组最大容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认并发度,兼容1.7及之前版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加载/扩容因子,实际使用n - (n >>> 2)
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树的节点数阀值
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表的节点数阀值
static final int UNTREEIFY_THRESHOLD = 6;
//当数组长度还未超过64,优先数组的扩容,否则将链表转为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容时任务的最小转移节点数
private static final int MIN_TRANSFER_STRIDE = 16;
//sizeCtl中记录stamp的位数
private static int RESIZE_STAMP_BITS = 16;
//帮助扩容的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//size在sizeCtl中的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//存放Node元素的数组,在第一次插入数据时初始化
transient volatile Node<K,V>[] table;
//一个过渡的table表,只有在扩容的时候才会使用
private transient volatile Node<K,V>[] nextTable;
//基础计数器值(size = baseCount + CounterCell[i].value)
private transient volatile long baseCount;
//控制table初始化和扩容操作
private transient volatile int sizeCtl;
//节点转移时下一个需要转移的table索引
private transient volatile int transferIndex;
//元素变化时用于控制自旋
private transient volatile int cellsBusy;
// 保存table中的每个节点的元素个数 2的幂次方
// size = baseCount + CounterCell[i].value
private transient volatile CounterCell[] counterCells;
table:Node数组,在第一次插入元素的时候初始化,默认初始大小为16,用来存储Node节点数据,扩容时大小总是2的幂次方。
nextTable:默认为null,扩容时生成的新的数组,其大小为原数组的两倍。
sizeCtl :默认为0,用来控制table的初始化和扩容操作,在不同的情况下有不同的涵义:
- -1 代表table正在初始化
- -N 表示有N-1个线程正在进行扩容操作
- 初始化数组或扩容完成后,将
sizeCtl
的值设为0.75*n - 在扩容操作在进行节点转移前,
sizeCtl
改为(hash << RESIZE_STAMP_SHIFT) + 2
,这个值为负数,并且每有一个线程参与扩容操作sizeCtl
就加1
transferIndex:扩容时用到,初始时为table.length
,表示从索引 0 到transferIndex
的节点还未转移 。
counterCells: ConcurrentHashMap的特定计数器,实现方法跟LongAdder
类似。这个计数器的机制避免了在更新时的资源争用,但是如果并发读取太频繁会导致缓存超负荷,为了避免读取太频繁,只有在添加了两个以上节点时才可以尝试扩容操作。在统一hash分配的前提下,发生这种情况的概率在13%左右,也就是说只有大约1/8的put操作才会检查扩容(并且在扩容后会更少)。
hash计算公式:hash = (key.hashCode ^ (key.hashCode >>> 16)) & HASH_BITS
索引计算公式:(table.length-1)&hash
3. 源码解析
3.1 put(K, V)
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//自旋
//f:索引节点; n:tab.length; i:新节点索引 (n - 1) & hash; fh:f.hash
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//索引i节点为空,直接插入
//cas插入节点,成功则跳出循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当前节点处于移动状态-其他线程正在进行节点转移操作
else if ((fh = f.hash) == MOVED)
//帮助转移
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {//check stable
//f.hash>=0,说明f是链表的头结点
if (fh >= 0) {
binCount = 1;//记录链表节点数,用于后面是否转换为红黑树做判断
for (Node<K,V> e = f;; ++binCount) {
K ek;
//key相同 修改
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//到这里说明已经是链表尾,把当前值作为新的节点插入到队尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树节点操作
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//更新新元素个数
addCount(1L, binCount);
return null;
}
说明:在 ConcurrentHashMap 中,put 方法几乎涵盖了所有内部的函数操作。所以,我们将从put函数开始逐步向下分析。
首先说一下put的流程,后面再详细分析每一个流程的具体实现(阅读时请结合源码):
- 计算当前key的hash值,根据hash值计算索引 i
(i=(table.length - 1) & hash)
; - 如果当前
table
为null,说明是第一次进行put操作,调用initTable()
初始化table
; - 如果索引 i 位置的节点 f 为空,则直接把当前值作为新的节点直接插入到索引 i 位置;
- 如果节点 f 的
hash
为-1(f.hash == MOVED(-1)
),说明当前节点处于移动状态(或者说是其他线程正在对 f 节点进行转移/扩容操作),此时调用helpTransfer(tab, f)
帮助转移/扩容; - 如果不属于上述条件,说明已经有元素存储到索引 i 处(即发生了哈希碰撞),此时需要对索引 i 处的节点 f 进行 put or update 操作,首先使用内置锁
synchronized
对节点 f 进行加锁:
- 如果
f.hash>=0
,说明 i 位置是一个链表,并且节点 f 是这个链表的头节点,则对 f 节点进行遍历,此时分两种情况:- 如果链表中某个节点e的key与当前key的hash相同,则对这个节点e的value进行修改操作。
- 如果遍历到链表尾都没有找到与当前key相同的节点,则把当前K-V作为一个新的节点插入到这个链表尾部。
- 如果节点 f 是
TreeBin
节点(f instanceof TreeBin
),说明索引 i 位置的节点是一个红黑树,则调用putTreeVal
方法找到一个已存在的节点进行修改,或者是把当前K-V放入一个新的节点(put or update)。
- 完成插入后,如果索引 i 处是一个链表,并且在插入新的节点后节点数>8,则调用
treeifyBin
把链表转换为红黑树。 - 最后,调用
addCount
更新元素数量
3.1.1 initTable()
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)//其他线程正在进行初始化或转移操作,让出CPU执行时间片,继续自旋
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS设置sizectl为-1 表示当前线程正在进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//0.75*n 设置扩容阈值
}
} finally {
sizeCtl = sc;//初始化sizeCtl=0.75*n
}
break;
}
}
return tab;
}
说明:初始化操作,ConcurrentHashMap的初始化在第一次插入数据的时候(判断table是否为null),注意初始化操作为单线程操作(如果有其他线程正在进行初始化,则调用Thread.yield()让出CPU时间片,自旋等待table初始完成)。
3.1.2 helpTransfer(Node<K,V>[], Node<K,V>)
//帮助其他线程进行转移操作
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//计算操作栈校验码
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)//不需要帮助转移,跳出
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//CAS更新帮助转移的线程数
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
说明: 如果索引到的节点的 hash 为-1,说明当前节点处于移动状态(或者说是其他线程正在对 f 节点进行转移操作。这里主要是靠 ForwardingNode 节点来检测,在transfer
方法中,被转移后的节点会改为ForwardingNode,它是一个占位节点,并且hash=MOVED(-1),也就是说,我们可以通过判断hash是否为MOVED来确定当前节点的状态),此时调用helpTransfer(tab, f)
帮助转移,主要操作就是更新帮助转移的线程数(sizeCtl+1),然后调用transfer
方法进行转移操作,transfer
后面我们会详细分析。
3.1.3 treeifyBin(Node<K,V>[] , index)
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//当数组长度还未超过64,优先数组的扩容,否则将链表转为红黑树
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//两倍扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {//check stable
//hd:节点头
TreeNode<K,V> hd = null, tl = null;
//遍历转换节点
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
说明: 在put操作完成后,如果当前节点为一个链表,并且链表长度>=TREEIFY_THRESHOLD(8),此时就需要调用treeifyBin
方法来把当前链表转为一个红黑树。treeifyBin
主要进行两步操作:
- 如果当前table长度还未超过
MIN_TREEIFY_CAPACITY(64)
,则优先对数组进行扩容操作,容量为原来的2倍(n<<1)。 - 否则就对当前节点进行转换操作(注意这个操作是单线程完成的)。遍历链表节点,把Node转换为TreeNode,然后在通过TreeBin来构造红黑树(红黑树的构造这里就不在详细介绍了)。
3.1.4 tryPresize(int size)
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);//计算一个近似size的2的幂次方数
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//未初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//已达到最大容量
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
//正在进行扩容操作
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
说明: 当table容量不足时,需要对其进行两倍扩容。tryPresize
方法很简单,主要就是用来检查扩容前的必要条件(比如是否超过最大容量),真正的扩容其实也可以叫“节点转移”,主要是通过transfer
方法完成。
3.1.5 transfer(Node<K,V> tab, Node<K,V> nextTab)
//转移或复制节点到新的table
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//转移幅度( tab.length/(NCPU*8) ),最小为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//根据当前数组长度,新建一个两倍长度的数组nextTab
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//初始为table的最后一个索引
}
int nextn = nextTab.length;
//初始化ForwardingNode节点,持有nextTab的引用,在处理完每个节点之后当做占位节点,表示该槽位已经处理过了
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;//节点是否已经处理
boolean finishing = false; // to ensure sweep before committing nextTab
//自旋移动每个节点,从transferIndex开始移动stride个节点到新的table。
//i:当前处理的Node索引;bound:需要处理节点的索引边界
for (int i = 0, bound = 0;;) {
//f:当前处理i位置的node; fh:f.hash
Node<K,V> f; int fh;
//通过while循环获取本次需要移动的节点索引i
while (advance) {
//nextIndex:下一个要处理的节点索引; nextBound:下一个需要处理的节点的索引边界
int nextIndex, nextBound;
if (--i >= bound || finishing)//通过--i控制下一个需要移动的节点
advance = false;
//节点已全部转移
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//transferIndex(初值为最后一个节点的索引),表示从transferIndex开始后面所有的节点都已分配,
//每次线程领取扩容任务后,需要更新transferIndex的值(transferIndex-stride)。
//CAS修改transferIndex,并更新索引边界
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//已完成转移,更新相关属性
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//1.5*n 扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
return;
}
//当前线程已经完成转移,但可能还有其他线程正在进行转移操作
//每个线程完成自己的扩容操作后就对sizeCtl-1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断是否全部任务已经完成,sizeCtl初始值=(rs << RESIZE_STAMP_SHIFT) + 2)
//这里判断如果还有其他线程正在操作,直接返回,否则的话重新初始化i对原tab进行一遍检查然后再提交
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);//i位置节点为空,替换为ForwardingNode节点,用于通知其他线程该位置已经处理
else if ((fh = f.hash) == MOVED)//节点已经被其他线程处理过,继续处理下一个节点
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {//check stable
//处理当前拿到的节点,构建两个node:ln/hn。ln:原位置; hn:i+n位置
Node<K,V> ln, hn;
if (fh >= 0) {//当前为链表节点(fh>=0)
//使用fn&n把原链表中的元素分成两份(fn&n = n or 0)
//在表扩容2倍后,索引i可能发生改变,如果原table长度n=2^x,如果hash的x位为1,此时需要加上x位的值,也就是i+n;
//如果x位为0,索引i不变
int runBit = fh & n; // n or 0
//最后一个与头节点f索引不同的节点
Node<K,V> lastRun = f;
//从索引i的节点开始向后查找最后一个有效节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;//n or 0
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
//把f链表分解为两个链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//在原位置
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
//i+n位置
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//nextTab的i位置插入一个链表
setTabAt(nextTab, i, ln);
//nextTab的i+n位置插入一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
advance = true;
}
/**
* 如果该节点是红黑树结构,则构造树节点lo和hi,遍历红黑树中的节点,同样是根据hash&tab.length算法,
* 把节点分为两类,分别插入索引i和(i+n)位置。
*/
else if (f instanceof TreeBin) {
//转为根结点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;//低位(i)节点和低位尾节点
TreeNode<K,V> hi = null, hiTail = null;//高位(i+n)节点和高位尾节点
int lc = 0, hc = 0;
//从首个节点向后遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//构建树节点
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
//原位置
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
//i+n位置
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
说明: transfer
方法是table扩容的核心实现。由于 ConcurrentHashMap 的扩容是新建一个table,所以主要问题就是如何把旧table的元素转移到新的table上。所以,扩容问题就演变成了“节点转移”问题。首先总结一下需要转移节点(调用transfer)的几个条件:
- 对table进行扩容时
- 在更新元素数目时(addCount方法),元素总数>=sizeCtl(sizeCtl=0.75n,达到扩容阀值),此时也需要扩容
- 在put操作时,发现索引节点正在转移(hash==MOVED),此时需要帮助转移
在进行节点转移之前,首先要做的就是重新初始化sizeCtl
的值(sizeCtl = (hash << RESIZE_STAMP_SHIFT) + 2
),这个值是一个负值,用于标识当前table正在进行转移操作,并且每有一个线程参与转移,sizeCtl
就加1。transfer
执行步骤如下(请结合源码注释阅读):
计算转移幅度
stride
(或者说是当前线程需要转移的节点数),最小为16;创建一个相当于当前 table 两倍容量的 Node 数组,转移完成后用作新的 table;
从
transferIndex
(初始为table.length
,也就是 table 的最后一个节点)开始,依次向前处理stride
个节点。前面介绍过,table 的每个节点都可能是一个链表结构,因为在 put 的时候是根据(table.length-1)&hash
计算出的索引,当插入新值时,如果通过 key 计算出的索引已经存在节点,那么这个新值就放在这个索引位节点的尾部(Node.next)。所以,在进行节点转移后,由于 table.length 变为原来的两倍,所以相应的索引也会改变,这时候就需要对链表进行分割,我们来看一下这个分割算法:
假设当前处理的节点 table[i]=f,并且它是一个链表结构,原table容量为 n=2x,索引计算公式为
i=(n - 1)&hash
。在表扩张后,由于容量 n 变为 2x+1 = 2*2x,所以索引计算就变为i=(2n - 1)&hash
。如果 hash 的 x 位为0,则 hash&(2x-1)=hash,此时 hash&(2x-1) == hash&(2x+1-1),索引位 i 不变;如果 hash 的 x 位为1,则 hash&2x=2x == n,在扩容后 x 变为 x+1,此时的索引需要加上 x 位的值,即 _i=hash&(2x-1) + hash&2x,也就是 i+n。举个栗子:设 n=100000 (25),x=5,hash 为100101(x位是1)。n-1=011111,那么i=hash&(n-1)=000101;扩容后容量变为m=1000000(26),m-1=0111111,那么 i 就变成了 hash&(m-1)=100101,也就是说新的索引位_i = i+n。如果当前节点为红黑树结构,也是利用这个算法进行分割,不同的是,在分割完成之后,如果这两个新的树节点<=6,则调用
untreeify
方法把树结构转为链表结构。
- 最后把操作过的节点都设为 ForwardingNode 节点(hash= MOVED,这样别的线程就可以检测到)。
transfer
操作完成后,table的结构变化如下:
3.1.6 addCount(long,int)
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//counterCells为null,CAS更新baseCount
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);//在线程争用资源时,使用fullAddCount计算更新元素数
return;
}
if (check <= 1)
return;
s = sumCount();//计算元素总数,用于后面的扩容操作
}
if (check >= 0) {
//检查扩容
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//其他线程在进行扩容操作
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
说明: put操作全部完成后,别忘了更新元素数量。addCount
用来更新 ConcurrentHashMap 的元素数,根据所传参数check
决定是否检查扩容,如果需要,调用transfer
方法进行扩容/节点转移。这里面有一个看起来比较复杂的方法fullAddCount
,作用是在线程争用资源时,使用它来计算更新元素数。这个方法的实现类似于LongAdder的add(LongAdder在上面有简单介绍),源码在此就不再详细分析了,有兴趣的同学可以研究下。
4. 总结
到此,ConcurrentHashMap的分析就告一段落了。总的来说源码比较复杂,真正理解它还是需要一些耐心的。重点是它的数据结构和扩容的实现。
ConcurrentHashMap 源码分析到此结束,希望对大家有所帮助,如您发现文章中有不妥的地方,请留言指正,谢谢。