Kafka 是一個高度可擴展的分佈式消息系統,在海量數據處理生態中佔據着重要的地位。

數據處理的一個關鍵特性是數據的一致性。具體到 Kafka 的領域中,也就是生產者生產的數據和消費者消費的數據之間一對一的一致性。在各種類型的失敗普遍存在的分佈式系統環境下,保證業務層面一個整體的消息集合被原子的發佈和恰好一次處理,是數據一致性在 Kafka 生態系統的實際要求。

本文介紹了 Kafka 生態中的事務機制的概念和流程。

Kafka 事務機制的概念

Kafka 從 0.11 版本開始支持了事務機制。Kafka 事務機制支持了跨分區的消息原子寫功能。具體來說,Kafka 生產者在同一個事務內提交到多個分區的消息,要麼同時成功,要麼同時失敗。這一保證在生產者運行時出現異常甚至宕機重啓之後仍然成立。

此外,同一個事務內的消息將以生產者發送的順序,唯一地提交到 Kafka 集羣上。也就是說,事務機制從某種層面上保證了消息被恰好一次地提交到 Kafka 集羣。衆所周知,恰好一次送達在分佈式系統中是不可能實現的。這個論斷有一些微妙的名詞重載問題,但大抵沒錯,所有聲稱能夠做到恰好一次處理的系統都在某個地方依賴了冪等性。

Kafka 的事務機制被廣泛用於現實世界中複雜業務需要保證一個業務領域中原子的概念被原子地提交的場景。

例如,一次下單流水包括訂單生成消息和庫存扣減消息,如果這兩個消息在歷史上由兩個主題分管,那麼它們在業務上的原子性就要求 Kafka 要利用事務機制原子地提交到 Kafka 集羣上。

還有,對於複雜的流式處理系統,Kafka 生產者的上游可能是另一個流式處理系統,這個系統可能有着自己的一致性方案。爲了跟上游系統的一致性方案協調,Kafka 就需要提供一個儘可能通用且易於組合的一致性機制,即靈活的事務機制,來幫助實現端到端的一致性。

Kafka 事務機制的流程

分佈式系統的數據一致性是難的。要想理解一個系統提供何種程度的數據一致性保證,以及這樣的保證對應用程序提出了什麼樣的要求,再及在哪些情況下一致性保證會出現什麼方面的回退,細究其一致性機制的實現是必須的。

上面我們提到,事務機制的核心特徵是能跨越多個分區原子地提交消息集合,甚至這些分區從屬於不同的主題。同時,被提交的消息集合中的消息每條僅被提交一次,並保持它們在生產者應用中被生產的順序寫入到 Kafka 集羣的消息日誌中。此外,事務能夠容忍生產者運行時出現異常甚至宕機重啓。

實現事務機制最關鍵的概念就是事務的唯一標識符( TransactionalID ),Kafka 使用 TransactionalID 來關聯進行中的事務。TransactionalID 由用戶提供,這是因爲 Kafka 作爲系統本身無法獨立的識別出宕機前後的兩個不同的進程其實是要同一個邏輯上的事務。

對於同一個生產者應用前後進行的多個事務,TransactionalID 並不需要每次都生成一個新的。這是因爲 Kafka 還實現了 ProducerID 以及 epoch 機制。這個機制在事務機制中的用途主要是用於標識不同的會話,同一個會話 ProducerID 的值相同,但有可能有多個任期。ProducerID 僅在會話切換時改變,而任期會在每次新的事物初始化時被更新。這樣,同一個 TransactionalID 就能作爲跨會話的多個獨立事務的標識。

接下來,我們從一個事務的完整流程出發討論客戶端也就是生產者和消費者,以及服務端也就是 Kafka 集羣在這個流程中扮演了什麼角色,執行了什麼動作。

初始化事務上下文

邏輯上說,事務總是從生產者提起的。生產者通過調用 initTransactions 方法初始化事務上下文。首要做的事情就是找到 Kafka 集羣負責管理當前事務的事務協調者( TransactionCoordinator ),向其申請 ProducerID 資源。初始的 ProducerID 及 epoch 都是未初始化的狀態。

生產者一側的事務管理者( TransactionManager )收到相應的方法調用之後先後發送查找事務協調者的信息和初始化 ProducerID 的信息。事務相關的所有元數據信息都會由客戶端即生產者一側的事務管理者和服務端即 Kafka 集羣的一個 Broker 上的事務協調者交互完成。

一開始,生產者並不知道哪個 Broker 上有自己 TransactionalID 關聯的事務協調者。邏輯上,所有事務相關的需要持久化的數據最終都會寫到一個特殊的主題 __transaction_state 上。這跟前面回答消費位點管理文章中的管理消費者消費位點的特殊主題 __consumer_offsets 構成了目前 Kafka 系統裏唯二的特殊主題。

對於一個生產者或者說被 TransactionalID 唯一標識的事務來說,它的事務協調者就是該事務的元數據最終存儲在 __transaction_state 主題上對應分區的分區首領。對於一個具體的事務來說,它的元數據將被其 TransactionalID 的哈希值的絕對值模分區數的分區所記錄,這也是常見的確定分區的方案。

生產者將查找事務協調者的信息發送到集羣的任意一個 Broker 上,由它計算出實際的事務協調者,獲取對應的節點信息後返回給生產者。這樣,生產者就找到了事務協調者。

隨後,生產者會向事務協調者申請一個 ProducerID 資源,這個資源包括 ProducerID 和對應的 epoch 信息。事務協調者收到對應請求後,將會首先判斷同一個 TransactionalID 下的事務的狀態,以應對好跨會話的事務的管理。

第一步,事務協調者會獲取 TransactionalID 對應的事務元數據信息。前面提到,這些元數據信息將被寫在特殊主題 __transaction_state 上,這也是事務元數據信息對生產者和 Kafka 集羣都容錯的需要。

如果獲取不到元數據信息,那麼就初始化事務元數據信息,包括從獲取一個新的 ProducerID 資源,並將它和 TransactionalID 以及分區編號和其他一些配置信息一起打包持久化。

其中,獲取一個新的 ProducerID 資源需要 ProducerID 管理器從 ZooKeeper 上申請一個 ProducerID 的號段,在逐一的分配出去。申請號段的手段是修改 ZooKeeper 上 /latest_producer_id_block 節點的信息,流程是讀節點上最後一個被申請的 ProducerID 的信息,加上要申請的號段的長度,再更新節點上最後一個被申請的 ProducerID 的信息。由於 ZooKeeper 對節點的更新有版本控制,因此併發的請求將會導致其中若干個請求目標版本失配,並提起重試。ProducerID 的長度是 Long 類型的長度,因此在實際使用過程中幾乎不可能用完,Kafka 對號段資源耗盡的情況拋出致命錯誤並不嘗試恢復。

如果獲取到了相同 TransactionalID 先前的元數據信息,那麼根據事務協調器事務先前的狀態採取不同的行爲。

  1. 如果此時狀態轉移正在進行,直接返回 CONCURRENT_TRANSACTIONS 異常。 注意這裏是事務協調器上正在 發生併發的狀態轉移。 通常來說,併發的狀態轉移應該依次執行,直接返回此異常可避免客戶端即生產者請求超時,而是讓生產者稍後自行重試。 這也是一種樂觀的加策略。

  2. 如果此時狀態爲 PrepareAbort 或 PrepareCommit 則返回 CONCURRENT_TRANSACTIONS 異常。 樣的,此時狀態即將轉換爲終結狀態,無需強行終止先前的事務,否則將會產生無謂的浪費。

  3. 如果此時狀態爲 Dead 或 PrepareEpochFence 或當前 ProducerID 和 epoch 對不上,直接拋出不可重試的異常。 這是由於要麼是 先前的 Prod ucer 且已經被新的 Producer 替代,要麼事務已經超時,無需再次嘗試。

  4. 如果此時狀態爲 Ongoing 則事務協調者會將事務轉移到 PrepareEpochFence 狀態,然後再丟棄當前的事務,並返回 CONCURRENT_TRANSACTIONS 異常。

  5. 如果此時狀態爲 CompleteAbort 或 CompleteCommit 或 Empty 之一那麼先將狀態轉移爲 Empty 然後更新 epoch 值。

經過這麼一連環的操作,Kafka 就將事務執行的上下文初始化好了。

開始一個事務

初始化事務的流程實際上是生產者和對應的事務協調者就事務狀態達成一致,進入到一個可以提起新的事務的狀態。此時,生產者可以通過 beginTransaction 方法開始一個事務操作。這個方法只會將本地事務狀態轉移到 IN_TRANSACTION 狀態,在真正的提交事務中的消息之前,不會有跟 Kafka 集羣的交互。

生產者將自己標記爲開始事務之後,也就是本地事務狀態轉移到事務進行中的狀態之後,就可以開始發送事務中的消息了。

發送事務中的消息

生產者在發送事務中的消息的時候,會將消息對應的分區添加到事務管理器中去,如果這個分區此前沒被添加過,那麼事務管理器會在下一次發送消息之前插入一條 AddPartitionsToTxnRequest 請求來告訴 Kafka 集羣的事務協調者參與事務的分區的信息。事務協調者收到這條信息之後,將會更新事務的元數據,並將元數據持久化到 __transaction_state 中。

對於生產者發送的消息,仍然和一般的消息生產一樣採用 ProduceRequest 請求。除了會在請求中帶上相應的 TransactionalID 信息和屬於事務中的消息的標識符,它跟生產者生產的普通信息別無二致。如果消費者沒有配置讀已提交的隔離級別,那麼這些消息在被 Kafka 集羣接受並持久化到主題分區中時,就已經對消費者可見而且可以被消費了。

事務中的消息的順序性保證也是在發送事務的時候檢查的。

生產者此時已經申請到了一個 ProducerID 資源,當它向一個分區發送消息時,內部會有一個消息管理器爲每個不同的分區維護一個順序編號( SequenceNumber )。相應地,Kafka 集羣也會爲每個 ProducerID 到每個分區的消息生產維護一個順序編號。

ProducerRequest 請求中包含了順序編號信息。如果 Kafka 集羣看到請求的順序編號跟自己的順序編號是連續的,即比自己的順序編號恰好大一,那麼接受這條消息。否則,如果請求的順序編號大一以上,則說明是一個亂序的消息,直接拒絕並拋出異常。如果請求的順序編號相同或更小,則說明是一個重複發送的消息,直接忽略並告訴客戶端是一個重複消息。

提交事務

在一個事務相關的所有消息都發送完畢之後,生產者就可以調用 commitTransaction 方法來提交整個事務了。對於事務中途發生異常的情形,也可以通過調用 abortTransaction 來丟棄整個事務。這兩個操作都是將事務狀態轉移到終結狀態,彼此之間有許多相似點。

無論是提交還是丟棄,生產者都是給事務協調者發送 EndTxnRequest 請求,請求中包含一個字段來判斷是提交還是丟棄。事務協調者在收到這個請求後,首先更新事務狀態到 PrepareAbort 或 PrepareCommit 並更新狀態到 __transaction_state 中。

如果在狀態更新成功前事務協調者宕機,那麼恢復過來的事務協調者將認爲事務在 Ongoing 狀態中,此時生產者由於收不到確認回覆,會重試 EndTxnRequest 請求,並最終更新事務到 PrepareAbort 或 PrepareCommit 狀態。

隨後,根據是提交還是丟棄,分別向事務涉及到的所有分區的分區首領發送事務標誌( TransactionMarker )。

事務標誌是 Kafka 事務機制引入的不同於業務消息的事務控制消息。它的作用主要是標識事務已經完成,這個消息同業務消息一樣能夠被消費者所消費,並且它和事務中的業務消息能夠通過 TransactionalID 關聯起來,從而支持配置了讀已提交特性的消費者忽略尚未提交的事務消息或被丟棄的事務消息。

如果在事務標誌寫到涉及到的所有分區的分區首領之前,事務協調者宕機或者分區首領宕機或網絡分區,新起來的事務協調者或超時後重試的事務協調者會重新向分區首領寫入事務標誌。事務標誌是冪等的,因此不會影響事務提交的結果。這裏我們印證了之前所說的所有聲稱能夠做到恰好一次處理的系統都在某個地方依賴了冪等性。

在當前事務涉及到的所有分區都已經把事務標誌信息持久化到主題分區之後,事務協調者纔會將這個事務的狀態置爲提交或丟棄,並持久化到事務日誌文件中。在這之後,一個 Kafka 事務纔算真正的完成了。事務協調者中緩存的關於當前事務的元數據就可以清理了。

如果在事務協調者回復生產者提交成功之前宕機,在恢復之後生產者再次提交事務時會直接返回事務提交成功。

總的來說,事務的狀態以 __transaction_state 主題上持久化的元數據信息爲準。

超時過期事務

分佈式系統由於天然的網絡阻塞或分區等失敗原因,操作在成功和失敗之外還有超時這第三種狀態。現實中的分佈式系統必須合理地處理超時的狀態,否則永久阻塞或等待在任何實際的業務領域中都是不可接受的。

Kafka 事務機制本身可以配置事務超時,在事務管理者和事務協調者交互的各個過程中都會檢驗事務超時的配置,如果事務已經超時則拋出異常。

但是,在網絡分區的情況下,可能 Kafka 集羣根本就等不到生產者發送的消息。這個時候,Kafka 集羣就需要相應的機制來主動過期。否則永不過期的中間狀態事務在生產者宕機且不可恢復或不再恢復的情況下將逐步積累成存儲垃圾。

Kafka 集羣會週期性的輪詢內存中的事務信息。如果發現進行中的事務最後的狀態更新時間距今已經超過了配置的集羣事務清理時間閾值,則採取丟棄該事務的操作。同時,爲了避免操作過程中併發地收到原 Producer 發來事務更新請求,首先更新事務關聯的 ProducerID 的 epoch 以將原 Producer 的 epoch 隔離掉。換個角度說,也就是以一個新的有效的身份執行丟棄事務操作,以免分不清到底是誰在丟棄事務。

此外,輪詢中還會檢查 TransactionalID 最新的事務信息,如果一個 TransactionalID 最後一個事務距今已經已經超過了配置的集羣 TransactionalID 清理時間閾值,則將該 TransactionalID 對應的元數據信息都進行清理。

上面的討論中還有兩個重要的主題被忽略了。一個是 Kafka 事務機制支持在同一個事務裏進行消息生產和消息消費位點提交,另一個是配置了讀已提交的消費者如何在事務未提交以及丟棄事務時正確的讀取事務中消息。

前者不是特別複雜,只需要將消費位點提交視作一條事務中的消息,和消息生產以及控制消息同等待遇,在提交的時候也被事務標誌所界定即可。

不展開聊是因爲這個特性通常只在僅適用 Kafka 搭建流式處理流水線的場景下有用,尤其是 Kafka Streams 解決方案。

對於組合多個系統的流式處理流水線來說,消息從 Kafka 中消費得到是上游,生產到 Kafka 上是下游,中間是另一個例如 Flink 的流式計算系統。在這種場景下,消費位點的管理和事務地生產消息是兩個可以分開考慮的事情,可以跟其他系統的一致性方案例如 Flink 的 Checkpoint 機制相結合,而不需要非得在同一個事務裏既提交消費位點,又提交新的消息。

後者主要靠 Kafka 集羣在管理消費位點拉取請求的時候,通過隨事務機制的引入新添加的 LastStableOffset 概念來響應配置爲讀已提交的消費者的請求。在事務完成之前不會允許讀已提交的消費者拉取事務中的消息。顯然,這有可能導致消費者拉取新消息時長時間的阻塞。因此在實踐中應當儘量避免長時間的事務。

對於丟棄事務的消息,Kafka 集羣會維護一個丟棄事務的消息的元數據,從而支持消費者同時拉取消息和丟棄事務的消息的元數據,自行比對篩掉丟棄事務的消息。在正常的業務場景裏,丟棄的事務不會太多,從而維護這樣的一份元數據以及讓消費者自行篩選會是一個能夠接受的選擇。

相關文章