摘要:// 快照類,用來保存當前線程的 TtlThreadLocal 和 ThreadLocal 的快照 private static class Snapshot { final WeakHashMap, Object> ttl2Value。} @Override public Thread newThread(@NonNull Runnable r) { // 調用了 Transmitter 的 clear 方法,在創建子線程前,清除當前線程的值,並保存下來 final Object backup = clear()。

作者 | 姜日遊

杏仁 Java 工程師。今日持續在線的程序玩家。

我們都知道 ThreadLocal 作爲一種多線程處理手段,將數據限制在當前線程中,避免多線程情況下出現錯誤。

一般的使用場景大多會是服務上下文、分佈式日誌跟蹤。

但是在業務代碼中,爲了提高響應速度,將多個複雜、長時間的計算或調用過程異步進行,讓主線程可以先進行其他操作。像我們項目中最常用的就是  CompletableFuture  了,默認會使用預設的  ForkJoin ThreadPool  執行。

這也就引入了一個問題,如果保證 ThreadLocal 的信息能夠傳遞異步線程?通過 ThreadLocal?通過線程池?通過 Runnable 或者 Callable?

有些場景丟了就丟了,比如目前我們的服務上下文傳遞,一般都沒有很嚴謹的處理 ......

但是,如果是分佈式追蹤的場景,丟了就要累慘了。

注:以下代碼僅保留關鍵代碼,其餘無關緊要則忽略

InheritableThreadLocal

InheritableThreadLocal  是 JDK 本身自帶的一種線程傳遞解決方案。顧名思義,由當前線程創建的線程,將會繼承當前線程裏 ThreadLocal 保存的值。

其本質上是 ThreadLocal 的一個子類,通過覆寫父類中創建初始化的相關方法來實現的。我們知道,ThreadLocal 實際上是 Thread 中保存的一個  ThreadLocalMap  類型的屬性搭配使用才能讓廣大 Javaer 直呼真香的,所以 InheritableThreadLocal 也是如此。

public class Thread implements Runnable {
    // 如果單純使用 ThreadLocal,則 Thread 使用該屬性值保存 ThreadLocalMap
    ThreadLocal.ThreadLocalMap threadLocals = null;
        // 否則使用該屬性值
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
  
    private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc) {
          Thread parent = currentThread();

          if (parent.inheritableThreadLocals != null)
              this.inheritableThreadLocals =
                  ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
      }
}

init  方法作爲 Thread 初始化的核心方法,相關 ThreadLocal 代碼已經全部摘出。如我們所見,僅僅就只是這一點改動。在創建線程時,如果當前線程的 inheritableThreadLocals 不爲空,則根據它創建出新的 InheritableThreadLocals 保存到新線程中。

Ps : ThreadLocal 作爲老牌選手,默認都是使用時,直接初始化 Thread 的 threadLocals 屬性。

只有像是 InheritableThreadLocal 這樣的後輩,需要特殊處理一下。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    
    protected T childValue(T parentValue) {
        return parentValue;
    }
  
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

   // Thread 中 ThreadLocalMap 不存在時的初始化動作,需要改爲初始化 inheritableThreadLocals
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

因此,原先 ThreadLocal 會從 Thread 的 threadLocals 獲取 Map,那麼 InheritableThreadLocal 就要從 inheritableThreadLocals 拿了。 childValue  方法用作從父線程中獲取值,可以看到,這邊是直接返回的,如果是複雜對象,就直接傳引用了。當然,繼承覆寫該方法,可以實現淺拷貝、深拷貝等等方式。

缺點

這樣的方式解決了創建線程時的 ThreadLocal 傳值的問題,但不可能一直創建新的線程,那實在耗費資源。因此通用做法是線程複用,比如線程池唄。但是,遞交異步任務是相應的 ThreadLocal 的值就無法傳遞過去了。

我們希望的是,異步線程執行任務的所使用的 ThreadLocal 值,是將任務提交給線程時主線程持有的。即從任務創建時傳遞到任務執行時。

想想,如果我們在創建異步任務時,在任務代碼外獲取當前線程的值臨時保存,再傳遞給執行線程,在真正的任務執行前保存到當前線程即可。對,確實可以,但是麻煩不?每個創建異步任務的地方都要寫。

那就把它封裝到遞交任務的方法中。

RunnableWrapper & CallableWrapper

假設按照服務上下文的場景舉例,目前項目中的執行異步操作的方案是定義一個  AsyncExecutor ,並聲明執行 Supplier 返回 CompletableFuture 的方法。

既然這樣就可以對方法做一些改造,保證上下文的傳遞。

private static ThreadLocal<String> contextHolder = new ThreadLocal<>();

public static <T> CompletableFuture<T> invokeToCompletableFuture(Supplier<T> supplier, String errorMessage) {
    // 第一步
    String context = contextHolder.get();
    Supplier<T> newSupplier = () -> {
         // 第二步
        String origin = contextHolder.get();
        try {
            contextHolder.set(context);
            // 第三步
            return supplier.get();
        } finally {
            // 第四步
            contextHolder.set(origin);
            log.info(origin);
        }
    };
    return CompletableFuture.supplyAsync(newSupplier).exceptionally(e -> {
        throw new ServerErrorException(errorMessage, e);
    });
}
// test code
public static void main(String[] args) throws ExecutionException, InterruptedException {
    contextHolder.set("main");
    log.info(contextHolder.get());
    CompletableFuture<String> context = invokeToCompletableFuture(() -> test.contextHolder.get(), "error");
    log.info(context.get());
}

總得來說,就是在將異步任務派發給線程池時,對其做一下上下文傳遞的處理。

第一步:主線程獲取上下文,傳遞給任務暫存。

1 之後的操作都將是異步執行線程操作的。

第二步:異步執行線程將原有上下文取出,暫時保存。並將主線程傳遞過來的上下文設置。

第三步:執行異步任務

第四步:將原有上下文設置回去。

可以看到一般並不會在異步線程執行完任務之後直接進行  remove 。而是一開始取出原上下文(可能爲 NULL,也可能是線程創建時 InheritableThreadLocal 繼承過來的值。當然後續也會被清除的),並在任務執行結束重新放回。這樣的方式可以說是異步 ThreadLocal 傳遞的標準範式(大佬說的)。

這樣子既起到了顯式清除主線程帶來的上下文,也避免瞭如果線程池的拒絕策略爲  CallerRunsPolicy ,後續處理時上下文丟失的問題。

Supplier 不算是典型例子,更爲典型的應該是 Runnable 和 Callable。不過舉一推三,都是修飾一下,再丟給線程池。

public final class DelegatingContextRunnable implements Runnable {

    private final Runnable delegate;

    private final Optional<String> delegateContext;

    public DelegatingContextRunnable(Runnable delegate,
                                       Optional<String> context) {
        assert delegate != null;
        assert context != null;

        this.delegate = delegate;
        this.delegateContext = context;
    }

    public DelegatingContextRunnable(Runnable delegate) {
        // 修飾原有的任務,並保存當前線程的值
        this(delegate, ContextHolder.get());
    }

    public void run() {
        Optional<String> originalContext = ContextHolder.get();

        try {
            ContextHolder.set(delegateContext);
            delegate.run();
        } finally {
            ContextHolder.set(originalContext);
        }
    }
}

public final void execute(Runnable task) {
  // 遞交給真正的執行線程池前,對任務進行修飾
  executor.execute(wrap(task));
}

protected final Runnable wrap(Runnable task) {
  return new DelegatingContextRunnable(task);
}

後續,使用線程池執行異步任務的時候,事先對任務進行封裝代理即可。

不過,還是比較麻煩。自定義的線程池,需要顯式處理任務。而且更嚴謹的做法,不同業務場景之間的線程池應該是隔離的,以免受到影響,就比如  Hystrix  的線程池。

每一個線程池都要處理就麻煩了。所以換個思路,代理線程池。

DelegaingExecutor

這個就不多說了,實際很簡單,就照搬我們上下文相關類庫。

public class DelegatingContextExecutor implements Executor  {

    private final Executor delegate;


    public DelegatingContextExecutor(Executor delegateExecutor) {
        this.delegate = delegateExecutor;
    }

    public final void execute(Runnable task) {
        delegate.execute(wrap(task));
    }

    protected final Runnable wrap(Runnable task) {
        return new DelegatingContextRunnable(task);
    }

    protected final Executor getDelegateExecutor() {
        return delegate;
    }
}
// 自定義的線程池,用於執行項目中的異步任務
public Executor queryExecutor() {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
    // 封裝服務上下文的線程池修飾
    return new DelegatingContextExecutorService(threadPoolExecutor);
}

問題似乎都解決了,那還有什麼?

對,適用場景不夠通用。上面的做法只針對於指定的 ThreadLocal,其他場景例如鏈路追蹤、應用容器或上層框架跨應用代碼給下層  SDK  傳遞信息(像是契約包  Feign  的執行線程)。

那麼  TransmittableThreadLocal  就是爲了解決通用化場景而設計的。

TransmittableThreadLocal

作爲一個核心代碼不超過一千行的工具框架,實際使用和架構設計都十分簡單。

其使用方法本質上與上述提到的 CallableWrapper 和 DelegatingExecutor 是一樣的,並且爲了方便使用,對外提供了靜態工廠方法或工具類。

public final void execute(Runnable task) {
  executor.execute(TtlCallable.get(task));
}
// 或者
public Executor queryExecutor() {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
    // 封裝服務上下文的線程池修飾
    return TtlExecutors.getTtlExecutorService(threadPoolExecutor);
}

當然,前提是 ThreadLocal 必須使用 TransmittableThreadLocal。至於爲什麼,我們源碼分析時再細細說來。

先看看核心實現類的結構,以 Callable 和 ExecutorService 爲例。

整體主要是三個部分:任務( TtlCallable )、線程池( ExecutorServiceTtlWrapper )、ThreadLocal( TransmittableThreadLocal )。其實對應上述講到的 CallableWrapper、DelegatingExecutor、InheritableThreadLocal。

但是無論是任務和線程池,本身還是依賴於 TransmittableThreadLocal 對於存儲值的管理。

用官方的時序圖直觀展示一下,框架是如何起作用的:

可以看到,從第三步創建完任務,第四步修飾完任務,後續大部分過程都依賴於 TransmittableThreadLocal 或 TransmittableThreadLocal 中聲明的靜態工具類  Transmitter 。Transmitter 主要負責 ThreadLocal 的管理和值的傳遞。

首先看看 TtlCallable。

TtlCallable

該類實際上是 JDK Callable 的一個修飾。類比於,上文講到的 RunnableWrapper,只是爲了臨時保存父線程 ThreadLocal 的值,以便在執行任務之前,賦值到子線程中。

因此,TtlCallable 和 TtlExecutorService 都實現了 TtlWrapper 接口。也許你以爲,該接口是實現修飾的語義,但是它只提供了一個方法,表達了拆修飾的語義:

public interface TtlWrapper<T> extends TtlEnhanced {
    @NonNull
    T unwrap();
}

畢竟核心是修飾,所以該類主要爲了提供修飾的核心抽象,便於框架對其進行判斷和管理。

該方法語義要求,必須返回修飾的源對象或下層對象(畢竟可能修飾了很多層),因此也是空值安全的。null 進來,null 出去。

public final class TtlCallable<V> implements Callable<V>, TtlWrapper<Callable<V>>, TtlEnhanced, TtlAttachments {
    // 保存父線程的 ThreadLocal 快照
    private final AtomicReference<Object> capturedRef;
    // 實際執行任務
    private final Callable<V> callable;
    // 判斷是否執行完,清除任務所保存的 ThreadLocal 快照
    private final boolean releaseTtlValueReferenceAfterCall;

    private TtlCallable(@NonNull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
        // 1.創建時, 從 Transmitter 抓取快照
        this.capturedRef = new AtomicReference<Object>(capture());
        this.callable = callable;
        this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
    }

    @Override
    public V call() throws Exception {
        Object captured = capturedRef.get();
        // 如果 releaseTtlValueReferenceAfterCall 爲 true,則在執行線程取出快照後清除。
        if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after call!");
        }
                // 2.使用 Transmitter 將快照重做到當前執行線程,並將原來的值取出
        Object backup = replay(captured);
        try {
            // 3.執行任務
            return callable.call();
        } finally {
            // 4.Transmitter 重新將原值放回執行線程
            restore(backup);
        }
    }
}

可以看到,從實例化到任務執行的順序,和上文講到的 CallableWrapper 是完全一致的。但是在其之上,提供了更爲完整的特性和線程安全性。

  • releaseTtlValueReferenceAfterCall  的可控,保證了任務執行完,依然被業務代碼持有的場景下,避免 ThreadLocal 快照繼續持有而造成的內存泄漏。畢竟,對於業務方來說,這個東西是我不關心的,無需跟隨任務本身的生命週期。

  • 快照使用  AtomicReference  保存,保證任務誤重用下,清除快照動作的多線程安全性。

上面兩者的合用,相當於期望一個任務只能被執行一次,儘量避免任務重用和繼續持有。

任務重用的間隔之間,可能出現 ThreadLocal 值被修改的情況,那麼後一次任務執行時,快照實際是不準確的。業務場景應該儘量避免這種情況出現纔對。

該類提供了靜態工廠方法,方便業務方創建。

public static <T> TtlCallable<T> get(@Nullable Callable<T> callable) {
    return get(callable, false);
}

@Nullable
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall) {
    return get(callable, releaseTtlValueReferenceAfterCall, false);
}

@Nullable
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {
    if (null == callable) return null;

    if (callable instanceof TtlEnhanced) {
        // avoid redundant decoration, and ensure idempotency
        if (idempotent) return (TtlCallable<T>) callable;
        else throw new IllegalStateException("Already TtlCallable!");
    }
    return new TtlCallable<T>(callable, releaseTtlValueReferenceAfterCall);
}

可以看到,默認工廠方法的  releaseTtlValueReferenceAfterCall  是 false。如果想要使用執行完清除,就要注意方法的使用。

其次,這裏還有一個冪等的參數控制:  idempotent  。如果傳入的 Callable 已經是修飾過的,那麼根據 idempotent 的值,要麼返回原 Callable,要麼報錯。

我覺得這裏有個兩難的點。

我們調用靜態工廠方法期望得到的是調用該方法時 ThreadLocal 的快照。所以理論上,應該無論傳入什麼 Callable,都應該返回一個保存當前本地線程值快照的 TtlCallable。

但是,如果這樣的邏輯下,傳入的是已修飾的類,那麼最後結果就是在任務執行時,會造成外層修飾的快照被內層修飾的覆蓋。實際使用的是之前保存的快照了。

因此默認情況就只能  FastFail

官方並不建議設置 idempotent 爲 true,因爲直接返回原修飾類,本身也就違反靜態工廠方法的語義。所以官方建議: <b>DO NOT</b> set, only when you know why.

ExecutorServiceTtlWrapper

該類並不需要多講,本身與上文的 DelegatingExecutor 一樣。

class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
    private final ExecutorService executorService;

    ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService) {
        super(executorService);
        this.executorService = executorService;
    }

    @NonNull
    @Override
    public <T> Future<T> submit(@NonNull Callable<T> task) {
        return executorService.submit(TtlCallable.get(task));
    }
}

其餘方法都是一樣的做法。

從上文看到,實際 ThreadLocal 的線程傳遞的核心在於 TransmittableThreadLocal 和 Transmitter。

TransmittableThreadLocal

TransmittableThreadLocal 只繼承了 InheritableThreadLocal 和實現了該框架提供的函數接口 TtlCopier。

因此 TransmittableThreadLocal 自身是一個 InheritableThreadLocal,同樣具備了線程創建時傳遞的特性。

其次,從類體系上看,TransmittableThreadLocal 自身是比較簡單的,本質上只是爲了讓框架能夠進行線程傳遞,做了一些小動作而已。

可以看到提供的方法是十分少的,源碼行數總共也纔不超過200行。

首先說一下構造函數。

private final boolean disableIgnoreNullValueSemantics;

public TransmittableThreadLocal() {
    this(false);
}

public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
    this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
}

一共兩個構造函數,有參構造函數允許設置 “是否禁用忽略空值語義”。默認是開啓的,表現行爲是如果是 null 值,那麼 TransmittableThreadLocal 是不會傳遞這個值,並且如果 set null,同時執行 remove 操作。表達的意思就是,“我不要 null,不歸我管。你敢給我,我就再也不管你了“。

這樣設計可能是因爲一開始設計服務於業務,是希望業務不要通過 NULL 來表達任何含義,同時避免 NPE 和優化 GC。但是後來官方考慮到作爲一個基礎服務框架,應該儘量保證完整的語義。畢竟這樣的特性是 JDK 的 ThreadLocal 不兼容的。因此後來,官方爲了保證兼容性,加了控制參數,允許禁用該特性。

TtlCopier

TransmittableThreadLocal 實現了一個類,TtlCopier。顧名思義,該類定義了線程傳遞時,值複製的抽象語義。

public interface TtlCopier<T> {
    T copy(T parentValue);
}

而 TransmittableThreadLocal 的默認實現是與 InheritableThreadLocal 相同的,返回值的引用。

public T copy(T parentValue) {
    return parentValue;
}

同時,該接口也爲業務方留下了擴展點。開發者可以重寫該方法,來定義線程傳遞時,如何進行值的複製。

TransmittableThreadLocal 內部維護了一個非常關鍵的屬性,用來註冊項目中維護的 TransmittableThreadLocal,從而保證 Transmitter 去正確傳遞 ThreadLocal 的值。

private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
        new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
            }

            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
            }
        };

holder 是一個 InheritableThreadLocal,用來保存所有註冊的 TransmittableThreadLocal。父子線程傳遞時,可以直接將父線程的註冊表傳遞過來。使用 InheritableThreadLocal,主要保證了嵌套線程場景下,註冊表的正確傳遞。官方有個 issue 以及爲其 fix 的 release 版本,從 ThreadLocal 改成了 InheritableThreadLocal。嵌入Thread調用的bug

其次,存儲的是  WeakHashMap ,value 都是無意義的 null,並且永遠不會被使用。這樣一來,保證項目使用 TransmittableThreadLocal 的話,不會引入新的內存泄漏問題。其內存泄漏的可能風險,就只完全來自於 InheritableThreadLocal 本身。

@Override
public final T get() {
    T value = super.get();
    if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
    return value;
}

@Override
public final void set(T value) {
    if (!disableIgnoreNullValueSemantics && null == value) {
        // may set null to remove value
        remove();
    } else {
        super.set(value);
        addThisToHolder();
    }
}

@Override
public final void remove() {
    removeThisFromHolder();
    super.remove();
}

@SuppressWarnings("unchecked")
private void addThisToHolder() {
    if (!holder.get().containsKey(this)) {
        holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
    }
}

private void removeThisFromHolder() {
    holder.get().remove(this);
}

get & set 會將當前的 TransmittableThreadLocal 註冊到 holder 中, remove 時,會刪除對應註冊。

可以看到,前文說到的  disableIgnoreNullValueSemantics  的值在 get 和 set 時使用到。默認爲 false 時,ThreadLocal 不會保存 null,holder 不會註冊對應的 TransmittableThreadLocal。

TransmittableThreadLocal 就這樣沒了,可以看到就很簡單。但是,線程傳遞的內容呢,爲什麼沒有?

這是因爲,TransmittableThreadLocal 將線程傳遞的所有工作全部委託給了其靜態內部類 Transmitter。

Transmitter

我們講到 TransmittableThreadLocal 會將有值的對象,註冊到 holder 中,以便 Transmitter 去知道傳遞哪一些實例的值。但是如果這樣,那不是都要修改代碼,將項目中的 ThreadLocal 都改掉嗎?

這當然不可能,因此 Transmitter 承擔了這個任務,允許業務代碼將原有的 ThreadLocal 註冊進來,以方便 Transmitter 來識別和傳遞。

// 註冊 ThreadLocal 的 threadLocalHolder 依然是 WeakHashMap
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
// ThreadLocal 手動註冊時用的鎖
private static final Object threadLocalHolderUpdateLock = new Object();
// 標記 ThreadLocal 的值已清除,類似於設置一個 null
private static final Object threadLocalClearMark = new Object();
// 傳遞 TtlCopier,來確定 threadLocal 傳遞值的方式。默認是 引用傳遞,與 TransmittableThreadLocal 的 copy 一致。
public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier) {
    return registerThreadLocal(threadLocal, copier, false);
}

@SuppressWarnings("unchecked")
public static <T> boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal<T> threadLocal) {
    // 默認是內部定義個 shadowCopier
    return registerThreadLocal(threadLocal, (TtlCopier<T>) shadowCopier, false);
}

public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier, boolean force) {
    // 如果是 TransmittableThreadLocal,則沒有必要再維護了。默認就實現了其的傳遞。
    if (threadLocal instanceof TransmittableThreadLocal) {
        logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!");
        return true;
    }
        
    synchronized (threadLocalHolderUpdateLock) {
        // force 爲 false,則不會更新對應的 copier
        if (!force && threadLocalHolder.containsKey(threadLocal)) return false;
                // copy on write
        WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder);
        newHolder.put((ThreadLocal<Object>) threadLocal, (TtlCopier<Object>) copier);
        threadLocalHolder = newHolder;
        return true;
    }
}

public static <T> boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal<T> threadLocal, boolean force) {
    return registerThreadLocal(threadLocal, (TtlCopier<T>) shadowCopier, force);
}
// 清除 ThreadLocal 的註冊
public static <T> boolean unregisterThreadLocal(@NonNull ThreadLocal<T> threadLocal) {
    if (threadLocal instanceof TransmittableThreadLocal) {
        logger.warning("unregister a TransmittableThreadLocal instance, this is unnecessary!");
        return true;
    }

    synchronized (threadLocalHolderUpdateLock) {
        if (!threadLocalHolder.containsKey(threadLocal)) return false;

        WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder);
        newHolder.remove(threadLocal);
        threadLocalHolder = newHolder;
        return true;
    }
}
// 默認實現的 TtlCopier,直接引用傳遞
private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
    @Override
    public Object copy(Object parentValue) {
        return parentValue;
    }
};

其實我自己有個想不明白的,既然已經用了 threadLocalHolderUpdateLock 做鎖,爲什麼還要用 copy on write?GC 友好?mark 一下。

剩下的部分,就是 Transmitter 怎麼傳遞 ThreadLocal 的值了。

實際就是三個步驟,capture -> reply -> restore,crr。

1.抓取當前線程的值快照

// 快照類,用來保存當前線程的 TtlThreadLocal 和 ThreadLocal 的快照
private static class Snapshot {
    final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;

    private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
        this.ttl2Value = ttl2Value;
        this.threadLocal2Value = threadLocal2Value;
    }
}

public static Object capture() {
    // 抓取快照
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 抓取 TransmittableThreadLocal 的快照
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
    // 從 TransmittableThreadLocal 的 holder 中,遍歷所有有值的 TransmittableThreadLocal,將 TransmittableThreadLocal 取出和值複製到 Map 中。
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
        ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
}

//  抓取註冊的 ThreadLocal。
private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
    // 從 threadLocalHolder 中,遍歷註冊的 ThreadLocal,將 ThreadLocal 和 TtlCopier 取出,將值複製到 Map 中。
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        final TtlCopier<Object> copier = entry.getValue();

        threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
}

2.將快照重做到執行線程

@NonNull
public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

// 重播 TransmittableThreadLocal,並保存執行線程的原值
@NonNull
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
    WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  
    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();

        // 遍歷 holder,從 父線程繼承過來的,或者之前註冊進來的
        backup.put(threadLocal, threadLocal.get());

        // clear the TTL values that is not in captured
        // avoid the extra TTL values after replay when run task
        // 清除本次沒有傳遞過來的 ThreadLocal,和對應值。畢竟一是可能會有因爲 InheritableThreadLocal 而傳遞並保留的值。二來保證主線程 set 過的 ThreadLocal,不應該被傳遞過來。明確,其傳遞是由業務代碼控制的,就是明確 set 過值的。
        if (!captured.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // 將 map 中的值,設置到 ThreadLocal 中。
    setTtlValuesTo(captured);

    // TransmittableThreadLocal 的回調方法,在任務執行前執行。
    doExecuteCallback(true);

    return backup;
}

private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
        TransmittableThreadLocal<Object> threadLocal = entry.getKey();
        // set 的同時,也就將 TransmittableThreadLocal 註冊到當前線程的註冊表了。
        threadLocal.set(entry.getValue());
    }
}

private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) {
    final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>();

    for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        backup.put(threadLocal, threadLocal.get());

        final Object value = entry.getValue();
        // 如果值是標記已刪除,則清除
        if (value == threadLocalClearMark) threadLocal.remove();
        else threadLocal.set(value);
    }

    return backup;
}

doExecuteCallback 是 TransmittableThreadLocal 定義的回調方法,保證任務執行前和執行後的回調動作。

isBefore 控制是執行前還是執行後。

內部調用了 beforeExecute 和 afterExecute 方法。默認是不做任何動作。

private static void doExecuteCallback(boolean isBefore) {
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
        try {
            if (isBefore) threadLocal.beforeExecute();
            else threadLocal.afterExecute();
        } catch (Throwable t) {
            // 忽略所有異常,保證任務的執行
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
            }
        }
    }
}
protected void beforeExecute() {
}

protected void afterExecute() {
}

3.恢復備份的原快照

public static void restore(@NonNull Object backup) {
    final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}

private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // call afterExecute callback 任務執行完回調
    doExecuteCallback(false);

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();

        // clear the TTL values that is not in backup
        // avoid the extra TTL values after restore
        // 恢復快照時,清除本次傳遞註冊進來,但是原先不存在的 TransmittableThreadLocal
        if (!backup.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // restore TTL values
    // 恢復快照中的 value 到 TransmittableThreadLocal 中
    setTtlValuesTo(backup);
}

private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
        TransmittableThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}

private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> backup) {
    for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}

對特殊場景以及 Lambda 的支持

Transmitter 定義了幾個特殊場景下以及 Java 8 lambda 表達式的使用。

特殊場景就是指,執行前,清除當前執行線程 ThreadLocal 的值,包括 TtlThreadLocal 和註冊 ThreadLocal 。

像一開始講到的業務代碼喜歡使用 Supplier,所以也對其做了支持。本質是爲了簡化工作。

不過,注意的是,快照的捕獲則需要業務代碼自己完成並傳遞。

public static <R> R runSupplierWithCaptured(@NonNull Object captured, @NonNull Supplier<R> bizLogic) {
    Object backup = replay(captured);
    try {
        return bizLogic.get();
    } finally {
        restore(backup);
    }
}

public static <R> R runSupplierWithClear(@NonNull Supplier<R> bizLogic) {
    Object backup = clear();
    try {
        return bizLogic.get();
    } finally {
        restore(backup);
    }
}

public static <R> R runCallableWithCaptured(@NonNull Object captured, @NonNull Callable<R> bizLogic) throws Exception {
    Object backup = replay(captured);
    try {
        return bizLogic.call();
    } finally {
        restore(backup);
    }
}

public static <R> R runCallableWithClear(@NonNull Callable<R> bizLogic) throws Exception {
    Object backup = clear();
    try {
        return bizLogic.call();
    } finally {
        restore(backup);
    }
}

簡化方法,使用起來也就是:

// 線程A
Object captured = Transmitter.capture();

// 線程B
@Async
String result = runSupplierWithCaptured(captured, () -> {
  
     System.out.println("Hello");
     ...
     return "World";
});

否則只能按照全套流程了:

// 線程A
Object captured = Transmitter.capture();

// 線程B
@Async
String result = runSupplierWithCaptured(captured, () -> {
  
     System.out.println("Hello");
     ...
     return "World";
});
Object backup = Transmitter.replay(captured); // (2)
try {
    System.out.println("Hello");
    // ...
    return "World";
} finally {
    // restore the TransmittableThreadLocal of thread B when replay
    Transmitter.restore(backup); (3)

Clear

上面可以看到,一些方法是做了 clear 操作。

就是不依賴快照的捕獲,將空值的快照信息,傳遞給重做方法執行,就能清除當前執行線程的值,並得到返回原值備份。

public static Object clear() {
    final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();

    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        // threadLocalClearMark 標記爲未被傳遞和註冊,更爲合適,從而避免和 null 混淆。否則無法區分原有就是 null,還是未被註冊
        threadLocal2Value.put(threadLocal, threadLocalClearMark);
    }

    return replay(new Snapshot(ttl2Value, threadLocal2Value));
}

注意

如果注意到 TransmittableThreadLocal 是繼承 InheritableThreadLocal,就應該知道,子線程創建時,值還是會被傳遞過去。這也就可能帶來內存泄漏問題。

所以,同時提供 DisableInheritableThreadFactoryWrapper,以方便業務代碼自定義線程池,禁止值的繼承傳遞。

class DisableInheritableThreadFactoryWrapper implements DisableInheritableThreadFactory {
    private final ThreadFactory threadFactory;

    DisableInheritableThreadFactoryWrapper(@NonNull ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @Override
    public Thread newThread(@NonNull Runnable r) {
        // 調用了 Transmitter 的 clear 方法,在創建子線程前,清除當前線程的值,並保存下來
        final Object backup = clear();
        try {
            return threadFactory.newThread(r);
        } finally {
            // 創建完,再重新恢復。以此,避免了值的繼承傳遞。
            restore(backup);
        }
    }

    @NonNull
    @Override
    public ThreadFactory unwrap() {
        return threadFactory;
    }
}

對於 1.8 特性,還提供了 ForkJoinWorkerThreadFactory 和 TtlForkJoinPoolHelper 等類的支持。

Java Agent 支持

避免代碼改動的話,可以使用 Java Agent,來隱式替換 JDK 的相應類。對於 1.8 的 CompletableFuture 和 Stream,在底層通過對 ForkJoinPool 的支持,也做了透明支持。

總結

到此,TransmittableThreadLocal 的源碼解析就結束了。核心源碼是不是很簡單?但是某些思想和考量還是很值得學習的。

ThreadLocal 的使用,本身類似於全局變量,而且是可修改的。一旦中間過程被修改,就無法保證整體流程的前後一致性。它將是一個隱藏的強依賴,一個可能被忽略、意想不到的坑。(我不承認,我在還原大佬的話。)

應該儘量避免在業務代碼中使用的。 DO NOT use, only when you know why .

嗯,還有加上一句,讓其他人也明白,文檔務必齊全。(說實話,我挺想用英文的,想想算了)。

全文完

以下文章您可能也會感興趣:

我們正在招聘 Java 工程師,歡迎有興趣的同學投遞簡歷到 [email protected]

相關文章