1. 简介
在之前写了HashMap源码解析介绍了HashMap
这个数据结构,可惜它并不是线程安全的,在多线程情况下最好还是使用ConCurrentHashMap
。在JDK8中它使用了CAS+synchronized实现来保证线程安全,而在JDK7中它使用了分段锁的实现方式来保证线程安全。下面我来剖析下它源码:
2. 实现
类继承结构
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {}
属性
// Node数组最大大小
private static final int MAXIMUM_CAPACITY = 1 << 30;
// Node数组最大大小
private static final int DEFAULT_CAPACITY = 16;
// 最大数组大小,只在toArray方法中使用
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转换为红黑树链表长度的阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树转换为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 链表转换为红黑树数组大小的阈值
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 帮助扩容的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // forwarding结点的Hash值
static final int TREEBIN = -2; // 树根结点的Hash值
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 获取可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Node结点的数组
transient volatile Node<K,V>[] table;
//
private transient volatile Node<K,V>[] nextTable;
private transient volatile long baseCount;
// < -1表示有-(sizeCtl + 1)个线程在扩容
// == -1表示数组在初始化
// 大于0表示并且数组为初始化表示要初始化数组的大小,否则表示扩容的阈值
private transient volatile int sizeCtl;
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
结点类型
链表结点类型
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
// 使用volatile修饰来保证变量在多线程下的可见性
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
// 在以当前结点开始的链表中寻找含有k这个键的结点
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
红黑树结点类型
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
// 找出键为k的结点
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
// 省略代码
}
}
// 用于存放红黑树的结构
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first; // TreeNode根结点
volatile Thread waiter; // 等待线程
volatile int lockState; // 锁状态
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
static int tieBreakOrder(Object a, Object b) {
int d;
if (a == null || b == null ||
(d = a.getClass().getName().
compareTo(b.getClass().getName())) == 0)
d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
-1 : 1);
return d;
}
//省略后面大部分代码
构造函数
在构造函数中并不会创建Node数组,而是延迟到第一次put方法时才创建Node数组
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// 键值对数量 / 数组大小 <= 加载因子,根据initialCapacity与加载因子求出数组大小
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 如果数组大小最大等于数组大小则设置数组初始大小为MAXIMUM_CAPACITY,
// 否则调用tableSizeFor函数求出一个不小于size的第一个2的n次幂的正整数
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 设置要初始化的数组大小
this.sizeCtl = cap;
}
// 求出一个不小于c的第一个2的n次幂的正整数
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
散列函数
通过高16位与低16位的异或运算得出新的散列值以减少碰撞几率
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
put方法
//插入键值对,如果键已存在,默认更改其值
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 键与值不得为空
if (key == null || value == null) throw new NullPointerException();
// 计算出键的散列值
int hash = spread(key.hashCode());
// 指定数组位置上结点的个数
int binCount = 0;
// 通过使用循环直到添加成功退出循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组未初始化,则调用initTable方法初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 否则计算出键所在的桶,如果桶为空则调用casTabAt方法尝试在该位置添加结点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 添加结点成功退出循环,否则表示有其他线程也尝试在此位置插入结点,出现冲突
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}// 如果该位置不为空,并且该结点Hash值为MOVED表示当前正在扩容,调用helpTransfer方法帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 使用synchronized关键字对该头结点进行加锁,只有获取头结点的锁的线程才有资格对该桶的链表或数组添加结点
synchronized (f) {
// 再一次判断加锁的结点是否为头结点
if (tabAt(tab, i) == f) {
// 如果头结点Hash值大于等于0则表示该桶存放的是链表,遍历链表添加结点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到该结点
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 判断是否替换旧值
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
} // 否则判断头结点是否为为TreeBin的实例,如果是则该桶存放的是一棵红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用putTreeVal方法添加结点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
// 判断是否替换旧值
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果binCount不等于0则表示添加结点或替换新值成功
if (binCount != 0) {
// 如果结点数量大于等于8,将链表树形化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果值不为空直接返回旧值
if (oldVal != null)
return oldVal;
break;
}
}
}
// 添加新结点成功,调用addCount方法判断是否需要扩容
addCount(1L, binCount);
return null;
}
initTable方法
这个方法是在调用put方法时数组未初始化时调用的,用于创建初始数组
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 使用循环来保证初始化成功
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl小于0表示数组在初始化或者扩容
if ((sc = sizeCtl) < 0)
// 调用Thread.yield方法尝试让出CPU给其他相同优先级的线程
Thread.yield(); // lost initialization race; just spin
// 否则使用CAS更新SIZECTL为-1,表示要初始化数组
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再一次判断数组是否未初始化
if ((tab = table) == null || tab.length == 0) {
// 若在构造对象时未给出初始化大小则使用默认大小16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 设置下次扩容阈值为当前数组大小的0.75倍
sc = n - (n >>> 2);
}
} finally {
// 无论是否初始化成功,更新sizeCtl的值
sizeCtl = sc;
}
break;
}
}
return tab;
}
helpTransfer方法
当发现数组正在扩容时调用helpTransfer方法帮助数组转移结点到新数组
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}