近日常有同學來問我如何閱讀代碼,關於這個問題的一般性答案我特別提了一個 問題 並自問自答。出於提供一個實際的例子的考量,正好此前綜合地閱讀 Spark 的 RPC 實現、Flink 基於 Akka 的 RPC 實現和 Actor Model 的通信模型,寫成本文分享我閱讀分佈式計算系統 Spark 和 Flink 中的 RPC 實現的過程和思考。

簡介 Actor Model 及 Akka 的問題

通常來說,閱讀代碼的流程是首先了解自己要閱讀的代碼解決了什麼問題,這個問題的現有解決方案是什麼,有什麼優勢和缺點。大致清楚了這些背景之後再在走讀代碼的過程中思考閱讀的代碼具體是怎麼解決這個問題的,最後專注到重點難點的代碼塊的理解上。也就是說,代碼閱讀最重要的不是代碼。代碼只是將思考的結果轉換爲實際可用的軟件的手段,思考的結果或者說解決問題的方法纔是重要的內容。

分佈式計算系統的分佈式特性決定了設計過程中必然會考慮節點間的通信問題,即籠統的 RPC 需求。關於 RPC 和 RMI 及 Actor Model 具體的差別本文不做展開,主要集中在 Spark 和 Flink 的 RPC 實現來介紹 Actor Model 下的 RPC 實現。

Actor Model 的主要概念包括

  • 通信的主體 Actor

  • 通信的內容 Message

  • 單線程先到先處理的消息處理器 Mailbox

特別需要提及的是 Actor 之間的通信是通過類似於地址的 ActorRef 來引用其他的 Actor 的,同時,在實現中,需要一個支持 Actor Model 運行起來的 ActorSystem 環境。這些具體的概念和名詞屬於 Akka,我們會在後面看到它們如何在 Spark 和 Flink 中被一一對應。

Actor Model 一個很少被注意的特點是它的建模過程中只存在 tell 這一個通信原語,ask 等等只是構建在 tell 上層的方便的通信模式。這就導致一個問題,即 Actor Model 原生的編程模式是明顯不同於傳統的編程模型的。傳統的編程模型中,函數調用是有返回值的,即使採用 Future 作爲返回值的佔位符,本質上還是有一一對應的返回值的;而在 Actor Model 中,消息一經發出就被遺忘,即所謂的 fire and forget 模式。要建立當前發出的消息和稍後收到的消息之間的 ask and answer 關係,需要額外的工作。這部分的內容可以參考 Akka 官方文檔中介紹通信模式的 章節 ,本身可以作爲 Akka 最佳實踐的一部分,有時間我會專門寫一篇文章介紹 Actor Model 下完全被顛覆的編程模型以及通過在其上模擬常見的編程模型來探索 Actor Model 的最佳實踐。

關於更多 Actor Model 的概念性和介紹性資料,可以參考的資料有  Akka 的官方文檔 《反應式設計模式》 等等。

Akka 作爲目前最成熟的 Actor Model 的實現之一,以及擁有容易理解的單線程 Actor 和併發通信模型,廣泛地充當了 JVM 系的分佈式系統的 RPC 層。Akka 最近的演化有兩個重點,一個是類型化(Typed)的 Akka,另一個是在拆分行爲(Behavior)和狀態(State)的概念。前者我們後面看到 Spark 和 Flink 的 RPC 實現時就能看到選擇標準的不同,後者這裏不作展開,可能會在後續討論函數式編程的文章中再次提及。

儘管 Akka 的實現非常成熟,但是直接使用 Akka 的底層 Actor Model 的軟件卻不多。對於業務軟件來說,Akka Model 過於底層,如果要利用它帶來的好處通常會直接使用 Akka Streams 和 Akka HTTP 等上層建築;對於其他分佈式系統來說,主要有以下兩個問題。

第一個問題是兩層集羣的負擔。如果我們使用 Akka 作爲底層 RPC 的實現,本身 Akka 會有相應的基礎組件,包括 ActorSystem 或者進一步使用 Akka Cluster 的話相應的 Cluster 對象。我們的分佈式系統例如 Spark 和 Flink 本身有自己的集羣管理策略,在 Spark 中有 Driver 和 Worker 的概念,在 Flink 中有 JobManager 和 TaskManager 等概念。如果在處理本身系統的集羣管理的同時還要兼顧底層的 Akka 集羣,這樣兩層的集羣在實際開發和運維的過程當中會帶來額外的複雜性。尤其是 Akka 作爲一個功能複雜的重量級框架,並且在 Typed Akka 中做出了限制公開的直接溝通兩個 Actor 的能力,強制要求使用 Akka Cluster 的決定。同時處理兩層集羣複雜的狀態機和角色與消息的轉換將會是一個巨大的負擔。

第二個問題是版本的負擔,這也是 Spark 走向去 Akka 化的直接原因,也是 Flink 社區經常被提問的一個問題。我們知道,爲了保證分佈式系統的穩定性,它依賴的組件尤其是 RPC 實現這樣底層模塊的依賴版本會保持相當的穩定性。這樣就有一個問題,Spark 和 Flink 的用戶在使用它們的同時也很有可能使用 Akka,並且依賴的是另一個 Akka 的版本。這樣,就會出現版本不同帶來的不兼容性問題。通常來說,這一點可以通過發佈一個項目專有的第三方依賴並使用 shaded 技術重定位包名來解決問題。但是由於重定位爲了覆蓋反射調用,是在字節碼級別對全限定名和字符串的包名前綴做替換。一般來說,包名都是諸如  org.apache.spark  或者  org.apache.flink  的形式,具有唯一性,替換起來不會有什麼問題。Akka 就不一樣了,它的包名是  akka.actor  等等,跟配置名稱是一樣的。這就導致重定位會錯誤改動代碼中的配置名字符串導致運行時字符串失配出錯。版本問題在 Lightbend 全家桶裏是不存在的,例如 Play 通過接口暴露底層的 Akka 數據結構,並固定依賴到某一個版本,這樣使用 Play 的人需要 Akka 的功能是隻需要通過接口拿到對應的 Akka 數據結構就可以,但是這種方式並沒有考慮和其他系統的版本兼容問題。

雖然上述問題可以通過定製 ClassLoader 並精心調整打包策略來繞過,或者要求用戶程序使用跟系統框架兼容的 Akka 版本,但是這會導致複雜不友好的用戶體驗,而清楚簡單的用戶體驗很多時候比功能更能決定一個框架的生存空間。同時,Akka 提供的很多功能,例如 Actor Model 基石的監督(Supervise)功能,對於上層提供 Failover 機制的 Spark 和 Flink 來說是多餘的。前有用戶體驗的硬性需求,後有開發輕量化的敏捷需求,Ligetbend 系以外的成熟的分佈式系統開發自己的 RPC 實現是理所當然的選擇。

理解了 Spark 和 Flink 爲什麼要開發自己的 RPC 實現之後,我們再看到 RPC 實現具體的考量點和內容。

Spark 的 RPC 實現

Spark 開發自己的 RPC 實現以換下 Akka 的理由主要是上面提及的版本依賴問題,在社區中記錄爲  SPARK-5293

閱讀相關代碼,首先我們要定位代碼的位置。Spark 的 RPC 實現主要位於  core  模塊下的  org.apache.spark.rpc  這個包下,閱讀代碼的過程中通過跳轉到定義和查找使用點可以找到完整的脈絡。結果而言,除了實際的 RPC Endpoint 實現之外,主要相關的代碼還包括  common/network-common  路徑下網絡傳輸層相關的底層支持。

Spark 的 RPC 實現雖然是爲了替換 Akka 而誕生的,但是它實際上可以看成一個簡化版的 Akka,仍然遵循許多 Actor Model 的抽象。例如

  • RpcEndpoint 對應 Actor

  • RpcEndpointRef 對應 ActorRef

  • RpcEnv 對應 ActorSystem

RpcEndpoint 與消息處理模型

這其中從模型上來說最簡單的反而是 RpcEndpoint,因爲所有的實現邏輯是具體實現類的事情,它其實只是一個簡單的存根(Stub)。總的來說,RpcEndpoint 有以下接口

private[spark] trait RpcEndpoint {
  final def self: RpcEndpointRef = ???
  final def stop(): Unit = ???
  val rpcEnv: RpcEnv = ???

  def receive: PartialFunction[Any, Unit] = ???
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = ???

  def onError(cause: Throwable): Unit = ???
  def onConnected(remoteAddress: RpcAddress): Unit = ???
  def onDisconnected(remoteAddress: RpcAddress): Unit = ???
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = ???

  def onStart(): Unit = ???
  def onStop(): Unit = ???
}

可以看到,上面的函數我分成了四組,其中第一組是和元操作有關的,望文生義不做解釋;第三組是連接和錯誤處理相關的回調函數,主要是記錄日誌和處理邊界情況的,也不用多做介紹;第四組實現的比較多,雖然和第三組一樣是掛載在特定事件上的回調函數,但是 RpcEndpoint 啓動和關閉時常常需要做狀態初始化和終結,以及資源的申請和釋放,所以  onStart  和  onStop  是經常被實現的接口。

這裏在展開說一下第二組接口。首先是  receive ,這個接口跟 Akka 裏面 Actor 的  receive  是類似的,我們看到它的簽名是  PartialFunction[Any, Unit] ,也就是說實現起來類似於下面的代碼塊。

override def receive: PartialFunction[Any, Unit] = {
  case Message => ...
  case BoxedMessage(msg1, msg2) => ...
}

可以看到和 Untyped Akka 別無二致,也就是說 Spark 的 RPC 實現也是非類型化的,編程模型上基於消息和模式匹配來做的。後面我們會看到 Flink 對這一點做了不同的選擇,介紹完 Flink 的情況後我們會做一個總的探討。

另一個接口就比較有意思了, receiveAndReply  實現了接收信息後返回的功能。由於沒有實現 Akka 中上下文  sender()  的邏輯,Spark 使用了另一個接口來處理需要返回的調用。我們分兩點說明  sender()  的問題和 Spark 基於 Actor Model 實現了傳統的具有返回值的調用的方式。

第一點, sender()  主要的問題是,它是一個方法調用,而不是一個確定性的值。這是函數式編程的擁躉喜歡討論的話題,即在不同的時刻調用  sender()  會返回不同的值。乍一看我們在每次處理一條消息的時候都調用  sender()  獲得當前消息的發送來源並沒有問題,不過這個方法在 Akka 社區給新手帶來了不少麻煩。

最大的問題還是上面提到的調用點的問題。通常來說,由於 Actor Model 中的 Actor 是單線程的處理消息的,你在同一個消息處理過程中多次調用  sender()  返回的都是當前消息的來源。不過,在一個常見的場景中,你在處理消息的時候發起了另一個異步動作,在異步動作中調用  sender()  來獲取當前消息的來源。由於異步動作觸發的時間是未知的,實際上當它觸發時再次調用  sender()  的時候,可能返回的就是另一條消息的來源了。這個問題很好解決,即用一個變量保存當前的  sender()  後面傳遞這個對象而不是再次調用  sender()  獲取對象。顯然,Spark 的  receiveAndReply  中的參數  context  就是這個可用於發回消息的上下文,與  sender()  類似。而在 Typed Akka 中,由於  sender()  無法確切的類型化,因此採用的是將消息來源直接編碼在發送的消息中的方式以在需要的時候使用它回覆消息,這要求 ActorRef 在不同的 ActorSystem 上正確的序列化和反序列化。

第二點,我們看到這裏的時候就會想,那我現在有兩個 receive 函數,雖然我可以根據需不需要發送回覆消息把消息處理邏輯拆分到不同的函數里,但是 Spark 又是怎麼知道應該把入站的請求分配到哪個函數的呢?這個就涉及到 Spark RPC 實現的細節。簡單的說我們先看到調用兩個 receive 函數的片段。

// Inbox.scala
class Inbox {
def process(dispatcher: Dispatcher): Unit = {
// ...
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch
{
case e: Throwable =>
context.sendFailure(e)
throw e
}

case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})

case OnStart =>
endpoint.onStart()
// ...

case OnStop =>
// ...
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")

case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)

case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)

case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
// ...
}
}

簡單掃過回調系列函數,我們看到 Spark RPC 判斷將消息轉往何處主要是看消息的類型是  RpcMessage  還是  OneWayMessage 。從名字中我們就可以看出,前者指的是調用並返回的消息,後者是 fire and forget 的消息。我們跳轉到定義並查找初始化點,可以發現生成這兩種不同信息的差異的根源發生在  RpcEndpointRef  是調用  ask  還是  send  上,在唯一的 Netty 實現上一路會經過  NettyRpcEnv  對應的  ask  和  send  方法,生成不同的消息發送到遠端。這也就是前面說的 Spark 原生的支持 ask 語義的意思。從熟悉的變成模型出發,可以把 ask 當成返回值不爲 void 的函數或者 Pascal 中的 function,send 當成返回值爲 void 的函數或者 Pascal 中的 procedure。

send 的語義是比較清楚的,關於 ask 的語義還有一個值得討論的點。我們知道 ask 會有一個返回值,這個返回值是真正有意義的返回值的佔位符 Future,而 Future 一般的處理方式在經過拼接和轉換之後終究是會有一個 onSuccess 或者 onFailure 的觸發動作,這個觸發在哪個線程上執行是很重要的。這涉及到我們在編寫 receive 函數的時候對異步行爲和同步策略的判斷。Spark 的實現類似於 Akka 中 AskPattern 引入 PromiseActorRef 的方式,生成一個 Promise 並在對應的返回收到時完成,這個 Promise 作爲 ask 的返回值。相關的回調邏輯發生在  NettyRpcEnv#askAbortable  中,可以看到,本地消息中 Promise 的完成發生在發送消息的同一個線程上,而遠端消息中 Promise 的完成一路探查到 TransportClient 和 TransportChannelHandler 可以發現完成在 Netty 的 channelRead0 上,也就是說,Spark 的 ask 返回的 Future,其完成的時間點並不一定和 RpcEndpoint 的主線程同步。這可能會導致在不加同步策略下的一些問題,例如通過 ask 詢問一個遠端節點的狀態和遠端節點主動 send 過來的狀態同時觸發狀態處理邏輯而導致競態條件。補充說明,Spark 的 RpcEndpoint 本身也可能併發的處理消息,僅當它是 ThreadSafeRpcEndpoint 或 IsolatedRpcEndpoint 時才表現出類似於 Actor Model 下單線程 Actor 的行爲。上面提到的 ask 導致競態條件的問題在 Akka 中也存在,這倒不算 BUG,只是在使用的時候需要注意採用合適的同步策略。

RpcEnv 與消息分派模型

接下來我們看到更接近 RPC 實現的核心的代碼。RpcEnv 是正確的處理 RpcEndpoint 存在和運行及其支持的網絡環境的上下文,目前 Spark 中只有基於Netty 的實現。

對於服務端來說,RpcEnv 是支持 RpcEndpoint 正常運行的環境,調度線程處理消息並負責 RpcEndpoint 的生命週期管理;對於客戶端來說,可以使用 RpcAddress 等方式從 RpcEnv 中獲取可用的本地或遠端的 RpcEndpointRef,這是一個 RpcEndpoint 的位置透明的引用或者叫句柄,可以通過調用它的 send 或 ask 方法來向 RpcEndpoint 發送信息。

對於消息的分派,我們從消息的入站和出站來看。

首先看出站,即本 RpcEnv 向 RpcEndpoint 發送消息。注意這裏如果是本地的 RpcEndpoint,會將消息直接通過 Dispatcher 分派到本地的 RpcEndpoint 上,嚴格來說不算出站。如果是遠端的 RpcEndpoint,NettyRpcEnv 會通過 postToOutbox 方法,對於 ask 來的方法的回覆,構造的消息來源 RpcEndpointRef 會帶有網絡層的 client,因此是直接返回;而對於本地直接出站的消息,則會根據接收者的地址放入 Outbox 的隊列中。一個地址對應着一個 Outbox,在 Outbox 中的消息異步的被取出併發送。

接着看入站,入站的消息會統一先由 NettyRpcEnv 交給 Dispatcher,Dispatcher 在根據消息的元數據分派到對應的處理 RpcEndpoint 上。Dispatcher 中每一個和 RpcEndpoint 一一對應的地址都會被關聯上一個 MessageLoop,類似於 EventLoop 它會負責處理發給 RpcEndpoint 的初步分派後的消息。每個 RpcEndpoint 實際綁定的消息處理觸發器是 Inbox,Inbox 相當於 Actor Model 中的 Mailbox,負責接收外部發到當前 RpcEndpoint 即 Actor 的消息。DedicatedMessageLoop 只服務於一個 RpcEndpoint,因此它也只持有一個 Inbox,當消息由 Dispatcher 發給 DedicatedMessageLoop 之後,它就轉發給唯一的 Inbox;SharedMessageLoop 可服務於多個 RpcEndpoint,所以它的內部有一個 RpcEndpoint 地址對應到 Inbox 的映射,收到 Dispatcher 初步分派後的消息後它會再次進行分派發送到具體的 RpcEndpoint 中。這種 MessageLoop 的設計對應的是一般的 RpcEndpoint 和 IsolatedRpcEndpoint,主要是提供不同的同步保證和線程配置。

具體到 Inbox 的消息就比較直接了,拋開狀態管理和異常管理不談,主要的內容就是一個同步的先進先出的隊列處理發佈進來的消息,如上一節代碼片段所貼,最終根據消息的類型調用 RpcEndpoint 的不同方法。

Flink 的 RPC 實現

現在我們轉過頭來看 Flink 的 RPC 實現。總的來說 Flink 的 RPC 實現依然是基於 Akka 的,這一點與 Spark 基於 Netty 開發的一套不同。Flink 社區有去掉 Akka 依賴的計劃,但進度只是  FLINK-4346  把接口抽象出來的程度,其底層實現仍然是 Akka,並沒有解決一開始我們提到的使用 Akka 帶來的問題。

我們看到 FLINK-4346 描述的目標,先從整體上了解它的設計方向。

It should address the following issues:

- Add type safety to the sender and receiver of messages. We want proper types methods to be called, rather than having generic message types and pattern matching everywhere. This is similar to typed actors.

- Make the message receivers testable without involving actors, i.e. the methods should be callable directly. When used with other component, the receiver will be wrapped in an actor that calls the methods based on received messages.

- We want to keep the paradigm of single-threaded execution per "actor"

首先我們可以看到的是它仍然強調了 Actor Model 的核心之一,單線程的 Actor 消息處理。其次,我們可以看到和 Spark 有兩個重要的不同點。

其一是不同於 Akka 的 testkit 套路,Flink 強調遠端調用和本地調用在編程模型上的統一性,從而可以在不引入 Actor 一套的情況下直接調用 Actor 的方法來進行測試。這一點實際上跟 RMI 是比較相似的,可以創建一個本地的對象調試,需要訪問遠端對象的時候就創建一個遠端對象的引用。關於這個調用編程模型上的統一性,後面講到 RpcGateway 和 RpcEndpoint 以及反射調用的時候會看到細節,總的來說這一套類似於 Akka 社區已經放棄的 Typed Actors 實現 Actor Model 類型化的方案。

其二是類型化,上面我們提到的編程模型本身跟類型關係不大。Flink 爲了更好的實現防禦性編程,期望在調用對應的遠端方法的時候能夠使用上類型系統的優勢來保證參數和返回值的類型匹配,其中主要是返回值的匹配和對應的 RpcGateway 不像無類型的 ActorRef 或 RpcEndpointRef 一樣難以判斷哪些消息是合法的。不過由於 FLINK-4346 的歷史侷限性,它借鑑了當時 Typed Actors 的實現方案,這個方案後來被廢棄。

由於不需要像 Akka 或 Spark 那樣從 Netty 或者 Aeron 這樣的網絡層框架重新搭建消息分派系統,Flink 的討論主要集中在它復刻 Typed Actors 的代碼和線程模型上。

RMI 式的類型化 RPC 實現

Flink 中的 RPC 實現主要在  flink-runtime  模塊下的  org.apache.flink.runtime.rpc  包中。由於複用了 Akka 的基礎設施,它並不像 Spark 那樣直接依賴傳輸層的實現,也不需要自己的分派信息。上次 Flink 的 PMC Chair Stephan Ewen 來北京,和他交流的時候確認了 Flink 只把 Akka 作爲 RPC 底層來用,並沒有使用 Akka 豐富的監督等其他功能,並且在未來有去掉 Akka 依賴的計劃。

Flink 的 RPC 實現的主要抽象包括

  • ActorSystem 的封裝 RpcService

  • Actor 與 RpcEndpoint 兩層之間的膠合層 RpcServer

  • 業務邏輯的載體 RpcEndpoint

  • RpcEndpoint 的位置透明的引用 RpcGateway

  • 迷之線程模型輔助接口 MainThreadExecutable 和 MainThreadExecutor

可以看到,這個 Spark 和 Akka 基本一一對應的骨架是不一樣的,主要的矛盾點在 RpcServer 這一層上。這是因爲相比於前兩者直接實現 Actor 或其等價物,Flink 的 RPC 實現是基於 Akka 的 Actor 實現了自己的 Actor 等價物 RpcEndpoint,這就導致模型的對應關係適配。

這個問題我們談到 RpcServer 的具體代碼的時候再提。Flink 的代碼不能像 Spark 那樣按照不同的類型來看,因爲類的實現可能涉及到反射訪問另一個類,這種情況下按照功能點來閱讀代碼會更好理解。

我們首先看到上面抽象的構造過程。最後的輔助接口放在下一節講,其他的抽象構造過程分別如下。

RpcServices 目前的唯一實現 AkkaRpcService 是 Akka 的 ActorSystem 的封裝,基本可以理解成 ActorSystem 的一個適配器。所以其構造過程也比較簡單,就是將適配的對象引用保存後返回。複雜的是由 RpcServices 構造的 RpcServer。

RpcServer 的構造有兩個觸發點。我們先看到連接遠端的 RpcEndpoint 時通過  RpcServices#connect  構造的 RpcServer。這個方法的兩個重載的區別只在於是否實現 fencing 的功能,即區分監聽同一地址的不同任期的 RpcEndpoint。由於 Flink 的 JobManager 等 RpcEndpoint 會通過主節點選舉選出主節點,監聽同一個地址的可能是節點的不同任期,而上一個任期的請求的回覆應該被過濾掉以免影響當前任期的節點狀態。這點先簡單帶過,我們看到 connect 除此以外的共同部分,摘要如下。

private <C extends RpcGateway> CompletableFuture<C> connectInternal(
  String address,
  Class<C> clazz,
  Function<ActorRef, InvocationHandler> invocationHandlerFactory
) {
  // ...
  final ActorSelection actorSel = actorSystem.actorSelection(address);
  final Future<ActorIdentity> identify = Patterns
            .ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
            .mapTo(ClassTag.apply(ActorIdentity.class));

  final CompletableFuture<ActorRef> actorRefFuture = FutureUtils.toJava(identify).thenApply(
    (ActorIdentity actorIdentity) -> {
      if (actorIdentity.getRef() != null) {
        return actorIdentity.getRef();
      } else ...
    });

  final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
            (ActorRef actorRef) -> FutureUtils.toJava(
                Patterns.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
                    .mapTo(ClassTag.apply(HandshakeSuccessMessage.class))));

  return actorRefFuture.thenCombineAsync(
            handshakeFuture,
            (actorRef, ignored) -> {
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
                ClassLoader classLoader = getClass().getClassLoader();
                C proxy = (C) Proxy.newProxyInstance(
                    classLoader,
                    new Class<?>[]{clazz},
                    invocationHandler);

                return proxy;
            },
            actorSystem.dispatcher());
}

連接的過程主要分成三個階段,第一個階段是通過 ActorSelection 和 Identify 找到和地址字符串對應的遠端 Actor 的引用,接着發送握手消息確保遠端的 Actor 正常工作,隨後將這個 ActorRef 打包爲一個 InvocationHandler 並轉換爲對應類型的代理後返回。這裏前兩個階段都是 Akka 的基本操作,這裏重點介紹一下最後一個階段,並說明它就是所謂的 RMI 式的 RPC 實現。

InvocationHandler 本身是 Java 內置的接口,其定義如下。

public interface InvocationHandler {
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
}

這個接口是給 Java 內置的代理功能使用的,invoke 方法的三個參數分別代表方法的接收方、方法引用和參數列表,或者我們用參數名簡單的對應到方法調用,那就是  proxy.method(args)  這樣的形式。我們看到穿件 Proxy 那一行的官方文檔註釋。

public class Proxy {
  /**
   * Returns an instance of a proxy class for the specified interfaces
   * that dispatches method invocations to the specified invocation
   * handler.
   *
   * <p>{@code Proxy.newProxyInstance} throws
   * {@code IllegalArgumentException} for the same reasons that
   * {@code Proxy.getProxyClass} does.
   */
  public static Object newProxyInstance(
    ClassLoader loader,
    Class<?>[] interfaces,
    InvocationHandler h
  ) throws IllegalArgumentException {
    // ...
  }
}

可以看出,所謂的代理對象就是可以處理  interfaces  定義的類型能接受的調用,並把這些調用轉交給 InvocationHandler 來處理的對象。我們在調用 connect 方法時傳遞的 class 參數即是 RpcGateway 的一個子接口,而 RpcGateway 的子接口例如 JobMasterGateway 或 TaskExecutorGateway 則定義了 JobMaster 或 TaskExecutor 這個 RpcEndpoint 所能接受的調用。通過這種方法我們將產生一個跟特定的 RpcGateway 的子接口語義相同的對象,而這個對象所有的 InvocationHandler 在 Flink 中的實現恰好就是 RpcServer 的有效實現。RpcServer 本身也是 RpcGateway 的一個子接口。最後這一點其實沒有太多的理論支持,只是爲了讓編譯通過和更好的處理  InvocationHandler#invoke  的邏輯所做的妥協,這也是 Flink 的 RPC 實現因爲阻抗適配而帶來的理解難度其中之一。

我們說到,在 Flink 裏面,InvocationHandler 和 RpcServer 實際上指的是同一種東西,它們的實現只有兩個,AkkaInvocationHandler 和 FencedAkkaInvocationHandler,後者如前所述與 fencing 相關,不做過多分析。我們從前者觸發,主要的邏輯出現在排除了本身方法調用之後的實際代理工作  invokeRpc  方法上。這個方法的流程如下。

  • 從所代理的方法的簽名中查出可能的 RpcTimeout 的參數位置並抽取參數,這個超時時間主要是爲了 ask 功能提供的。

  • 構造相應的 RpcInvocation 消息,這個是 Flink 專用的唯一的 Actor 類型 AkkaRpcActor 能識別的消息類型。根據 RpcGateway 或者說 RpcServer 的位置分別產生 LocalRpcInvocation 消息或 RemoteRpcInvocation 消息。兩者的主要區別在於是否支持序列化,因爲只有發往遠端的消息才需要考慮序列化相關的事項。這個消息包含了方法調用的元信息,即方法名,參數列表和參數類型列表。由於發送的對象是確定的,就是和 RpcServer 保存的 ActorRef 對應的對象,因此不需要指定方法接收者。

  • 根據方法的返回值類型進行不同的處理。如果是 void 即無返回值,則進行 tell 後返回;如果是 CompletableFuture 則進行 ask 後轉換返回的 Future 的類型後返回 Future;如果是其他非 CompletableFuture 的返回值,則類似於前者,但是阻塞在 Future 上等待取得返回值後返回。

可以看到,在這裏,方法調用被轉換成了 RpcInvocation 這樣的方法調用元信息,在遠端接受到這些信息後通過反射進行調用,具體可以參照  AkkaRpcActor#handleRpcInvocation  的內容。從外表上看,開發者拿到 RpcGateway 的代理對象後,就像操作它們的子接口,例如 JobMasterGateway 一樣,調用其接口,例如 registerTaskManager 或 offerSlot 等。而實際的操作經過 AkkaInvocationHandler 的解釋變成發到一個 RpcEndpoint 的消息,這個過程與 RMI 是異曲同工的。

RpcServer 的另一個構造點是  RpcServices#startServer ,在唯一的實現 AkkaRpcServices 裏它會根據傳入的 RpcEndpoint,解析這個具體的 RpcEndpoint 子類的對象實現的接口,通過  ActorSystem#actorOf  創建對應的 AkkaRpcActor 並拿到 ActorRef 後同 connect 最後階段一樣構造出 RpcServer 的代理對象。這個代理對象由於實現了代理 RpcEndpoint 的方法的邏輯,也即它所實現的 RpcGateway 的邏輯,所以在  RpcEndpoint#getSelfGateway  的時候也可以被強轉成對應的 RpcGateway 來返回。

RpcEndpoint 的構造就比較簡單了,是直接的調用構造函數的構造,其基礎構造函數如下。

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}

可以看到就是在這裏發起對  RpcService#startServer  的調用。

總的來說,Flink 的 RPC 實現概念混亂,試圖實現 Actor Model 但是概念對應上由於其本質上是在 Actor Model 上糊了一層 Actor Model 但是又複用了底層的 ActorSystem 而導致說不清道不明,阻抗失配。同時,在編程上依賴巧合,例如代理同時是 RpcServer 又是 RpcGateway 的子接口,依賴反射,以及下一節中會講到的同一功能多種暴露手段。可以說是一個勉強能用但是擴展困難,出現問題難以排查而且性能絕非最優的實現。

MainThreadExecutable 與線程模型

上一節中提到 Flink 的 RPC 實現出現了同一功能的多種暴露手段,也出現了【迷之線程模型輔助接口 MainThreadExecutable 和 MainThreadExecutor】這樣的字眼。這一節就展開的介紹下 Flink RPC 的線程模型。

首先,Flink 的 RPC 實現是基於 Akka 的,所以 Akka 的 Dispatcher 以及上層的 tell 和 ask 的線程模型是一樣的。這裏主要神奇的是 MainThreadExecutable 這個接口。我們先看到它的定義。

public interface MainThreadExecutable {
  void runAsync(Runnable runnable);
  <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout);
  void scheduleRunAsync(Runnable runnable, long delay);
}

同上,這裏有一個 Fenced 的子接口,但是主要與 fencing 相關,不做展開。我們看到這個接口的方法,猜想是我們可以將一個 Runnable 或者 Callable 交給一個此接口的實現去異步地執行。實際情況確實有點像,我們看到它的實現,欸,唯一的實現居然是老朋友,概念糅合的集中點,AkkaInvocationHandler 這個類。真巧。

它的實現除去一些邊界條件和檢查代碼概要如下。

@Override
public void runAsync(Runnable runnable) {
  scheduleRunAsync(runnable, 0L);

}


@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {
if (isLocal) {
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
tell(new RunAsync(runnable, atTimeNanos));
} else {
throw new RuntimeException(...);
}
}

@Override
public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
if (isLocal) {
return(CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);
} else {
throw new RuntimeException(...);
}
}

public void scheduleRunAsync(Runnable runnable, long delayMillis) {
if (isLocal) {
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
tell(new RunAsync(runnable, atTimeNanos));
} else {
throw new RuntimeException(...);
}
}

@Override
public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
if (isLocal) {
return(CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);
} else {
throw new RuntimeException(...);
}
}

至於 RunAsync 和 CallAsync 的處理邏輯則存在於 AkkaRpcActor 中,簡單地說,當 AkkaRpcActor 收到這個消息時,如果是 schedule 且未到時間就會調度到 Dispatcher 線程中等待,否則立即執行,對於 callAsync 也就是 ask,還會通過 tell 返回結果。再結合上面的代碼,我們發現,喔,原來這個方法調用只能在 local 的情況下使用,而且進一步看其上層在 RpcEndpoint 處暴露的接口是 protected 的。在實際應用的時候,callAsync 和 scheduleRunAsync 基本沒人用,runAsync 則用的不少,如果你熟悉 Akka 的話,你會發現這基本上和  self() ! Msg  沒有太大的差別。

那麼爲什麼 Flink 要引入這個接口呢?下面我從 Flink 的 RPC 實現三處和線程模型有關的接口來對比 Flink 的實現,其中有一處就是這裏的 runAsync 等。

另一處是 getSelfGateway 方法。我們剛纔說,runAsync 基本上就是  self() ! Msg ,那麼 getSelfGateway 不就是這裏的  self()  嗎?嗯,確實是的。Flink 的 getSelfGateway 方法主要用於測試的時候測試代碼拿到一個 RpcServer 或者你直接管他叫 ActorRef 來進行消息發送或者適配對應的類型簽名,另一個主要的作用則是在本地不同的 Actor 之間傳遞 ActorRef,遠端我們有  RpcService#connect  方法來搞定,本地原則上也可以這麼搞,不過 getSelfGateway 看起來更方便一點,也減少了 ActorSelection 和後續確認和握手的來回消息傳遞。那麼 runAsync 和這個玩意到底有什麼區別呢?答,通過 RMI 的方法進行調用,方法必須擁有姓名,而 runAsync 可以神奇的傳遞 Runnable 而使得你不需要去改 RpcGateway 的接口就可以給自己發消息。底層實際上就是一個通用的 RunAsync 消息。着實神奇。如果你的 runAsync 的內容是調用一個 RpcGateway 上註冊的方法,那麼你實際上也可以寫成 getSelfGateway 然後通過點語法調用這個方法。

再另一處是  RpcEndpoint#getMainThreadExecutor  和  RpcEndpont#getRpcService.getExecutor()  這兩個通常在拼接 CompletableFuture 的 Async 系列方法時作爲 Executor 傳入。其中前一個實際上就是上面提到的 runAsync 的又一層包裝,當你將它作爲 Executor 傳入的時候,我們看看它的 execute 和 schedule 方法是怎麼寫的。

private final MainThreadExecutable gateway;

public void runAsync(Runnable runnable) {
  gateway.runAsync(runnable);
}

public void scheduleRunAsync(Runnable runnable, long delayMillis) {
  gateway.scheduleRunAsync(runnable, delayMillis);
}

@Override
public void execute(@Nonnull Runnable command) {
  runAsync(command);

}


@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
FutureTask<Void> ft = new FutureTask<>(command, null);
scheduleRunAsync(ft, delayMillis);
return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
}

所以其實這就是在外面像在裏面一樣調用 runAsync 的一層接口。

後者則是直接拿到 Akka ActorSystem 中的 Dispatcher 作爲 Executor,將動作發佈到 ActorSystem 的 Dispatcher 中去調度執行。這裏有一點要特別注意的是前面提到的 MainThreadExecutor,它執行時的線程就是 Dispatcher 當中的一個哦。所以如果你把兩個競爭的調用,一個放在 MainThreadExecutor 裏跑,一個放在 ActorSystem Dispatcher 裏跑,那麼就有可能會出現死鎖,而且這個死鎖不是必現的,甚至執行順序關係都不是引發死鎖決定性的因素,首先看你發佈到 Dispatcher 的作業是否被分派到 MainThreadExecutor 那個線程上。由於 Flink 把這兩個 Executor 直接暴露出來,並且非常方便獲得,兩者很容易誤用,所以相關的併發錯誤在歷史的進程中,尤其是 Dispatcher 和 JobMaster 那一塊,綜合上其他原因,發生並且被修復過很多次。

Flink 的 RPC 實現把一個簡單的 tell 和 ask 的模型,先是搞成 RMI 導致性能堪憂並且實現高度依賴難以理解、維護和擴展的反射,再是暴露出多個功能重複的接口,活生生的把 Java 寫成了 Perl 的模樣,降低了開發者犯錯的難度,增加了開發者犯錯的幾率。雖然我個人很喜歡 Perl,但是在一個幾百萬行的大型項目裏混進一塊 Perl 風格的代碼,恐怕還是敬謝不敏。不過另一個角度說,Flink 能把事情搞得這麼複雜,但是系統還算能正常的工作,也算 Java 作爲一門開發語言的魅力所在。

閱讀代碼的技巧簡述

上面就是我閱讀 Spark 和 Flink 的 RPC 實現代碼的過程和思考。由於評論性的文字已經內聯在閱讀的過程中,而且這點篇幅其實還遠遠沒有展開一些有趣的或者關鍵的技術細節,這裏就不再做評述。

回到最初的問題,不少同學來問我代碼怎麼閱讀。其實代碼本身是人的思維的具象化的一種表現,閱讀代碼不應該只是去讀代碼本身,一行一行的看它的執行路徑,這樣與機器何異?閱讀代碼首先應該思考的是所要閱讀的代碼解決了什麼問題,這個問題爲什麼存在,它的現有解法和一般解法是什麼。瞭解到這些基本信息之後,在閱讀代碼的過程中,對於同質化的部分就略讀或者通讀過去,對於配置和錯誤處理和邊界情況掃讀,重要的邊界情況再單獨看看。主要精力集中在差異化的部分,對比差異化的部分的考量點,分清孰優孰劣,或者在軟件開發的過程中,通常沒有一方完全好過另一方,有的只是權衡(trade off)。例如在上面的內容中,好像我把 Flink 說得一無是處,那主要是因爲我的工作跟它相關,每天深受這些坑折磨,實際上類型化是一件非常有意義的事,Untyped Akka 和 Spark 當你面對一堆只能靠名字來猜他背後是啥的 Ref 的時候,其實你也是在依賴命名約束或者叫命名巧合來編程。

另外,這裏推薦一篇介紹 Spark RPC 的 文章 ,它與本文的不同除了範圍以外,主要是更加偏重實踐,有作者本人的腳手架倉庫可以實驗,並且做了時序圖和 UML 類圖。

圖表是非常好的表意手段,寫作本文時我原本想引用一個 Actor Model 的概念圖,但是一時找不到了。對於 Spark 和 Flink 的 RPC 實現,一張粗略的類責任鏈和所屬關係圖也會一圖勝千言。不過時序圖和 UML 類圖恐怕還是太古板和複雜了,就算畫得出來,我也高度懷疑到底有誰沒事盯着那玩意看。粗略的類圖是可以的,UML 類圖容易關注點失焦;時序圖像我在上面分析線程模型和同步策略的時候,對應的時序關係是要分析的。但是事無鉅細的時序圖恐怕沒有必要,畢竟客戶端到服務端大體就是那麼回事。如前所述,我們閱讀代碼的時候,主要是要關注差一點。事無鉅細的 UML 類圖和時序圖太容易把一些瑣碎的細節也列上去了。

有實驗環境來測試嘛,當然是最好的。在閱讀 RPC 實現的過程中,雖然我沒有把相關的邏輯抽出來做實驗,但是測試覆蓋率高的項目,其單元測試和可執行的 example 本身就是良好的實驗場地。單元測試可能是我見過的最優秀的理解一塊代碼意圖的方式之一了。

總的來說,本文展示的是在初步瞭解一個方向的代碼編寫常識後,針對某一功能點進行主題閱讀和對比整理的過程。在一開始閱讀代碼的時候,可以先針對某個特定的實現,先把它的邏輯理順,等到對問題的抽象和解決方案的抽象有一定的感覺之後再進行對比閱讀,有的放矢,快速沉澱總結。

相關文章