LongAdder原理及创建使用示例详解
Afu 人气:0LongAdder介绍
1.Atomic原子类
Atomic的原子类内部使用的是CAS原则,CAS是一个乐观锁,但是如果是在高并发的情况下的话,多个线程不断地竞争
CAS的不断的自旋,非常耗CPU.
高并发环境下,value变量其实是一个热点,也就是多个线程竞争一个热点。
这时候就需要使用的 LongAdder 来替代 Atomic类
2.LongAdder原理
LongAdder的原理就是分散热点,将value分散到一个数组中,不同的线程去找自己的对应的Cell进行修改值,
各个线程对Cell进行CAS操作,这样热点就被分散了,冲突的概率小了,性能就提高了.
如果要返回实际的值,返回所有的数组中的值和base值就行.
2.查看LongAdder的add方法
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
一看到这种代码就头皮发麻
简单说明一下这几行代码作用,说多了,你也懒得看
第一个if作用:
如果还没有初始化cells数组,就去修改base值
如果修改Base值失败,说明多个线程对base值修改发生了竞争
第二个if作用:
判断有没有cells有没有值,有的话说明已经初始化过cells数组了
知道对应的桶位添加值或者修改值
我感觉你已经不知道我在说什么了!
反正你就知道,如果有多个线程发生了竞争,就去cells数组中找对应的桶位的cell添加或者修改值即可
3.longAccumulate方法
这里的代码特别的恶心,是能助眠的好代码,建议收藏!
简单说明一下几个核心的代码
3.1.创建Cell数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
Cell数组还没有进行初始化,
创建长度为2的Cell数组
在索引1的位置存储第一个Cell对象
3.2.创建桶位Cell对象
if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } }
m - 1) & h
通过路由寻址公式找到对应的索引位置的桶位为空,然后把创建的Cell对象存储进去
更直白就是说,你要蹲坑了,你要去找坑,如果坑位没有人,你就进去
3.3.扩容Cell数组
else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; }
数组长度不够时,2倍扩容
4.总结
你现在肯定是云里雾里,讲的都是什么东西.
你现在就只需要知道 LongAdder
通过 base 和 Cell数组 换取更高的性能
5.附上源码解析
当然这里你可以跳过了,不用看了
5.1.add方法
public void add(long x) { // as 表示cells引用 // b 表示获取的base值 // v 表示期望值 // m 表示cells数组的长度 // a 表示当前线程命中cell单元格 Cell[] as; long b, v; int m; Cell a; //条件一: --> true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中 // --> false 表示cells还没有初始化过,当前线程所有的数据都被写到base中 //条件二: --> false 表示cas替换值成功 // --> true 表示发生竞争了,可能需要重试 或者 扩容 if ((as = cells) != null || !casBase(b = base, b + x)) { //什么时候会进来? //1.true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中 //2.true cells还没有初始化,但是发生竞争了 // true -> 未竞争 false ->发生竞争 boolean uncontended = true; //as == null || (m = as.length - 1) < 0 看做是一个条件 //条件一:true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,也就是通过第二个条件进来的 // false --> 说明cells已经初始化了,可以去寻找自己的cell进行赋值 //条件二: getProbe() 可以理解为获取当前线程的hash值, m表示cells长度-1 注意:cells的次方数一定是2的次方 // true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持 // false --> 说明当前线程对应的下标cell不为空,下一步需要将 x,添加到cell中 //条件三: true --> 代表cas失败,代表当前线程对应的cell,有竞争 // false --> 表示cas成功 if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //都哪些情况会调用这里的方法? //1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的 //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持 //3.true --> 代表cas失败,代表当前线程对应的cell,有竞争 longAccumulate(x, null, uncontended); } }
5.2.longAccumulate方法
/** * //1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,到时候会进行初始化cell * //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持 * //3.true --> 对应的下标也有cell,但是cas失败,代表当前线程对应的cell,有竞争 */ //wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //表示线程的hash值 int h; //条件成立,说明当前线程还未分配hash值 if ((h = getProbe()) == 0) { //给当前线程分配hash值 ThreadLocalRandom.current(); // force initialization //取出当前线程的hash值赋值给h h = getProbe(); //为什么?默认情况下,可定写入到了cell[0]的位置,不把他当做一次真正的竞争?? wasUncontended = true; } //表示扩容意向, false表示不会扩容 , true有可能会扩容 boolean collide = false; // True if last slot nonempty //自旋 for (;;) { //as 表示cells引用 //a 表示当前线程命中的cell //n 表示cell的数组长度 //v 表示期望值 Cell[] as; Cell a; int n; long v; //情况1:as不为空表示cells已经初始化了,当前线程应该将数据写入到对应的cell中 if ((as = cells) != null && (n = as.length) > 0) { //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持 //3.true --> 代表cas失败,代表当前线程对应的cell,有竞争[重试或者扩容] //情况1.1: true 表示当前线程对应的下标位置的cell为null,需要创建cell if ((a = as[(n - 1) & h]) == null) { //true 表示当前锁未被占用,false-->表示锁被占用 if (cellsBusy == 0) { // Try to attach new Cell //拿x创建cell Cell r = new Cell(x); // Optimistically create //条件一:true 表示当前锁未被占用,false-->表示锁被占用 //条件二:true表示当前线程获取锁成功,false表示当前线程获取锁失败, if (cellsBusy == 0 && casCellsBusy()) { //是否创建成功的标记 boolean created = false; try { // Recheck under lock //rs表示cells引用 //m 表示cells长度 //j 表示当前线程命中的下标 Cell[] rs; int m, j; //条件一,条件二,恒成立的 //rs[j = (m - 1) & h] == null 为了防止其他线程已经初始化话过了,防止当前线程再次修改,覆盖掉原来的cell if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //扩容意向改为false collide = false; } //情况1.2 //wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //情况1.3 :当前线程rehash过后,然后新命中的cell不为空 //true --> 写成功 退出 //false -- > rehash过后命中的cell,也有竞争 这里为重试一次,再重试一次 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //情况1.4. // 条件一: n >= NCPU true --> 扩容意向改为False ,表示不扩容了 false --> 表示cells还可以扩容 //条件二: true --> 表示其他线程已经扩容过了,当前线程rehash之后重试即可 else if (n >= NCPU || cells != as) collide = false; // At max size or stale //默认开始为false 的, 设置为true,需要扩容,但是不一定真的发生扩容 else if (!collide) collide = true; //情况6.真正的扩容逻辑 //条件一: cellsBusy == 0 true --> 表示当前无锁状态,当前线程可以去竞争这把锁 //条件二:casCellsBusy() true --> 表示当前线程获取锁成功,可以执行扩容逻辑 ,false 表示有其他线程在做相关的操作 // 尝试两次之后 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale //n<<1 翻倍 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //释放锁 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //重置当前线程hash值 h = advanceProbe(h); } //情况2:前置条件,cells为进行初始化,as为null //条件一: cellsBusy为true当前未加锁 //条件二: cells == as 为什么? 因为其他线程在你给as赋值之后修改了cells //条件三: true 表示获取锁成功 ,会把cellBusy 设置为 1 , false 表示其他线程在持有这个锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // 为什么还有一次判断呢? //防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据 if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //释放锁 cellsBusy = 0; } if (init) break; } //情况三: //1.当前cellBusy加锁状态,表示其他线程正在初始化cells,所以当前线程累加到base //2.cells被其他线程初始化之后,当前线程需要将数据累加到base else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
加载全部内容