ConcurrentHashMap源码剖析

一、ConcurrentHashMap的存储结构(常识性讲解)

ConcurrentHashMap就是一个安全的HashMap,在JDK1.8中,ConcurrentHashMap的存储结构和HashMap一致

key-value结构存储,put(key,value);

image.png

二、正确的理解ConcurrentHashMap的安全性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.mashibing.public220221;

import java.util.concurrent.ConcurrentHashMap;

/**
* 安全性问题
* 计数操作,统计内容,如果出现过+1,如果没出现,赋值为1
*/
public class A_0301_001_ConcurrentHashMapTest {
private static final ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();

public static void main(String[] args) {
// 如果是并发的复合操作,那么就凉凉
Long count = map.get("test");
if (count == null) {
map.put("test", 1L);
} else {
map.put("test", count + 1);
}
// 在ConcurrentMap接口中提供了几个方法
// 1. putIfAbsent(key,value),当map中的key不存在时,就将key-value存储进去,并返回null, 如果存在,什么都不做
// 2. boolean replace(K key, V oldValue, V newValue); 替换时,需要保证存在的key-oldValue完全匹配,才会将newValue替换掉之前的oldValue
// 如果replace成功替换,返回true,否则返回false

while (true) {
Long count = map.get("test");
if (count == null) {
// 现在map中没有这个key
if (map.putIfAbsent("test", 1L) == null) {
// A == null
break;
}
} else {
// 现在map中有这个key,将value + 1
// B == 1
if (map.replace("test", count, count + 1))
break;
}
}
// ConcurrentHashMap是为了保证并发操作map时,map的数据不会被破坏,但是当多线程复合操作map时,ConcurrentHashMap是无法保证安全的
}
}

三、从ConcurrentHashMap的put方法开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key和value如果为null,直接甩异常
if (key == null || value == null) throw new NullPointerException();
// 计算key的hash值,(通过hash值决定Entry存放到数组的哪个索引位置)
int hash = spread(key.hashCode());
// 暂时当做标识,值为0
int binCount = 0;
// 是死循环,开始!!声明临时变量为tab,tab赋值了table,table就是当前HashMap的数组!
for (Node<K,V>[] tab = table;;) {
// 声明了一堆变量
Node<K,V> f; int n, i, fh;
// 如果tab为null,或者tab的长度为0
if (tab == null || (n = tab.length) == 0)
// 进来说明数组没有初始化,开始初始化,ConcurrentHashMap要避免并发初始化时造成的问题,这里深入研究一下
tab = initTable();
// tabAt(数组,索引位置),得到这个数组指定索引位置的值,f就是数组的下标位置的值
// 如果f == null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 进到这,说明索引位置没有值,基于CAS的方式将当前的key-value存储到这个索引位置
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// 如果CAS成功,添加数据成功(添加到了数组上),如果走false,继续上述操作,尝试其他内容
break; // no lock when adding to empty bin
}
// f是经过上述if得到的索引位置的值,当前key-value的hash是否为MOVED,如果相等,证明当前位置正在扩容
else if ((fh = f.hash) == MOVED)
// 如果正在扩容,帮你扩容(构建长度为原来2倍的数组,并且将老数组的值移动到新数组),帮助扩容的操作是迁移数据的操作
tab = helpTransfer(tab, f);
else {
// 第一个判断:数组初始化了么?
// 第二个判断:数组指定的位置有值么?
// 第三个判断:现在正在扩容么?
// 第四个判断:是否需要将数据挂到链表上,或者添加到红黑树中?(出现了Hash冲突(碰撞))
V oldVal = null;
// 加个锁,锁的是f(f是什么?)
synchronized (f) {
// 判断f是否为当前数组指定下标的位置
if (tabAt(tab, i) == f) {
// 如果fh 大于等于 0(判断hash值是否大于等于0),说明是链表
if (fh >= 0) {
// 将标识修改为1
binCount = 1;
// 开始循环,为了将插入的值挂到链表的最后面
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && // 当前指向的节点和要插入的节点的hash值是否相等
(
(ek = e.key) == key || // 判断指向的节点的key是否等于当前要插入的节点的key
key.equals(ek))) { // 指向的节点的key是否域当前的key相等
// 说明当前是修改操作
oldVal = e.val;
if (!onlyIfAbsent) // onlyIfAbsent如果为true,就什么事都不做
e.val = value; // 修改值
break;
}
// 是正常的添加操作
Node<K,V> pred = e;
// 如果节点的next为null,说明到链表的最后一个节点了,添加到链表的末尾
if ((e = e.next) == null) {
// 将当前值添加到链表的最后一个节点的next指向
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 判断当前位置的桶数据是否是树
else if (f instanceof TreeBin) {
Node<K,V> p;
// 标记修改为2
binCount = 2;
// 调用putTreeVal扔到红黑树
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 进到if说明是覆盖操作
oldVal = p.val;
if (!onlyIfAbsent) // 根据判断决定,是否修改数据
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

四、ConcurrentHashMap的spread方法

1
2
3
4
5
6
7
8
9
10
11
12
int hash = spread(key.hashCode());
static final int spread(int h) {
// (h ^ (h >>> 16)):HashMap的散列算法 HashMap默认的数组长度是:16
// Entry存放的索引位置 = (数组长度 - 1) & hash
// & HASH_BITS的运算目的是为了保证等到的hash值,一定是正数,因为最高位符号位100%是0
// 因为hash值为负数时,有特殊的含义,
return (h ^ (h >>> 16)) & HASH_BITS;

static final int MOVED = -1; // 当前索引位置的数据正在扩容
static final int TREEBIN = -2; // 当前索引位置下面不是链表,是红黑树
static final int RESERVED = -3; // 当前索引位置被临时占用,compute方法会涉及
}

image.png

五、ConcurrentHashMap的initTable方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
sizeCtl = -1:说明当前ConcurrentHashMap正在初始化!!!
sizeCtl = -N:说明当前ConcurrentHashMap正在扩容!!!
sizeCtl = 0:默认初始值,当前ConcurrentHashMap还没有初始化
sizeCtl > 0:如果已经初始化过了,sizeCtl标识扩容的阈值, 如果没有初始化,sizeCtl标识数组的初始化容量
*/
tab = initTable();
private final Node<K,V>[] initTable() {
// 声明一些变量
Node<K,V>[] tab; int sc;
// tab变量是HashMap的数组, 数组长度为null,或者数组的长度为0,说明数组还没有初始化!
while ((tab = table) == null || tab.length == 0) {
// 将sizeCtl赋值给sc,如果进到if中,说明正在扩容或者正在初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // 线程让一手~~
// 以CAS的方式,尝试将sizeCtl从之前oldValue替换为-1,为了标识我当前ConcurrentHashMap正在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 开始扩容
try {
// table是不是还是null,或者长度还是0
if ((tab = table) == null || tab.length == 0) {
// 声明n,sc是sizeCtl,默认sizectl为0,但是现在正在初始化,我把sizeCtl改为了-1,但是sc还是0
// sc如果为0,不大于0,所以为DEFAULT_CAPACITY,16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建数组!!!
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将初始化的nt数组赋值给ConcurrentHashMap的table
table = tab = nt;
// 默认sc = 12,为扩容阈值
sc = n - (n >>> 2);
}
} finally {
// 将阈值赋值给sizeCtl,到这初始化完毕
sizeCtl = sc;
}
break;
}
}

六、ConcurrentHashMap的JDK过去和现在对比

一、jdk1.8之前的数据结构:
1.默认情况下会有16个区段 Segment数组 Segment[16]

2.每次每个区段Segment中会保存若干个散列桶,每次散列桶长度扩容成2^n次方的长度。 多个散列桶相连就构成了散列表。

3.存入元素: key带入到hashcode方法众获得hashcode值,然后把hashcode值带入到散列算法中获取segment的下标(区段编号),再根据key带入到定义好的函数中获取Segment对象中散列桶的下标。

如果此位置有元素就构成链表(JDK1.8及以后会形成红黑树),如果没有元素就存入

3.存取的线程安全问题: 如果多个线程操作同一个Segment,则某个线程给此Segment加锁,另一个线程只能阻塞。

同时解决了HashTable的问题,HashTable只能由一个线程操作。 ConcurrentHashMap可以让一个线程操作第一个Segment,另一个线程操作另一个Segment。

4.小矩形块表示散列桶

绿色的Segment表示ConcurrentHashMap集合众 Segment[16]数组里的一个对象。

5.并发问题:

两个线程给不同的区段Segment中添加元素,这种情况可以并发。 所以ConcurrentHashMap可以保证线程安全(多个线程操作同一个Segment,则某个线程给此Segment加锁,另一个线程只能阻塞)并且在一定程度上提交线程并发执行效率。

没有区段了,和HashMap一致了,数组+链表+红黑树 +乐观锁 + synchronized

二、乐观锁和悲观锁:
2.1.悲观锁:执行的某个线程总是悲观的认为,在自己执行期间,总有其它线程与之并发执行,认为会产生安全问题,所以为了保证线程安全,在线程刚开始访问对象数据时,后立即给对象加锁,从而保证线程安全. 成了同步的效果(就是排队执行,第一个线程执行完第二个才开始)

Synchronized就是悲观锁

2.2.乐观锁:

ConcurrentHashmap和HashMap区别:

1.HashMap是 非线程安全的,而HashTabl e和ConcurrentHashmap都是线程安全的

2.HashMap的key 和value均可以为null;而HashTable和Concur rentHashMap的key和value均不可以为null

3.HashTable 和ConcurrentHashMap的区别:保证线程安全的方式不同:

3.1.HashTable是通过给整张散列表加锁的方式来保证线程安全,这种方式保证了线程安全,但是并发执行效率低下。

3.2.ConcurrentHashMap在JDK1.8之前,采用分段锁机制来保证线程安全的,这种方式可以在保证线程安全的同时,一定程度上提高并发执行效率(当多线程并发访问不同的segment时,多线程就是完全并发的,并发执行效率会提高)

3.3.从JDK1.8开始, ConcurrentHashMap数据结构与1.8中的HashMap保持一致,均为数组+链表+红黑树,是通过乐观锁+Synchroni zed来保证线程安全的.当多线程并发向同一个散列桶添加元素时。若散列桶为空,此时触发乐观锁机制,线程会获取到桶中的版本号,在添加节点之前,判断线程中获取的版本号与桶中实际存在的版本号是否一致,若一致,则添加成功,若不一致,则让线程自旋。

若散列桶不为空,此时使用Synchronized来保证线程安全,先访问到的线程会给桶中的头节点加锁,从而保证线程安全。

三、现在的数据结构:
JDK1.8中ConcurrentHashmap保证线程安全的方式:乐观锁+Sysnchronized

多线程并发向同一个散列桶添加元素时若散列桶为空,则触发乐观锁机制,线程获取散列桶中的版本号,在添加元素之前判断线程中的版本号与桶中的版本号是否一致致,添加成功不一致,自旋若散列桶不为空,,则使用synchroinized.先访问到的线程给头结点解

锁,形成链表或红黑树,JDK1.8中ConcrruentHashMap在保证线程安全的同时允许最大程序的多线程并发执行,

ConcrruentHashMap中的乐观锁说明:

HashTable和ConcurrentHashMap保证线程安全的方式

HashTable是通过给整张散列表加锁的方式来保证线程安全的.这种方式可以

保证线程安全,但是并发执行效率极其低下.(同步)

以上保证线程安全的方式中,有-些可以并发执行的操作是得不到并发的,

所以并发执行效率有可提升的空间.

总结

1.7版本
1.1.7版本的ConcurrentHashMap是基于Segment分段实现的
2.每个Segment相对于一个小型的HashMap
3.每个Segment内部会进行扩容,和HashMap的扩容逻辑类似
4.先生成新的数组,然后转移元素到新数组中
5.扩容的判断也是每个Segment内部单独判断的,判断是否超过阈值
1.8版本
1.1.8版本的ConcurrentHashMap不再基于Segment实现2.当某个线程进行put时,如果发现ConcurrentHashMap正在进行扩容那么该线程一起进行扩容
3.如果某个线程put时,发现没有正在进行扩容,则将key-value添加到ConcurrentHashMap中,然后判断是否超过阈值,超过了则进行扩容
4.ConcurrentHashMap是支持多个线程同时扩容的

5.扩容之前也先生成一个新的数组
6.

关于扩容

https://blog.csdn.net/ZOKEKAI/article/details/90051567?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522171258736516800186523395%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=171258736516800186523395&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-90051567-null-null.142^v100^pc_search_result_base2&utm_term=ConcurrentHashMap%E6%89%A9%E5%AE%B9&spm=1018.2226.3001.4187

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
总结:

private transient volatile int sizeCtl;
多线程之间,以volatile的方式读取sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。
通过cas设置sizeCtl属性,告知其他线程ConcurrentHashMap的状态变更。

不同状态,sizeCtl所代表的含义也有所不同:
未初始化:
sizeCtl = 0:表示没有指定初始容量
sizeCtl > 0:表示初始容量
初始化中:
sizeCtl=-1,标记作用,告知其他线程,正在初始化
正常状态:
sizeCtl = 0.75n ,扩容阈值
扩容中:
sizeCtl < 0 : 表示有其他线程正在执行扩容
sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容
sizeCtl = -N 表示,这个高16位表示当前扩容的标志,每次扩容都会生成一个不一样的标志,低16位表示参与扩容的线程数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
扩容流程:
1、根据操作系统的 CPU 核数和集合 length 计算每个核一轮处理桶的个数,最小是16
2、修改 transferIndex 标志位,每个线程领取完任务就减去多少,
比如初始大小是transferIndex = table.length = 64,每个线程领取的桶个数是16
第一个线程领取完任务后transferIndex = 48,也就是说第二个线程这时进来是从第 48 个桶开始处理,再减去16,依次类推,这就是多线程协作处理的原理
3、领取完任务之后就开始处理,如果桶为空就设置为 ForwardingNode ,
如果不为空就加锁拷贝,只有这里用到了 synchronized 关键字来加锁,为了防止拷贝的过程有其他线程在put元素进来。
拷贝完成之后也设置为 ForwardingNode节点。
4、如果某个线程分配的桶处理完了之后,再去申请,发现 transferIndex = 0
这个时候就说明所有的桶都领取完了,但是别的线程领取任务之后有没有处理完并不知道,
该线程会将 sizeCtl 的值减1,然后判断是不是所有线程都退出了,如果还有线程在处理,就退出,
直到最后一个线程处理完,发现 sizeCtl = rs<< RESIZE_STAMP_SHIFT 也就是标识符左移 16 位,
才会将旧数组干掉,用新数组覆盖,并且会重新设置 sizeCtl 为新数组的扩容点。

以上过程总的来说分成两个部分:
1、分配任务:这部分其实很简单,就是把一个大的数组给切分,切分多个小份,然后每个线程处理其中每一小份,
当然可能就只有1个或者几个线程在扩容,那就一轮一轮的处理,一轮处理一份。
2、处理任务:复制部分主要有两点,第一点就是加锁,第二点就是处理完之后置为ForwardingNode来占位标识这个位置被迁移过了。

ForwardingNode用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。

旧版本之前,只能通过分段加锁实现扩容机制。旧版本后,多个线程可以一起参与扩容,想领任务一样,一个线程领一份,然后使用ForwardingNode进行站位