concurrentHashMap解析这篇文章就够了

实现原理

ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

image

从图中可以看到,ConcurrentHashMap内部分为很多个Segment,每一个Segment拥有一把锁,然后每个Segment(继承ReentrantLock)下面包含很多个HashEntry列表数组。对于一个key,需要经过三次(为什么要hash三次下文会详细讲解)hash操作,才能最终定位这个元素的位置,这三次hash分别为:

对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);

将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素的放在哪个Segment;

将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),通过h3能够确定该元素放置在哪个HashEntry。

初始化

构造函数的源码如下:

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {

    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

    throw new IllegalArgumentException();

    if (concurrencyLevel > MAX_SEGMENTS)

     concurrencyLevel = MAX_SEGMENTS;

    // Find power-of-two sizes best matching arguments

    int sshift = 0;

    int ssize = 1;

    while (ssize < concurrencyLevel) {

        ++sshift;

        ssize <<= 1;

    }

    this.segmentShift = 32 - sshift;

    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)

    initialCapacity = MAXIMUM_CAPACITY;

    int c = initialCapacity / ssize;

    if (c * ssize < initialCapacity)

    ++c;

    int cap = MIN_SEGMENT_TABLE_CAPACITY;

    while (cap < c)

    cap <<= 1;

// create segments and segments[0]

Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);

Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

this.segments = ss;

}

传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个。

initialCapacity表示新创建的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;

loadFactor表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需要rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;

concurrencyLevel表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel,因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因。

初始化的一些动作:

验证参数的合法性,如果不合法,直接抛出异常。

concurrencyLevel也就是Segment的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,如果超过这个值,设置为这个值。

然后使用循环找到大于等于concurrencyLevel的第一个2的n次方的数ssize,这个数就是Segment数组的大小,并记录一共向左按位移动的次数sshift,并令segmentShift = 32 - sshift,并且segmentMask的值等于ssize - 1,segmentMask的各个二进制位都为1,目的是之后可以通过key的hash值与这个值做&运算确定Segment的索引。

检查给的容量值是否大于允许的最大容量值,如果大于该值,设置为该值。最大容量值为static final int MAXIMUM_CAPACITY = 1 << 30;。

然后计算每个Segment平均应该放置多少个元素,这个值c是向上取整的值。比如初始容量为15,Segment个数为4,则每个Segment平均需要放置4个元素。

最后创建一个Segment实例,将其当做Segment数组的第一个元素。

put操作,put操作的源码如下:


public V put(K key, V value) {

    Segment<K,V> s;

    if (value == null)

    throw new NullPointerException();

    int hash = hash(key);

    int j = (hash >>> segmentShift) & segmentMask;

    if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck

    (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment

    s = ensureSegment(j);

    return s.put(key, hash, value, false);

}

操作步骤如下:

判断value是否为null,如果为null,直接抛出异常。

key通过一次hash运算得到一个hash值。(这个hash运算下文详说)

将得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。

在初始化的时候我们说过segmentShift的值等于32-sshift,例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:

0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。

使用Unsafe的方式从Segment数组中获取该索引对应的Segment对象。

向这个Segment对象中put值,这个put操作也基本是一样的步骤(通过&运算获取HashEntry的索引,然后set)。

get操作,get操作的源码如下:

public V get(Object key) {

    Segment<K,V> s; // manually integrate access methods to reduce overhead

    HashEntry<K,V>[] tab;

    int h = hash(key);

    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {

    for (HashEntry<K,V> e = (HashEntry<K,V>)UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) {
          K k;
          if ((k = e.key) == key || (e.hash == h && key.equals(k)))
              return e.value;
          }
    }
    return null;
}

操作步骤为:

和put操作一样,先通过key进行两次hash确定应该去哪个Segment中取数据。

使用Unsafe获取对应的Segment,然后再进行一次&运算得到HashEntry链表的位置,然后从链表头开始遍历整个链表(因为Hash可能会有碰撞,所以用一个链表保存),如果找到对应的key,则返回对应的value值,如果链表遍历完都没有找到对应的key,则说明Map中不包含该key,返回null。

size操作

size操作与put和get操作最大的区别在于,size操作需要遍历所有的Segment才能算出整个Map的大小,而put和get都只关心一个Segment。假设我们当前遍历的Segment为SA,那么在遍历SA过程中其他的Segment比如SB可能会被修改,于是这一次运算出来的size值可能并不是Map当前的真正大小。所以一个比较简单的办法就是计算Map大小的时候所有的Segment都Lock住,不能更新(包含put,remove等等)数据,计算完之后再Unlock。这是普通人能够想到的方案,但是牛逼的作者还有一个更好的Idea:先给3次机会,不lock所有的Segment,遍历所有Segment,累加各个Segment的大小得到整个Map的大小,如果某相邻的两次计算获取的所有Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。如果这三次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segment加锁,再遍历所有Segment计算Map大小,最后再解锁所有Segment。源代码如下:

public int size() {

// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.

final Segment<K,V>[] segments = this.segments;

int size;

boolean overflow; // true if size overflows 32 bits

long sum; // sum of modCounts

long last = 0L; // previous sum

int retries = -1; // first iteration isn't retry

try {
    for (;;) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            ensureSegment(j).lock(); // force creation
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
              Segment<K,V> seg = segmentAt(segments, j);
              if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0) overflow = true;
              }
        }
        if (sum == last)  break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock();
    }
}
return overflow ? Integer.MAX_VALUE : size;
}

举个例子:

一个Map有4个Segment,标记为S1,S2,S3,S4,现在我们要获取Map的size。计算过程是这样的:第一次计算,不对S1,S2,S3,S4加锁,遍历所有的Segment,假设每个Segment的大小分别为1,2,3,4,更新操作次数分别为:2,2,3,1,则这次计算可以得到Map的总大小为1+2+3+4=10,总共更新操作次数为2+2+3+1=8;第二次计算,不对S1,S2,S3,S4加锁,遍历所有Segment,假设这次每个Segment的大小变成了2,2,3,4,更新次数分别为3,2,3,1,因为两次计算得到的Map更新次数不一致(第一次是8,第二次是9)则可以断定这段时间Map数据被更新,则此时应该再试一次;第三次计算,不对S1,S2,S3,S4加锁,遍历所有Segment,假设每个Segment的更新操作次数还是为3,2,3,1,则因为第二次计算和第三次计算得到的Map的更新操作的次数是一致的,就能说明第二次计算和第三次计算这段时间内Map数据没有被更新,此时可以直接返回第三次计算得到的Map的大小。最坏的情况:第三次计算得到的数据更新次数和第二次也不一样,则只能先对所有Segment加锁再计算最后解锁。

关于hash

术语英文解释

哈希算法hash algorithm是一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为哈希值。

哈希表hash table根据设定的哈希函数H(key)和处理冲突方法将一组关键字映象到一个有限的地址区间上,并以关键字在地址区间中的象作为记录在表中的存储位置,这种表称为哈希表或散列,所得存储位置称为哈希地址或散列地址。

hash的源代码:


int h = hashSeed;

if ((0 != h) && (k instanceof String)) {

return sun.misc.Hashing.stringHash32((String) k);

}

h ^= k.hashCode();

// Spread bits to regularize both segment and index locations,

// using variant of single-word Wang/Jenkins hash.

h += (h << 15) ^ 0xffffcd7d;

h ^= (h >>> 10);

h += (h << 3);

h ^= (h >>> 6);

h += (h << 2) + (h << 14);

return h ^ (h >>> 16);

}

这里用到了Wang/Jenkins hash算法的变种,主要的目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。

举个简单的例子:

System.out.println(Integer.parseInt("0001111", 2) & 15);

System.out.println(Integer.parseInt("0011111", 2) & 15);

System.out.println(Integer.parseInt("0111111", 2) & 15);

System.out.println(Integer.parseInt("1111111", 2) & 15);

这些数字得到的hash值都是一样的,全是15,所以如果不进行第一次预hash,发生冲突的几率还是很大的,但是如果我们先把上例中的二进制数字使用hash()函数先进行一次预hash,得到的结果是这样的:

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000

0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

可以看到每一位的数据都散开了,并且ConcurrentHashMap中是使用预hash值的高位参与运算的。比如之前说的先将hash值向右按位移动28位,再与15做&运算,得到的结果都别为:4,15,7,8,没有冲突!

注意事项

ConcurrentHashMap中的key和value值都不能为null。

ConcurrentHashMap的整个操作过程中大量使用了Unsafe类来获取Segment/HashEntry,这里Unsafe的主要作用是提供原子操作。Unsafe这个类比较恐怖,破坏力极强,一般场景不建议使用,如果有兴趣可以到这里做详细的了解Java中鲜为人知的特性

ConcurrentHashMap是线程安全的类并不能保证使用了ConcurrentHashMap的操作都是线程安全的!

参考:深入分析ConcurrentHashMap

注:本文章在csdn上也有,都是本人书写,请勿转载!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343