一止长渊

ConcurrentHashMap原理初探

N 人看过
字数:2.8k字 | 预计阅读时长:11分钟

本文将介绍 ConcurrentHashMap 相关原理,具体内容包括:

  • 初始化方法
  • put 方法
  • 如何做到线程安全的 rehash

参考:https://zhuanlan.zhihu.com/p/84390205

1. 初始化

ConcurrentHashMap 不像 ArrayList 那样在构造函数中就进行了数组初始化,ConcurrentHashMap 是只有在第一次调用 put 放入元素的时候才会进行链表数组初始化,并且由于多线程,所以链表数组的初始化方式也要求是一个线程安全的操作,在第一次 put 时如果数组为空或容量为空(这个容量 sizeCtl 也是一个 volatile 修饰,保证其在链表初始化和扩容时在多线程下的可见性)。会调用 initTable 保证在多线程下安全初始化链表
sizeCtl = -1,表示告知其他线程正在进行初始化

    // sizeCtl方法也有volatile修饰,保证链表初始化和扩容时在多线程的可见性
    private transient volatile int sizeCtl;

    // 为了保证容量都是2 ^ n,这里采用移位并加上或运算,让所有位数都变为1,然后才加1
    // 假设构造函数指定容量18,那么结果就是离18最近的2的幂是32,那么这个数组初始化容量是32
    // 假设构造函数指定容量小于16,那么结果就是默认的初始化容量16
    // 输入的数二进制为k位,那么经过下面操作到达n |= n >>> 16后就是k个1,最后n+1就是k+1位,最高位为1,低位都是0,保证是2的幂
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }


    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // 指定容量参数进行初始化,ConcurretHashMap和HashMap不会像ArrayList那样在构造函数中就进行初始化链表数组
        // 只有等到第一次put元素时才会进行初始化,第一次只是将sizeCtl进行赋值,为此ConcurrentHashMap需要将初始表的操作也要是一个线程安全操作
        // 通过tableSizeFor计算得出的容量大小为 如果用户指定容量是小于16那么就是16,如果用户指定容量是大于16,那么就采用最靠近该数的2的幂的数
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 如果sizeCtl小于0,说明另外的正在执行CAS成功,正在进行初始化,那么该线程让出CPU时间片
            if ((sc = sizeCtl) < 0) // 将sizeCtl赋值给sc,是赋值一份
                // 调用yield方法会让这个线程从执行状态(running)进入到就绪状态(ready),
                // 让出时间片后,下次CPU时间片调度还是从可执行态中集合中挑选,由于CPU时间片调度的算法不同,该线程还是可能再次得到CPU时间片执行的
                Thread.yield(); // lost initialization race; just spin
            // 这里也就是尝试将成员变量sizeCtl值更新为-1,具体是通过内存偏移量读取判断是否一致更新的
            // 比较当前这个对象内存偏移SIZECTL个位置的值其实就是一开始读取sc同一个位置,是否我预先读到期望值一致sc,一致就将-1赋给该位置,也就是sc读到就会为-1,表示有人cas成功返回true
            // 比较sc的值与当前的sizeCtl值是否一致,一致就将sizeCtl修改为-1,表示当前正在有人进行初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // double-check
                    // cas成功后,再次检查该table是否为空,如果不为空就直接返回,不存在多次new链表数组的操作
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 如果没有指定容量,就采用默认容量16进行数组初始化
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

总结:
initTable 会保证多线程下初始化链表数组正常,是通过 CAS + 自旋的操作来进行的 1.查询 sizeCtl 值,如果小于 0 说明有线程正在初始化,就 yield 让出时间片,从 running 状态变为 ready 就绪状态 2.如果当前值 sizeCtl 大于 0,就尝试去 CAS 比较 sizeCtl 值是不是一开始读到的值,然后将 sizeCtl 更新为-1 3.更新 sizeCtl 为-1 成功后,再次检查数组是否为空,如果不为空就说明初始化过了就直接返回。如果为空就就行数组初始化

为什么这里需要 double-check,而之前看过的代码 cas 都是不需要 double-check 的?

原因在于,之前的代码 cas 修改的就是需要修改的值,比如我修改一个数为 B,原数为 A,那么我先读出 B,然后比对是不是 A,是 A 就更新为 B,读取和修改该值是一个原子性操作,而且这个原子性操作结束后就已经到达了我想要达到的目的了,我的目的就是将这个值修改成功;
但是这里是 new table[],如何利用 compareAndSwapObject 的话,compareAndSwapObject 是根据内存偏移量找到该元素的,但是你还没见链表数组时,是不知道这个链表数组有多大的,所以知道的偏移量,但不知道什么地方结束;
所以 compareAndSwap 都是修改已经存在的东西,而不是新建一个东西,无法在 compareAndSwap 中 new 出来一个新的东西,所以这里只有将 sizeCtl 一个已经存在的东西当做一个天然的标志位将其设置为-1,其他人读取就无法进入,等待别人也读取正确进入,double-checktable 已经不为空了,那么自然就退出了

2. put 方法

put 方法是一个 for 循环自旋操作 1.首先查看链表数组是否初始化,如果未初始化就调用 initTable(又是一个 CAS+自旋操作),初始化链表数组(也是一个线程安全操作) 2.计算 hash 值定位到数组某个位置,如果该位置为空,就 cas 自旋将改为为新建的 Node,如果失败就重试 3.如果该位置不为空,就通过 synchronized 将该位置头结点对象锁住,然后再次检查 double-check 头结点还是不是之前读到的头结点,这一步主要是防止 put 过程中其他线程 rehash 改变了该 Node

  • 如果不是 f 就会退出 synchronized 代码块重试

  • 如果是之前读到的 f,就沿着链表或数组查找

    • 找到就更新相同的 key 就更新其 value 退出自旋操作;
    • 找不到就新建 Node 结点,新建结点检查链表长度是否超过 8,超过 8 就转为红黑树退出自旋操作

    4.检查数组占用是否超过 0.75 * capacity 阈值,就 rehash 操作

put 方法通过锁住头结点的方式,锁住成功后再次检查是否为一开始的头结点,如果不是就重新读取重试;如果是就沿着链表查看该 key,存在更新;不存在插入;

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 注意此处是for循环:失败就会重试
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果没有在构造函数指定容量的话,那么构造函数结束链表数组还是为null的,这种情况只有在第一次put时才会初始化容量为你指定容量的链表数组
            // 如果没有指定容量就采用16的默认容量
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 为空:for自旋 + cas向该空位置设置为一个头结点
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // synchronized锁住该头结点然后进行操作
                synchronized (f) {
                    // double-check
                    // 锁住后再次检查是否为之前读到的头结点,防止rehash的影响,因为rehash后头结点发生变化
                    if (tabAt(tab, i) == f) {
                        // fh=-1表示正在扩容
                        if (fh >= 0) { // 说明是链表
                            binCount = 1;
                            // 顺着链表向下找
                            for (Node<K,V> e = f;; ++binCount) { // 沿着链表找的过程会进行计数
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) // 找到就更新退出synchronized代码块
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e; // 没有就
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null); // 顺着向下找,如果找到空了,就在链表最后一个结点新添加一个结点
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 如果是红黑树的话,就利用红黑树插入和更新的方法
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 如果链表长度大于等于8,就转为红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break; // 退出外层while自旋循环
                }
            }
        }
        addCount(1L, binCount); // 此处都是占了一个新位置的,如果超过负载因子会进行扩容rehash
        return null;
    }

3. 线程安全的 rehash

ConcurrentHashMap 不同于 HashMap,HashMap 扩容时 Node 是不会新建,而是取下每个 Node 链接到新数组中某个位置;而 ConcurrentHashMap 是通过在 rehash 原链表数组某个位置时,将链表数组的头结点锁住,然后该链表或者红黑树遍历到的每个 Node 都是根据其值新建一个新 Node 出来,这样可以在 rehash 的时候还可以 get,等待全部 hash 完后才将 nextTable 替换 table

  1. cas 比对更新当前的 sizeCtl 是否为读到的值,如果是就+1 成功才有资格进入到 tranfer 进行 rehash 代码,防止多个线程 new 出多个 hash 表处理(通过 sizeCtl 控制,使扩容过程中不会 new 出多个新 hash 表来。)
  2. 首先 new 一个新的 hash 表(nextTable)出来,大小是原来的 2 倍。后面的 rehash 都是针对这个新的 hash 表操作,不涉及原 hash 表(table)
  3. 然后会对原 hash 表(table)中的每个链表进行 rehash,此时会尝试获取头节点的锁。这一步就保证了在 rehash 的过程中不能对这个链表执行 put 操作。
  4. 最后,将所有键值对重新 rehash 到新表(nextTable)中后,用 nextTable 将 table 替换。这就避免了 HashMap 中 get 和扩容并发时,可能 get 到 null 的问题。

在整个过程中,共享变量的存储和读取全部通过 volatile 或 CAS 的方式,保证了线程安全。

本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。