ConcurrentHashMap transfer方法 解析ConcurrentHashMap: transfer方法源码分析(难点)
兴趣使然の草帽路飞 人气:0想了解解析ConcurrentHashMap: transfer方法源码分析(难点)的相关内容吗,兴趣使然の草帽路飞在本文为您仔细讲解ConcurrentHashMap transfer方法的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:ConcurrentHashMap解析,ConcurrentHashMap入门,transfer方法,下面大家一起来学习吧。
- 上一篇文章介绍过
put
方法以及其相关的方法,接下来,本篇就介绍一下transfer这个方法(比较难),最好能动手结合着源码进行分析,并仔细理解前面几篇文章的内容~ - 注:代码分析的注释中的CASE0、CASE1… ,这些并没有直接关联关系,只是为了给每个if逻辑判断加一个标识,方便在其他逻辑判断的地方进行引用而已。
再复习一下并发Map的结构图:
1、transfer方法
transfer
方法的作用是:迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中。该方法迁移数据的流程就是在发生扩容时,从散列表原table中,按照桶位下标,依次将每个桶中的元素(结点、链表、树)迁移到新表nextTable中。
另外还有与其相关的方法helpTransfer
:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
下图为并发扩容流程图,在分析源码前熟悉一下流程:
链表迁移示意图:
下面就开始分析代码把:
/** * 迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中: */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // n 表示扩容之前table数组的长度 // stride 表示分配给线程任务的步长 int n = tab.length, stride; // 这里为了方便分析,根据CUP核数,stride 固定为MIN_TRANSFER_STRIDE 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 控制线程迁移数据的最小步长(桶位的跨度~) stride = MIN_TRANSFER_STRIDE; // subdivide range // ------------------------------------------------------------------------------ // CASE0:nextTab == null // 条件成立:表示当前线程为触发本次扩容的线程,需要做一些扩容准备工作:(在if代码块中) // 条件不成立:表示当前线程是协助扩容的线程... if (nextTab == null) { // initiating try { // 创建一个比扩容之前容量n大一倍的新table @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 赋值nextTab对象给 nextTable ,方便协助扩容线程 拿到新表 nextTable = nextTab; // 记录迁移数据整体位置的一个标记,初始值是原table数组的长度。 // 可以理解为:全局范围内散列表的数据任务处理到哪个桶的位置了 transferIndex = n; } // 表示新数组的长度 int nextn = nextTab.length; // fwd节点,当某个桶位数据迁移处理完毕后,将此桶位设置为fwd节点,其它写线程或读线程看到后,会有不同逻辑。fwd结点指向新表nextTab的引用 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 推进标记(后面讲数据迁移的时候再分析~) boolean advance = true; // 完成标记(后面讲数据迁移的时候再分析~) boolean finishing = false; // to ensure sweep before committing nextTab // i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~) // bound 表示分配给当前线程任务的下界限制。(线程的数据迁移任务先从高位开始迁移,迁移到下界位置结束) int i = 0, bound = 0; // 自旋~ 非常长的一个for循环... for (;;) { // f 桶位的头结点 // fh 头结点的hash Node<K,V> f; int fh; /** * while循环的任务如下:(循环的条件为推进标记advance为true) * 1.给当前线程分配任务区间 * 2.维护当前线程任务进度(i 表示当前处理的桶位) * 3.维护map对象全局范围内的进度 */ // 整个while循环就是在算i的值,过程太复杂,不用太关心 // i的值会从n-1依次递减,感兴趣的可以打下断点就知道了 // 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素 while (advance) { // nextIndex~nextBound分配任务的区间: // 分配任务的开始下标 // 分配任务的结束下标 int nextIndex, nextBound; // ----------------------------------------------------------------------- // CASE1: // 条件一:--i >= bound // 成立:表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位. // 不成立:表示当前线程任务已完成 或者 未分配 if (--i >= bound || finishing) // 推进标记设置为flase advance = false; // ----------------------------------------------------------------------- // CASE2: (nextIndex = transferIndex) <= 0 // transferIndex:可以理解为,全局范围内散列表的数据任务处理到哪个桶的位置了~ // 前置条件:当前线程任务已完成 或 者未分配,即没有经过CASE1 // 条件成立:表示对象全局范围内的桶位都分配完毕了,没有区间可分配了,设置当前线程的i变量为-1 跳出循环,然后执行退出迁移任务相关的程序 // 条件不成立:表示对象全局范围内的桶位尚未分配完毕,还有区间可分配~ else if ((nextIndex = transferIndex) <= 0) { i = -1; // 推进标记设置为flase advance = false; } // ----------------------------------------------------------------------- // CASE3,其实就是移动i的一个过程:CAS更新成功,则i从当前位置-1,再复习一下i的含义: // i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~) // CASE3前置条件(未经过CASE1-2):1、当前线程需要分配任务区间 2.全局范围内还有桶位尚未迁移 // 条件成立:说明给当前线程分配任务成功 // 条件失败:说明分配任务给当前线程失败,应该是和其它线程发生了竞争~ else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } /** 处理线程任务完成后,线程退出transfer方法的逻辑: **/ // ------------------------------------------------------------------------- // CASE4: // 条件一:i < 0 // 成立:表示当前线程未分配到任务,或已完成了任务 // 条件二、三:一般情况下不会成里~ if (i < 0 || i >= n || i + n >= nextn) { // 如果一次遍历完成了 // 也就是整个map所有桶中的元素都迁移完成了 // 保存sizeCtl的变量 int sc; // 扩容结束条件判断 if (finishing) { // 如果全部桶中数据迁移完成了,则替换旧桶数组 // 并设置下一次扩容门槛为新桶数组容量的0.75倍 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } // ------------------------------------------------------------------------- // CASE5:当前线程扩容完成,把并发扩容的线程数-1,该操作基于CAS,修改成功返回true // 条件成立:说明,更新 sizeCtl低16位 -1 操作成功,当前线程可以正常退出了~ // 如果条件不成立:说明CAS更新操作时,与其他线程发生竞争了~ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 条件成立:说明当前线程不是最后一个退出transfer任务的线程 // eg: // 1000 0000 0001 1011 0000 0000 0000 0000 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 扩容完成两边肯定相等 // 正常退出 return; // 完成标记finishing置为true,,finishing为true才会走到上面的if扩容结束条件判断 // 推进标记advance置为true finishing = advance = true; // i重新赋值为n // 这样会再重新遍历一次桶数组,看看是不是都迁移完成了 // 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件 i = n; // recheck before commit } } /** 线程处理一个桶位数据的迁移工作,处理完毕后, 设置advance为true: 表示继续推荐,然后就会回到for,继续循环 **/ // ------------------------------------------------------------------------- // CASE6: // 【CASE6~CASE8】前置条件:当前线程任务尚未处理完,正在进行中! // 条件成立:说明当前桶位未存放数据,只需要将此处设置为fwd节点即可。 else if ((f = tabAt(tab, i)) == null) // 如果桶中无数据,直接放入FWD,标记该桶已迁移: // casTabAt通过CAS的方式去向Node数组指定位置i设置节点值,设置成功返回true,否则返回false // 即:将tab数组下标为i的结点设置为FWD结点 advance = casTabAt(tab, i, null, fwd); // ------------------------------------------------------------------------- // CASE7: (fh = f.hash) == MOVED 如果桶中第一个元素的hash值为MOVED // 条件成立:说明头节f它是ForwardingNode节点, // 则,当前桶位已经迁移过了,当前线程不用再处理了,直接再次更新当前线程任务索引,再次处理下一个桶位 或者 其它操作~ else if ((fh = f.hash) == MOVED) advance = true; // already processed // ------------------------------------------------------------------------- // CASE8: // 前置条件:**当前桶位有数据**,而且node节点 不是 fwd节点,说明这些数据需要迁移。 else { // 锁定该桶并迁移元素:(sync 锁加在当前桶位的头结点) synchronized (f) { // 再次判断当前桶第一个元素是否有修改,(可能出现其它线程抢先一步迁移/修改了元素) // 防止在你加锁头对象之前,当前桶位的头对象被其它写线程修改过,导致你目前加锁对象错误... if (tabAt(tab, i) == f) { // 把一个链表拆分成两个链表(规则按照是桶中各元素的hash与桶大小n进行与操作): // 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中 // 其中低位链表迁移到新桶中的位置相对旧桶不变 // 高位链表迁移到新桶中位置正好是其在旧桶的位置加n // 这也正是为什么扩容时容量在变成两倍的原因 (链表迁移原理参考上面的图) // ln 表示低位链表引用 // hn 表示高位链表引用 Node<K,V> ln, hn; // --------------------------------------------------------------- // CASE9: // 条件成立:表示当前桶位是链表桶位 if (fh >= 0) { // 第一个元素的hash值大于等于0,说明该桶中元素是以链表形式存储的 // 这里与HashMap迁移算法基本类似,唯一不同的是多了一步寻找lastRun // 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表 // 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0 // 则最后后面三个0对应的元素肯定还是在同一个桶中 // 这时lastRun对应的就是倒数第三个节点 // 至于为啥要这样处理,我也没太搞明白 // lastRun机制: // 可以获取出 当前链表 末尾连续高位不变的 node int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 看看最后这几个元素归属于低位链表还是高位链表 // 条件成立:说明lastRun引用的链表为 低位链表,那么就让 ln 指向 低位链表 if (runBit == 0) { ln = lastRun; hn = null; } // 否则,说明lastRun引用的链表为 高位链表,就让 hn 指向 高位链表 else { hn = lastRun; ln = null; } // 遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中 // 循环跳出条件:当前循环结点!=lastRun for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 低位链表的位置不变 setTabAt(nextTab, i, ln); // 高位链表的位置是:原位置 + n setTabAt(nextTab, i + n, hn); // 标记当前桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } // --------------------------------------------------------------- // CASE10: // 条件成立:表示当前桶位是 红黑树 代理结点TreeBin else if (f instanceof TreeBin) { // 如果第一个元素是树节点,也是一样,分化成两颗树 // 也是根据hash&n为0放在低位树中,不为0放在高位树中 // 转换头结点为 treeBin引用 t TreeBin<K,V> t = (TreeBin<K,V>)f; // 低位双向链表 lo 指向低位链表的头 loTail 指向低位链表的尾巴 TreeNode<K,V> lo = null, loTail = null; // 高位双向链表 lo 指向高位链表的头 loTail 指向高位链表的尾巴 TreeNode<K,V> hi = null, hiTail = null; // lc 表示低位链表元素数量 // hc 表示高位链表元素数量 int lc = 0, hc = 0; // 遍历整颗树(TreeBin中的双向链表,从头结点至尾节点),根据hash&n是否为0分化成两颗树: for (Node<K,V> e = t.first; e != null; e = e.next) { // h 表示循环处理当前元素的 hash int h = e.hash; // 使用当前节点 构建出来的新的TreeNode TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // --------------------------------------------------- // CASE11: // 条件成立:表示当前循环节点 属于低位链节点 if ((h & n) == 0) { // 条件成立:说明当前低位链表还没有数据 if ((p.prev = loTail) == null) lo = p; // 说明低位链表已经有数据了,此时当前元素追加到低位链表的末尾就行了 else loTail.next = p; // 将低位链表尾指针指向 p 节点 loTail = p; ++lc; } // --------------------------------------------------- // CASE12: // 条件成立:当前节点属于高位链节点 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果分化的树中元素个数小于等于6,则退化成链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位树的位置不变 setTabAt(nextTab, i, ln); // 高位树的位置是原位置加n setTabAt(nextTab, i + n, hn); // 标记该桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } } } } } }
补充,lastRun机制示意图:
小节:
(1)新桶数组大小是旧桶数组的两倍;
(2)迁移元素先从靠后的桶开始;
(3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;
(4)迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;
(5)低位链表(树)存储在原来的位置;
(6)高们链表(树)存储在原来的位置加n的位置;
(7)迁移元素时会锁住当前桶,也是分段锁的思想;
2、helpTransfer方法
helpTransfer
:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。当前桶元素迁移完成了才去协助迁移其它桶元素!
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { // nextTab 引用的是 fwd.nextTable == map.nextTable 理论上是这样。 // sc 保存map.sizeCtl Node<K,V>[] nextTab; int sc; // CASE0: 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空 // 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素 // 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组 // 条件一:tab != null 恒成立 true // 条件二:(f instanceof ForwardingNode) 恒成立 true // 条件三:((ForwardingNode<K,V>)f).nextTable) != null 恒成立 true if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // 根据前表的长度tab.length去获取扩容唯一标识戳,假设 16 -> 32 扩容:1000 0000 0001 1011 int rs = resizeStamp(tab.length); // 条件一:nextTab == nextTable // 成立:表示当前扩容正在进行中 // 不成立:1.nextTable被设置为Null了,扩容完毕后,会被设为Null // 2.再次出发扩容了...咱们拿到的nextTab 也已经过期了... // 条件二:table == tab // 成立:说明 扩容正在进行中,还未完成 // 不成立:说明扩容已经结束了,扩容结束之后,最后退出的线程 会设置 nextTable 为 table // 条件三:(sc = sizeCtl) < 0 // 成立:说明扩容正在进行中 // 不成立:说明sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束。 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs // true -> 说明当前线程获取到的扩容唯一标识戳 非 本批次扩容 // false -> 说明当前线程获取到的扩容唯一标识戳 是 本批次扩容 // 条件二:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs << 16 ) + 1 // true -> 表示扩容完毕,当前线程不需要再参与进来了 // false -> 扩容还在进行中,当前线程可以参与 // 条件三:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs<<16) + MAX_RESIZERS // true -> 表示当前参与并发扩容的线程达到了最大值 65535 - 1 // false -> 表示当前线程可以参与进来 // 条件四:transferIndex <= 0 // true -> 说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干.. // false -> 还有任务可以分配。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 当前线程帮忙迁移元素 transfer(tab, nextTab); break; } } return nextTab; } return table; }
3、总结
这块代码逻辑很难理解,可以结合着ConcurrentHashMap面试题去巩固一下,重新吸收~也希望大家多多关注的其他内容!
加载全部内容