摘要:進入到這裏說明 cells 正在或者已經初始化過了,執行 caseBase() 方法,通過 CAS操作 來修改 base 的值,如果修改成功則跳出循環,這個 CAS E只有在初始化 Cell數組 的時候,多個線程嘗試 CAS 修改 cellsBusy 加鎖的時候,失敗的線程會走到這個分支,然後直接 CAS 修改 base 數據。主要看a.cas(v = a.value, v + x),接着條件二,說明當前線程hash與數組長度取模計算出的位置的cell有值,此時直接嘗試一次CAS操作,如果成功則退出if條件,失敗則繼續往下執行longAccumulate()方法。

前言

思維導圖.png

文章中所有 高清無碼圖片公衆號號回覆: 圖片666 即可查閱, 可直接關注公衆號: 壹枝花算不算浪漫

最近阿里巴巴發佈了 Java開發手冊(泰山版) ( 公衆號回覆: 開發手冊 可收到 阿里巴巴開發手冊(泰山版 2020.4.22發佈).pdf ),其中第 17條 寫到:

阿里巴巴開發手冊.png

對於 Java項目中 計數統計的一些需求, 如果是 JDK8,推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數)

在大多數項目及開源組件中,計數統計使用最多的仍然還是 AtomicLong ,雖然是阿里巴巴這樣說,但是我們仍然要根據使用場景來決定是否使用 LongAdder

今天主要是來講講 LongAdder實現原理 ,還是老方式,通過 圖文 一步步解開 LongAdder 神祕的面紗,通過此篇文章你會了解到:

  • 爲什麼AtomicLong在高併發場景下性能急劇下降?
  • LongAdder爲什麼快?
  • LongAdder實現原理(圖文分析)
  • AtomicLong是否可以被遺棄或替換?

本文代碼全部基於JDK 1.8,建議邊看文章邊看源碼更加利於消化

AtomicLong

當我們在進行計數統計的時,通常會使用 AtomicLong 來實現。 AtomicLong 能保證併發情況下計數的準確性,其內部通過 CAS 來解決併發安全性的問題。

AtomicLong實現原理

說到線程安全的計數統計工具類,肯定少不了 Atomic 下的幾個原子類。 AtomicLong 就是 juc包 下重要的 原子類 ,在併發情況下可以對長整形類型數據進行原子操作, 保證併發情況下數據的安全性。

public class AtomicLong extends Number implements java.io.Serializable {
    public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }

    public final long decrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
    }
}

我們在計數的過程中,一般使用 incrementAndGet()decrementAndGet() 進行加一和減一操作,這裏調用了 Unsafe 類中的 getAndAddLong() 方法進行操作。

接着看看 unsafe.getAndAddLong() 方法:

public final class Unsafe {
    public final long getAndAddLong(Object var1, long var2, long var4) {
        long var6;
        do {
            var6 = this.getLongVolatile(var1, var2);
        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;
    }

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
}

這裏直接進行 CAS+自旋 操作更新 AtomicLong 中的 valu e值,進而保證 value 值的 原子性更新

AtomicLong瓶頸分析

如上代碼所示,我們在使用 CAS + 自旋 的過程中,在高併發環境下,N個線程同時進行自旋操作,會出現大量失敗並不斷自旋的情況,此時 AtomicLong 的自旋會成爲瓶頸。

AtomicLong瓶頸分析.png

如上圖所示,高併發場景下 AtomicLong 性能會急劇下降,我們後面也會舉例說明。

那麼高併發下計數的需求有沒有更好的替代方案呢?在 JDK8Doug Lea 大神 新寫了一個 LongAdder 來解決此問題,我們後面來看 LongAdder 是如何優化的。

LongAdder

LongAdder和AtomicLong性能測試

我們說了很多 LongAdder 上性能優於 AtomicLong ,到底是不是呢?一切還是以代碼說話:

/**
 * Atomic和LongAdder耗時測試
 *
 * @author:一枝花算不算浪漫
 * @date:2020-05-12 7:06
 */
public class AtomicLongAdderTest {
    public static void main(String[] args) throws Exception{
        testAtomicLongAdder(1, 10000000);
        testAtomicLongAdder(10, 10000000);
        testAtomicLongAdder(100, 10000000);
    }

    static void testAtomicLongAdder(int threadCount, int times) throws Exception{
        System.out.println("threadCount: " + threadCount + ", times: " + times);
        long start = System.currentTimeMillis();
        testLongAdder(threadCount, times);
        System.out.println("LongAdder 耗時:" + (System.currentTimeMillis() - start) + "ms");
        System.out.println("threadCount: " + threadCount + ", times: " + times);
        long atomicStart = System.currentTimeMillis();
        testAtomicLong(threadCount, times);
        System.out.println("AtomicLong 耗時:" + (System.currentTimeMillis() - atomicStart) + "ms");
        System.out.println("----------------------------------------");
    }

    static void testAtomicLong(int threadCount, int times) throws Exception{
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = Lists.newArrayList();
        for (int i = 0; i < threadCount; i++) {
            list.add(new Thread(() -> {
                for (int j = 0; j < times; j++) {
                    atomicLong.incrementAndGet();
                }
            }));
        }

        for (Thread thread : list) {
            thread.start();
        }

        for (Thread thread : list) {
            thread.join();
        }

        System.out.println("AtomicLong value is : " + atomicLong.get());
    }

    static void testLongAdder(int threadCount, int times) throws Exception{
        LongAdder longAdder = new LongAdder();
        List<Thread> list = Lists.newArrayList();
        for (int i = 0; i < threadCount; i++) {
            list.add(new Thread(() -> {
                for (int j = 0; j < times; j++) {
                    longAdder.increment();
                }
            }));
        }

        for (Thread thread : list) {
            thread.start();
        }

        for (Thread thread : list) {
            thread.join();
        }

        System.out.println("LongAdder value is : " + longAdder.longValue());
    }
}

執行結果:

CAS原理圖.png

這裏可以看到隨着併發的增加, AtomicLong 性能是急劇下降的,耗時是 LongAdder 的數倍。至於原因我們還是接着往後看。

LongAdder爲什麼這麼快

先看下 LongAdder 的操作原理圖:

YUnlDO.png

既然說到 LongAdder 可以顯著提升高併發環境下的性能,那麼它是如何做到的?

1、 設計思想上, LongAdder 採用"分段"的方式降低 CAS 失敗的頻次

這裏先簡單的說下 LongAdder 的思路,後面還會詳述 LongAdder 的原理。

我們知道, AtomicLong 中有個內部變量 value 保存着實際的 long 值,所有的操作都是針對該變量進行。也就是說,高併發環境下, value 變量其實是一個 熱點數據 ,也就是 N個線程競爭一個熱點。

LongAdder 的基本思路就是 分散熱點 ,將 value 值的新增操作分散到一個數組中,不同線程會命中到數組的不同槽中,各個線程只對自己槽中的那個 value 值進行 CAS 操作,這樣熱點就被分散了,衝突的概率就小很多。

LongAdder 有一個全局變量 volatile long base 值,當併發不高的情況下都是通過 CAS 來直接操作 base 值,如果 CAS 失敗,則針對 LongAdder 中的 Cell[] 數組中的 Cell 進行 CA S操作,減少失敗的概率。

例如當前類中 base = 10 ,有三個線程進行 CAS 原子性的 +1操作線程一執行成功,此時base=11線程二、線程三執行失敗後 開始針對於 Cell[] 數組中的 Cell 元素進行 +1操作 ,同樣也是 CAS 操作,此時數組 index=1index=2Cellvalue 都被設置爲了1.

執行完成後,統計累加數據: sum = 11 + 1 + 1 = 13 ,利用 LongAdder 進行累加的操作就執行完了,流程圖如下:

分段加鎖思路.png

如果要獲取真正的 long 值,只要將各個槽中的變量值累加返回。這種分段的做法類似於 JDK7ConcurrentHashMap 的分段鎖。

2、使用Contended註解來消除僞共享

LongAdder 的父類 Striped64 中存在一個 volatile Cell[] cells; 數組,其長度是 2 的冪次方 ,每個 Cell 都使用 @Contended 註解進行修飾,而 @Contended 註解可以進行 緩存行填充 ,從而解決 僞共享問題 。僞共享會導致緩存行失效,緩存一致性開銷變大。

@sun.misc.Contended static final class Cell {

}

僞共享指的是多個線程同時讀寫同一個緩存行的不同變量時導致的 CPU緩存失效 。儘管這些變量之間沒有任何關係,但由於在主內存中鄰近,存在於同一個緩存行之中,它們的相互覆蓋會導致頻繁的緩存未命中,引發性能下降。這裏對於僞共享我只是提一下概念,並不會深入去講解,大家可以自行查閱一些資料。

解決僞共享的方法一般都是使用 直接填充 ,我們只需要保證不同線程的變量存在於不同的 CacheLine 即可,使用多餘的字節來填充可以做點這一點,這樣就不會出現僞共享問題。例如在 Disruptor隊列 的設計中就有類似設計(可參考我之前的博客文章: Disruptor學習筆記 ):

緩存行填充代碼.png 緩存行填充.png

Striped64 類中我們可以看看 Doug LeaCell 上加的註釋也有說明這一點:

Cell註釋.png

紅框中的翻譯如下:

Cell 類是 AtomicLong 添加了 padded([email protected]) 來消除 僞共享 的變種版本。緩存行填充對於大多數原子來說是繁瑣的,因爲它們通常不規則地分散在內存中,因此彼此之間不會有太大的干擾。但是,駐留在數組中的原子對象往往彼此相鄰,因此在沒有這種預防措施的情況下,通常會共享緩存行數據(對性能有巨大的負面影響)。

3、惰性求值

LongAdder 只有在使用 longValue() 獲取當前累加值時纔會真正的去結算計數的數據, longValue() 方法底層就是調用 sum() 方法,對 baseCell數組 的數據累加然後返回,做到數據寫入和讀取分離。

AtomicLong 使用 incrementAndGet() 每次都會返回 long 類型的計數值,每次遞增後還會伴隨着數據返回,增加了額外的開銷。

LongAdder實現原理

之前說了, AtomicLong 是多個線程針對單個 熱點值value進行原子操作 。而 LongAdder 是每個線程擁有自己的槽,各個線程一般只對自己槽中的那個值進行 CAS操作

比如有三個線程同時對value增加1,那麼value = 1 + 1 + 1 = 3

但是對於LongAdder來說,內部有一個base變量,一個Cell[]數組。

base變量:非競態條件下,直接累加到該變量上

Cell[]數組:競態條件下,累加個各個線程自己的槽Cell[i]中

最終結果的計算是下面這個形式:

value = base +

LongAdder源碼剖析

前面已經用圖分析了 LongAdder 高性能的原理,我們繼續看下 LongAdder 實現的源碼:

public class LongAdder extends Striped64 implements Serializable {
    public void increment() {
        add(1L);
    }

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
}

一般我們進行計數時都會使用 increment() 方法,每次進行 +1操作increment() 會直接調用 add() 方法。

變量說明:

  • as 表示cells引用
  • b 表示獲取的base值
  • v 表示 期望值,
  • m 表示 cells 數組的長度
  • a 表示當前線程命中的cell單元格

條件分析:

條件一:as == null || (m = as.length - 1) < 0

此條件成立說明cells數組未初始化。如果不成立則說明cells數組已經完成初始化,對應的線程需要找到Cell數組中的元素去寫值。

條件一.png

條件二:(a = as[getProbe() & m]) == null

getProbe()獲取當前線程的hash值,m表示cells長度-1,cells長度是2的冪次方數,原因之前也講到過,與數組長度取模可以轉化爲按位與運算,提升計算性能。

當條件成立時說明當前線程通過hash計算出來數組位置處的cell爲空,進一步去執行longAccumulate()方法。如果不成立則說明對應的cell不爲空,下一步將要將x值通過CAS操作添加到cell中。

條件三:!(uncontended = a.cas(v = a.value, v + x)

主要看a.cas(v = a.value, v + x),接着條件二,說明當前線程hash與數組長度取模計算出的位置的cell有值,此時直接嘗試一次CAS操作,如果成功則退出if條件,失敗則繼續往下執行longAccumulate()方法。

條件二/條件三.png

接着往下看核心的 longAccumulate() 方法,代碼很長,後面會一步步分析,先上代碼:

java.util.concurrent.atomic.Striped64. :

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current();
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {
                    Cell r = new Cell(x);
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)
                wasUncontended = true;
            else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
            break;                          
    }
}

代碼很長, if else 分支很多,除此看肯定會很頭疼。這裏一點點分析,然後結合畫圖一步步瞭解其中實現原理。

我們首先要清楚執行這個方法的前置條件,它們是或的關係,如上面條件一、二、三

  1. cells數組沒有初始化
  2. cells數組已經初始化,但是當前線程對應的cell數據爲空
  3. cells數組已經初始化, 當前線程對應的cell數據爲空,且CAS操作+1失敗

longAccumulate()方法的入參:

  • long x 需要增加的值,一般默認都是1
  • LongBinaryOperator fn 默認傳遞的是null
  • wasUncontended競爭標識,如果是false則代表有競爭。只有cells初始化之後,並且當前線程CAS競爭修改失敗,纔會是false

然後再看下Striped64中一些變量或者方法的定義:

  • base: 類似於AtomicLong中全局的value值。在沒有競爭情況下數據直接累加到base上,或者cells擴容時,也需要將數據寫入到base上
  • collide:表示擴容意向,false 一定不會擴容,true可能會擴容。
  • cellsBusy:初始化cells或者擴容cells需要獲取鎖, 0:表示無鎖狀態 1:表示其他線程已經持有了鎖
  • casCellsBusy(): 通過CAS操作修改cellsBusy的值,CAS成功代表獲取鎖,返回true
  • NCPU:當前計算機CPU數量,Cell數組擴容時會使用到
  • getProbe(): 獲取當前線程的hash值
  • advanceProbe(): 重置當前線程的hash值

接着開始正式解析longAccumulate()源碼:

private static final long PROBE;

if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current();
    h = getProbe();
    wasUncontended = true;
}

static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

我們上面說過 getProbe() 方法是爲了獲取當前線程的 hash值 ,具體實現是通過 UNSAFE.getInt() 實現的, PROBE 是在初始化時候獲取當前線程 threadLocalRandomProbe 的值。

注:Unsafe.getInt()有三個重載方法getInt(Object o, long offset)、getInt(long address) 和getIntVolatile(long address),都是從指定的位置獲取變量的值,只不過第一個的offset是相對於對象o的相對偏移量,第二個address是絕對地址偏移量。如果第一個方法中o爲null是,offset也會被作爲絕對偏移量。第三個則是帶有volatile語義的load讀操作。

如果當前線程的 hash值h=getProbe()爲0,0與任何數取模都是0,會固定到數組第一個位置 ,所以這裏做了優化,使用 ThreadLocalRandom 爲當前線程重新計算一個 hash 值。最後設置 wasUncontended = true ,這裏含義是重新計算了當前線程的 hash 後認爲此次不算是一次競爭。 hash 值被重置就好比一個全新的線程一樣,所以設置了競爭狀態爲 true

可以畫圖理解爲:

wasUncontended設置說明.png

接着執行 for循環 ,我們可以把 for循環 代碼拆分一下,每個 if條件 算作一個 CASE 來分析:

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {

        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

        }
        else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))

    }
}

如上所示,第一個 if語句代表CASE1 ,裏面再有 if判斷 會以 CASE1.1 這種形式來講解,下面接着的 else ifCASE2 , 最後一個爲 CASE3

CASE1執行條件:

if ((as = cells) != null && (n = as.length) > 0) {

}

cells數組 不爲空,且數組長度大於0的情況會執行 CASE1CASE1 的實現細節代碼較多,放到最後面講解。

CASE2執行條件和實現原理:

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
        try {
            if (cells == as) {
                Cell[] rs = new Cell[2];
                rs[h & 1] = new Cell(x);
                cells = rs;
                init = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (init)
            break;
}

CASE2 標識 cells數組 還未初始化,因爲判斷 cells == as ,這個代表當前線程到了這裏獲取的 cells 還是之前的一致。我們可以先看這個 case ,最後再回頭看最爲麻煩的 CASE1 實現邏輯。

cellsBusy 上面說了是加鎖的狀態,初始化 cells數組 和擴容的時候都要獲取加鎖的狀態,這個是通過 CAS 來實現的,爲0代表無鎖狀態,爲1代表其他線程已經持有鎖了。 cells==as 代表當前線程持有的數組未進行修改過, casCellsBusy() 通過 CAS操作 去獲取鎖。但是裏面的 if條件 又再次判斷了 cell==as ,這一點是不是很奇怪?通過畫圖來說明下問題:

cells==as雙重判斷說明.png

如果上面條件都執行成功就會執行數組的初始化及賦值操作, Cell[] rs = new Cell[2] 表示數組的長度爲2, rs[h & 1] = new Cell(x) 表示創建一個新的 Cell元素value是x值,默認爲1。

h & 1 類似於我們之前 HashMap 或者 ThreadLocal 裏面經常用到的計算散列桶 index 的算法,通常都是 hash & (table.len - 1) ,這裏就不做過多解釋了。 執行完成後直接退出 for循環

CASE3執行條件和實現原理:

else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
    break;

進入到這裏說明 cells 正在或者已經初始化過了,執行 caseBase() 方法,通過 CAS操作 來修改 base 的值,如果修改成功則跳出循環,這個 CAS E只有在初始化 Cell數組 的時候,多個線程嘗試 CAS 修改 cellsBusy 加鎖的時候,失敗的線程會走到這個分支,然後直接 CAS 修改 base 數據。

CASE1 實現原理:

分析完了 CASE2和CASE3 ,我們再折頭回看一下 CASE1 ,進入 CASE1 的前提是: cells數組 不爲空,已經完成了初始化賦值操作。

接着還是一點點往下拆分代碼,首先看第一個判斷分支 CASE1.1

if ((a = as[(n - 1) & h]) == null) {
    if (cellsBusy == 0) {
        Cell r = new Cell(x);
        if (cellsBusy == 0 && casCellsBusy()) {
            boolean created = false;
            try {
                Cell[] rs; int m, j;
                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                    rs[j] = r;
                    created = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (created)
                break;
            continue;
        }
    }
    collide = false;
}

這個if條件中 (a = as[(n - 1) & h]) == null 代表當前線程對應的數組下標位置的 cell 數據爲 null ,代表沒有線程在此處創建 Cell 對象。

接着判斷 cellsBusy==0 ,代表當前鎖未被佔用。然後新創建 Cell對象 ,接着又判斷了一遍 cellsBusy == 0 ,然後執行 casCellsBusy() 嘗試通過CAS操作修改 cellsBusy=1 ,加鎖成功後修改擴容意向 collide = false;

for (;;) {
    if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
        rs[j] = r;
        created = true;
    }

    if (created)
        break;
    continue;
}

上面代碼判斷當前線程 hash 後指向的數據位置元素是否爲空,如果爲空則將 cell 數據放入數組中,跳出循環。如果不爲空則繼續循環。

CASE1.1.png

繼續往下看代碼, CASE1.2

else if (!wasUncontended)
    wasUncontended = true;

h = advanceProbe(h);

wasUncontended 表示 cells 初始化後,當前線程競爭修改失敗 wasUncontended =false ,這裏只是重新設置了這個值爲 true ,緊接着執行 advanceProbe(h) 重置當前線程的 hash ,重新循環。

接着看CASE1.3:

else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
    break;

進入 CASE1.3 說明當前線程對應的數組中有了數據,也重置過 hash值 ,這時通過CAS操作嘗試對當前數中的 value值 進行累加x操作,x默認爲1,如果 CAS 成功則直接跳出循環。

CASE1.3.png

接着看CASE1.4:

else if (n >= NCPU || cells != as)
    collide = false;    

如果 cells數組 的長度達到了 CPU核心數 ,或者 cells 擴容了,設置擴容意向 collide爲false 並通過下面的 h = advanceProbe(h) 方法修改線程的 probe 再重新嘗試

至於這裏爲什麼要提出和 CPU數量 做判斷的問題:每個線程會通過線程對 cells[threadHash%cells.length] 位置的 Cell 對象中的 value 做累加,這樣相當於將線程綁定到了 cells 中的某個 cell 對象上,如果超過 CPU數量 的時候就不再擴容是因爲 CPU 的數量代表了機器處理能力,當超過 CPU 數量時,多出來的 cells 數組元素沒有太大作用。

多線程更新Cell.png

接着看CASE1.5:

 else if (!collide)
   collide = true;

如果擴容意向 collidefalse 則修改它爲 true ,然後重新計算當前線程的 hash 值繼續循環,在 CASE1.4 中,如果當前數組的長度已經大於了 CPU 的核數,就會再次設置擴容意向 collide=false ,這裏的意義是保證擴容意向爲 false 後不再繼續往後執行 CASE1.6 的擴容操作。

接着看CASE1.6分支:

else if (cellsBusy == 0 && casCellsBusy()) {
    try {
        if (cells == as) {
            Cell[] rs = new Cell[n << 1];
            for (int i = 0; i < n; ++i)
                rs[i] = as[i];
            cells = rs;
        }
    } finally {
        cellsBusy = 0;
    }
    collide = false;
    continue;
}

這裏面執行的其實是擴容邏輯,首先是判斷通過 CAS 改變 cellsBusy 來嘗試加鎖,如果 CAS 成功則代表獲取鎖成功,繼續向下執行,判斷當前的 cells 數組和最先賦值的 as 是同一個,代表沒有被其他線程擴容過,然後進行擴容,擴容大小爲之前的容量的兩倍,這裏用的按位左移1位來操作的。

Cell[] rs = new Cell[n << 1];

擴容後再將之前數組的元素拷貝到新數組中,釋放鎖設置 cellsBusy = 0 ,設置擴容狀態,然後繼續循環執行。

到了這裏,我們已經分析完了 longAccumulate() 所有的邏輯,邏輯分支挺多,仔細分析看看其實還是挺清晰的,流程圖如下:

流程圖.png

我們再舉一些線程執行的例子裏面場景覆蓋不全,大家可以按照這種模式自己模擬場景分析代碼流程:

多線程執行示例.png

如有問題也請及時指出,我會第一時間更正,不勝感激!

LongAdder的sum方法

當我們最終獲取計數器值時,我們可以使用 LongAdder.longValue() 方法,其內部就是使用 sum 方法來彙總數據的。

java.util.concurrent.atomic.LongAdder.sum() :

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

實現很簡單,base + ,遍歷 cells 數組中的值,然後累加。

AtomicLong可以棄用了嗎?

看上去 LongAdder 的性能全面超越了 AtomicLong ,而且阿里巴巴開發手冊也提及到 推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀 鎖的重試次數) ,但是我們真的就可以捨棄掉 LongAdder 了嗎?

當然不是,我們需要看場景來使用,如果是併發不太高的系統,使用 AtomicLong 可能會更好一些,而且內存需求也會小一些。

我們看過 sum() 方法後可以知道 LongAdder 在統計的時候如果有併發更新,可能導致統計的數據有誤差。

而在高併發統計計數的場景下,才更適合使用 LongAdder

總結

LongAdder 中最核心的思想就是利用空間來換時間,將熱點 value 分散成一個 Cell列表來承接併發的CAS ,以此來提升性能。

LongAdder 的原理及實現都很簡單,但其設計的思想值得我們品味和學習。

YDzI1K.png

相關文章