title: 再看 ConcurrentHashMap
date: 2021/06/15 09:16
构造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor); // 通过传入初始容量和负载因子计算出 map 的"真实容量"
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size); // 向上取到 2^n
this.sizeCtl = cap;
}
- initialCapacity - 用户传入的初始容量
- loadFactor - 负载因子
- concurrencyLevel - 并发度,也就是会有多少个线程同时操作
代码概述:
- 校验参数
- 判断初始容量和并发度谁大,哪个大选哪个
- 重新计算出 map 的容量;size = (1.0 + 8/0.75) = 11.6666666666.
- 向上取到 2^n;cap = 16
?ConcurrentHashMap 的构造为什么要修改容量呢?
假设用户传入的容量是 8,然后直接用 8 作为容量,但是由于负载因子是 0.75,所以当容量达到 6(8 * 0.75)的时候会进行一次扩容,但是用户既然传入了初始容量 8,那么就代表他可能只会添加 8 个元素,那中间穿插了一次扩容是不是就很不合理呢?所以 ConcurrentHashMap 直接在构造的时候通过负载因子计算出合适的容量。
?构造时传入的并发度有何作用?
ConcurrentHashMap 当出现 hash 碰撞的时候会对计算出来的那个桶(Node)进行加锁(synchronized),他将初始容量赋值为并发度(当并发度大的时候)的目的可能是为了让每个线程都有一个桶进行加锁(每个线程计算到了不同的桶中),避免排队的出现。
ConcurrentHashMap#get()
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 1. 综合高位低位,使其具有更好的 hash 性 2. 保证计算出的 hash 值为正数,因为负数有其他用途
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 从主存中取出数组中指定位置的桶
if ((eh = e.hash) == h) { // 如果头结点的 hash 值等于计算出的 hash,则表示命中,继续进行下一步 equals 校验
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // 如果节点小于 0,则表示1.红黑树节点 2.正在搬迁 3.预约的坑,调用节点的 find 方法进行查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 否则就顺着链表一个一个找了
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
hash 值小于 0 的三个子类
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
// 作用 1. 标识当前桶已经移完,2. 当迁移的时候有人get(key)看到这个标记,就知道去新数组中找了
static final class ForwardingNode<K, V> extends Node<K, V> {
}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K, V> extends Node<K, V> {
}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K, V> extends Node<K, V> {
}
?为什么 ConcurrentHashMap#get() 全程没有加锁?
首先,Node[] 采用了 volatile
修饰,保证了 Node[] 的可见性。
transient volatile Node<K, V>[] table;
但是,我们都知道:
volatile 只保证修饰的变量的可见性,对其内容不保证可见性。
所以我们直接通过 Node[0] 这样取出的 Node 其实还是有可能是缓存中的,所以 ConcurrentHashMap 中采用了 tabAt()
来获取指定的 Node,我们看下它的代码:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); // 从主存中取出指定内存地址的 Node 对象
}
之后再对取出的这个 Node 对象进行操作。
由于 ConcurrentHashMap#get() 取出的是某一时刻的 Node 对象,所以可能会出现取不到刚刚放入的元素的情况。
ConcurrentHashMap#put()
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 1. 综合高位低位,使其具有更好的 hash 性 2. 保证计算出的 hash 值为正数,因为负数有其他用途
int binCount = 0; // 用来计数一个桶上有多少个节点,当达到阈值则进行树化
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 当数组为空则初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 从主存取出指定位置的桶,如果为 null,则通过 cas 替换,成功则结束,失败则自旋
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 如果取出的桶节点是 ForwardingNode,表明正在扩容,则帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 对桶进行加锁
if (tabAt(tab, i) == f) { // 再次取一遍,保证桶没有被修改(指没有被替换成其他类型的节点)
if (fh >= 0) { // 如果节点的 hash 值大于 0,则表示是链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 过程中会通过 binCount 计算节点数量
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 如果 key 相同,则替换
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 挂接到尾结点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 如果是 TreeBin 节点则表示是红黑树结构,不想看
Node<K,V> p;
binCount = 2; // 将 binCount 改为 2,因为已经是树了,不需要再次树化了
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 如果超过阈值则对所在的桶 i 进行树化
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 增加 size 计数
return null;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) // 如果 sizeCtl 小于 0,则放弃 cpu 时间片
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 通过 cas 将 sizeCtl 修改为 -1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化数组,内容全都是 null
table = tab = nt;
sc = n - (n >>> 2); // 计算出下次扩容时的容量
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null || // 如果创建出了 CounterCell[]
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 没有创建出 CounterCell[],如果 cas 修改 baseCount 失败,则走下面逻辑
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null || // 获取当前线程的"探针",& 上数组的大小-1,计算出桶的位置,取出 CounterCell
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 通过 cas 修改 CounterCell 的值,进行累加,如果失败则进入 fullAddCount 逻辑
fullAddCount(x, uncontended); // 创建累加单元数组和cell,累加重试
return;
}
if (check <= 1)
return;
s = sumCount(); // 计算元素个数
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) { // sc 小于 0 则表示正在初始化或者扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0) // 结束条件们
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 如果 cas 成功,则帮助扩容
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) // cas 修改 sizeCtl,扩容
transfer(tab, null);
s = sumCount();
}
}
}
?CounterCell 的注释说参照了 LongAdder,我们来看下
关键域
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁;何时会使用他加锁 1. 创建或扩容 Cell[] 2. 为 Cell[] 中元素赋值
transient volatile int cellsBusy;
Cell内部类
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
add() 代码
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { // Cell 数组不为空 或者 cas 操作 base 失败
boolean uncontended = true; // 表示是否无竞争
if (as == null || (m = as.length - 1) < 0 || // Cell 数组为空
(a = as[getProbe() & m]) == null || // 对应位置的 Cell 为空
!(uncontended = a.cas(v = a.value, v + x))) // cas 操作 Cell 失败
longAccumulate(x, null, uncontended);
}
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) { // 如果当前线程没有"探针"则初始化
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // 表示是否不扩容
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells 不为空
if ((a = as[(n - 1) & h]) == null) { // 如果对应的 cell 为空
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) { // 双重检查,并 cas 加锁
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; // 赋值
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty 插槽现在非空
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail 如果有竞争则切换 Cell,重新进行 cas 操作
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // cas 修改 cell
break;
else if (n >= NCPU || cells != as) // 如果数组的大小超过 CPU 核数,则不扩容,切换 Cell,重新进行 cas 操作
collide = false; // At max size or stale
else if (!collide) // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) { // 扩容操作
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 切换 cell
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 未加锁 && cells 没有被其他线程修改 && 加锁成功
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; // 初始化 cells
rs[h & 1] = new Cell(x); // 为当前线程对应的 cell 赋值,这里没有用 volatile 进行操作是因为 happen-before(下一行)
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 上面两个条件都没进去,尝试 cas 修改 base 的值
break; // Fall back on using base
}
}
这样做真的能提高效率吗?
肯定的啊,要不为啥都推荐用呢。
AtomicLong 是采用的 CAS + 自旋,如果并发量很大的话会有很多线程 cas 失败导致线程空转。
LongAdder 先对 base 进行 cas 操作,当 cas 失败(表示出现并发)则新建 Cell 使用 Cell 进行累加。
缓存行和伪共享是什么鬼?
由于CPU的速度比内存快得多,如果CPU直接读取内存,会导致CPU大部分时间都在划水,于是有了缓存。
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率(局部性原理)。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性(Java 通过 volatile 实现),如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效,这就是伪共享问题。
由于此处是 Cell 数组,所以他在内存中是连续的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象,而此处的 Cell 数组使用了 volatile 修饰,从而需要保证内存的可见性(写要刷入主存,更新其他核心的缓存行),这样问题来了:
- Core-0 要修改 Cell[0]
- Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效。
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
为啥Cell[]的大小不超过CPU核数?
因为同一时刻最多只有CPU核数的线程来更新。