Java8異步編程之CompletableFuture源碼解讀
摘要:如果 t 等於 null,那麼直接 執⾏回調⽅法 h,並返回下⼀個 CompletableFuture 對象。@Test public void demo2() throws ExecutionException, InterruptedException { CompletableFuture
【51CTO.com原創稿件】
一、引言
一說到異步任務,很多人上來咔咔新建個線程池。爲了防止線程數量肆虐,一般還會考慮使用單例模式創建線程池,具體使用方法大都如下面的代碼所示:
@Test publicvoiddemo1() throwsExecutionException, InterruptedException { ExecutorServiceexecutorService=Executors.newFixedThreadPool(5); Future<Object>future1=executorService.submit(newCallable<Object>() { @Override publicObjectcall() throwsException { returnThread.currentThread().getName(); } }); System.out.println(future1.get()); executorService.execute(newRunnable() { @Overridepublicvoidrun() { System.out.println(Thread.currentThread().getName()); } }); }
經常使用 JavaScript 的同學相信對於異步回調的用法相當熟悉了,畢竟 JavaScript 擁有“回調地獄”的美譽。
我們大 Java 又開啓了新一輪模仿之旅。
java.util.concurrent 包新增了 CompletableFuture 類可以實現類似 JavaScript 的連續回調。
二、兩種基本用法
先來看下 CompletableFuture 的兩種基本⽤法,代碼如下:
@Test public void index1() throws ExecutionException, InterruptedException { CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> Thread.currentThread().getName()); CompletableFuture completableFuture2 = CompletableFuture.runAsync(() -> Thread.currentThread().getName()); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); }
打印輸出:
ForkJoinPool.commonPool-worker-1 null
初看代碼,第一反應是代碼簡潔。直接調用 CompletableFuture 類的靜態方法,提交任務方法就完事了。但是,隨之而來的疑問就是,異步任務執行的背後是一套什麼邏輯呢?是一對一使用newThread()還是依賴線程池去執行的呢。
三、探索線程池原理
翻閱 CompletableFuture 類的源碼,我們找到答案。關鍵代碼如下:
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1); /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
可以看到 CompletableFuture 類默認使⽤的是 ForkJoinPool.commonPool() ⽅法返回的線程池。當 然啦,前提是 ForkJoinPool 線程池的數量⼤於 1 。否則,則使⽤ CompletableFuture 類⾃定義的 ThreadPerTaskExecutor 線程池。 ThreadPerTaskExecutor 線程池的實現邏輯⾮常簡單,⼀⾏代碼簡單實現了 Executor 接⼝,內部執⾏ 邏輯是⼀條任務對應⼀條線程。代碼如下:
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
四、兩種異步接⼝
之前我們使⽤線程池執⾏異步任務時,當不需要任務執⾏完畢後返回結果的,我們都是實現 Runnable 接⼝。⽽當需要實現返回值時,我們使⽤的則是 Callable 接⼝。 同理,使⽤ CompletableFuture 類的靜態⽅法執⾏異步任務時,不需要返回結果的也是實現 Runnable 接⼝。⽽當需要實現返回值時,我們使⽤的則是 Supplier 接⼝。其實,Callable 接⼝和 Supplier 接⼝ 並沒有什麼區別。 接下來,我們來分析⼀下 CompletableFuture 是如何實現異步任務執⾏的。
runAsync
CompletableFuture 執⾏⽆返回值任務的是 runAsync() ⽅法。該⽅法的關鍵執⾏代碼如下:
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); e.execute(new AsyncRun(d, f)); return d; }
可以看到,該⽅法將 Runnable 實例作爲參數封裝⾄ AsyncRun 類。實際上, AsyncRun 類是對 Runnable 接⼝的進⼀步封裝。實際上,AsyncRun 類也是實現了 Runnable 接⼝。觀察下⽅ AsyncRun 類的源碼,可以看到 AsyncRun 類的 run() ⽅法中調⽤了 Runnable 參數的 run() ⽅法。
public void run() { CompletableFuture<Void> d; Runnable f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { f.run(); d.completeNull(); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } }
當提交的任務執⾏完畢後,即 f.run() ⽅法執⾏完畢。調⽤ d.completeNull() ⽅法設置任務執⾏結 果爲空。代碼如下:
/** The encoding of the null value. */ static final AltResult NIL = new AltResult(null); /** Completes with the null value, unless already completed. */ final boolean completeNull() { return UNSAFE.compareAndSwapObject(this, RESULT, null, NIL); }
可以看到,對於任務返回值爲 null 的執⾏結果,被封裝爲 new AltResult(null) 對象。⽽且,還是 調⽤的 CAS 本地⽅法實現了原⼦操作。 爲什麼需要對 null 值進⾏單獨封裝呢?觀察 get() ⽅法的源碼:
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); }
原來原因是便於使⽤ null 值區分異步任務是否執⾏完畢。 如果你對 CAS 不太瞭解的話,可以查閱 compareAndSwapObject ⽅法的四個參數的含義。該⽅法的參 數 RESULT 是什麼呢?查看代碼如下:
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
原來,RESULT 是獲取 CompletableFuture 對象中 result 字段的偏移地址。這個 result 字段⼜是啥 呢?就是任務執⾏完畢後的結果值。代碼如下:
// Either the result or boxed AltResult volatile Object result;
supplyAsync
CompletableFuture 執⾏有返回值任務的是 supplyAsync() ⽅法。該⽅法的關鍵執⾏代碼如下:
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; }
與 AsyncRun 類對 Runnable 接⼝的封裝相同的是,AsyncSupply 類也是對 Runnable 接⼝的 run() ⽅ 法進⾏了⼀層封裝。代碼如下:
public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } }
當異步任務執⾏完畢後,返回結果會經 d.completeValue() ⽅法進⾏封裝。與 d.completeNull() ⽅ 法不同的是,該⽅法具有⼀個參數。代碼如下:
/** Completes with a non-exceptional result, unless already completed. */ final boolean completeValue(T t) { return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t); }
⽆論是類 AsyncRun 還是類 AsyncSupply ,run() ⽅法都會在執⾏結束之際調⽤ CompletableFuture 對象的 postComplete() ⽅法。顧名思義,該⽅法將通知後續回調函數的執⾏。
五、探究回調函數原理
前⾯我們提到了 CompletableFuture 具有連續回調的特性。舉個例⼦:
@Test public void demo2() throws ExecutionException, InterruptedException { CompletableFuture<ArrayList> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return new ArrayList(); }) .whenCompleteAsync((list, throwable) -> { System.out.println(Thread.currentThread().getName()); list.add(1); }) .whenCompleteAsync((list, throwable) -> { System.out.println(Thread.currentThread().getName()); list.add(2); }) .whenCompleteAsync((list, throwable) -> { System.out.println(Thread.currentThread().getName()); list.add(3); }); System.out.println(completableFuture.get()); }
打印輸出:
ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 [1, 2, 3]
上⾯的測試⽅法中,通過 supplyAsync ⽅法提交異步任務,當異步任務運⾏結束,對結果值添加三個回 調函數進⼀步處理。 觀察打印輸出,可以初步得出如下結論:
- 異步任務與回調函數均運⾏在同⼀個線程中。
- 回調函數的調⽤順序與添加回調函數的順序⼀致。
那麼問題來了,CompletableFuture 內部是如何處理連續回調函數的呢?
AsyncSupply
當我們提交異步任務時,等價於向線程池提交 AsyncSupply 對象或者 AsyncRun 對象。觀察這兩個類 的唯⼀構造⽅法都是相同的,代碼如下:
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; }
這就將 AsyncSupply 異步任務與返回給⽤戶的 CompletableFuture 對象進⾏綁定,⽤於在執⾏結束後 回填結果到 CompletableFuture 對象,以及通知後續回調函數的運⾏。
Completion
回調函數均是 Completion 類的⼦類,抽取 Completion 類與⼦類的關鍵代碼:
Completion next; CompletableFuture<V> dep; CompletableFuture<T> src; Function fn;
Completion 類含有 next 字段,很明顯是⼀個鏈表。 Completion 的⼦類含有兩個 CompletableFuture 類型的參數,dep 是新建的、⽤於下⼀步的 CompletableFuture 對象,src 則是引⽤它的 CompletableFuture 對象。
當 Completion 執⾏完回調⽅法後,⼀般會返回 dep 對象,⽤於迭代遍歷。
CompletableFuture
觀察源碼,CompletableFuture 主要包含下⾯兩個參數:
volatile Object result; //結果 volatile Completion stack; //回調⽅法棧
Completion 類型封裝了回調⽅法,但爲什麼要起名爲 stack (棧)呢? 因爲 CompletableFuture 藉助 Completion 的鏈表結構實現了棧。每當調⽤ CompletableFuture 對 象的 whenCompleteAsync() 或其它回調⽅法時,都會新建⼀個 Completion 對象,並壓到棧頂。代碼 如下:
final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h); return UNSAFE.compareAndSwapObject(this, STACK, h, c); }
postComplete
回顧上⾯兩種異步任務類的實現,當異步任務執⾏完畢之後,都會調⽤ postComplete() ⽅法通知回調 ⽅法的執⾏。代碼如下:
final void postComplete() { CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } }
這段代碼是本⽂的核⼼部分,⼤致邏輯如下:
當異步任務執⾏結束後,CompletableFuture 會查看⾃身是否含有回調⽅法棧,如果含有,會通過 casStack() ⽅法拿出棧頂元素 h ,此時的棧頂是原來棧的第⼆位元素 t。如果 t 等於 null,那麼直接 執⾏回調⽅法 h,並返回下⼀個 CompletableFuture 對象。然後⼀直迭代這個過程。 簡化上述思路,我更想稱其爲通過 Completion 對象實現橋接的 CompletableFuture 鏈表,流程圖如 下:
上⾯的過程是屬於正常情況下的,也就是⼀個 CompletableFuture 對象只提交⼀個回調⽅法的情況。 如果我們使⽤同⼀個 CompletableFuture 對象連續調⽤多次回調⽅法,那麼就會形成 Completion 棧。
你以爲 Completion 棧內元素會依次調⽤,不會的。從代碼中來看,當回調⽅法 t 不等於 null,有兩種 情況:
情況 1:如果當前迭代到的 CompletableFuture 對象是 this (也就是 CompletableFuture 鏈表頭), 會令 h.next = null ,因爲 h.next 也就是 t 通過 CAS 的⽅式壓到了 this 對象的 stack 棧頂。
情況 2:如果當前迭代到的 CompletableFuture 對象 f 不是 this (不是鏈表頭)的話,會將回調函數 h 壓⼊ this (鏈表頭)的 stack 中。然後從鏈表頭再次迭代遍歷。這樣下去,對象 f 中的回調⽅法棧假設 爲 3-2-1,從 f 的棧頂推出再壓⼊ this 的棧頂,順序就變爲了 1-2-3。這時候,情況就變成了第 1 種。
這樣,當回調⽅法 t = h.next 等於 null 或者 f 等於 this 時,都會對棧頂的回調⽅法進⾏調⽤。
簡單來說,就是將擁有多個回調⽅法的 CompletableFuture 對象的多餘的回調⽅法移到到 this 對象的 棧內。
回調⽅法執⾏結束要麼返回下⼀個 CompletableFuture 對象,要麼返回 null 然後⼿動設置爲 f = this, 再次從頭遍歷。
Async
回調函數的執⾏其實分爲兩種,區別在於帶不帶 Async 後綴。例如:
@Test public void demo3() throws ExecutionException, InterruptedException { CompletableFuture<ArrayList> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return new ArrayList(); }) .whenComplete((arrayList, throwable) -> { System.out.println(Thread.currentThread().getName()); arrayList.add(1); }).whenCompleteAsync((arrayList, throwable) -> { System.out.println(Thread.currentThread().getName()); arrayList.add(2); }); System.out.println(completableFuture.get()); }
打印輸出:
ForkJoinPool.commonPool-worker-1 main ForkJoinPool.commonPool-worker-1 [1, 2]
whenComplete() 和 whenCompleteAsync() ⽅法的區別在於是否在⽴即執⾏。源碼如下:
private CompletableFuture<T> uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
兩個⽅法都是調⽤的 uniWhenCompleteStage() ,區別在於參數 Executor e 是否爲 null。從⽽控制是 否調⽤ d.uniWhenComplete() ⽅法,該⽅法會判斷 result 是否爲 null,從⽽嘗試是否⽴即執⾏該回調 ⽅法。若是 supplyAsync() ⽅法提交的異步任務耗時相對⻓⼀些,那麼就不建議使⽤ whenComplete() ⽅法了。此時由 whenComplete() 和 whenCompleteAsync() ⽅法提交的異步任務都會由線程池執⾏。
本章小結
通過本章節的源碼分析,我們明白了 Completion 之所以將自身設置爲鏈表結構,是因爲 CompletableFuture 需要藉助 Completion 的鏈表結構實現棧。也明白了同一個 CompletableFuture 對象如果多次調用回調方法時執行順序會與調用的順序不符合。換言之,一個 CompletableFuture 對象只調用一個回調方法纔是 CompletableFuture 設計的初衷,我們在編程中也可以利用這一特性來保證回調方法的調用順序。
因篇幅有限,本文並沒有分析更多的 CompletableFuture 源碼,感興趣的小夥伴可以自行查看。
六、用法集錦
異常處理
方法:
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
示例:
@Test public void index2() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2 / 0) .exceptionally((e) -> { System.out.println(e.getMessage()); return 0; }); System.out.println(completableFuture.get()); }
輸出:
java.lang.ArithmeticException: / by zero 0
任務完成後對結果的處理
方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
示例:
@Test public void index3() throws ExecutionException, InterruptedException { CompletableFuture<HashMap> completableFuture = CompletableFuture.supplyAsync(() -> new HashMap()) .whenComplete((map, throwable) -> { map.put("key1", "value1"); }); System.out.println(completableFuture.get()); }
輸出:
{key=value}
任務完成後對結果的轉換
方法:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
示例:
@Test public void index4() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2) .thenApply((r) -> r + 1); System.out.println(completableFuture.get()); }
輸出:
- 3
任務完成後對結果的消費
方法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
示例:
@Test public void index5() throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 2) .thenAccept(System.out::println); System.out.println(completableFuture.get()); }
輸出:
2 null
任務的組合(需等待上一個任務完成)
方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
示例:
@Test public void index6() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2) .thenCompose(integer -> CompletableFuture.supplyAsync(() -> integer + 1)); System.out.println(completableFuture.get()); }
輸出:
- 3
任務的組合(不需等待上一步完成)
方法:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
示例:
@Test public void index7() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2) .thenCombine(CompletableFuture.supplyAsync(() -> 1), (x, y) -> x + y); System.out.println(completableFuture.get()); }
輸出:
- 3
消費最先執行完畢的其中一個任務,不返回結果
方法:
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
示例:
@Test public void index8() throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }) .acceptEither(CompletableFuture.supplyAsync(() -> 1), System.out::println); System.out.println(completableFuture.get()); }
輸出:
1 null
消費最先執行完畢的其中一個任務,並返回結果
方法:
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
示例:
@Test public void index9() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }) .applyToEither(CompletableFuture.supplyAsync(() -> 1), x -> x + 10); System.out.println(completableFuture.get()); }
輸出:
- 11
等待所有任務完成
方法:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
示例:
@Test public void index10() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2); CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2); System.out.println("waiting all task finish.."); System.out.println(completableFuture.get()); System.out.println("all task finish"); }
輸出:
waiting all task finish.. null all task finish
返回最先完成的任務結果
方法:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例:
@Test public void index11() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2); CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2); System.out.println(completableFuture.get()); }
輸出:
- 2
【51CTO原創稿件,合作站點轉載請註明原文作者和出處爲51CTO.com】
【責任編輯:龐桂玉 TEL:(010)68476606】