Java7 ConcurrentHashMap总结分析

ConcurrentHashMap

在JAVA线程安全的并发集合有HashTable,而HashTable
是一种通过在方法上使用Synchronize的简单粗暴的方式进行保证线程的安全,ConcurrentHashMap则是一个高效的并发集合。ConcurrentHashMap不是简单粗暴的在方法上使用Synchronize,而是通过分段(segment) 锁的方式进行处理并发问题。ConcurrentHashMap的锁机制更类似于我们数据库中的行锁,而HashTable则是整张表锁。下图为ConcurrentHashMap的数据结构。

结构.png
取模算法

ConcurrentHashMap获取Segment或者在Segment中获取Entry都是是通过mod求余来确定当前的Segment或者Entry.而在ConcurrentHashMap不是使用“%”的方式来进行求余值,而是通过&位运算的方式求余值。但是这种&位运算求余的方式必须是2n
公式:a%(2n)是等价于a&(2n-1)。

System.out.println("345%16="+(345%16)+" 等价于: 345&(16-1)="+(345&(16-1)));
输出结果:
345%16=9 等价于: 345&(16-1)=9
ConcurrentHashMap初始化
  public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //根据并发级别进行计算Segment的size和位移量
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //计算key&(2n-1)的值和key左移量
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //16384
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1; 
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

我们通过源码的初始化方法进行分析得出:

  • ConcurrentHashMap初始化的时候会传入:
    (1)initialCapacity初始容量,而initialCapacity主要用于计算初始化Segment中的Table的初始化容量。
    (2) loadFactor负载因子,则是用来计算Table的扩展阔值。当Table达到阔值的时候就会进行动态扩展。
    (3) concurrencyLevel并发级别,则是用来确定Segment的大小。默认系统设置16个segment。
  • 初始化的时候默认先计算segmentshift和sgmentMask,而上面已经说过ConcurrentHashMap是通过&运算求余的方式进行确定当前Key值对应的位置。那么segmentShift则是(2n-1),sgmentMask则是key左移的值,ConcurrentHashMap是对key左移sgmentMask后再进行&操作。
  • 默认配置下创建1个Segment,而Segment是不可以进行动态扩容的。但是Segment中的Table根据阔值可以进行动态扩容。
put
  • 创建Segment
  @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        //计算hash值,在hash方法中会对对象中的hashCode进行Hash
        int hash = hash(key);
        //对hash值进行左位移后,求余
        int j = (hash >>> segmentShift) & segmentMask;
       //判断当这个Segment是否为Null,如果为Null创建一个Segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

   @SuppressWarnings("unchecked")
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        //判断是否存在segament,因为不是线程安全的所以需要多次检测是否存在Segment
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            //初始化Segment中Table的阔值、容量、负载因子
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            //创建Table
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            //判断是否存在segament,因为不是线程安全的所以需要多次检测是否存在Segment
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    //使用CAS方式进行插入Segment
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

通过对源码的分析我们可以看到因为在初始化ConcurrentHashMap的时候只有一个Segment,当计算出key的hash值后,若该Hash值没有对应的Segment的时候会去创建一个Segment,而因为在创建Segment是非加锁的方式,因此在创建Segment的时候会多次判断是否存在Segment,在最后还使用了CAS的方式进行插入到Segment的数组中。

CAS的使用的标准范式,在循环中使用CAS操作。保证CAS一定执行。

  • 插入值
    ``

      final V put(K key, int hash, V value, boolean onlyIfAbsent) {
          //获取重入锁
          HashEntry<K,V> node = tryLock() ? null :
              scanAndLockForPut(key, hash, value);
          V oldValue;
          try {
              HashEntry<K,V>[] tab = table;
              int index = (tab.length - 1) & hash;
              //根据计算的index获取Entry
              HashEntry<K,V> first = entryAt(tab, index);
              //循环所有Entry
              for (HashEntry<K,V> e = first;;) {
                  if (e != null) {
                      K k;
                      //判断值是否存在相同的Key或者Hash值
                      if ((k = e.key) == key ||
                          (e.hash == hash && key.equals(k))) {
                          oldValue = e.value;
                          //判断是否为不存在插入。
                          if (!onlyIfAbsent) {
                              e.value = value;
                              ++modCount;
                          }
                          break;
                      }
                      //迭代下一个
                      e = e.next;
                  }
                  else {
                      if (node != null)
                          node.setNext(first);
                      else
                          node = new HashEntry<K,V>(hash, key, value, first);
                      int c = count + 1;
                      //判断是否达到阔值,
                      if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                          //达到阔值,扩容并且重新计算HASH值
                          rehash(node);
                      else
                          //如果没有达到阔值则添加到node的下一个
                          setEntryAt(tab, index, node);
                      ++modCount;
                      count = c;
                      oldValue = null;
                      break;
                  }
              }
          } finally {
              //释放锁
              unlock();
          }
          return oldValue;
      }
    

``
(1)这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
(2)循环table链,如果存在HashEntry时,则进行判断是否需要更新,如果存相等的Hash和Key,则判断onlyIfAbsent是否为False,若为False则更新当前的值并且跳出循环返回。因为put方法有putIfAbsent和put公共,而putIfAbsent则只有当前值不存在时插入,如果存在则返回旧值,而put则是无论任何时候都会更新旧值,并且将旧值返回。如果不存相等的Hash和Key。判断当前节点是否头节点,如果是则设置为头节点。然后判断是否需要对Table进行扩容,如果需要则从新计算hash值并且将新值增加到链中。如果不需要扩容则直接添加到链中。

get
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

计算 hash 值,找到 segment 数组中的具体位置.根据 hash 找到数组中具体的位置,循环链查找相等的值并且返回

size
 public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

在获取ConcurrentHashMap的size时候,是获取两次size的值判断是否一致,如果不是一致的情况下,再进行加锁操作。所以在高并发写入的情况下避免使用Size方法,以免由于加锁导致性能问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容