全鏈路追蹤必備組件之 TransmittableThreadLocal 詳解
摘要:// 快照類,用來保存當前線程的 TtlThreadLocal 和 ThreadLocal 的快照 private static class Snapshot { final WeakHashMap
作者 | 姜日遊
杏仁 Java 工程師。今日持續在線的程序玩家。
我們都知道 ThreadLocal 作爲一種多線程處理手段,將數據限制在當前線程中,避免多線程情況下出現錯誤。
一般的使用場景大多會是服務上下文、分佈式日誌跟蹤。
但是在業務代碼中,爲了提高響應速度,將多個複雜、長時間的計算或調用過程異步進行,讓主線程可以先進行其他操作。像我們項目中最常用的就是 CompletableFuture
了,默認會使用預設的 ForkJoin ThreadPool
執行。
這也就引入了一個問題,如果保證 ThreadLocal 的信息能夠傳遞異步線程?通過 ThreadLocal?通過線程池?通過 Runnable 或者 Callable?
有些場景丟了就丟了,比如目前我們的服務上下文傳遞,一般都沒有很嚴謹的處理 ......
但是,如果是分佈式追蹤的場景,丟了就要累慘了。
注:以下代碼僅保留關鍵代碼,其餘無關緊要則忽略
InheritableThreadLocal
InheritableThreadLocal
是 JDK 本身自帶的一種線程傳遞解決方案。顧名思義,由當前線程創建的線程,將會繼承當前線程裏 ThreadLocal 保存的值。
其本質上是 ThreadLocal 的一個子類,通過覆寫父類中創建初始化的相關方法來實現的。我們知道,ThreadLocal 實際上是 Thread 中保存的一個 ThreadLocalMap
類型的屬性搭配使用才能讓廣大 Javaer 直呼真香的,所以 InheritableThreadLocal 也是如此。
public class Thread implements Runnable { // 如果單純使用 ThreadLocal,則 Thread 使用該屬性值保存 ThreadLocalMap ThreadLocal.ThreadLocalMap threadLocals = null; // 否則使用該屬性值 ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc) { Thread parent = currentThread(); if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); } }
init
方法作爲 Thread 初始化的核心方法,相關 ThreadLocal 代碼已經全部摘出。如我們所見,僅僅就只是這一點改動。在創建線程時,如果當前線程的 inheritableThreadLocals 不爲空,則根據它創建出新的 InheritableThreadLocals 保存到新線程中。
Ps : ThreadLocal 作爲老牌選手,默認都是使用時,直接初始化 Thread 的 threadLocals 屬性。
只有像是 InheritableThreadLocal 這樣的後輩,需要特殊處理一下。
public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } // Thread 中 ThreadLocalMap 不存在時的初始化動作,需要改爲初始化 inheritableThreadLocals void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
因此,原先 ThreadLocal 會從 Thread 的 threadLocals 獲取 Map,那麼 InheritableThreadLocal 就要從 inheritableThreadLocals 拿了。 childValue
方法用作從父線程中獲取值,可以看到,這邊是直接返回的,如果是複雜對象,就直接傳引用了。當然,繼承覆寫該方法,可以實現淺拷貝、深拷貝等等方式。
缺點
這樣的方式解決了創建線程時的 ThreadLocal 傳值的問題,但不可能一直創建新的線程,那實在耗費資源。因此通用做法是線程複用,比如線程池唄。但是,遞交異步任務是相應的 ThreadLocal 的值就無法傳遞過去了。
我們希望的是,異步線程執行任務的所使用的 ThreadLocal 值,是將任務提交給線程時主線程持有的。即從任務創建時傳遞到任務執行時。
想想,如果我們在創建異步任務時,在任務代碼外獲取當前線程的值臨時保存,再傳遞給執行線程,在真正的任務執行前保存到當前線程即可。對,確實可以,但是麻煩不?每個創建異步任務的地方都要寫。
那就把它封裝到遞交任務的方法中。
RunnableWrapper & CallableWrapper
假設按照服務上下文的場景舉例,目前項目中的執行異步操作的方案是定義一個 AsyncExecutor
,並聲明執行 Supplier 返回 CompletableFuture 的方法。
既然這樣就可以對方法做一些改造,保證上下文的傳遞。
private static ThreadLocal<String> contextHolder = new ThreadLocal<>(); public static <T> CompletableFuture<T> invokeToCompletableFuture(Supplier<T> supplier, String errorMessage) { // 第一步 String context = contextHolder.get(); Supplier<T> newSupplier = () -> { // 第二步 String origin = contextHolder.get(); try { contextHolder.set(context); // 第三步 return supplier.get(); } finally { // 第四步 contextHolder.set(origin); log.info(origin); } }; return CompletableFuture.supplyAsync(newSupplier).exceptionally(e -> { throw new ServerErrorException(errorMessage, e); }); } // test code public static void main(String[] args) throws ExecutionException, InterruptedException { contextHolder.set("main"); log.info(contextHolder.get()); CompletableFuture<String> context = invokeToCompletableFuture(() -> test.contextHolder.get(), "error"); log.info(context.get()); }
總得來說,就是在將異步任務派發給線程池時,對其做一下上下文傳遞的處理。
第一步:主線程獲取上下文,傳遞給任務暫存。
1 之後的操作都將是異步執行線程操作的。
第二步:異步執行線程將原有上下文取出,暫時保存。並將主線程傳遞過來的上下文設置。
第三步:執行異步任務
第四步:將原有上下文設置回去。
可以看到一般並不會在異步線程執行完任務之後直接進行 remove
。而是一開始取出原上下文(可能爲 NULL,也可能是線程創建時 InheritableThreadLocal 繼承過來的值。當然後續也會被清除的),並在任務執行結束重新放回。這樣的方式可以說是異步 ThreadLocal 傳遞的標準範式(大佬說的)。
這樣子既起到了顯式清除主線程帶來的上下文,也避免瞭如果線程池的拒絕策略爲 CallerRunsPolicy
,後續處理時上下文丟失的問題。
Supplier 不算是典型例子,更爲典型的應該是 Runnable 和 Callable。不過舉一推三,都是修飾一下,再丟給線程池。
public final class DelegatingContextRunnable implements Runnable { private final Runnable delegate; private final Optional<String> delegateContext; public DelegatingContextRunnable(Runnable delegate, Optional<String> context) { assert delegate != null; assert context != null; this.delegate = delegate; this.delegateContext = context; } public DelegatingContextRunnable(Runnable delegate) { // 修飾原有的任務,並保存當前線程的值 this(delegate, ContextHolder.get()); } public void run() { Optional<String> originalContext = ContextHolder.get(); try { ContextHolder.set(delegateContext); delegate.run(); } finally { ContextHolder.set(originalContext); } } } public final void execute(Runnable task) { // 遞交給真正的執行線程池前,對任務進行修飾 executor.execute(wrap(task)); } protected final Runnable wrap(Runnable task) { return new DelegatingContextRunnable(task); }
後續,使用線程池執行異步任務的時候,事先對任務進行封裝代理即可。
不過,還是比較麻煩。自定義的線程池,需要顯式處理任務。而且更嚴謹的做法,不同業務場景之間的線程池應該是隔離的,以免受到影響,就比如 Hystrix
的線程池。
每一個線程池都要處理就麻煩了。所以換個思路,代理線程池。
DelegaingExecutor
這個就不多說了,實際很簡單,就照搬我們上下文相關類庫。
public class DelegatingContextExecutor implements Executor { private final Executor delegate; public DelegatingContextExecutor(Executor delegateExecutor) { this.delegate = delegateExecutor; } public final void execute(Runnable task) { delegate.execute(wrap(task)); } protected final Runnable wrap(Runnable task) { return new DelegatingContextRunnable(task); } protected final Executor getDelegateExecutor() { return delegate; } } // 自定義的線程池,用於執行項目中的異步任務 public Executor queryExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(); // 封裝服務上下文的線程池修飾 return new DelegatingContextExecutorService(threadPoolExecutor); }
問題似乎都解決了,那還有什麼?
對,適用場景不夠通用。上面的做法只針對於指定的 ThreadLocal,其他場景例如鏈路追蹤、應用容器或上層框架跨應用代碼給下層 SDK
傳遞信息(像是契約包 Feign
的執行線程)。
那麼 TransmittableThreadLocal
就是爲了解決通用化場景而設計的。
TransmittableThreadLocal
作爲一個核心代碼不超過一千行的工具框架,實際使用和架構設計都十分簡單。
其使用方法本質上與上述提到的 CallableWrapper 和 DelegatingExecutor 是一樣的,並且爲了方便使用,對外提供了靜態工廠方法或工具類。
public final void execute(Runnable task) { executor.execute(TtlCallable.get(task)); } // 或者 public Executor queryExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(); // 封裝服務上下文的線程池修飾 return TtlExecutors.getTtlExecutorService(threadPoolExecutor); }
當然,前提是 ThreadLocal 必須使用 TransmittableThreadLocal。至於爲什麼,我們源碼分析時再細細說來。
先看看核心實現類的結構,以 Callable 和 ExecutorService 爲例。
整體主要是三個部分:任務( TtlCallable
)、線程池( ExecutorServiceTtlWrapper
)、ThreadLocal( TransmittableThreadLocal
)。其實對應上述講到的 CallableWrapper、DelegatingExecutor、InheritableThreadLocal。
但是無論是任務和線程池,本身還是依賴於 TransmittableThreadLocal 對於存儲值的管理。
用官方的時序圖直觀展示一下,框架是如何起作用的:
可以看到,從第三步創建完任務,第四步修飾完任務,後續大部分過程都依賴於 TransmittableThreadLocal 或 TransmittableThreadLocal 中聲明的靜態工具類 Transmitter
。Transmitter 主要負責 ThreadLocal 的管理和值的傳遞。
首先看看 TtlCallable。
TtlCallable
該類實際上是 JDK Callable 的一個修飾。類比於,上文講到的 RunnableWrapper,只是爲了臨時保存父線程 ThreadLocal 的值,以便在執行任務之前,賦值到子線程中。
因此,TtlCallable 和 TtlExecutorService 都實現了 TtlWrapper 接口。也許你以爲,該接口是實現修飾的語義,但是它只提供了一個方法,表達了拆修飾的語義:
public interface TtlWrapper<T> extends TtlEnhanced { @NonNull T unwrap(); }
畢竟核心是修飾,所以該類主要爲了提供修飾的核心抽象,便於框架對其進行判斷和管理。
該方法語義要求,必須返回修飾的源對象或下層對象(畢竟可能修飾了很多層),因此也是空值安全的。null 進來,null 出去。
public final class TtlCallable<V> implements Callable<V>, TtlWrapper<Callable<V>>, TtlEnhanced, TtlAttachments { // 保存父線程的 ThreadLocal 快照 private final AtomicReference<Object> capturedRef; // 實際執行任務 private final Callable<V> callable; // 判斷是否執行完,清除任務所保存的 ThreadLocal 快照 private final boolean releaseTtlValueReferenceAfterCall; private TtlCallable(@NonNull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) { // 1.創建時, 從 Transmitter 抓取快照 this.capturedRef = new AtomicReference<Object>(capture()); this.callable = callable; this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall; } @Override public V call() throws Exception { Object captured = capturedRef.get(); // 如果 releaseTtlValueReferenceAfterCall 爲 true,則在執行線程取出快照後清除。 if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after call!"); } // 2.使用 Transmitter 將快照重做到當前執行線程,並將原來的值取出 Object backup = replay(captured); try { // 3.執行任務 return callable.call(); } finally { // 4.Transmitter 重新將原值放回執行線程 restore(backup); } } }
可以看到,從實例化到任務執行的順序,和上文講到的 CallableWrapper 是完全一致的。但是在其之上,提供了更爲完整的特性和線程安全性。
-
releaseTtlValueReferenceAfterCall
的可控,保證了任務執行完,依然被業務代碼持有的場景下,避免 ThreadLocal 快照繼續持有而造成的內存泄漏。畢竟,對於業務方來說,這個東西是我不關心的,無需跟隨任務本身的生命週期。 -
快照使用
AtomicReference
保存,保證任務誤重用下,清除快照動作的多線程安全性。
上面兩者的合用,相當於期望一個任務只能被執行一次,儘量避免任務重用和繼續持有。
任務重用的間隔之間,可能出現 ThreadLocal 值被修改的情況,那麼後一次任務執行時,快照實際是不準確的。業務場景應該儘量避免這種情況出現纔對。
該類提供了靜態工廠方法,方便業務方創建。
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable) { return get(callable, false); } @Nullable public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall) { return get(callable, releaseTtlValueReferenceAfterCall, false); } @Nullable public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) { if (null == callable) return null; if (callable instanceof TtlEnhanced) { // avoid redundant decoration, and ensure idempotency if (idempotent) return (TtlCallable<T>) callable; else throw new IllegalStateException("Already TtlCallable!"); } return new TtlCallable<T>(callable, releaseTtlValueReferenceAfterCall); }
可以看到,默認工廠方法的 releaseTtlValueReferenceAfterCall
是 false。如果想要使用執行完清除,就要注意方法的使用。
其次,這裏還有一個冪等的參數控制: idempotent
。如果傳入的 Callable 已經是修飾過的,那麼根據 idempotent 的值,要麼返回原 Callable,要麼報錯。
我覺得這裏有個兩難的點。
我們調用靜態工廠方法期望得到的是調用該方法時 ThreadLocal 的快照。所以理論上,應該無論傳入什麼 Callable,都應該返回一個保存當前本地線程值快照的 TtlCallable。
但是,如果這樣的邏輯下,傳入的是已修飾的類,那麼最後結果就是在任務執行時,會造成外層修飾的快照被內層修飾的覆蓋。實際使用的是之前保存的快照了。
因此默認情況就只能 FastFail
。
官方並不建議設置 idempotent 爲 true,因爲直接返回原修飾類,本身也就違反靜態工廠方法的語義。所以官方建議: <b>DO NOT</b> set, only when you know why.
ExecutorServiceTtlWrapper
該類並不需要多講,本身與上文的 DelegatingExecutor 一樣。
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced { private final ExecutorService executorService; ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService) { super(executorService); this.executorService = executorService; } @NonNull @Override public <T> Future<T> submit(@NonNull Callable<T> task) { return executorService.submit(TtlCallable.get(task)); } }
其餘方法都是一樣的做法。
從上文看到,實際 ThreadLocal 的線程傳遞的核心在於 TransmittableThreadLocal 和 Transmitter。
TransmittableThreadLocal
TransmittableThreadLocal 只繼承了 InheritableThreadLocal 和實現了該框架提供的函數接口 TtlCopier。
因此 TransmittableThreadLocal 自身是一個 InheritableThreadLocal,同樣具備了線程創建時傳遞的特性。
其次,從類體系上看,TransmittableThreadLocal 自身是比較簡單的,本質上只是爲了讓框架能夠進行線程傳遞,做了一些小動作而已。
可以看到提供的方法是十分少的,源碼行數總共也纔不超過200行。
首先說一下構造函數。
private final boolean disableIgnoreNullValueSemantics; public TransmittableThreadLocal() { this(false); } public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) { this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics; }
一共兩個構造函數,有參構造函數允許設置 “是否禁用忽略空值語義”。默認是開啓的,表現行爲是如果是 null 值,那麼 TransmittableThreadLocal 是不會傳遞這個值,並且如果 set null,同時執行 remove 操作。表達的意思就是,“我不要 null,不歸我管。你敢給我,我就再也不管你了“。
這樣設計可能是因爲一開始設計服務於業務,是希望業務不要通過 NULL 來表達任何含義,同時避免 NPE 和優化 GC。但是後來官方考慮到作爲一個基礎服務框架,應該儘量保證完整的語義。畢竟這樣的特性是 JDK 的 ThreadLocal 不兼容的。因此後來,官方爲了保證兼容性,加了控制參數,允許禁用該特性。
TtlCopier
TransmittableThreadLocal 實現了一個類,TtlCopier。顧名思義,該類定義了線程傳遞時,值複製的抽象語義。
public interface TtlCopier<T> { T copy(T parentValue); }
而 TransmittableThreadLocal 的默認實現是與 InheritableThreadLocal 相同的,返回值的引用。
public T copy(T parentValue) { return parentValue; }
同時,該接口也爲業務方留下了擴展點。開發者可以重寫該方法,來定義線程傳遞時,如何進行值的複製。
TransmittableThreadLocal 內部維護了一個非常關鍵的屬性,用來註冊項目中維護的 TransmittableThreadLocal,從而保證 Transmitter 去正確傳遞 ThreadLocal 的值。
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); } };
holder 是一個 InheritableThreadLocal,用來保存所有註冊的 TransmittableThreadLocal。父子線程傳遞時,可以直接將父線程的註冊表傳遞過來。使用 InheritableThreadLocal,主要保證了嵌套線程場景下,註冊表的正確傳遞。官方有個 issue 以及爲其 fix 的 release 版本,從 ThreadLocal 改成了 InheritableThreadLocal。嵌入Thread調用的bug
其次,存儲的是 WeakHashMap
,value 都是無意義的 null,並且永遠不會被使用。這樣一來,保證項目使用 TransmittableThreadLocal 的話,不會引入新的內存泄漏問題。其內存泄漏的可能風險,就只完全來自於 InheritableThreadLocal 本身。
@Override public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder(); return value; } @Override public final void set(T value) { if (!disableIgnoreNullValueSemantics && null == value) { // may set null to remove value remove(); } else { super.set(value); addThisToHolder(); } } @Override public final void remove() { removeThisFromHolder(); super.remove(); } @SuppressWarnings("unchecked") private void addThisToHolder() { if (!holder.get().containsKey(this)) { holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value. } } private void removeThisFromHolder() { holder.get().remove(this); }
get & set 會將當前的 TransmittableThreadLocal 註冊到 holder 中, remove 時,會刪除對應註冊。
可以看到,前文說到的 disableIgnoreNullValueSemantics
的值在 get 和 set 時使用到。默認爲 false 時,ThreadLocal 不會保存 null,holder 不會註冊對應的 TransmittableThreadLocal。
TransmittableThreadLocal 就這樣沒了,可以看到就很簡單。但是,線程傳遞的內容呢,爲什麼沒有?
這是因爲,TransmittableThreadLocal 將線程傳遞的所有工作全部委託給了其靜態內部類 Transmitter。
Transmitter
我們講到 TransmittableThreadLocal 會將有值的對象,註冊到 holder 中,以便 Transmitter 去知道傳遞哪一些實例的值。但是如果這樣,那不是都要修改代碼,將項目中的 ThreadLocal 都改掉嗎?
這當然不可能,因此 Transmitter 承擔了這個任務,允許業務代碼將原有的 ThreadLocal 註冊進來,以方便 Transmitter 來識別和傳遞。
// 註冊 ThreadLocal 的 threadLocalHolder 依然是 WeakHashMap private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(); // ThreadLocal 手動註冊時用的鎖 private static final Object threadLocalHolderUpdateLock = new Object(); // 標記 ThreadLocal 的值已清除,類似於設置一個 null private static final Object threadLocalClearMark = new Object(); // 傳遞 TtlCopier,來確定 threadLocal 傳遞值的方式。默認是 引用傳遞,與 TransmittableThreadLocal 的 copy 一致。 public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier) { return registerThreadLocal(threadLocal, copier, false); } @SuppressWarnings("unchecked") public static <T> boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal<T> threadLocal) { // 默認是內部定義個 shadowCopier return registerThreadLocal(threadLocal, (TtlCopier<T>) shadowCopier, false); } public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier, boolean force) { // 如果是 TransmittableThreadLocal,則沒有必要再維護了。默認就實現了其的傳遞。 if (threadLocal instanceof TransmittableThreadLocal) { logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!"); return true; } synchronized (threadLocalHolderUpdateLock) { // force 爲 false,則不會更新對應的 copier if (!force && threadLocalHolder.containsKey(threadLocal)) return false; // copy on write WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder); newHolder.put((ThreadLocal<Object>) threadLocal, (TtlCopier<Object>) copier); threadLocalHolder = newHolder; return true; } } public static <T> boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal<T> threadLocal, boolean force) { return registerThreadLocal(threadLocal, (TtlCopier<T>) shadowCopier, force); } // 清除 ThreadLocal 的註冊 public static <T> boolean unregisterThreadLocal(@NonNull ThreadLocal<T> threadLocal) { if (threadLocal instanceof TransmittableThreadLocal) { logger.warning("unregister a TransmittableThreadLocal instance, this is unnecessary!"); return true; } synchronized (threadLocalHolderUpdateLock) { if (!threadLocalHolder.containsKey(threadLocal)) return false; WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder); newHolder.remove(threadLocal); threadLocalHolder = newHolder; return true; } } // 默認實現的 TtlCopier,直接引用傳遞 private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() { @Override public Object copy(Object parentValue) { return parentValue; } };
其實我自己有個想不明白的,既然已經用了 threadLocalHolderUpdateLock 做鎖,爲什麼還要用 copy on write?GC 友好?mark 一下。
剩下的部分,就是 Transmitter 怎麼傳遞 ThreadLocal 的值了。
實際就是三個步驟,capture -> reply -> restore,crr。
1.抓取當前線程的值快照
// 快照類,用來保存當前線程的 TtlThreadLocal 和 ThreadLocal 的快照 private static class Snapshot { final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value; final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value; private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) { this.ttl2Value = ttl2Value; this.threadLocal2Value = threadLocal2Value; } } public static Object capture() { // 抓取快照 return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } // 抓取 TransmittableThreadLocal 的快照 private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); // 從 TransmittableThreadLocal 的 holder 中,遍歷所有有值的 TransmittableThreadLocal,將 TransmittableThreadLocal 取出和值複製到 Map 中。 for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } // 抓取註冊的 ThreadLocal。 private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() { final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>(); // 從 threadLocalHolder 中,遍歷註冊的 ThreadLocal,將 ThreadLocal 和 TtlCopier 取出,將值複製到 Map 中。 for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); final TtlCopier<Object> copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; }
2.將快照重做到執行線程
@NonNull public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } // 重播 TransmittableThreadLocal,並保存執行線程的原值 @NonNull private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) { WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // 遍歷 holder,從 父線程繼承過來的,或者之前註冊進來的 backup.put(threadLocal, threadLocal.get()); // clear the TTL values that is not in captured // avoid the extra TTL values after replay when run task // 清除本次沒有傳遞過來的 ThreadLocal,和對應值。畢竟一是可能會有因爲 InheritableThreadLocal 而傳遞並保留的值。二來保證主線程 set 過的 ThreadLocal,不應該被傳遞過來。明確,其傳遞是由業務代碼控制的,就是明確 set 過值的。 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 將 map 中的值,設置到 ThreadLocal 中。 setTtlValuesTo(captured); // TransmittableThreadLocal 的回調方法,在任務執行前執行。 doExecuteCallback(true); return backup; } private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = entry.getKey(); // set 的同時,也就將 TransmittableThreadLocal 註冊到當前線程的註冊表了。 threadLocal.set(entry.getValue()); } } private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) { final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); backup.put(threadLocal, threadLocal.get()); final Object value = entry.getValue(); // 如果值是標記已刪除,則清除 if (value == threadLocalClearMark) threadLocal.remove(); else threadLocal.set(value); } return backup; }
doExecuteCallback 是 TransmittableThreadLocal 定義的回調方法,保證任務執行前和執行後的回調動作。
isBefore 控制是執行前還是執行後。
內部調用了 beforeExecute 和 afterExecute 方法。默認是不做任何動作。
private static void doExecuteCallback(boolean isBefore) { for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { try { if (isBefore) threadLocal.beforeExecute(); else threadLocal.afterExecute(); } catch (Throwable t) { // 忽略所有異常,保證任務的執行 if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t); } } } } protected void beforeExecute() { } protected void afterExecute() { }
3.恢復備份的原快照
public static void restore(@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) { // call afterExecute callback 任務執行完回調 doExecuteCallback(false); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // clear the TTL values that is not in backup // avoid the extra TTL values after restore // 恢復快照時,清除本次傳遞註冊進來,但是原先不存在的 TransmittableThreadLocal if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // restore TTL values // 恢復快照中的 value 到 TransmittableThreadLocal 中 setTtlValuesTo(backup); } private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> backup) { for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } }
對特殊場景以及 Lambda 的支持
Transmitter 定義了幾個特殊場景下以及 Java 8 lambda 表達式的使用。
特殊場景就是指,執行前,清除當前執行線程 ThreadLocal 的值,包括 TtlThreadLocal 和註冊 ThreadLocal 。
像一開始講到的業務代碼喜歡使用 Supplier,所以也對其做了支持。本質是爲了簡化工作。
不過,注意的是,快照的捕獲則需要業務代碼自己完成並傳遞。
public static <R> R runSupplierWithCaptured(@NonNull Object captured, @NonNull Supplier<R> bizLogic) { Object backup = replay(captured); try { return bizLogic.get(); } finally { restore(backup); } } public static <R> R runSupplierWithClear(@NonNull Supplier<R> bizLogic) { Object backup = clear(); try { return bizLogic.get(); } finally { restore(backup); } } public static <R> R runCallableWithCaptured(@NonNull Object captured, @NonNull Callable<R> bizLogic) throws Exception { Object backup = replay(captured); try { return bizLogic.call(); } finally { restore(backup); } } public static <R> R runCallableWithClear(@NonNull Callable<R> bizLogic) throws Exception { Object backup = clear(); try { return bizLogic.call(); } finally { restore(backup); } }
簡化方法,使用起來也就是:
// 線程A Object captured = Transmitter.capture(); // 線程B @Async String result = runSupplierWithCaptured(captured, () -> { System.out.println("Hello"); ... return "World"; });
否則只能按照全套流程了:
// 線程A Object captured = Transmitter.capture(); // 線程B @Async String result = runSupplierWithCaptured(captured, () -> { System.out.println("Hello"); ... return "World"; }); Object backup = Transmitter.replay(captured); // (2) try { System.out.println("Hello"); // ... return "World"; } finally { // restore the TransmittableThreadLocal of thread B when replay Transmitter.restore(backup); (3)
Clear
上面可以看到,一些方法是做了 clear 操作。
就是不依賴快照的捕獲,將空值的快照信息,傳遞給重做方法執行,就能清除當前執行線程的值,並得到返回原值備份。
public static Object clear() { final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); // threadLocalClearMark 標記爲未被傳遞和註冊,更爲合適,從而避免和 null 混淆。否則無法區分原有就是 null,還是未被註冊 threadLocal2Value.put(threadLocal, threadLocalClearMark); } return replay(new Snapshot(ttl2Value, threadLocal2Value)); }
注意
如果注意到 TransmittableThreadLocal 是繼承 InheritableThreadLocal,就應該知道,子線程創建時,值還是會被傳遞過去。這也就可能帶來內存泄漏問題。
所以,同時提供 DisableInheritableThreadFactoryWrapper,以方便業務代碼自定義線程池,禁止值的繼承傳遞。
class DisableInheritableThreadFactoryWrapper implements DisableInheritableThreadFactory { private final ThreadFactory threadFactory; DisableInheritableThreadFactoryWrapper(@NonNull ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public Thread newThread(@NonNull Runnable r) { // 調用了 Transmitter 的 clear 方法,在創建子線程前,清除當前線程的值,並保存下來 final Object backup = clear(); try { return threadFactory.newThread(r); } finally { // 創建完,再重新恢復。以此,避免了值的繼承傳遞。 restore(backup); } } @NonNull @Override public ThreadFactory unwrap() { return threadFactory; } }
對於 1.8 特性,還提供了 ForkJoinWorkerThreadFactory 和 TtlForkJoinPoolHelper 等類的支持。
Java Agent 支持
避免代碼改動的話,可以使用 Java Agent,來隱式替換 JDK 的相應類。對於 1.8 的 CompletableFuture 和 Stream,在底層通過對 ForkJoinPool 的支持,也做了透明支持。
總結
到此,TransmittableThreadLocal 的源碼解析就結束了。核心源碼是不是很簡單?但是某些思想和考量還是很值得學習的。
ThreadLocal 的使用,本身類似於全局變量,而且是可修改的。一旦中間過程被修改,就無法保證整體流程的前後一致性。它將是一個隱藏的強依賴,一個可能被忽略、意想不到的坑。(我不承認,我在還原大佬的話。)
應該儘量避免在業務代碼中使用的。 DO NOT use, only when you know why .
嗯,還有加上一句,讓其他人也明白,文檔務必齊全。(說實話,我挺想用英文的,想想算了)。
全文完
以下文章您可能也會感興趣:
我們正在招聘 Java 工程師,歡迎有興趣的同學投遞簡歷到 [email protected] 。