ConcurrentHashMap源码剖析
一、ConcurrentHashMap的存储结构(常识性讲解)
ConcurrentHashMap就是一个安全的HashMap,在JDK1.8中,ConcurrentHashMap的存储结构和HashMap一致
key-value结构存储,put(key,value);

二、正确的理解ConcurrentHashMap的安全性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.mashibing.public220221;
import java.util.concurrent.ConcurrentHashMap;
public class A_0301_001_ConcurrentHashMapTest { private static final ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
public static void main(String[] args) { Long count = map.get("test"); if (count == null) { map.put("test", 1L); } else { map.put("test", count + 1); }
while (true) { Long count = map.get("test"); if (count == null) { if (map.putIfAbsent("test", 1L) == null) { break; } } else { if (map.replace("test", count, count + 1)) break; } } } }
|
三、从ConcurrentHashMap的put方法开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ( (ek = e.key) == key || key.equals(ek))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
|
四、ConcurrentHashMap的spread方法
1 2 3 4 5 6 7 8 9 10 11 12
| int hash = spread(key.hashCode()); static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS;
static final int MOVED = -1; static final int TREEBIN = -2; static final int RESERVED = -3; }
|

五、ConcurrentHashMap的initTable方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
tab = initTable(); private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } }
|
六、ConcurrentHashMap的JDK过去和现在对比
一、jdk1.8之前的数据结构:
1.默认情况下会有16个区段 Segment数组 Segment[16]
2.每次每个区段Segment中会保存若干个散列桶,每次散列桶长度扩容成2^n次方的长度。 多个散列桶相连就构成了散列表。
3.存入元素: key带入到hashcode方法众获得hashcode值,然后把hashcode值带入到散列算法中获取segment的下标(区段编号),再根据key带入到定义好的函数中获取Segment对象中散列桶的下标。
如果此位置有元素就构成链表(JDK1.8及以后会形成红黑树),如果没有元素就存入
3.存取的线程安全问题: 如果多个线程操作同一个Segment,则某个线程给此Segment加锁,另一个线程只能阻塞。
同时解决了HashTable的问题,HashTable只能由一个线程操作。 ConcurrentHashMap可以让一个线程操作第一个Segment,另一个线程操作另一个Segment。
4.小矩形块表示散列桶
绿色的Segment表示ConcurrentHashMap集合众 Segment[16]数组里的一个对象。
5.并发问题:
两个线程给不同的区段Segment中添加元素,这种情况可以并发。 所以ConcurrentHashMap可以保证线程安全(多个线程操作同一个Segment,则某个线程给此Segment加锁,另一个线程只能阻塞)并且在一定程度上提交线程并发执行效率。
没有区段了,和HashMap一致了,数组+链表+红黑树 +乐观锁 + synchronized
二、乐观锁和悲观锁:
2.1.悲观锁:执行的某个线程总是悲观的认为,在自己执行期间,总有其它线程与之并发执行,认为会产生安全问题,所以为了保证线程安全,在线程刚开始访问对象数据时,后立即给对象加锁,从而保证线程安全. 成了同步的效果(就是排队执行,第一个线程执行完第二个才开始)
Synchronized就是悲观锁
2.2.乐观锁:
ConcurrentHashmap和HashMap区别:
1.HashMap是 非线程安全的,而HashTabl e和ConcurrentHashmap都是线程安全的
2.HashMap的key 和value均可以为null;而HashTable和Concur rentHashMap的key和value均不可以为null
3.HashTable 和ConcurrentHashMap的区别:保证线程安全的方式不同:
3.1.HashTable是通过给整张散列表加锁的方式来保证线程安全,这种方式保证了线程安全,但是并发执行效率低下。
3.2.ConcurrentHashMap在JDK1.8之前,采用分段锁机制来保证线程安全的,这种方式可以在保证线程安全的同时,一定程度上提高并发执行效率(当多线程并发访问不同的segment时,多线程就是完全并发的,并发执行效率会提高)
3.3.从JDK1.8开始, ConcurrentHashMap数据结构与1.8中的HashMap保持一致,均为数组+链表+红黑树,是通过乐观锁+Synchroni zed来保证线程安全的.当多线程并发向同一个散列桶添加元素时。若散列桶为空,此时触发乐观锁机制,线程会获取到桶中的版本号,在添加节点之前,判断线程中获取的版本号与桶中实际存在的版本号是否一致,若一致,则添加成功,若不一致,则让线程自旋。
若散列桶不为空,此时使用Synchronized来保证线程安全,先访问到的线程会给桶中的头节点加锁,从而保证线程安全。
三、现在的数据结构:
JDK1.8中ConcurrentHashmap保证线程安全的方式:乐观锁+Sysnchronized
多线程并发向同一个散列桶添加元素时若散列桶为空,则触发乐观锁机制,线程获取散列桶中的版本号,在添加元素之前判断线程中的版本号与桶中的版本号是否一致致,添加成功不一致,自旋若散列桶不为空,,则使用synchroinized.先访问到的线程给头结点解
锁,形成链表或红黑树,JDK1.8中ConcrruentHashMap在保证线程安全的同时允许最大程序的多线程并发执行,
ConcrruentHashMap中的乐观锁说明:
HashTable和ConcurrentHashMap保证线程安全的方式
HashTable是通过给整张散列表加锁的方式来保证线程安全的.这种方式可以
保证线程安全,但是并发执行效率极其低下.(同步)
以上保证线程安全的方式中,有-些可以并发执行的操作是得不到并发的,
所以并发执行效率有可提升的空间.
总结
1.7版本
1.1.7版本的ConcurrentHashMap是基于Segment分段实现的
2.每个Segment相对于一个小型的HashMap
3.每个Segment内部会进行扩容,和HashMap的扩容逻辑类似
4.先生成新的数组,然后转移元素到新数组中
5.扩容的判断也是每个Segment内部单独判断的,判断是否超过阈值
1.8版本
1.1.8版本的ConcurrentHashMap不再基于Segment实现2.当某个线程进行put时,如果发现ConcurrentHashMap正在进行扩容那么该线程一起进行扩容
3.如果某个线程put时,发现没有正在进行扩容,则将key-value添加到ConcurrentHashMap中,然后判断是否超过阈值,超过了则进行扩容
4.ConcurrentHashMap是支持多个线程同时扩容的
5.扩容之前也先生成一个新的数组
6.
关于扩容
https://blog.csdn.net/ZOKEKAI/article/details/90051567?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522171258736516800186523395%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=171258736516800186523395&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-90051567-null-null.142^v100^pc_search_result_base2&utm_term=ConcurrentHashMap%E6%89%A9%E5%AE%B9&spm=1018.2226.3001.4187
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 总结:
private transient volatile int sizeCtl; 多线程之间,以volatile的方式读取sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。 通过cas设置sizeCtl属性,告知其他线程ConcurrentHashMap的状态变更。
不同状态,sizeCtl所代表的含义也有所不同: 未初始化: sizeCtl = 0:表示没有指定初始容量 sizeCtl > 0:表示初始容量 初始化中: sizeCtl=-1,标记作用,告知其他线程,正在初始化 正常状态: sizeCtl = 0.75n ,扩容阈值 扩容中: sizeCtl < 0 : 表示有其他线程正在执行扩容 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容 sizeCtl = -N 表示,这个高16位表示当前扩容的标志,每次扩容都会生成一个不一样的标志,低16位表示参与扩容的线程数量
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 扩容流程: 1、根据操作系统的 CPU 核数和集合 length 计算每个核一轮处理桶的个数,最小是16 2、修改 transferIndex 标志位,每个线程领取完任务就减去多少, 比如初始大小是transferIndex = table.length = 64,每个线程领取的桶个数是16, 第一个线程领取完任务后transferIndex = 48,也就是说第二个线程这时进来是从第 48 个桶开始处理,再减去16,依次类推,这就是多线程协作处理的原理 3、领取完任务之后就开始处理,如果桶为空就设置为 ForwardingNode , 如果不为空就加锁拷贝,只有这里用到了 synchronized 关键字来加锁,为了防止拷贝的过程有其他线程在put元素进来。 拷贝完成之后也设置为 ForwardingNode节点。 4、如果某个线程分配的桶处理完了之后,再去申请,发现 transferIndex = 0, 这个时候就说明所有的桶都领取完了,但是别的线程领取任务之后有没有处理完并不知道, 该线程会将 sizeCtl 的值减1,然后判断是不是所有线程都退出了,如果还有线程在处理,就退出, 直到最后一个线程处理完,发现 sizeCtl = rs<< RESIZE_STAMP_SHIFT 也就是标识符左移 16 位, 才会将旧数组干掉,用新数组覆盖,并且会重新设置 sizeCtl 为新数组的扩容点。
以上过程总的来说分成两个部分: 1、分配任务:这部分其实很简单,就是把一个大的数组给切分,切分多个小份,然后每个线程处理其中每一小份, 当然可能就只有1个或者几个线程在扩容,那就一轮一轮的处理,一轮处理一份。 2、处理任务:复制部分主要有两点,第一点就是加锁,第二点就是处理完之后置为ForwardingNode来占位标识这个位置被迁移过了。
ForwardingNode用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
|
旧版本之前,只能通过分段加锁实现扩容机制。旧版本后,多个线程可以一起参与扩容,想领任务一样,一个线程领一份,然后使用ForwardingNode进行站位