程序员社区

再看 ConcurrentHashMap


title: 再看 ConcurrentHashMap
date: 2021/06/15 09:16


构造方法

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);   // 通过传入初始容量和负载因子计算出 map 的"真实容量"
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size); // 向上取到 2^n
    this.sizeCtl = cap;
}
  • initialCapacity - 用户传入的初始容量
  • loadFactor - 负载因子
  • concurrencyLevel - 并发度,也就是会有多少个线程同时操作

代码概述:

  1. 校验参数
  2. 判断初始容量和并发度谁大,哪个大选哪个
  3. 重新计算出 map 的容量;size = (1.0 + 8/0.75) = 11.6666666666.
  4. 向上取到 2^n;cap = 16

?ConcurrentHashMap 的构造为什么要修改容量呢?

假设用户传入的容量是 8,然后直接用 8 作为容量,但是由于负载因子是 0.75,所以当容量达到 6(8 * 0.75)的时候会进行一次扩容,但是用户既然传入了初始容量 8,那么就代表他可能只会添加 8 个元素,那中间穿插了一次扩容是不是就很不合理呢?所以 ConcurrentHashMap 直接在构造的时候通过负载因子计算出合适的容量。

?构造时传入的并发度有何作用?

ConcurrentHashMap 当出现 hash 碰撞的时候会对计算出来的那个桶(Node)进行加锁(synchronized),他将初始容量赋值为并发度(当并发度大的时候)的目的可能是为了让每个线程都有一个桶进行加锁(每个线程计算到了不同的桶中),避免排队的出现。

ConcurrentHashMap#get()

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode()); // 1. 综合高位低位,使其具有更好的 hash 性 2. 保证计算出的 hash 值为正数,因为负数有其他用途
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {    // 从主存中取出数组中指定位置的桶
        if ((eh = e.hash) == h) {   // 如果头结点的 hash 值等于计算出的 hash,则表示命中,继续进行下一步 equals 校验
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)    // 如果节点小于 0,则表示1.红黑树节点 2.正在搬迁 3.预约的坑,调用节点的 find 方法进行查找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {  // 否则就顺着链表一个一个找了
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

hash 值小于 0 的三个子类

/*
 * Encodings for Node hash fields. See above for explanation.
 */
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations

// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
// 作用 1. 标识当前桶已经移完,2. 当迁移的时候有人get(key)看到这个标记,就知道去新数组中找了
static final class ForwardingNode<K, V> extends Node<K, V> {
}

// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K, V> extends Node<K, V> {
}

// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K, V> extends Node<K, V> {
}

?为什么 ConcurrentHashMap#get() 全程没有加锁?

首先,Node[] 采用了 volatile 修饰,保证了 Node[] 的可见性。

transient volatile Node<K, V>[] table;

但是,我们都知道:

volatile 只保证修饰的变量的可见性,对其内容不保证可见性。

所以我们直接通过 Node[0] 这样取出的 Node 其实还是有可能是缓存中的,所以 ConcurrentHashMap 中采用了 tabAt() 来获取指定的 Node,我们看下它的代码:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);   // 从主存中取出指定内存地址的 Node 对象
}

之后再对取出的这个 Node 对象进行操作。

由于 ConcurrentHashMap#get() 取出的是某一时刻的 Node 对象,所以可能会出现取不到刚刚放入的元素的情况。

ConcurrentHashMap#put()

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode()); // 1. 综合高位低位,使其具有更好的 hash 性 2. 保证计算出的 hash 值为正数,因为负数有其他用途
    int binCount = 0;   // 用来计数一个桶上有多少个节点,当达到阈值则进行树化
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  // 当数组为空则初始化
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    // 从主存取出指定位置的桶,如果为 null,则通过 cas 替换,成功则结束,失败则自旋
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)    // 如果取出的桶节点是 ForwardingNode,表明正在扩容,则帮助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {  // 对桶进行加锁
                if (tabAt(tab, i) == f) {   // 再次取一遍,保证桶没有被修改(指没有被替换成其他类型的节点)
                    if (fh >= 0) {  // 如果节点的 hash 值大于 0,则表示是链表结构
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {    // 过程中会通过 binCount 计算节点数量
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) { // 如果 key 相同,则替换
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 挂接到尾结点
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {    // 如果是 TreeBin 节点则表示是红黑树结构,不想看
                        Node<K,V> p;
                        binCount = 2;   // 将 binCount 改为 2,因为已经是树了,不需要再次树化了
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); // 如果超过阈值则对所在的桶 i 进行树化
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 增加 size 计数
    return null;
}

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0) // 如果 sizeCtl 小于 0,则放弃 cpu 时间片
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  // 通过 cas 将 sizeCtl 修改为 -1
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化数组,内容全都是 null
                    table = tab = nt;
                    sc = n - (n >>> 2); // 计算出下次扩容时的容量
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||  // 如果创建出了 CounterCell[]
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 没有创建出 CounterCell[],如果 cas 修改 baseCount 失败,则走下面逻辑
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||   // 获取当前线程的"探针",& 上数组的大小-1,计算出桶的位置,取出 CounterCell
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {    // 通过 cas 修改 CounterCell 的值,进行累加,如果失败则进入 fullAddCount 逻辑
            fullAddCount(x, uncontended);   // 创建累加单元数组和cell,累加重试
            return;
        }
        if (check <= 1)
            return;
        s = sumCount(); // 计算元素个数
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {   // sc 小于 0 则表示正在初始化或者扩容
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0) // 结束条件们
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 如果 cas 成功,则帮助扩容
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))   // cas 修改 sizeCtl,扩容
                transfer(tab, null);
            s = sumCount();
        }
    }
}

?CounterCell 的注释说参照了 LongAdder,我们来看下

关键域

// 累加单元数组, 懒惰初始化 
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域 
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁;何时会使用他加锁 1. 创建或扩容 Cell[] 2. 为 Cell[] 中元素赋值
transient volatile int cellsBusy;

Cell内部类

// 防止缓存行伪共享 
@sun.misc.Contended 
static final class Cell {
    volatile long value; 
    Cell(long x) { value = x; }

    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值 
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next); 
    }
    // 省略不重要代码
}

add() 代码

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {    // Cell 数组不为空 或者 cas 操作 base 失败
        boolean uncontended = true; // 表示是否无竞争
        if (as == null || (m = as.length - 1) < 0 ||    // Cell 数组为空
            (a = as[getProbe() & m]) == null || // 对应位置的 Cell 为空
            !(uncontended = a.cas(v = a.value, v + x))) // cas 操作 Cell 失败
            longAccumulate(x, null, uncontended);
    }
}

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {    // 如果当前线程没有"探针"则初始化
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // 表示是否不扩容
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {  // cells 不为空
            if ((a = as[(n - 1) & h]) == null) {    // 如果对应的 cell 为空
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) { // 双重检查,并 cas 加锁
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;  // 赋值
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty 插槽现在非空
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail 如果有竞争则切换 Cell,重新进行 cas 操作
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))    // cas 修改 cell
                break;
            else if (n >= NCPU || cells != as)  // 如果数组的大小超过 CPU 核数,则不扩容,切换 Cell,重新进行 cas 操作
                collide = false;            // At max size or stale
            else if (!collide)  // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {    // 扩容操作
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);    // 切换 cell
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 未加锁 && cells 没有被其他线程修改 && 加锁成功
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];    // 初始化 cells
                    rs[h & 1] = new Cell(x);    // 为当前线程对应的 cell 赋值,这里没有用 volatile 进行操作是因为 happen-before(下一行)
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x)))) // 上面两个条件都没进去,尝试 cas 修改 base 的值
            break;                          // Fall back on using base
    }
}

这样做真的能提高效率吗?

肯定的啊,要不为啥都推荐用呢。

AtomicLong 是采用的 CAS + 自旋,如果并发量很大的话会有很多线程 cas 失败导致线程空转。

LongAdder 先对 base 进行 cas 操作,当 cas 失败(表示出现并发)则新建 Cell 使用 Cell 进行累加。

缓存行和伪共享是什么鬼?

由于CPU的速度比内存快得多,如果CPU直接读取内存,会导致CPU大部分时间都在划水,于是有了缓存

再看 ConcurrentHashMap插图

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率(局部性原理)。

缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)

缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中

CPU 要保证数据的一致性(Java 通过 volatile 实现),如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效,这就是伪共享问题。

再看 ConcurrentHashMap插图1

由于此处是 Cell 数组,所以他在内存中是连续的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象,而此处的 Cell 数组使用了 volatile 修饰,从而需要保证内存的可见性(写要刷入主存,更新其他核心的缓存行),这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效。

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

再看 ConcurrentHashMap插图2

为啥Cell[]的大小不超过CPU核数?

因为同一时刻最多只有CPU核数的线程来更新。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 再看 ConcurrentHashMap

一个分享Java & Python知识的社区