引言
HashMap是非线程安全的,而HashTable是线程安全的,但是HashTable实现同步的方法比较暴力,即在所有方法体上添加synchronized关键字,相当于所有读写线程均去读取一把锁,效率比较低下。
另外一种同步Map的方法是使用Collections工具类:
Collections.synchronized(new HashMap()
该种方法与HashTable实现方式类似,也是通过锁住整表来实现同步的。
而ConcurrentHashMap则避免了上述两种Map同步方式锁住全表的问题。
众所周知,HashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段,而ConcurrentHashMap加锁的时候根据散列值锁住了散列值锁对应的那段,因此提高了并发性能。ConcurrentHashMap也增加了对常用复合操作的支持,比如"若没有则添加":putIfAbsent(),替换:replace()。这2个操作都是原子操作。
ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。
再次声明,本文介绍的ConcurrentHashMap是JDK1.7版本的,JDK1.8对ConcurrentHashMap做了较多改进,后面会专门写一篇文章介绍。
ConcurrentHashMap的内部结构
可以看到,ConcurrentHashMap内部采用了一种叫Segment的数据结构,很明显它就是一个数组table(哈希桶),数据的元素就是大家熟悉的HashEntry(哈希链)。
简单来讲,就是ConcurrentHashMap比HashMap多了一次hash过程,第1次hash定位到Segment,第2次hash定位到HashEntry,然后链表搜索找到指定节点。
该种实现方式的缺点是hash过程比普通的HashMap要长,但是优点也很明显,在进行写操作时,只需锁住写元素所在的Segment即可,其他Segment无需加锁,提高了并发读写的效率。
Segment
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor;
}
可以看出,Segment继承了ReentrantLock并实现了序列化接口,说明Segment的锁是可重入的。
- count: Segment中元素的数量,由volatile修饰,支持内存可见性;
- modCount: 对table的大小造成影响的操作的数量(比如put或者remove操作);
- threshold:扩容阈值;
- table:链表数组,数组中的每一个元素代表了一个链表的头部;
- loadFactor: 负载因子。
可以发现,Segment的数据结构与普通的HashMap基本类似,只是通过继承ReentrantLock可实现加锁操作。
HashEntry
Segment中的元素是以HashEntry的形式存放在链表数组中的,其结构与普通HashMap的HashEntry基本一致,不同的是Segment的HashEntry,其value由volatile修饰,以支持内存可见性,即写操作对其他读线程即时可见。
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
看完数据结构,接着看初始化过程:
ConcurrentHashMap的初始化
直接跟源码:
// initialCapacity: 初始容量
// loadFactor: 负载因子
// concurrencyLevel: ConcurrentHashMap内部的Segment的数量
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 若concurrencyLevel大于MAX_SEGMENTS,则concurrencyLevel=MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 求解concurrencyLevel与2的几次方最近(天花板方向)
// 如concurrencyLevel=5 则天花板方向上离2^3=8最近,则sshift=3,ssize=8
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segmentShift和segmentMask主要用于元素的hash
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
// 可以看到,实际segment的数量为ssize
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
// 若initialCapacity / ssize不整除,则将c=c+1
if (c * ssize < initialCapacity)
++c;
int cap = 1;
// cap为每个segment的初始容量,其值为离c天花板方向最近的2^n
// 例:c为5,cap则为2^3=8;c为12,cap则为2^4=16
while (cap < c)
cap <<= 1;
// 创建Segment
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
需要注意的是,concurrencyLevel一经指定,便不能再次改变,原因也很简单,简化元素增多时的rehash过程,若Segment的数量也随元素的增加而进行扩容,则需要进行两次rehash,需要处理全部元素,效率较低。
随着元素的增加,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。
ConcurrentHashMap的get操作
public V get(Object key) {
// 首先计算key的hash值
int hash = hash(key.hashCode());
// segmentFor(hash): 定位到key在哪个segment
// 调用segment的get(key, hash)获取到指定key的value
return segmentFor(hash).get(key, hash);
}
接着跟进segmentFor(hash):
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
重新回顾一下,ConcurrentHashMap初始化时,确定的2个参数:
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
其中,ssize = 2^sshift,ssize即为Segment的数量。
我们知道:
// 通过位运算简化求余操作
hash % (2^n) = hash & (2^n -1 )
可以看出,ConcurrentHashMap的第1次hash没有直接采用key的hash值来进行求余操作,而是采用hash值的高sshift位来进行求余操作的。
而hash值的高sshift位为:
hash >>> (32-sshift) = hash >>> segmentShift
至此,第1次hash操作就完成了。
下面,我们继续看Segment的get(key, hash)方法,即第2次hash过程:
V get(Object key, int hash) {
if (count != 0) { // read-volatile
// 取得链表的头部,就是第2次hash过程
HashEntry<K,V> e = getFirst(hash);
// 链表搜索,直到hash与key均相等时,返回节点的value
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
首先对count做了非零判断,前边讲解Segment的数据结构时,指出count是volatile修饰的,put、remove等操作会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。
getFirst(hash)完成了第2次hash过程,跟进去:
HashEntry<K,V> getFirst(int hash) {
// 获取Segment的数组结构
HashEntry<K,V>[] tab = table;
// 第2次hash过程,确定key位于哪一个HashEntry
return tab[hash & (tab.length - 1)];
}
可以看出,第2次hash与第1次hash基本类似,只不过直接用的hash值与Segment的数组大小进行求余,而没有采取hash值高n位的方式。
ConcurrentHashMap的put操作
put操作也涉及2次hash定位过程,但是比get操作多了是否扩容、rehash等过程。
put操作的第1次hash与get类似,不再赘述,主要看如何将元素put到Segment中。
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加锁
lock();
try {
int c = count;
// 对c进行+1操作,获取新的数组容量
// 如果新的数组容量高于阈值,则先进行扩容操作
if (c++ > threshold) // ensure capacity
rehash();
// 获取Segment的数组table
HashEntry<K,V>[] tab = table;
// 确定在数组中的位置index,即第2次hash过程
int index = hash & (tab.length - 1);
// 获取index位置的头结点
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 沿链表遍历,直到找到与元素key或者hash值相同的节点
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
// 若key或者hash值相同的节点存在,则进行更新操作
if (e != null) {
// value也是volatile修饰的,所以内存即时可见
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
// 若key或者hash值相同的节点不存在,则新建节点,追加到当前链表的头部(头插法)
else {
oldValue = null;
// 更新modCount的值,记录对table的大小造成影响的操作的数量
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
// 更新count的值,内存即时可见
count = c; // write-volatile
}
// 返回旧值
return oldValue;
} finally {
// 释放锁,与synchronize关键字不同,Lock必须显示释放
unlock();
}
}
可以看到,Segment的put过程与普通的HashMap基本类似。
ConcurrentHashMap的remove操作
与put操作基本类似,首先根据hash值的高sshift位与Segment的数量ssize求余定位到具体的Segment,然后在Segment上执行具体的remove操作。
下面我们看一下Segment如何实现remove操作的。
话不多说,直接怼源码:
V remove(Object key, int hash, Object value) {
// 加锁,除了读取操作,其他操作均需要加锁
lock();
try {
// 计算新的Segment元素数量
int c = count - 1;
// 获取Segment的数组table
HashEntry<K,V>[] tab = table;
// 第2次hash,确定在table的哪个位置
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 沿链表遍历,直到找到与元素key或者hash值相同的节点
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// 更新modCount值
++modCount;
HashEntry<K,V> newFirst = e.next;
// 将待删除元素的前面的元素全部复制一遍,然后头插到链表上去
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
tab[index] = newFirst;
// 更新新的Segment元素数量,内存即时可见
count = c; // write-volatile
}
}
// 返回旧值
return oldValue;
} finally {
// 释放锁
unlock();
}
}
由于,HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去。
如:
// 原有链表:
1-->2-->3-->4-->5
// 删除节点3,新的链表为:
2-->1-->4-->5
至此,JDK1.7版本的ConcurrentHashMap的基础方法介绍完毕,其他方法的实现方式,读者可自行阅读源码获取,再次不做过多阐述。
参考文献: