程序员社区

Java7-JUC-ConcurrentHashMap

1.ConcurrentHashMap简介

        其实出现ConcurrentHashMap对标的就是HashTable的低效性,故此采用ConcurrentHashMap采用分段锁代替对象锁的机制,细化了锁粒度(数据库表锁和行锁的区别)。未来会不会考虑用COW机制去做呢?

1.1 HashTable分析

       Hashtable之所以效率低下主要是因为其实现使用了synchronized关键字对put以及get等操作进行加锁,而synchronized关键字加锁是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作时,锁住了整个Hash表,线程竞争激烈的情况下,HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。因此,在JDK1.5~1.7版本,Java使用了分段锁机制实现ConcurrentHashMap. 

1.2 ConcurrentHashMap分析

      简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个HashTable(区别是Synchronized和Lock锁实现同步机制不同);这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可是实现多线程put操作。接下来,本文将详细分析JDK1.7版本中ConcurrentHashMap的实现原理。

2 ConcurrentHashMap数据结构

Java7-JUC-ConcurrentHashMap插图
盗图

Java7-JUC-ConcurrentHashMap插图1

        ConcurrentHashMap类结构如上图所示。由图可知,在ConcurrentHashMap中,定义了一个Segment<K, V>[]数组来将Hash表实现分段存储,从而实现分段加锁;

        Segment元素则与HashMap结构类似,其包含了一个HashEntry数组,用来存储Key/Value对。Segment继承了ReetrantLock,表示Segment是一个可重入锁,因此ConcurrentHashMap通过可重入锁对每个分段进行加锁。

        HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

Java7-JUC-ConcurrentHashMap插图2

3 ConcurrentHashMap源码分析

3.1 构造器初始化

       JDK1.7的ConcurrentHashMap的初始化主要分为两个部分:一是初始化ConcurrentHashMap,即初始化segments数组、segmentShift段偏移量和segmentMask段掩码等;然后则是初始化每个segment分段。接下来,我们将分别介绍这两部分初始化。ConcurrentHashMap包含多个构造函数,而所有的构造函数最终都调用了如下的构造函数:

 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {

        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

                throw new IllegalArgumentException();

         if (concurrencyLevel > MAX_SEGMENTS) 最大65535

               concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments

            int sshift = 0; 移位次数

            int ssize = 1; segment的数组大小初始值

            取大于传入并发数的最小2^n次方数

            while (ssize < concurrencyLevel) {

                ++sshift;

                ssize <<= 1;

            }

          this.segmentShift = 32 - sshift; 计算位移次数 与32做减法 得出常量值

          this.segmentMask = ssize - 1; 去 2^n次幂 - 1 作为对应是HashMap的table大小,充分利用空间

         if (initialCapacity > MAXIMUM_CAPACITY) 超值处理

            initialCapacity = MAXIMUM_CAPACITY;

        int c = initialCapacity / ssize;(计算段segment内部HashEntry个数)

        if (c * ssize < initialCapacity)  如果多出来的不足size的则+1处理,防止HashEntry丢失不足

            ++c;

        int cap = MIN_SEGMENT_TABLE_CAPACITY; 赋值每个段segment的容量

        while (cap < c) 

            cap <<= 1; 当最小段容量不足计划生成的段容量时候,则直接进行扩展2倍,直到大于为止

        系统创建第一个Segment段,作为其他Segment段的初始化的参考标准。

        Segment<K,V> s0 =

            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),

                            (HashEntry<K,V>[])new HashEntry[cap]);

        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 初始化段容量

        UNSAFE.putOrderedObject(ss, SBASE, s0); // 放入第一个数组下标为0的位置

        this.segments = ss;

    }

     由代码可知,该构造函数需要传入三个参数:initialCapacity、loadFactor、concurrencyLevel。

主要参数介绍

      concurrencyLevel主要用来初始化segments(存放K,V的HashMap的数组)、segmentShift(定位辅助求数组下标)和segmentMask(求数组下标)等

      initialCapacity(作为分段的总容量被除数)和loadFactor(计算每个段的Threshold扩容阈值)则主要用来初始化每个Segment分段

其他参数介绍

        根据ConcurrentHashMap的构造方法可知,在初始化时创建了两个中间变量ssize和sshift,它们都是通过concurrencyLevel计算得到的。

       ssize:表示了segments数组的长度,为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方,所以在初始化时通过循环计算出一个大于或等于concurrencyLevel的最小的2的N次方值来作为数组长度;容器里锁个数也是16,为segmentMask计算值进行铺垫依据。

       sshift:表示了计算ssize时进行移位操作的次数:ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移动4次,所以sshift等于4。因为ssize的最大长度为65536,所以sshift最大值为16。

      segmentShift:用于定位参与散列运算的位数,其等于32减去sshift,使用32是因为ConcurrentHashMap的hash()方法返回的最大数是32位的。取值范围 [16,32],最大为32

      segmentMask:是散列运算的掩码,等于ssize减去1,所以掩码的二进制各位都为1,segmentMask最大值为65535(ssize-1)。

      由于segmentShift和segmentMask与数组坐标定位和增加Hash散列性相关,因此之后还会对此进行分析。

3.2 初始化Segment分段

        ConcurrentHashMap通过initialCapacity和loadFactor来初始化每个Segment. 在初始化Segment时,也定义了一个中间变量cap,其等于initialCapacity除以ssize的倍数c,如果c大于1,则取大于等于c的2的N次方,cap表示Segment中HashEntry数组的长度;loadFactor表示了Segment的加载因子,通过cap*loadFactor获得每个Segment的阈值threshold.

       默认情况下,initialCapacity等于16,loadFactor等于0.75,concurrencyLevel等于16。由于采用了Segment分段锁机制实现一个高效的同步,那么首先则需要通过hash散列算法计算key的hash值,从而定位其所在的Segment. 因此,首先需要了解ConcurrentHashMap中hash()函数的实现。

private int hash(Object k) {

        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {

            return sun.misc.Hashing.stringHash32((String) k);

        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,

        // using variant of single-word Wang/Jenkins hash.

        h += (h <<  15) ^ 0xffffcd7d;

        h ^= (h >>> 10);

        h += (h <<  3);

        h ^= (h >>>  6);

        h += (h <<  2) + (h << 14);

        return h ^ (h >>> 16);

    }

        通过hash()函数可知,首先通过计算一个随机的hashSeed减少String类型的key值的hash冲突;然后利用Wang/Jenkins hash算法对key的hash值进行再hash计算。通过这两种方式都是为了减少散列冲突,从而提高效率。因为如果散列的质量太差,元素分布不均,那么使用Segment分段加锁也就没有意义了。

    private Segment<K,V> segmentForHash(int h) {

        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

        return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);

    }

       再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。我做了一个测试,不通过再哈希而直接执行哈希计算。

System.out.println(Integer.parseInt("0001111",2) &15);

System.out.println(Integer.parseInt("0011111",2) &15);

System.out.println(Integer.parseInt("0111111",2) &15);

System.out.println(Integer.parseInt("1111111",2) &15);

      计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每个四位用竖线分隔。

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000

0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

       可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。ConcurrentHashMap通过以下哈希算法定位到segment。

        默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中,(hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有再发生冲突。

3.3 ConcurrentHashMap的操作

        在介绍ConcurrentHashMap的操作之前,首先需要介绍一下Unsafe类,因为在JDK1.7新版本中是通过Unsafe类的方法实现锁操作的。Unsafe类是一个保护类,一般应用程序很少用到,但其在一些框架中经常用到,如JDK、Netty、Spring等框架。

       Unsafe类提供了一些硬件级别的原子操作,其在JDK1.7和JDK1.8中的ConcurrentHashMap都有用到,但其用法却不同,在此只介绍在JDK1.7中用到的几个方法:

      arrayBaseOffset(Class class):获取数组第一个元素的偏移地址。

      arrayIndexScale(Class class):获取数组中元素的增量地址。

      getObjectVolatile(Object obj, long offset):获取obj对象中offset偏移地址对应的Object型field属性值,支持Volatile读内存语义。

3.3.1 get

        JDK1.7的ConcurrentHashMap的get操作是不加锁的,因为在每个Segment中定义的HashEntry数组和在每个HashEntry中定义的value和next HashEntry节点都是volatile类型的,volatile类型的变量可以保证其在多线程之间的可见性,因此可以被多个线程同时读,从而不用加锁。而其get操作步骤也比较简单,定位Segment –> 定位HashEntry –> 通过getObjectVolatile()方法获取指定偏移量上的HashEntry –> 通过循环遍历链表获取对应值。

    定位Segment:(((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE

    定位HashEntry:(((tab.length - 1) & h)) << TSHIFT) + TBASE

3.3.2 put

ConcurrentHashMap的put方法就要比get方法复杂的多,其实现源码如下:

    public V put(K key, V value) {

        Segment<K,V> s;

        if (value == null)

            throw new NullPointerException();

        int hash = hash(key);

        int j = (hash >>> segmentShift) & segmentMask;

        if ((s = (Segment<K,V>)UNSAFE.getObject          // non volatile; recheck

            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment

            s = ensureSegment(j);

        return s.put(key, hash, value, false);

    }

         同样的,put方法首先也会通过hash算法定位到对应的Segment,此时,如果获取到的Segment为空,则调用ensureSegment()方法;否则,直接调用查询到的Segment的put方法插入值,注意此处并没有用getObjectVolatile()方法读,而是在ensureSegment()中再用volatile读操作,这样可以在查询segments不为空的时候避免使用volatile读,提高效率

        在ensureSegment()方法中,首先使用getObjectVolatile()读取对应Segment,如果还是为空,则以segments[0]为原型创建一个Segment对象,并将这个对象设置为对应的Segment值并返回。

       在Segment的put方法中,首先需要调用tryLock()方法获取锁,然后通过hash算法定位到对应的HashEntry,然后遍历整个链表,如果查到key值,则直接插入元素即可;而如果没有查询到对应的key,则需要调用rehash()方法对Segment中保存的table进行扩容,扩容为原来的2倍,并在扩容之后插入对应的元素。插入一个key/value对后,需要将统计Segment中元素个数的count属性加1。最后,插入成功之后,需要使用unLock()释放锁。

      由于put方法需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。

       是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量的阈值(threshold),如果超过阈值并且,数组进行扩容。值的一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经达到容量的阈值,如果到达了就进行扩容,但是有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

       如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

3.3.3 size

      ConcurrentHashMap的size操作的实现方法也非常巧妙,一开始并不对Segment加锁,而是直接尝试将所有的Segment元素中的count相加,这样执行两次,然后将两次的结果对比,如果两次结果相等则直接返回;而如果两次结果不同,则再将所有Segment加锁,然后再执行统计得到对应的size值。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Java7-JUC-ConcurrentHashMap

一个分享Java & Python知识的社区