首页>>后端>>java->LongAdder 原理

LongAdder 原理

时间:2023-12-07 本站 点击:0

设计思路

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

LongAdder 的内部结构

LongAdder内部有一个base变量,一个Cell[]数组:

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

/** Number of CPUS, to place bound on table size */ // CPU核数,用来决定槽数组的大小 static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * Table of cells. When non‐null, size is a power of 2. */ // 数组槽,大小为2的次幂 transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */ /** * 基数,在两种情况下会使用: * 1. 没有遇到并发竞争时,直接使用base累加数值18 * 2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cell s初始化), * 其他竞争失败的线程会讲数值累加到base上 */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ transient volatile int cellsBusy;

定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过  Unsafe来CAS操作它的值:

LongAdder#add 方法

LongAdder#add方法的逻辑如下:

public void add(long x) {    Cell[] as; long b, v; int m; Cell a;    if ((as = cells) != null || !casBase(b = base, b + x)) {        boolean uncontended = true;        if (as == null || (m = as.length - 1) < 0 ||            (a = as[getProbe() & m]) == null ||            !(uncontended = a.cas(v = a.value, v + x)))            longAccumulate(x, null, uncontended);    }}

只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。 如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化  但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。

Striped64#longAccumulate 方法

整个Striped64#longAccumulate的如下:

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {        //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成;        //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况        //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败;        //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败;       //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;        int h;        if ((h = getProbe()) == 0) {            //初始化ThreadLocalRandom;            ThreadLocalRandom.current(); // force initialization            //将h设置为0x9e3779b9            h = getProbe();            //设置未竞争标记为true            wasUncontended = true;        }        //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突         boolean collide = false;         for (;;) {            Cell[] as; Cell a; int n; long v;            //这个主干if有三个分支            //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)            //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)            //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上            //先看主分支一            if ((as = cells) != null && (n = as.length) > 0) {                /**                 *内部小分支一:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m%cells.length]位置上                  */                if ((a = as[(n - 1) & h]) == null) {                    //cellsBusy == 0 代表当前没有线程cells数组做修改                    if (cellsBusy == 0) {                        //将要累加的x值作为初始值创建一个新的Cell对象,                        Cell r = new Cell(x);                         //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁                        if (cellsBusy == 0 && casCellsBusy()) {                            //标记Cell是否创建成功并放入到cells数组被hash的位置上                            boolean created = false;                            try {                                Cell[] rs; int m, j;                                //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null                                if ((rs = cells) != null &&                                    (m = rs.length) > 0 &&                                    rs[j = (m - 1) & h] == null) {                                    //将新的cell设置到该位置                                    rs[j] = r;                                    created = true;                                }                            } finally {                                //去掉锁                                cellsBusy = 0;                            }                            //生成成功,跳出循环                            if (created)                                break;                            //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]已经有其它线程设置了cell了,继续执行循环。                            continue;                        }                    }                   //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false                    collide = false;                /**                 *内部小分支二:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环;                 */                } else if (!wasUncontended)                      //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。                     wasUncontended = true;                /**                *内部小分支三:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出                  */                else if (a.cas(v = a.value, ((fn == null) ? v + x :                                             fn.applyAsLong(v, x))))                    break;                /**                 *内部小分支四:分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值*/                else if (n >= NCPU || cells != as)                    collide = false;                /**                 *内部小分支五:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环                 */                else if (!collide)                    //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支                    collide = true;                /**                 *内部小分支六:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支                 */                else if (cellsBusy == 0 && casCellsBusy()) {                    try {                        //检查cells是否已经被扩容                        if (cells == as) {      // Expand table unless stale                            Cell[] rs = new Cell[n << 1];                            for (int i = 0; i < n; ++i)                                rs[i] = as[i];                            cells = rs;                        }                    } finally {                        cellsBusy = 0;                    }                    collide = false;                    continue;                   // Retry with expanded table                }                //为当前线程重新计算hash值                h = advanceProbe(h);            //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,先尝试获取cellsBusy锁。            }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {                boolean init = false;                try {  // Initialize table                    //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上                    if (cells == as) {                        Cell[] rs = new Cell[2];                        rs[h & 1] = new Cell(x);                        cells = rs;                        init = true;                    }                } finally {                    //解锁                    cellsBusy = 0;                }                //如果init为true说明初始化成功,跳出循环                if (init)                    break;            }            /**             *如果以上操作都失败了,则尝试将值累加到base上;             */            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // Fall back on using base                break;          }    }

LongAdder#sum方法

/** * 返回累加的和,也就是"当前时刻"的计数值 * 注意: 高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值 * 此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加, * 方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值 */public long sum() {    Cell[] as = cells; Cell a;    long sum = base;    if (as != null) {        for (int i = 0; i < as.length; ++i) {            if ((a = as[i]) != null)                sum += a.value;        }    }    return sum;}

由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用sum方法时的原子快照值。

LongAccumulator

LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而 LongAccumulator提供了自定义的函数操作。其构造函数如下:通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator 接收2个long作为参数,并返回1个long)。LongAccumulator内部原理和LongAdder几乎完全一样,都是利用了父类Striped64的longAccumulate方法。

public class LongAccumulatorTest {     public static void main(String[] args) throws InterruptedException {         // 累加 x+y         LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);         ExecutorService executor = Executors.newFixedThreadPool(8);         // 1到9累加         IntStream.range(1, 10).forEach(i -> executor.submit(() ->         accumulator.accumulate(i)));         Thread.sleep(2000);         System.out.println(accumulator.getThenReset());     } }

参考资料

https://blog.csdn.net/jiangtianjiao/article/details/103844801/

原文:https://juejin.cn/post/7103872764984950797


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/18683.html