一 成员变量解析
1 transient volatile Node<K,V>[] table
默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。
2 private transient volatile Node<K,V>[] nextTable
默认为null,扩容时新生成的数组,其大小为原数组的两倍。
3 Node
保存key,value及key的hash值的数据结构。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K, V> next;
Node(int hash, K key, V val, Node<K, V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}
4 ForwardingNode
一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
5 static final int MOVED = -1
扩容节点的hash值
6 private transient volatile int sizeCtl
默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
-1 代表table正在初始化
-N 表示有N-1个线程正在进行扩容操作
其余情况:
(1) 如果table未初始化,表示table需要初始化的大小。
(2) 如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。
7 private transient volatile long baseCount
map中元素的长度
二 方法解析
1 实例初始化
实例化ConcurrentHashMap时带参数时,会根据参数调整table的大小,假设参数为100,最终会调整成256,确保table的大小总是2的幂次方,算法如下:
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
2 添加元素操作
(1)初始化tab
前面已经提到过,table初始化操作会延缓到第一次put行为。但是put是可以并发执行的,Doug Lea是如何实现table只初始化一次的?让我们来看看源码的实现:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一个线程发现sizeCtl<0(即sizeCtl=-1),意味着另外的线程执行CAS操作成功,
//当前线程只需要让出cpu时间片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,表示正在进行tab初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// table初始化完成,表示table的容量,默认是table大小的0.75倍
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
sizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。所以执行第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,有且只有一个线程能够修改成功,其它线程通过Thread.yield()让出CPU时间片等待table初始化完成。
(2)put()
假设table已经初始化完成,put操作采用CAS+synchronized实现并发插入或更新操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
// ConcurrentHashMap不允许key或者value为null,这点和HashMap不同
if (key == null || value == null) throw new NullPointerException();
// 对key的hash进行二次hash,减少散列冲突
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果tab为空,则调用initTable()初始化tab
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// tab[ (n - 1) & hash]为空节点,则直接创建一个新的节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// Doug Lea采用Unsafe.getObjectVolatile来获取,也许有人质疑,直接table[index]不可以么,
// 为什么要这么复杂?在java内存模型中,我们已经知道每个线程都有一个工作内存,
// 里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最
// 新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,
// 保证了每次拿到数据都是最新的。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 检查table[i]的节点的hash是否等于MOVED,如果等于MOVED,则检测到正在扩容,帮助其扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else { // 节点f.hash不是MOVED
V oldVal = null;
// 锁定,(hash值相同的链表的头节点)
synchronized (f) {
// 避免多线程竞争,需要重新检查
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
/*
*下面的代码就是先查找链表中是否出现了此key,如果出现,则更新value,跳出循环
*否则将节点加入到链表末尾并跳出循环
*/
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
/*
* 到节点末尾 直接在节点末尾加入新node
*/
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是树节点,则将节点夹入树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果链表长度已经达到临界值8,就需要把链表转换为树结构
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果put是更新已有的key-value,那么返回旧value
if (oldVal != null)
return oldVal;
break;
}
}
}
// 将当前ConcurrentHashMap的元素数量+1
addCount(1L, binCount);
return null;
}
(3)ConcurrentHashMap的三个原子操作
/**
* 获取tab在i位置上的节点
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/**
* 利用CAS方法设置tab在i位置上的节点
* 将tab[i]和c作比较,相等则将tab[i]设置为v
* 否则不做修改
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/**
* 设置节点位置的值,仅在上锁区被调用
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put()方法的大致流程如下:
计算key两次哈希后的hash值
if(tab为空)
对tab进行初始化
计算tab索引index = hash & (tab.size-1)
else if(tab[index]为空)
直接为tab[index]建立新节点,退出循环
else if(tab[index].hash == MOVE,即tab[index]正在进行扩容)
调用helpTransfer(),帮助tab[index]扩容
else
用synchronized锁定和key的hash值相同的链表的头节点
if(tab[index].hash > 0)
通过key和key的hash找到要更新的节点,若没有则在链表末尾添加新的node节点
else if(tab[index]是树节点)
在树种更新节点
增加baseCount的值
2 获取元素
(1)get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果节点e的hash值小于0,说明该节点正在扩容,则调用节点的find()方法找寻
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
(2)find
/**
* Node类的find函数,通过遍历以该节点为首的链表来找寻匹配的值
*/
Node<K, V> find(int h, Object k) {
Node<K, V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
3 扩容
(1)ForwardingNode:
一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
(2)扩容触发时机
- 如果新增节点之后,所在链表的元素个数达到了阈值 8 ,则会调用 treeifyBin 方法把链表转换成红黑树,不过在结构转换之前,会对数组长度进行判断。
如果数组长度n小于阈值 MIN_TREEIFY_CAPACITY ,默认是64,则会调用 tryPresize 方法把数组长度扩大到原来的两倍,并触发 transfer 方法,重新调整节点的位置。 - put方法时,如果当前节点的hash为-1,则调用helpTransfer方法来触发扩容
- addCount方法记录元素个数后检查是否需要进行扩容,当数组元素个数达到阈值时,会触发 transfer 方法,重新调整节点的位置。
(3)transfer
transfer方法代码略长,逐块分析:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
transfer 方法实现了在并发的情况下,高效的从原始组数往新数组中移动元素,假设扩容之前节点的分布如下,这里区分蓝色节点和红色节点,是为了后续更好的分析:
在上图中,第14个槽位插入新节点之后,链表元素个数已经达到了8,且数组长度为16,优先通过扩容来缓解链表过长的问题,实现如下:
- 根据当前数组长度n来新建一个长度为2n的nextTable
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
- 初始化 ForwardingNode 节点,其中保存了新数组 nextTable 的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
- 通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过CAS设置 transferIndex 属性值,并初始化 i 和 bound 值, i 指当前处理的槽位序号, bound 指需要处理的槽位边界,先处理槽位15的节点;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
- 如果索引为15的节点tab[15]为空,则通过CAS插入ForwardingNode 节点表示该节点已经被其他线程处理过
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
- 如果索引为15的节点tab[15]的hash值等于-1,说明其他线程已经处理了该节点(该节点已经转换完毕),则直接跳过,处理索引为14的节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
- 处理槽位14的节点tab[14],是一个链表结构,先定义两个变量节点 ln 和 hn ,分别保存hash值的第X位为0和1的节点(X是由n来确定的,2^X=table.length),具体实现如下:
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
使用 fn&n 可以快速把链表中的元素区分成两类,A类是hash值的第X位为0,B类是hash值的第X位为1,并通过 lastRun 记录最后需要处理的节点,A类和B类节点可以分散到新数组的槽位14和30中,在原数组的槽位14中,蓝色节点第X为0,红色节点第X为1,把链表拉平显示如下:
- 通过遍历链表,记录 runBit 和 lastRun ,分别为1和节点6,所以设置 hn 为节点6, ln 为null;
-
重新遍历链表,以 lastRun 节点为终止条件,根据第X位的值分别构造ln链表和hn链表
ln链:和原来链表相比,顺序已经不一样了
hn链:通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置
4 size计算
(1)addCount
1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,
// 则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 使用CounterCell记录元素个数的变化
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,
// 并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
(2)size()
因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数
三 ConcurrentHashMap的红黑树实现分析
1 什么是红黑树?
红黑树是一种特殊的二叉树,主要用它存储有序的数据,提供高效的数据检索,添加、查找、删除等操作的时间复杂度为O(lgn),每个节点都有一个标识位表示颜色,红色或黑色,有如下5种特性:
(1)每个节点要么红色,要么是黑色
(2)根节点一定是黑色的
(3)每个空叶子节点必须是黑色的
(4)如果一个节点是红色的,那么它的两个子节点必须是黑色的
(5)从一个节点到该节点的子孙节点的所有路径包含相同个数的黑色节点
注意到性质4导致了路径不能有两个毗连的红色节点就足够了。最短的可能路径都是黑色节点,最长的可能路径有交替的红色和黑色节点。因为根据性质5所有最长的路径都有相同数目的黑色节点,这就表明了没有路径能多于任何其他路径的两倍长。
这些约束强制了红黑树的关键性质: 从根到叶子的最长的可能路径不多于最短的可能路径的两倍长。
2 链表转为红黑树
在1.8的实现中,当一个链表中的元素达到8个且tab长度大于等于64,会调用treeifyBin()方法把链表结构转化成红黑树结构,实现如下:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 对tab[index]加锁,生成树节点链
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍历tab[index]中的节点,将节电转化为一条树节点链
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 将TreeNode链表转化为TreeBin(即红黑树)
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}
从上述实现可以看出:并非一开始就创建红黑树结构,如果当前Node数组长度小于阈值MIN_TREEIFY_CAPACITY,默认为64,先通过扩大数组容量为原来的两倍以缓解单个链表元素过大的性能问题。
static final class TreeBin<K,V> extends Node<K,V> {
// 红黑树的根节点
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
// r保存每次插入新节点后的树的根节点
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
// 如果没有根节点,说明是树是空的,将新添加的节点设为根节点,设置为黑色(特性2)
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
// 如果根节点已存在,从根节点开始利用二分法比较带插入节点和树节点的hash值
// 如果x节点的hash小于树节点hash,则从树节点的左子树开始比较,直到左子树为null时将x插入,
// 然后balanceInsertion对红黑树进行调整
// 如果x节点的hash大于树节点hash,则从树节点的右子树开始比较,直到右子树为null时将x插入,
// 然后balanceInsertion对红黑树进行调整
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
// x节点的hash小于树节点hash,dir=-1
if ((ph = p.hash) > h)
dir = -1;
// x节点的hash大于树节点hash,dir=1
else if (ph < h)
dir = 1;
// 如果x的hash值和p的hash值相等,首先判断节点中的key对象的类是否实现了
// Comparable接口,如果实现Comparable接口,则调用compareTo方法比较两者key的大小,
// 但是如果key对象没有实现Comparable接口,或则compareTo方法返回了0,
// 则继续调用tieBreakOrder方法计算dir值
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
// dir小于0,从左子树开始比较;dir大于0,从右子树开始比较
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
// 插入后做平衡调整
r = balanceInsertion(r, x);
break;
}
}
}
}
// 最后设置root为节点全部插入后的根节点
this.root = r;
assert checkInvariants(root);
}
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
TreeNode<K,V> x) {
// x是当前要处理的节点,xp为x的父节点,xpp为xp的父节点,xppl为xpp的左子节点,
// xppr为xpp的右子节点
x.red = true;
for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
// 如果xp为null,则将x设为根节点,返回x,循环退出
if ((xp = x.parent) == null) {
x.red = false;
return x;
}
// 如果xp为黑色或xpp为null,返回root,循环退出
else if (!xp.red || (xpp = xp.parent) == null)
return root;
// 如果xp是父节点的左子节点
if (xp == (xppl = xpp.left)) {
// 如果xp的兄弟节点(父节点的右子节点)是红色,则将xp和其兄弟节点都变为黑色,
// xpp变为红色,并将当前节点设为xpp,继续循环
if ((xppr = xpp.right) != null && xppr.red) {
xppr.red = false;
xp.red = false;
xpp.red = true;
x = xpp;
}
// 如果xp的兄弟节点(父节点的右子节点)为null或颜色是黑色
else {
// 如果x是xp的右子节点,则对xp进行左旋
if (x == xp.right) {
root = rotateLeft(root, x = xp);
xpp = (xp = x.parent) == null ? null : xp.parent;
}
// 如果xp不为null,则设xp为黑色
if (xp != null) {
xp.red = false;
// 如果xpp不为null,则设xpp为红色,对xpp进行右旋
if (xpp != null) {
xpp.red = true;
root = rotateRight(root, xpp);
}
}
}
}
// 如果xp是父节点的右子节点
else {
if (xppl != null && xppl.red) {
xppl.red = false;
xp.red = false;
xpp.red = true;
x = xpp;
}
else {
if (x == xp.left) {
root = rotateRight(root, x = xp);
xpp = (xp = x.parent) == null ? null : xp.parent;
}
if (xp != null) {
xp.red = false;
if (xpp != null) {
xpp.red = true;
root = rotateLeft(root, xpp);
}
}
}
}
}
}
}
由此我们可以看到将Node链表转化为红黑树的大致步骤如下:
- 首先调用treeifyBin(Node<K,V>[] tab, int index) 函数将Node链表依次转化为TreeNode链表
- 将TreeNode链表转化为TreeBin对象,在转化过程中建立红黑树,建立过程如下:
(1)在TreeBin的构造函数中依次遍历TreeNode中的节点并添加到红黑树中;
(2)如果没有根节点,说明是树是空的,将新添加的节点设为根节点,设置为黑色,继续执行(1);
(3)否则,从根节点开始利用二分法比较带插入节点和树节点的hash值。如果x节点的hash小于树节点hash,则从树节点的左子树开始比较,直到左子树为null时将x插入,然后balanceInsertion对红黑树进行调整,继续执行(1);
(4)如果x节点的hash大于树节点hash,则从树节点的右子树开始比较,直到右子树为null时将x插入,然后balanceInsertion对红黑树进行调整,继续执行(1);
(5)如果x的hash值和p的hash值相等,首先判断节点中的key对象的类是否实现了Comparable接口,如果实现Comparable接口,则调用compareTo方法比较两者key的大小,但是如果key对象没有实现Comparable接口,或则compareTo方法返回了0,则继续调用tieBreakOrder方法计算dir值,得到dir值后将x插入,然后balanceInsertion对红黑树进行调整,继续执行(1); - balanceInsertion函数会在插入新节点后检查当前红黑树是否符合上述的5个性质,如果添加新节点打破了部分特性,则通过改变节点颜色或节点旋转(左旋和右旋),具体分析见上面源码注释
3 红黑树锁的实现
当tab[index]为红黑树时,为了控制多线程并发访问,红黑树自己实现了锁。ConcurrentHashMap中红黑树的锁是基于状态位+CAS+LockSupport实现的
// 红黑树根节点
TreeNode<K,V> root;
// TreeNode链表的头结点
volatile TreeNode<K,V> first;
volatile Thread waiter;
// 锁状态
volatile int lockState;
// 获取了写锁的状态
static final int WRITER = 1;
// 等待写锁的状态
static final int WAITER = 2;
static final int READER = 4; // increment value for setting read lock
/**
* Acquires write lock for tree restructuring.
*/
private final void lockRoot() {
// 如果当前状态位0,则修改状态为WRITER
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
// 写锁获取失败,则竞争获取
contendedLock(); // offload to separate method
}
/**
* Releases write lock for tree restructuring.
*/
private final void unlockRoot() {
// 释放锁,将锁状态恢复0
lockState = 0;
}
/**
* Possibly blocks awaiting root lock.
*/
private final void contendedLock() {
boolean waiting = false;
// 自旋进行锁竞争
for (int s;;) {
// 如果当前锁状态为WAITER(等待写锁)
if (((s = lockState) & ~WAITER) == 0) {
// 尝试获取写锁
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
// 写锁获取成功,将等待线程设为null,返回
if (waiting)
waiter = null;
return;
}
}
// 如果当前状态不是等待状态
else if ((s & WAITER) == 0) {
// 设置当前状态为等待状态
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
waiting = true;
waiter = Thread.currentThread();
}
}
else if (waiting)
// 如果处于等待状态,则阻塞该线程
LockSupport.park(this);
}
}
综上我们可以看出获取红黑树获取写锁的流程:
- 通过CAS来设置当前缩状态为写锁状态(尝试获取写锁);
- 如果成功获取写锁,则退出,否则继续3;
- 进入contendedLock中自旋地获取写锁,获取成功则返回,否则线程阻塞,等待其他线程释放写锁;
红黑树释放写锁:
- 设置当前锁状态为0;
4 红黑树节点删除
红黑树删除节点的一共分为两步:
- 在TreeNode链表中删除节点x;
- 在红黑树中找到x的后继节点s,然后将x和s交换位置以及颜色,之后调用balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x)调整红黑树
// p是待删除节点
TreeNode<K,V> next = (TreeNode<K,V>)p.next;
TreeNode<K,V> pred = p.prev; // unlink traversal pointers
TreeNode<K,V> r, rl;
if (pred == null)
first = next;
else
pred.next = next;
if (next != null)
next.prev = pred;
if (first == null) {
root = null;
return true;
}
if ((r = root) == null || r.right == null || // too small
(rl = r.left) == null || rl.left == null)
return true;
上面的代码执行第一步操作,删除TreeNode链表中x节点
lockRoot();
try {
TreeNode<K,V> replacement;
TreeNode<K,V> pl = p.left;
TreeNode<K,V> pr = p.right;
if (pl != null && pr != null) {
TreeNode<K,V> s = pr, sl;
while ((sl = s.left) != null) // find successor
s = sl;
boolean c = s.red; s.red = p.red; p.red = c; // swap colors
TreeNode<K,V> sr = s.right;
TreeNode<K,V> pp = p.parent;
if (s == pr) { // p was s's direct parent
p.parent = s;
s.right = p;
}
else {
TreeNode<K,V> sp = s.parent;
if ((p.parent = sp) != null) {
if (s == sp.left)
sp.left = p;
else
sp.right = p;
}
if ((s.right = pr) != null)
pr.parent = s;
}
p.left = null;
if ((p.right = sr) != null)
sr.parent = p;
if ((s.left = pl) != null)
pl.parent = s;
if ((s.parent = pp) == null)
r = s;
else if (p == pp.left)
pp.left = s;
else
pp.right = s;
if (sr != null)
replacement = sr;
else
replacement = p;
}
else if (pl != null)
replacement = pl;
else if (pr != null)
replacement = pr;
else
replacement = p;
if (replacement != p) {
TreeNode<K,V> pp = replacement.parent = p.parent;
if (pp == null)
r = replacement;
else if (p == pp.left)
pp.left = replacement;
else
pp.right = replacement;
p.left = p.right = p.parent = null;
}
root = (p.red) ? r : balanceDeletion(r, replacement);
if (p == replacement) { // detach pointers
TreeNode<K,V> pp;
if ((pp = p.parent) != null) {
if (p == pp.left)
pp.left = null;
else if (p == pp.right)
pp.right = null;
p.parent = null;
}
}
} finally {
unlockRoot();
}
上述代码执行第二步,在红黑树中找到x的后继节点s,然后将x和s交换位置以及颜色,最后进行红黑树的平衡调整