说明

ConcurrentHashMap并发容器

  • 采用 cas自旋、while/for死循环、sychronize锁三种方式保证线程竞争下的插入安全,其原理与HashMap并不相同,改动量比较大
  • jdk7采用分段segament的概念,把数组分为几段,每次锁一段达到并发的目的,但是分段会多维护一次hash
  • jdk8采用锁数组的节点Node,将链表或红黑树整个锁定,达到线程安全。jdk8的精髓就在于没有node节点的时候数据的并发插入,它并没有阻塞线程,而是cas重试

源码解析

Jdk8源码

putValue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
final V putVal(K key, V value, boolean onlyIfAbsent) {
//第一部分
//对传入的参数进行合法性判断
if (key == null || value == null) throw new NullPointerException();
//计算键所对应的 hash 值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
//如果哈希表还未初始化,那么初始化它
if (tab == null || (n = tab.length) == 0) {
tab = initTable();

//根据键的 hash 值找到哈希数组相应的索引位置
//如果为空,那么以CAS无锁式向该位置添加一个节点
} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break;
}
//第二部分 检测到桶结点是 ForwardingNode 类型,协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//桶结点是普通的结点,锁住该桶头结点并试图在该链表的尾部添加一个节点
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//向普通的链表中添加元素,无需赘述
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f; ; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key, value, null);
break;
}
}
}
//向红黑树中添加元素,TreeBin 结点的hash值为TREEBIN(-2)
else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value))
!= null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功
//binCount == 0 说明 put 操作将一个新节点添加成为某个桶的首节点
if (binCount != 0) {
//链表深度超过 8 转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//oldVal != null 说明此次操作是修改操作
//直接返回旧值即可,无需做下面的扩容边界检查
if (oldVal != null)
return oldVal;
break;
}
}
}
//CAS 式更新baseCount,并判断是否需要扩容
addCount(1L, binCount);
//程序走到这一步说明此次 put 操作是一个添加操作,否则早就 return 返回了
return null;
}

initTable()

核心就是循环判断tab容量
判断是否有其他线程已经开始了,有则放弃时间片进入下一次循环
没有则通过原子类获取执行权并设置sc为-1
初始化数组,初始化阈值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//如果表为空才进行初始化操作
while ((tab = table) == null || tab.length == 0) {
//sizeCtl 小于零说明已经有线程正在进行初始化操作
//当前线程应该放弃 CPU 的使用
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//否则说明还未有线程对表进行初始化,那么本线程就来做这个工作
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//保险起见,再次判断下表是否为空
try {
if ((tab = table) == null || tab.length == 0) {
//sc 大于零说明容量已经初始化了,否则使用默认容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//根据容量构建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算阈值,等效于 n*0.75
sc = n - (n >>> 2);
}
} finally {
//设置阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}

helpTransfer()

  1. 根据 CPU 核心数确定每个线程负责的桶数,默认每个线程16个桶
  2. 创建新数组,长度是原来数组的两倍
  3. 分配好当前线程负责的桶区域 [bound, nextIndex)
  4. 并发迁移,根据链表和红黑树执行不同迁移策略
  5. 迁移完成,设置新的数组和新的扩容阈值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//返回一个 16 位长度的扩容校验标识
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//sizeCtl 如果处于扩容状态的话
//前 16 位是数据校验标识,后 16 位是当前正在扩容的线程总数
//这里判断校验标识是否相等,如果校验符不等或者扩容操作已经完成了,直接退出循环,不用协助它们扩容了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//否则调用 transfer 帮助它们进行扩容
//sc + 1 标识增加了一个线程进行扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
//首先,每个线程进来会先领取自己的任务区间,然后开始 --i 来遍历自己的任务区间,对每个桶进行处理。
//如果遇到桶的头结点是空的,那么使用 ForwardingNode 标识该桶已经被处理完成了。
//如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。
//如果是正常的桶,对桶首节点加锁,正常的迁移即可,迁移结束后依然会将原表的该位置标识位已经处理。
//第一部分
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算单个线程允许处理的最少table桶首节点个数,不能小于 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
//刚开始扩容,初始化 nextTab
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//transferIndex 指向最后一个桶,方便从后向前遍历
transferIndex = n;
}
int nextn = nextTab.length;
//定义 ForwardingNode 用于标记迁移完成的桶
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

扩容图文

多线程开始扩容{}

1

lastrun节点{}

2

链表迁移{}

3

红黑树迁移{}

4

迁移过程中get和put的操作的处理{}

5

并发迁移{}

6

迁移完成{}

7

emm 引用至哪里,有点忘了,抱歉