请选择 进入手机版 | 继续访问电脑版
搜索
房产
装修
汽车
婚嫁
健康
理财
旅游
美食
跳蚤
二手房
租房
招聘
二手车
教育
茶座
我要买房
买东西
装修家居
交友
职场
生活
网购
亲子
情感
龙城车友
找美食
谈婚论嫁
美女
兴趣
八卦
宠物
手机

嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎

[复制链接]
查看: 15|回复: 0

2万

主题

2万

帖子

7万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
76529
发表于 2020-9-16 06:39 | 显示全部楼层 |阅读模式
前言

上篇文章介绍了 HashMap 源码后,在博客平台广受好评,让本来己经不打算更新这个系列的我,仿佛被打了一顿鸡血。真的,被读者认可的感觉,就是这么奇妙。
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094214940-171419077

然后,有读者希望我能出一版 ConcurrentHashMap 的解析。所以,今天的这篇文章,我准备讲述一下 ConcurrentHashMap 分别在JDK1.7和 JDK1.8 的源码。文章较长,建议小伙伴们可以先收藏再看哦~
说一下为什么我要把源码解析写的这么详细吧。一方面,可以记录下当时自己的思考过程,也方便后续自己复习翻阅;另一方面,记录下来还能够帮助看到文章的小伙伴加深对源码的理解,简直是一举两得的事情。
更正错误

另外,上一篇文章,有个错误点,却没有读者给我指正出来。o(╥﹏╥)o 。因此,我只能自己在此更正一下。见下面截图,
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094215370-1993170344

put 方法,在新值替换旧值那里,应该是只有一种情况的,e 不包括新值。图中的方框也标注出来了。因为,判断 e=p.next==null , 然后新的节点是赋值给 p.next 了,并没有赋值给 e,此时 e 依旧是空的。所以 e!=null,代表当前的 e 是已经存在的旧值。
文章编写过程,难免出现作者考虑不周的地方,如果有朋友发现有错误的地方,还请不吝赐教,指正出来。知错能改,善莫大焉,对于技术,我们应该怀有一颗严谨的心态~
文章目录

这篇文章,我打算从以下几个方面来讲。
1)多线程下的 HashMap 有什么问题?
2)怎样保证线程安全,为什么选用 ConcurrentHashMap?
3)ConcurrentHashMap 1.7 源码解析

  • 底层存储结构
  • 常用变量
  • 构造函数
  • put() 方法
  • ensureSegment() 方法
  • scanAndLockForPut() 方法
  • rehash() 扩容机制
  • get() 获取元素方法
  • remove() 方法
  • size() 方法是怎么统计元素个数的
4)ConcurrentHashMap 1.8 源码解析

  • put()方法详解
  • initTable()初始化表
  • addCount()方法
  • fullAddCount()方法
  • transfer()是怎样扩容和迁移元素的
  • helpTransfer()方法帮助迁移元素
多线程下 HashMap 有什么问题?

在上一篇文章中,已经讲解了 HashMap 1.7 死循环的成因,也正因为如此,我们才说 HashMap 在多线程下是不安全的。但是,在JDK1.8 的 HashMap 改为采用尾插法,已经不存在死循环的问题了,为什么也会线程不安全呢?
我们以 put 方法为例(1.8),
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094215905-1030254601

假如现在有两个线程都执行到了上图中的划线处。当线程一判断为空之后,CPU 时间片到了,被挂起。线程二也执行到此处判断为空,继续执行下一句,创建了一个新节点,插入到此下标位置。然后,线程一解挂,同样认为此下标的元素为空,因此也创建了一个新节点放在此下标处,因此造成了元素的覆盖。
所以,可以看到不管是 JDK1.7 还是 1.8 的 HashMap 都存在线程安全的问题。那么,在多线程环境下,应该怎样去保证线程安全呢?
怎样保证线程安全,为什么选用 ConcurrentHashMap?

首先,你可能想到,在多线程环境下用 Hashtable 来解决线程安全的问题。这样确实是可以的,但是同样的它也有缺点,我们看下最常用的 put 方法和 get 方法。
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094216171-81358215

我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094216398-1409623903

可以看到,不管是往 map 里边添加元素还是获取元素,都会用 synchronized 关键字加锁。当有多个元素之前存在资源竞争时,只能有一个线程可以获取到锁,操作资源。更不能忍的是,一个简单的读取操作,互相之间又不影响,为什么也不能同时进行呢?
所以,hashtable 的缺点显而易见,它不管是 get 还是 put 操作,都是锁住了整个 table,效率低下,因此 并不适合高并发场景。
也许,你还会想起来一个集合工具类 Collections,生成一个SynchronizedMap。其实,它和 Hashtable 差不多,同样的原因,锁住整张表,效率低下。
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094216773-85859626

所以,思考一下,既然锁住整张表的话,并发效率低下,那我把整张表分成 N 个部分,并使元素尽量均匀的分布到每个部分中,分别给他们加锁,互相之间并不影响,这种方式岂不是更好 。这就是在 JDK1.7 中 ConcurrentHashMap 采用的方案,被叫做锁分段技术,每个部分就是一个 Segment(段)。
但是,在JDK1.8中,完全重构了,采用的是 Synchronized + CAS ,把锁的粒度进一步降低,而放弃了 Segment 分段。(此时的 Synchronized 已经升级了,效率得到了很大提升,锁升级可以了解一下)
ConcurrentHashMap 1.7 源码解析

我们看下在 JDK1.7中 ConcurrentHashMap 是怎么实现的。墙裂建议,在本文之前了解一下多线程的基本知识,如JMM内存模型,volatile关键字作用,CAS和自旋,ReentranLock重入锁。
底层存储结构

在 JDK1.7中,本质上还是采用链表+数组的形式存储键值对的。但是,为了提高并发,把原来的整个 table 划分为 n 个 Segment 。所以,从整体来看,它是一个由 Segment 组成的数组。然后,每个 Segment 里边是由 HashEntry 组成的数组,每个 HashEntry之间又可以形成链表。我们可以把每个 Segment 看成是一个小的 HashMap,其内部结构和 HashMap 是一模一样的。
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094217079-1521632360

当对某个 Segment 加锁时,如图中 Segment2,并不会影响到其他 Segment 的读写。每个 Segment 内部自己操作自己的数据。这样一来,我们要做的就是尽可能的让元素均匀的分布在不同的 Segment中。最理想的状态是,所有执行的线程操作的元素都是不同的 Segment,这样就可以降低锁的竞争。
废话了这么多,还是来看底层源码吧,因为所有的思想都在代码里体现。借用 Linus的一句话,“No BB . Show me the code ” (改编版,哈哈)
常用变量

先看下 1.7 中常用的变量和内部类都有哪些,这有助于我们了解 ConcurrentHashMap 的整体结构。
  1. //默认初始化容量,这个和 HashMap中的容量是一个概念,表示的是整个 Map的容量static final int DEFAULT_INITIAL_CAPACITY = 16;//默认加载因子static final float DEFAULT_LOAD_FACTOR = 0.75f;//默认的并发级别,这个参数决定了 Segment 数组的长度static final int DEFAULT_CONCURRENCY_LEVEL = 16;//最大的容量static final int MAXIMUM_CAPACITY = 1  0) || initialCapacity < 0 || concurrencyLevel  MAX_SEGMENTS)                concurrencyLevel = MAX_SEGMENTS;        // Find power-of-two sizes best matching arguments        //偏移量,是为了对hash值做位移操作,计算元素所在的Segment下标,put方法详讲        int sshift = 0;        //用于设定最终Segment数组的长度,必须是2的n次幂        int ssize = 1;        //这里就是计算 sshift 和 ssize 值的过程  (1)         while (ssize < concurrencyLevel) {                ++sshift;                ssize  segmentShift) & segmentMask;        //这里是用Unsafe类的原子操作找到Segment数组中j下标的 Segment 对象        if ((s = (Segment)UNSAFE.getObject          // nonvolatile; recheck                 (segments, (j  segmentShift) & segmentMask  //计算 HashEntry 数组下标 (tab.length - 1) & hash
复制代码
思考一下,为什么它们的算法不一样呢? 计算 Segment 数组下标是用的 hash值高几位(这里以高 4 位为例)和掩码做与运算,而计算 HashEntry 数组下标是直接用的 hash 值和数组长度减1做与运算。
我的理解是,这是为了尽量避免当前 hash 值计算出来的 Segment 数组下标和计算出来的 HashEntry 数组下标趋于相同。简单说,就是为了避免分配到同一个 Segment 中的元素扎堆现象,即避免它们都被分配到同一条链表上,导致链表过长。同时,也是为了减少并发。下面做一个运算,帮助理解一下(假设不用高 4 位运算,而是正常情况都用低位做运算)。
  1. //我们以并发级别16,HashEntry数组容量 4 为例,则它们参与运算的掩码分别为 15 和 3//hash值0110 1101 0110 1111 0110 1110 0010 0010//segmentMask = 15   ,标记为 (1)0000 0000 0000 0000 0000 0000 0000 1111//tab.length - 1 = 3     ,标记为 (2)0000 0000 0000 0000 0000 0000 0000 0011//用 hash 分别和 15 ,3 做与运算,会发现得到的结果是一样,都是十进制 2.//这表明,当前 hash值被分配到下标为 2 的 Segment 中,同时,被分配到下标为 2 的 HashEntry 数组中//现在若有另外一个 hash 值 h2,和第一个hash值,高位不同,但是低4位相同,1010 1101 0110 1111 0110 1110 0010 0010//我们会发现,最后它也会被分配到下标为 2 的 Segment 和 HashEntry 数组,就会和第一个元素形成链表。//所以,为了避免这种扎堆现象,让元素尽量均匀分配,就让 hash 的高 4 位和 (1)处做与 运算,而用低位和 (2)处做与运算//这样计算后,它们所在的Segment下标分别为 6(0110), 10(1010),即使它们在HashEntry数组中的下标都为 2(0010),也无所谓//因为它们并不在一个 Segment 中,也就不会在同一个 HashEntry 数组中,更不会形成链表。//更重要的是,它们不会有并发,因为在各自不同的 Segment 自己操作自己的加锁解锁,互不影响
复制代码
可能有的小伙伴就会打岔了,那如果两个 hash 值,低位和高位都相同,怎么办呢。如果是这样,我只能说,这个 hash 算法也太烂了吧。(这里的 hash 算法也会尽量避免这种情况,当然只是减少几率,并不能杜绝)
我有个大胆的想法,这里的高低位不同的计算方式,是不是后边 1.8 HashMap 让 hash 高低位做异或运算的引子呢?不得而知。。
put 方法比较简单,只要能看懂 HashMap 中的 put 方法,这里也没问题。主要是它调用的子方法比较复杂,下边一个一个讲解。
ensureSegment()方法

回到 Map的 put 方法,判断 j 下标的 Segment为空后,则需要调用此方法,初始化一个 Segment 对象,以确保拿到的对象一定是不为空的,否则无法执行s.put了。
  1. //k为 (hash >>> segmentShift) & segmentMask 算法计算出来的值private Segment ensureSegment(int k) {        final Segment[] ss = this.segments;        //u代表 k 的偏移量,用于通过 UNSAFE 获取主内存最新的实际 K 值        long u = (k  MAX_SCAN_RETRIES) {                        lock();                        break;                }                //3.若 retries 的值为偶数,并且从内存中再次获取到最新的头节点,判断若不等于first                //则说明有其他线程修改了当前下标位置的头结点,于是需要更新头结点信息。                else if ((retries & 1) == 0 &&                                 (f = entryForHash(this, hash)) != first) {                        //更新头结点信息,并把重试次数重置为 -1,继续下一次循环,从最新的头结点遍历当前链表。                        e = first = f; // re-traverse if entry changed                        retries = -1;                }        }        return node;}
复制代码
这个方法逻辑比较复杂,会一直循环尝试获取锁,若获取成功,则返回。否则的话,每次循环时,都会同时遍历当前链表。若遍历完了一次,还没找到和key相等的节点,就会预先创建一个节点。注意,这里只是预测性的创建一个新节点,也有可能在这之前,就已经获取锁成功了。
同时,当重试次每偶数次时,就会检查一次当前最新的头结点是否被改变。因为若有变化的话,还需要从最新的头结点开始遍历链表。
还有一种情况,就是循环次数达到了最大限制,则停止循环,用阻塞的方式去获取锁。这时,也就停止了遍历链表的动作,当前线程也不会再做其他预热(warm up)的事情。
关于为什么预测性的创建新节点,源码中原话是这样的:
Since traversal speed doesn't matter, we might as well help warm up the associated code and accesses as well.
解释一下就是,因为遍历速度无所谓,所以,我们可以预先(warm up)做一些相关联代码的准备工作。这里相关联代码,指的就是循环中,在获取锁成功或者调用 lock 方法之前做的这些事情,当然也包括创建新节点。
在put 方法中可以看到,有一句是判断 node 是否为空,若创建了,就直接头插。否则的话,它也会自己创建这个新节点。
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询 1714084-20200421094217689-1640532123

scanAndLockForPut 这个方法可以确保返回时,当前线程一定是获取到锁的状态。
rehash()方法

当 put 方法时,发现元素个数超过了阈值,则会扩容。需要注意的是,每个Segment只管它自己的扩容,互相之间并不影响。换句话说,可以出现这个 Segment的长度为2,另一个Segment的长度为4的情况(只要是2的n次幂)。
  1. //node为创建的新节点private void rehash(HashEntry node) {        //当前Segment中的旧表        HashEntry[] oldTable = table;        //旧的容量        int oldCapacity = oldTable.length;        //新容量为旧容量的2倍        int newCapacity = oldCapacity >> segmentShift) & segmentMask) >> 16)) & HASH_BITS;  HASH_BITS = 0x7fffffff;        // 0x7fffffff ,二进制为 0111 1111 1111 1111 1111 1111 1111 1111 。        //所以,hash值除了做了高低位异或运算,还多了一步,保证最高位的 1 个 bit 位总是0。        //这里,我并没有明白它的意图,仅仅是保证计算出来的hash值不超过 Integer 最大值,且不为负数吗。        //同 HashMap 的hash 方法对比一下,会发现连源码注释都是相同的,并没有多说明其它的。        //我个人认为意义不大,因为最后 hash 是为了和 capacity -1 做与运算,而 capacity 最大值为 1= TREEIFY_THRESHOLD)                                        treeifyBin(tab, i);                                //把旧节点值返回                                if (oldVal != null)                                        return oldVal;                                break;                        }                }        }        //给元素个数加 1,并有可能会触发扩容,比较复杂,稍后细讲        addCount(1L, binCount);        return null;}
复制代码
initTable()方法

先看下当数组为空时,是怎么初始化表的。
  1. private final Node[] initTable() {        Node[] tab; int sc;        //循环判断表是否为空,直到初始化成功为止。        while ((tab = table) == null || tab.length == 0) {                //sizeCtl 这个值有很多情况,默认值为0,                //当为 -1 时,说明有其它线程正在对表进行初始化操作                //当表初始化成功后,又会把它设置为扩容阈值                //当为一个小于 -1 的负数,用来表示当前有几个线程正在帮助扩容(后边细讲)                if ((sc = sizeCtl) < 0)                        //若 sc 小于0,其实在这里就是-1,因为此时表是空的,不会发生扩容,sc只能为正数或者-1                        //因此,当前线程放弃 CPU 时间片,只是自旋。                        Thread.yield(); // lost initialization race; just spin                //通过 CAS 把 sc 的值设置为-1,表明当前线程正在进行表的初始化,其它失败的线程就会自旋                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                        try {                                //重新检查一下表是否为空                                if ((tab = table) == null || tab.length == 0) {                                        //如果sc大于0,则为sc,否则返回默认容量 16。                                        //当调用有参构造创建 Map 时,sc的值是大于0的。                                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                                        @SuppressWarnings("unchecked")                                        //创建数组                                        Node[] nt = (Node[])new Node[n];                                        table = tab = nt;                                        //n减去 1/4 n ,即为 0.75n ,表示扩容阈值                                        sc = n - (n >>> 2);                                }                        } finally {                                //更新 sizeCtl 为扩容阈值                                sizeCtl = sc;                        }                        //若当前线程初始化表成功,则跳出循环。其它自旋的线程因为判断数组不为空,也会停止自旋                        break;                }        }        return tab;}
复制代码
addCount()方法

若 put 方法元素插入成功之后,则会调用此方法,传入参数为 addCount(1L, binCount)。这个方法的目的很简单,就是把整个 table 的元素个数加 1 。但是,实现比较难。
我们先思考一下,如果让我们自己去实现这样的统计元素个数,怎么实现?
类比 1.8 的 HashMap ,我们可以搞一个 size 变量来存储个数统计。但是,这是在多线程环境下,需要考虑并发的问题。因此,可以把 size 设置为 volatile 的,保证可见性,然后通过 CAS 乐观锁来自增 1。
这样虽然也可以实现。但是,设想一下现在有非常多的线程,都在同一时间操作这个 size 变量,将会造成特别严重的竞争。所以,基于此,这里做了更好的优化。让这些竞争的线程,分散到不同的对象里边,单独操作它自己的数据(计数变量),用这样的方式尽量降低竞争。到最后需要统计 size 的时候,再把所有对象里边的计数相加就可以了。
上边提到的 size ,在此用 baseCount 表示。分散到的对象用 CounterCell 表示,对象里边的计数变量用 value 表示。注意这里的变量都是 volatile 修饰的。
当需要修改元素数量时,线程会先去 CAS 修改 baseCount 加1,若成功即返回。若失败,则线程被分配到某个 CounterCell ,然后操作 value 加1。若成功,则返回。否则,给当前线程重新分配一个 CounterCell,再尝试给 value 加1。(这里简略的说,实际更复杂)
CounterCell 会组成一个数组,也会涉及到扩容问题。所以,先画一个示意图帮助理解一下。
我的关键词 嘿嘿,我就知道面试官接下来要问我 ConcurrentHashMap 底层原理了,看我怎  新闻咨询

[code]//线程被分配到的格子@sun.misc.Contended static final class CounterCell {        //此格子内记录的 value 值    volatile long value;    CounterCell(long x) { value = x; }}//用来存储线程和线程生成的随机数的对应关系static final int getProbe() {        return UNSAFE.getInt(Thread.currentThread(), PROBE);}// x为1,check代表链表上的元素个数private final void addCount(long x, int check) {        CounterCell[] as; long b, s;        //此处要进入if有两种情况        //1.数组不为空,说明数组已经被创建好了。        //2.若数组为空,说明数组还未创建,很有可能竞争的线程非常少,因此就直接 CAS 操作 baseCount        //若 CAS 成功,则方法跳转到 (2)处,若失败,则需要考虑给当前线程分配一个格子(指CounterCell对象)        if ((as = counterCells) != null ||                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {                CounterCell a; long v; int m;                //字面意思,是无竞争,这里先标记为 true,表示还没有产生线程竞争                boolean uncontended = true;                //这里有三种情况,会进入 fullAddCount 方法                //1.若数组为空,进方法 (1)                //2.ThreadLocalRandom.getProbe() 方法会给当前线程生成一个随机数(可以简单的认为也是一个hash值)                //然后用随机数与数组长度取模,计算它所在的格子。若当前线程所分配到的格子为空,进方法 (1)。                //3.若数组不为空,且线程所在格子不为空,则尝试 CAS 修改此格子对应的 value 值加1。                //若修改成功,则跳转到 (3),若失败,则把 uncontended 值设为 fasle,说明产生了竞争,然后进方法 (1)                if (as == null || (m = as.length - 1) < 0 ||                        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||                        !(uncontended =                          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {                        //方法(1), 这个方法的目的是让当前线程一定把 1 加成功。情况更多,更复杂,稍后讲。                        fullAddCount(x, uncontended);                        return;                }                //(3)能走到这,说明数组不为空,且修改 baseCount失败,                //且线程被分配到的格子不为空,且修改 value 成功。                //但是这里没明白为什么小于等于1,就直接返回了,这里我怀疑之前的方法漏掉了binCount=0的情况。                //而且此处若返回了,后边怎么判断扩容?(存疑)                if (check = 0) {                Node[] tab, nt; int n, sc;                //若元素个数达到扩容阈值,且tab不为空,且tab数组长度小于最大容量                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&                           (n = tab.length) < MAXIMUM_CAPACITY) {                        //这里假设数组长度n就为16,这个方法返回的是一个固定值,用于当做一个扩容的校验标识                        //可以跳转到最后,看详细计算过程,0000 0000 0000 0000 1000 0000 0001 1011                        int rs = resizeStamp(n);                        //若sc小于0,说明正在扩容                        if (sc < 0) {                            //sc的结构类似这样,1000 0000 0001 1011 0000 0000 0000 0001                                //sc的高16位是数据校验标识,低16位代表当前有几个线程正在帮助扩容,RESIZE_STAMP_SHIFT=16                                //因此判断校验标识是否相等,不相等则退出循环                                //sc == rs + 1,sc == rs + MAX_RESIZERS 这两个应该是用来判断扩容是否已经完成,但是计算方法存疑                                //感兴趣的可以看这个地址,应该是一个 bug ,                                // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427                                //nextTable=null 说明需要扩容的新数组还未创建完成                                //transferIndex这个参数小于等于0,说明已经不需要其它线程帮助扩容了,                                //但是并不说明已经扩容完成,因为有可能还有线程正在迁移元素。稍后扩容细讲就明白了。                                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                                        transferIndex

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

技术支持:迪恩网络科技公司  Powered by Discuz! X3.2
快速回复 返回顶部 返回列表