比AtomicLong更優秀的LongAdder確定不來了解一下嗎?
摘要:進入到這裏說明 cells 正在或者已經初始化過了,執行 caseBase() 方法,通過 CAS操作 來修改 base 的值,如果修改成功則跳出循環,這個 CAS E只有在初始化 Cell數組 的時候,多個線程嘗試 CAS 修改 cellsBusy 加鎖的時候,失敗的線程會走到這個分支,然後直接 CAS 修改 base 數據。主要看a.cas(v = a.value, v + x),接着條件二,說明當前線程hash與數組長度取模計算出的位置的cell有值,此時直接嘗試一次CAS操作,如果成功則退出if條件,失敗則繼續往下執行longAccumulate()方法。
前言
思維導圖.png
文章中所有 高清無碼圖片
在 公衆號號回覆: 圖片666
即可查閱, 可直接關注公衆號: 壹枝花算不算浪漫
最近阿里巴巴發佈了 Java開發手冊(泰山版)
( 公衆號回覆: 開發手冊
可收到 阿里巴巴開發手冊(泰山版 2020.4.22發佈).pdf
),其中第 17條
寫到:
阿里巴巴開發手冊.png
對於 Java項目中
計數統計的一些需求, 如果是 JDK8,推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數)
在大多數項目及開源組件中,計數統計使用最多的仍然還是 AtomicLong
,雖然是阿里巴巴這樣說,但是我們仍然要根據使用場景來決定是否使用 LongAdder
。
今天主要是來講講 LongAdder
的 實現原理 ,還是老方式,通過 圖文 一步步解開 LongAdder
神祕的面紗,通過此篇文章你會了解到:
- 爲什麼AtomicLong在高併發場景下性能急劇下降?
- LongAdder爲什麼快?
- LongAdder實現原理(圖文分析)
- AtomicLong是否可以被遺棄或替換?
本文代碼全部基於JDK 1.8,建議邊看文章邊看源碼更加利於消化
AtomicLong
當我們在進行計數統計的時,通常會使用 AtomicLong
來實現。 AtomicLong
能保證併發情況下計數的準確性,其內部通過 CAS
來解決併發安全性的問題。
AtomicLong實現原理
說到線程安全的計數統計工具類,肯定少不了 Atomic
下的幾個原子類。 AtomicLong
就是 juc包 下重要的 原子類 ,在併發情況下可以對長整形類型數據進行原子操作, 保證併發情況下數據的安全性。
public class AtomicLong extends Number implements java.io.Serializable { public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; } }
我們在計數的過程中,一般使用 incrementAndGet()
和 decrementAndGet()
進行加一和減一操作,這裏調用了 Unsafe
類中的 getAndAddLong()
方法進行操作。
接着看看 unsafe.getAndAddLong()
方法:
public final class Unsafe { public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); }
這裏直接進行 CAS+自旋 操作更新 AtomicLong
中的 valu
e值,進而保證 value
值的 原子性更新 。
AtomicLong瓶頸分析
如上代碼所示,我們在使用 CAS + 自旋 的過程中,在高併發環境下,N個線程同時進行自旋操作,會出現大量失敗並不斷自旋的情況,此時 AtomicLong
的自旋會成爲瓶頸。
AtomicLong瓶頸分析.png
如上圖所示,高併發場景下 AtomicLong
性能會急劇下降,我們後面也會舉例說明。
那麼高併發下計數的需求有沒有更好的替代方案呢?在 JDK8
中 Doug Lea
大神 新寫了一個 LongAdder
來解決此問題,我們後面來看 LongAdder
是如何優化的。
LongAdder
LongAdder和AtomicLong性能測試
我們說了很多 LongAdder
上性能優於 AtomicLong
,到底是不是呢?一切還是以代碼說話:
/** * Atomic和LongAdder耗時測試 * * @author:一枝花算不算浪漫 * @date:2020-05-12 7:06 */ public class AtomicLongAdderTest { public static void main(String[] args) throws Exception{ testAtomicLongAdder(1, 10000000); testAtomicLongAdder(10, 10000000); testAtomicLongAdder(100, 10000000); } static void testAtomicLongAdder(int threadCount, int times) throws Exception{ System.out.println("threadCount: " + threadCount + ", times: " + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder 耗時:" + (System.currentTimeMillis() - start) + "ms"); System.out.println("threadCount: " + threadCount + ", times: " + times); long atomicStart = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong 耗時:" + (System.currentTimeMillis() - atomicStart) + "ms"); System.out.println("----------------------------------------"); } static void testAtomicLong(int threadCount, int times) throws Exception{ AtomicLong atomicLong = new AtomicLong(); List<Thread> list = Lists.newArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { atomicLong.incrementAndGet(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("AtomicLong value is : " + atomicLong.get()); } static void testLongAdder(int threadCount, int times) throws Exception{ LongAdder longAdder = new LongAdder(); List<Thread> list = Lists.newArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { longAdder.increment(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("LongAdder value is : " + longAdder.longValue()); } }
執行結果:
CAS原理圖.png
這裏可以看到隨着併發的增加, AtomicLong
性能是急劇下降的,耗時是 LongAdder
的數倍。至於原因我們還是接着往後看。
LongAdder爲什麼這麼快
先看下 LongAdder
的操作原理圖:
YUnlDO.png
既然說到 LongAdder
可以顯著提升高併發環境下的性能,那麼它是如何做到的?
1、 設計思想上, LongAdder
採用"分段"的方式降低 CAS
失敗的頻次
這裏先簡單的說下 LongAdder
的思路,後面還會詳述 LongAdder
的原理。
我們知道, AtomicLong
中有個內部變量 value
保存着實際的 long
值,所有的操作都是針對該變量進行。也就是說,高併發環境下, value
變量其實是一個 熱點數據 ,也就是 N個線程競爭一個熱點。
LongAdder
的基本思路就是 分散熱點 ,將 value
值的新增操作分散到一個數組中,不同線程會命中到數組的不同槽中,各個線程只對自己槽中的那個 value
值進行 CAS
操作,這樣熱點就被分散了,衝突的概率就小很多。
LongAdder
有一個全局變量 volatile long base
值,當併發不高的情況下都是通過 CAS
來直接操作 base
值,如果 CAS
失敗,則針對 LongAdder
中的 Cell[]
數組中的 Cell
進行 CA
S操作,減少失敗的概率。
例如當前類中 base = 10
,有三個線程進行 CAS
原子性的 +1操作 , 線程一執行成功,此時base=11 , 線程二、線程三執行失敗後 開始針對於 Cell[]
數組中的 Cell
元素進行 +1操作 ,同樣也是 CAS
操作,此時數組 index=1
和 index=2
中 Cell
的 value
都被設置爲了1.
執行完成後,統計累加數據: sum = 11 + 1 + 1 = 13
,利用 LongAdder
進行累加的操作就執行完了,流程圖如下:
分段加鎖思路.png
如果要獲取真正的 long
值,只要將各個槽中的變量值累加返回。這種分段的做法類似於 JDK7
中 ConcurrentHashMap
的分段鎖。
2、使用Contended註解來消除僞共享
在 LongAdder
的父類 Striped64
中存在一個 volatile Cell[] cells;
數組,其長度是 2 的冪次方 ,每個 Cell
都使用 @Contended
註解進行修飾,而 @Contended
註解可以進行 緩存行填充 ,從而解決 僞共享問題 。僞共享會導致緩存行失效,緩存一致性開銷變大。
@sun.misc.Contended static final class Cell { }
僞共享指的是多個線程同時讀寫同一個緩存行的不同變量時導致的 CPU緩存失效
。儘管這些變量之間沒有任何關係,但由於在主內存中鄰近,存在於同一個緩存行之中,它們的相互覆蓋會導致頻繁的緩存未命中,引發性能下降。這裏對於僞共享我只是提一下概念,並不會深入去講解,大家可以自行查閱一些資料。
解決僞共享的方法一般都是使用 直接填充 ,我們只需要保證不同線程的變量存在於不同的 CacheLine
即可,使用多餘的字節來填充可以做點這一點,這樣就不會出現僞共享問題。例如在 Disruptor隊列
的設計中就有類似設計(可參考我之前的博客文章: Disruptor學習筆記 ):
緩存行填充代碼.png 緩存行填充.png
在 Striped64
類中我們可以看看 Doug Lea
在 Cell
上加的註釋也有說明這一點:
Cell註釋.png
紅框中的翻譯如下:
Cell
類是 AtomicLong
添加了 padded([email protected])
來消除 僞共享 的變種版本。緩存行填充對於大多數原子來說是繁瑣的,因爲它們通常不規則地分散在內存中,因此彼此之間不會有太大的干擾。但是,駐留在數組中的原子對象往往彼此相鄰,因此在沒有這種預防措施的情況下,通常會共享緩存行數據(對性能有巨大的負面影響)。
3、惰性求值
LongAdder
只有在使用 longValue()
獲取當前累加值時纔會真正的去結算計數的數據, longValue()
方法底層就是調用 sum()
方法,對 base
和 Cell數組
的數據累加然後返回,做到數據寫入和讀取分離。
而 AtomicLong
使用 incrementAndGet()
每次都會返回 long
類型的計數值,每次遞增後還會伴隨着數據返回,增加了額外的開銷。
LongAdder實現原理
之前說了, AtomicLong
是多個線程針對單個 熱點值value進行原子操作 。而 LongAdder
是每個線程擁有自己的槽,各個線程一般只對自己槽中的那個值進行 CAS操作
。
比如有三個線程同時對value增加1,那麼value = 1 + 1 + 1 = 3
但是對於LongAdder來說,內部有一個base變量,一個Cell[]數組。
base變量:非競態條件下,直接累加到該變量上
Cell[]數組:競態條件下,累加個各個線程自己的槽Cell[i]中
最終結果的計算是下面這個形式:
value = base +
LongAdder源碼剖析
前面已經用圖分析了 LongAdder
高性能的原理,我們繼續看下 LongAdder
實現的源碼:
public class LongAdder extends Striped64 implements Serializable { public void increment() { add(1L); } public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } }
一般我們進行計數時都會使用 increment()
方法,每次進行 +1操作 , increment()
會直接調用 add()
方法。
變量說明:
- as 表示cells引用
- b 表示獲取的base值
- v 表示 期望值,
- m 表示 cells 數組的長度
- a 表示當前線程命中的cell單元格
條件分析:
條件一:as == null || (m = as.length - 1) < 0
此條件成立說明cells數組未初始化。如果不成立則說明cells數組已經完成初始化,對應的線程需要找到Cell數組中的元素去寫值。
條件一.png
條件二:(a = as[getProbe() & m]) == null
getProbe()獲取當前線程的hash值,m表示cells長度-1,cells長度是2的冪次方數,原因之前也講到過,與數組長度取模可以轉化爲按位與運算,提升計算性能。
當條件成立時說明當前線程通過hash計算出來數組位置處的cell爲空,進一步去執行longAccumulate()方法。如果不成立則說明對應的cell不爲空,下一步將要將x值通過CAS操作添加到cell中。
條件三:!(uncontended = a.cas(v = a.value, v + x)
主要看a.cas(v = a.value, v + x),接着條件二,說明當前線程hash與數組長度取模計算出的位置的cell有值,此時直接嘗試一次CAS操作,如果成功則退出if條件,失敗則繼續往下執行longAccumulate()方法。
條件二/條件三.png
接着往下看核心的 longAccumulate()
方法,代碼很長,後面會一步步分析,先上代碼:
java.util.concurrent.atomic.Striped64.
:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
代碼很長, if else
分支很多,除此看肯定會很頭疼。這裏一點點分析,然後結合畫圖一步步瞭解其中實現原理。
我們首先要清楚執行這個方法的前置條件,它們是或的關係,如上面條件一、二、三
- cells數組沒有初始化
- cells數組已經初始化,但是當前線程對應的cell數據爲空
- cells數組已經初始化, 當前線程對應的cell數據爲空,且CAS操作+1失敗
longAccumulate()方法的入參:
- long x 需要增加的值,一般默認都是1
- LongBinaryOperator fn 默認傳遞的是null
- wasUncontended競爭標識,如果是false則代表有競爭。只有cells初始化之後,並且當前線程CAS競爭修改失敗,纔會是false
然後再看下Striped64中一些變量或者方法的定義:
- base: 類似於AtomicLong中全局的value值。在沒有競爭情況下數據直接累加到base上,或者cells擴容時,也需要將數據寫入到base上
- collide:表示擴容意向,false 一定不會擴容,true可能會擴容。
- cellsBusy:初始化cells或者擴容cells需要獲取鎖, 0:表示無鎖狀態 1:表示其他線程已經持有了鎖
- casCellsBusy(): 通過CAS操作修改cellsBusy的值,CAS成功代表獲取鎖,返回true
- NCPU:當前計算機CPU數量,Cell數組擴容時會使用到
- getProbe(): 獲取當前線程的hash值
- advanceProbe(): 重置當前線程的hash值
接着開始正式解析longAccumulate()源碼:
private static final long PROBE; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
我們上面說過 getProbe()
方法是爲了獲取當前線程的 hash值
,具體實現是通過 UNSAFE.getInt()
實現的, PROBE
是在初始化時候獲取當前線程 threadLocalRandomProbe
的值。
注:Unsafe.getInt()有三個重載方法getInt(Object o, long offset)、getInt(long address) 和getIntVolatile(long address),都是從指定的位置獲取變量的值,只不過第一個的offset是相對於對象o的相對偏移量,第二個address是絕對地址偏移量。如果第一個方法中o爲null是,offset也會被作爲絕對偏移量。第三個則是帶有volatile語義的load讀操作。
如果當前線程的 hash值h=getProbe()爲0,0與任何數取模都是0,會固定到數組第一個位置 ,所以這裏做了優化,使用 ThreadLocalRandom
爲當前線程重新計算一個 hash
值。最後設置 wasUncontended = true
,這裏含義是重新計算了當前線程的 hash
後認爲此次不算是一次競爭。 hash
值被重置就好比一個全新的線程一樣,所以設置了競爭狀態爲 true
。
可以畫圖理解爲:
wasUncontended設置說明.png
接着執行 for循環
,我們可以把 for循環
代碼拆分一下,每個 if條件
算作一個 CASE
來分析:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) } }
如上所示,第一個 if語句代表CASE1
,裏面再有 if判斷
會以 CASE1.1
這種形式來講解,下面接着的 else if
爲 CASE2
, 最後一個爲 CASE3
CASE1執行條件:
if ((as = cells) != null && (n = as.length) > 0) { }
cells數組
不爲空,且數組長度大於0的情況會執行 CASE1
, CASE1
的實現細節代碼較多,放到最後面講解。
CASE2執行條件和實現原理:
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
CASE2
標識 cells數組
還未初始化,因爲判斷 cells == as
,這個代表當前線程到了這裏獲取的 cells
還是之前的一致。我們可以先看這個 case
,最後再回頭看最爲麻煩的 CASE1
實現邏輯。
cellsBusy
上面說了是加鎖的狀態,初始化 cells數組
和擴容的時候都要獲取加鎖的狀態,這個是通過 CAS
來實現的,爲0代表無鎖狀態,爲1代表其他線程已經持有鎖了。 cells==as
代表當前線程持有的數組未進行修改過, casCellsBusy()
通過 CAS操作
去獲取鎖。但是裏面的 if條件
又再次判斷了 cell==as
,這一點是不是很奇怪?通過畫圖來說明下問題:
cells==as雙重判斷說明.png
如果上面條件都執行成功就會執行數組的初始化及賦值操作, Cell[] rs = new Cell[2]
表示數組的長度爲2, rs[h & 1] = new Cell(x)
表示創建一個新的 Cell元素
, value是x值,默認爲1。
h & 1
類似於我們之前 HashMap
或者 ThreadLocal
裏面經常用到的計算散列桶 index
的算法,通常都是 hash & (table.len - 1)
,這裏就不做過多解釋了。 執行完成後直接退出 for循環
。
CASE3執行條件和實現原理:
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
進入到這裏說明 cells
正在或者已經初始化過了,執行 caseBase()
方法,通過 CAS操作
來修改 base
的值,如果修改成功則跳出循環,這個 CAS
E只有在初始化 Cell數組
的時候,多個線程嘗試 CAS
修改 cellsBusy
加鎖的時候,失敗的線程會走到這個分支,然後直接 CAS
修改 base
數據。
CASE1 實現原理:
分析完了 CASE2和CASE3
,我們再折頭回看一下 CASE1
,進入 CASE1
的前提是: cells數組
不爲空,已經完成了初始化賦值操作。
接着還是一點點往下拆分代碼,首先看第一個判斷分支 CASE1.1
:
if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; }
這個if條件中 (a = as[(n - 1) & h]) == null
代表當前線程對應的數組下標位置的 cell
數據爲 null
,代表沒有線程在此處創建 Cell
對象。
接着判斷 cellsBusy==0
,代表當前鎖未被佔用。然後新創建 Cell對象
,接着又判斷了一遍 cellsBusy == 0
,然後執行 casCellsBusy()
嘗試通過CAS操作修改 cellsBusy=1
,加鎖成功後修改擴容意向 collide = false;
for (;;) { if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } if (created) break; continue; }
上面代碼判斷當前線程 hash
後指向的數據位置元素是否爲空,如果爲空則將 cell
數據放入數組中,跳出循環。如果不爲空則繼續循環。
CASE1.1.png
繼續往下看代碼, CASE1.2 :
else if (!wasUncontended) wasUncontended = true; h = advanceProbe(h);
wasUncontended
表示 cells
初始化後,當前線程競爭修改失敗 wasUncontended =false
,這裏只是重新設置了這個值爲 true
,緊接着執行 advanceProbe(h)
重置當前線程的 hash
,重新循環。
接着看CASE1.3:
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
進入 CASE1.3
說明當前線程對應的數組中有了數據,也重置過 hash值
,這時通過CAS操作嘗試對當前數中的 value值
進行累加x操作,x默認爲1,如果 CAS
成功則直接跳出循環。
CASE1.3.png
接着看CASE1.4:
else if (n >= NCPU || cells != as) collide = false;
如果 cells數組
的長度達到了 CPU核心數
,或者 cells
擴容了,設置擴容意向 collide爲false
並通過下面的 h = advanceProbe(h)
方法修改線程的 probe
再重新嘗試
至於這裏爲什麼要提出和 CPU數量
做判斷的問題:每個線程會通過線程對 cells[threadHash%cells.length]
位置的 Cell
對象中的 value
做累加,這樣相當於將線程綁定到了 cells
中的某個 cell
對象上,如果超過 CPU數量
的時候就不再擴容是因爲 CPU
的數量代表了機器處理能力,當超過 CPU
數量時,多出來的 cells
數組元素沒有太大作用。
多線程更新Cell.png
接着看CASE1.5:
else if (!collide) collide = true;
如果擴容意向 collide
是 false
則修改它爲 true
,然後重新計算當前線程的 hash
值繼續循環,在 CASE1.4
中,如果當前數組的長度已經大於了 CPU
的核數,就會再次設置擴容意向 collide=false
,這裏的意義是保證擴容意向爲 false
後不再繼續往後執行 CASE1.6
的擴容操作。
接着看CASE1.6分支:
else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; }
這裏面執行的其實是擴容邏輯,首先是判斷通過 CAS
改變 cellsBusy
來嘗試加鎖,如果 CAS
成功則代表獲取鎖成功,繼續向下執行,判斷當前的 cells
數組和最先賦值的 as
是同一個,代表沒有被其他線程擴容過,然後進行擴容,擴容大小爲之前的容量的兩倍,這裏用的按位左移1位來操作的。
Cell[] rs = new Cell[n << 1];
擴容後再將之前數組的元素拷貝到新數組中,釋放鎖設置 cellsBusy = 0
,設置擴容狀態,然後繼續循環執行。
到了這裏,我們已經分析完了 longAccumulate()
所有的邏輯,邏輯分支挺多,仔細分析看看其實還是挺清晰的,流程圖如下:
流程圖.png
我們再舉一些線程執行的例子裏面場景覆蓋不全,大家可以按照這種模式自己模擬場景分析代碼流程:
多線程執行示例.png
如有問題也請及時指出,我會第一時間更正,不勝感激!
LongAdder的sum方法
當我們最終獲取計數器值時,我們可以使用 LongAdder.longValue()
方法,其內部就是使用 sum
方法來彙總數據的。
java.util.concurrent.atomic.LongAdder.sum()
:
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
實現很簡單,base + ,遍歷 cells
數組中的值,然後累加。
AtomicLong可以棄用了嗎?
看上去 LongAdder
的性能全面超越了 AtomicLong
,而且阿里巴巴開發手冊也提及到 推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀 鎖的重試次數) ,但是我們真的就可以捨棄掉 LongAdder
了嗎?
當然不是,我們需要看場景來使用,如果是併發不太高的系統,使用 AtomicLong
可能會更好一些,而且內存需求也會小一些。
我們看過 sum()
方法後可以知道 LongAdder
在統計的時候如果有併發更新,可能導致統計的數據有誤差。
而在高併發統計計數的場景下,才更適合使用 LongAdder
。
總結
LongAdder
中最核心的思想就是利用空間來換時間,將熱點 value
分散成一個 Cell列表來承接併發的CAS ,以此來提升性能。
LongAdder
的原理及實現都很簡單,但其設計的思想值得我們品味和學習。