ConcurrentHashMap原理初探
本文将介绍 ConcurrentHashMap 相关原理,具体内容包括:
- 初始化方法
- put 方法
- 如何做到线程安全的 rehash
参考:https://zhuanlan.zhihu.com/p/84390205
1. 初始化
ConcurrentHashMap 不像 ArrayList 那样在构造函数中就进行了数组初始化,ConcurrentHashMap 是只有在第一次调用 put 放入元素的时候才会进行链表数组初始化,并且由于多线程,所以链表数组的初始化方式也要求是一个线程安全的操作,在第一次 put 时如果数组为空或容量为空(这个容量 sizeCtl 也是一个 volatile 修饰,保证其在链表初始化和扩容时在多线程下的可见性)。会调用 initTable 保证在多线程下安全初始化链表
sizeCtl = -1,表示告知其他线程正在进行初始化
// sizeCtl方法也有volatile修饰,保证链表初始化和扩容时在多线程的可见性
private transient volatile int sizeCtl;
// 为了保证容量都是2 ^ n,这里采用移位并加上或运算,让所有位数都变为1,然后才加1
// 假设构造函数指定容量18,那么结果就是离18最近的2的幂是32,那么这个数组初始化容量是32
// 假设构造函数指定容量小于16,那么结果就是默认的初始化容量16
// 输入的数二进制为k位,那么经过下面操作到达n |= n >>> 16后就是k个1,最后n+1就是k+1位,最高位为1,低位都是0,保证是2的幂
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 指定容量参数进行初始化,ConcurretHashMap和HashMap不会像ArrayList那样在构造函数中就进行初始化链表数组
// 只有等到第一次put元素时才会进行初始化,第一次只是将sizeCtl进行赋值,为此ConcurrentHashMap需要将初始表的操作也要是一个线程安全操作
// 通过tableSizeFor计算得出的容量大小为 如果用户指定容量是小于16那么就是16,如果用户指定容量是大于16,那么就采用最靠近该数的2的幂的数
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl小于0,说明另外的正在执行CAS成功,正在进行初始化,那么该线程让出CPU时间片
if ((sc = sizeCtl) < 0) // 将sizeCtl赋值给sc,是赋值一份
// 调用yield方法会让这个线程从执行状态(running)进入到就绪状态(ready),
// 让出时间片后,下次CPU时间片调度还是从可执行态中集合中挑选,由于CPU时间片调度的算法不同,该线程还是可能再次得到CPU时间片执行的
Thread.yield(); // lost initialization race; just spin
// 这里也就是尝试将成员变量sizeCtl值更新为-1,具体是通过内存偏移量读取判断是否一致更新的
// 比较当前这个对象内存偏移SIZECTL个位置的值其实就是一开始读取sc同一个位置,是否我预先读到期望值一致sc,一致就将-1赋给该位置,也就是sc读到就会为-1,表示有人cas成功返回true
// 比较sc的值与当前的sizeCtl值是否一致,一致就将sizeCtl修改为-1,表示当前正在有人进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// double-check
// cas成功后,再次检查该table是否为空,如果不为空就直接返回,不存在多次new链表数组的操作
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 如果没有指定容量,就采用默认容量16进行数组初始化
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
总结:
initTable 会保证多线程下初始化链表数组正常,是通过 CAS + 自旋的操作来进行的 1.查询 sizeCtl 值,如果小于 0 说明有线程正在初始化,就 yield 让出时间片,从 running 状态变为 ready 就绪状态 2.如果当前值 sizeCtl 大于 0,就尝试去 CAS 比较 sizeCtl 值是不是一开始读到的值,然后将 sizeCtl 更新为-1 3.更新 sizeCtl 为-1 成功后,再次检查数组是否为空,如果不为空就说明初始化过了就直接返回。如果为空就就行数组初始化
为什么这里需要 double-check,而之前看过的代码 cas 都是不需要 double-check 的?
原因在于,之前的代码 cas 修改的就是需要修改的值,比如我修改一个数为 B,原数为 A,那么我先读出 B,然后比对是不是 A,是 A 就更新为 B,读取和修改该值是一个原子性操作,而且这个原子性操作结束后就已经到达了我想要达到的目的了,我的目的就是将这个值修改成功;
但是这里是 new table[],如何利用 compareAndSwapObject 的话,compareAndSwapObject 是根据内存偏移量找到该元素的,但是你还没见链表数组时,是不知道这个链表数组有多大的,所以知道的偏移量,但不知道什么地方结束;
所以 compareAndSwap 都是修改已经存在的东西,而不是新建一个东西,无法在 compareAndSwap 中 new 出来一个新的东西,所以这里只有将 sizeCtl 一个已经存在的东西当做一个天然的标志位将其设置为-1,其他人读取就无法进入,等待别人也读取正确进入,double-checktable 已经不为空了,那么自然就退出了
2. put 方法
put 方法是一个 for 循环自旋操作 1.首先查看链表数组是否初始化,如果未初始化就调用 initTable(又是一个 CAS+自旋操作),初始化链表数组(也是一个线程安全操作) 2.计算 hash 值定位到数组某个位置,如果该位置为空,就 cas 自旋将改为为新建的 Node,如果失败就重试 3.如果该位置不为空,就通过 synchronized 将该位置头结点对象锁住,然后再次检查 double-check 头结点还是不是之前读到的头结点,这一步主要是防止 put 过程中其他线程 rehash 改变了该 Node
如果不是 f 就会退出 synchronized 代码块重试
如果是之前读到的 f,就沿着链表或数组查找
- 找到就更新相同的 key 就更新其 value 退出自旋操作;
- 找不到就新建 Node 结点,新建结点检查链表长度是否超过 8,超过 8 就转为红黑树退出自旋操作
4.检查数组占用是否超过 0.75 * capacity 阈值,就 rehash 操作
put 方法通过锁住头结点的方式,锁住成功后再次检查是否为一开始的头结点,如果不是就重新读取重试;如果是就沿着链表查看该 key,存在更新;不存在插入;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 注意此处是for循环:失败就会重试
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果没有在构造函数指定容量的话,那么构造函数结束链表数组还是为null的,这种情况只有在第一次put时才会初始化容量为你指定容量的链表数组
// 如果没有指定容量就采用16的默认容量
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 为空:for自旋 + cas向该空位置设置为一个头结点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// synchronized锁住该头结点然后进行操作
synchronized (f) {
// double-check
// 锁住后再次检查是否为之前读到的头结点,防止rehash的影响,因为rehash后头结点发生变化
if (tabAt(tab, i) == f) {
// fh=-1表示正在扩容
if (fh >= 0) { // 说明是链表
binCount = 1;
// 顺着链表向下找
for (Node<K,V> e = f;; ++binCount) { // 沿着链表找的过程会进行计数
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) // 找到就更新退出synchronized代码块
e.val = value;
break;
}
Node<K,V> pred = e; // 没有就
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null); // 顺着向下找,如果找到空了,就在链表最后一个结点新添加一个结点
break;
}
}
}
else if (f instanceof TreeBin) { // 如果是红黑树的话,就利用红黑树插入和更新的方法
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果链表长度大于等于8,就转为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break; // 退出外层while自旋循环
}
}
}
addCount(1L, binCount); // 此处都是占了一个新位置的,如果超过负载因子会进行扩容rehash
return null;
}
3. 线程安全的 rehash
ConcurrentHashMap 不同于 HashMap,HashMap 扩容时 Node 是不会新建,而是取下每个 Node 链接到新数组中某个位置;而 ConcurrentHashMap 是通过在 rehash 原链表数组某个位置时,将链表数组的头结点锁住,然后该链表或者红黑树遍历到的每个 Node 都是根据其值新建一个新 Node 出来,这样可以在 rehash 的时候还可以 get,等待全部 hash 完后才将 nextTable 替换 table
- cas 比对更新当前的 sizeCtl 是否为读到的值,如果是就+1 成功才有资格进入到 tranfer 进行 rehash 代码,防止多个线程 new 出多个 hash 表处理(通过 sizeCtl 控制,使扩容过程中不会 new 出多个新 hash 表来。)
- 首先 new 一个新的 hash 表(nextTable)出来,大小是原来的 2 倍。后面的 rehash 都是针对这个新的 hash 表操作,不涉及原 hash 表(table)
- 然后会对原 hash 表(table)中的每个链表进行 rehash,此时会尝试获取头节点的锁。这一步就保证了在 rehash 的过程中不能对这个链表执行 put 操作。
- 最后,将所有键值对重新 rehash 到新表(nextTable)中后,用 nextTable 将 table 替换。这就避免了 HashMap 中 get 和扩容并发时,可能 get 到 null 的问题。
在整个过程中,共享变量的存储和读取全部通过 volatile 或 CAS 的方式,保证了线程安全。
本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。