ConcurrentHashMap
在JAVA线程安全的并发集合有HashTable,而HashTable
是一种通过在方法上使用Synchronize的简单粗暴的方式进行保证线程的安全,ConcurrentHashMap则是一个高效的并发集合。ConcurrentHashMap不是简单粗暴的在方法上使用Synchronize,而是通过分段(segment) 锁的方式进行处理并发问题。ConcurrentHashMap的锁机制更类似于我们数据库中的行锁,而HashTable则是整张表锁。下图为ConcurrentHashMap的数据结构。
取模算法
ConcurrentHashMap获取Segment或者在Segment中获取Entry都是是通过mod求余来确定当前的Segment或者Entry.而在ConcurrentHashMap不是使用“%”的方式来进行求余值,而是通过&位运算的方式求余值。但是这种&位运算求余的方式必须是2n
公式:a%(2n)是等价于a&(2n-1)。
System.out.println("345%16="+(345%16)+" 等价于: 345&(16-1)="+(345&(16-1)));
输出结果:
345%16=9 等价于: 345&(16-1)=9
ConcurrentHashMap初始化
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//根据并发级别进行计算Segment的size和位移量
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//计算key&(2n-1)的值和key左移量
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//16384
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
我们通过源码的初始化方法进行分析得出:
- ConcurrentHashMap初始化的时候会传入:
(1)initialCapacity初始容量,而initialCapacity主要用于计算初始化Segment中的Table的初始化容量。
(2) loadFactor负载因子,则是用来计算Table的扩展阔值。当Table达到阔值的时候就会进行动态扩展。
(3) concurrencyLevel并发级别,则是用来确定Segment的大小。默认系统设置16个segment。 - 初始化的时候默认先计算segmentshift和sgmentMask,而上面已经说过ConcurrentHashMap是通过&运算求余的方式进行确定当前Key值对应的位置。那么segmentShift则是(2n-1),sgmentMask则是key左移的值,ConcurrentHashMap是对key左移sgmentMask后再进行&操作。
- 默认配置下创建1个Segment,而Segment是不可以进行动态扩容的。但是Segment中的Table根据阔值可以进行动态扩容。
put
- 创建Segment
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//计算hash值,在hash方法中会对对象中的hashCode进行Hash
int hash = hash(key);
//对hash值进行左位移后,求余
int j = (hash >>> segmentShift) & segmentMask;
//判断当这个Segment是否为Null,如果为Null创建一个Segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//判断是否存在segament,因为不是线程安全的所以需要多次检测是否存在Segment
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
//初始化Segment中Table的阔值、容量、负载因子
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
//创建Table
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//判断是否存在segament,因为不是线程安全的所以需要多次检测是否存在Segment
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//使用CAS方式进行插入Segment
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
通过对源码的分析我们可以看到因为在初始化ConcurrentHashMap的时候只有一个Segment,当计算出key的hash值后,若该Hash值没有对应的Segment的时候会去创建一个Segment,而因为在创建Segment是非加锁的方式,因此在创建Segment的时候会多次判断是否存在Segment,在最后还使用了CAS的方式进行插入到Segment的数组中。
CAS的使用的标准范式,在循环中使用CAS操作。保证CAS一定执行。
-
插入值
``final V put(K key, int hash, V value, boolean onlyIfAbsent) { //获取重入锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; //根据计算的index获取Entry HashEntry<K,V> first = entryAt(tab, index); //循环所有Entry for (HashEntry<K,V> e = first;;) { if (e != null) { K k; //判断值是否存在相同的Key或者Hash值 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; //判断是否为不存在插入。 if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //迭代下一个 e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //判断是否达到阔值, if (c > threshold && tab.length < MAXIMUM_CAPACITY) //达到阔值,扩容并且重新计算HASH值 rehash(node); else //如果没有达到阔值则添加到node的下一个 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { //释放锁 unlock(); } return oldValue; }
``
(1)这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
(2)循环table链,如果存在HashEntry时,则进行判断是否需要更新,如果存相等的Hash和Key,则判断onlyIfAbsent是否为False,若为False则更新当前的值并且跳出循环返回。因为put方法有putIfAbsent和put公共,而putIfAbsent则只有当前值不存在时插入,如果存在则返回旧值,而put则是无论任何时候都会更新旧值,并且将旧值返回。如果不存相等的Hash和Key。判断当前节点是否头节点,如果是则设置为头节点。然后判断是否需要对Table进行扩容,如果需要则从新计算hash值并且将新值增加到链中。如果不需要扩容则直接添加到链中。
get
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
计算 hash 值,找到 segment 数组中的具体位置.根据 hash 找到数组中具体的位置,循环链查找相等的值并且返回
size
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
在获取ConcurrentHashMap的size时候,是获取两次size的值判断是否一致,如果不是一致的情况下,再进行加锁操作。所以在高并发写入的情况下避免使用Size方法,以免由于加锁导致性能问题。