摘要:當 map 算子處理完前 3 條記錄 並收到檢查點屏障時,它們會將狀態以異步的方式寫入穩定存儲.。Storm需要自己實現有狀態的計算,比如藉助於自定義的內存變量或者redis等系統,保證低延遲的情況下自己去判斷實現有狀態的計算,但是Flink就不需要這樣,而且作爲新一代的流處理系統,Flink非常重視。

流式計算分爲無狀態和有狀態兩種情況。無狀態計算觀察每個獨立的事件,Storm就是無狀態的計算框架,每一條消息來了以後和前後都沒有關係,一條是一條。比如我們接收電力系統傳感器的數據,當電壓超過240v就報警,這就是無狀態的數據。但是如果我們需要同時判斷多個電壓,比如三相電路,我們判斷三相電都高於某個值,那麼就需要將狀態保存,計算。因爲這三條記錄是分別發送過來的。

Storm需要自己實現有狀態的計算,比如藉助於自定義的內存變量或者redis等系統,保證低延遲的情況下自己去判斷實現有狀態的計算,但是Flink就不需要這樣,而且作爲新一代的流處理系統,Flink非常重視。

一致性

其實就是消息傳遞的正確性。在流處理中,一致性分爲 3 個級別。

  • at-most-once:最多一次,可能會丟失。

  • at-least-once:最少一次,可能會重複,而計算的時候可能就會多次運算影響結果。

  • exactly-once:恰好保證一次,這樣得到的結果是最準確的。

最先保證 exactly-once 的系統(Storm Trident 和 Spark Streaming),但是在性能和表現力這兩個方面付出了很大的代價。爲了保證 exactly-once,這些系統無法單獨地對每條記錄運用應用邏輯,而是同時處理多條(一批)記錄,保證對每一批的處理要麼全部成功,要麼全部失敗。這就導致在得到結果前, 必須等待一批記錄處理結束。因此,用戶經常不得不使用兩個流處理框架 (一個用來保證 exactly-once,另一個用來對每個元素做低延遲處理),結果使基礎設施更加複雜。

但是,Flink解決了這種問題。

檢查點機制

檢查點是 Flink 最有價值的創新之一,因爲它使 Flink 可以保 證 exactly-once,並且不需要犧牲性能。

Flink 檢查點的核心作用是確保狀態正確,即使遇到程序中斷,也要正確。 記住這一基本點之後,我們用一個例子來看檢查點是如何運行的。Flink 爲 用戶提供了用來定義狀態的工具。例如,以下這個 Scala 程序按照輸入記錄 的第一個字段(一個字符串)進行分組並維護第二個字段的計數狀態。

val stream: DataStream[(String, Int)] = ... 
 
val counts: DataStream[(String, Int)] = stream   
.keyBy(record => record._1)   
.mapWithState((in: (String, Int), count: Option[Int]) =>     
  count match {       
    case Some(c) => ( (in._1, c + in._2), Some(c + in._2) )       
    case None => ( (in._1, in._2), Some(in._2) )     
})

該程序有兩個算子:keyBy 算子用來將記錄按照第一個元素(一個字符串) 進行分組,根據該 key 將數據進行重新分區,然後將記錄再發送給下一個算子:有狀態的 map 算子(mapWithState)。 map 算子在接收到每個元素後, 將輸入記錄的第二個字段的數據加到現有總數中,再將更新過的元素髮射出去。

輸入流中的 6 條記錄被檢查點屏障 (checkpoint barrier)隔開,所有的 map 算子狀態均爲0(計數還未開始)。 所有 key 爲 a 的記錄將被頂層的 map 算子處理,所有 key 爲 b 的記錄將被中間層的 map 算子處理,所有 key 爲 c 的記錄則將被底層的 map 算子處理。

如果輸入流來自消息傳輸系統Kafka,這個相互隔離的位置就是偏移量。

檢查點屏障像普通記錄一樣在算子之間流動。當 map 算子處理完前 3 條記錄 並收到檢查點屏障時,它們會將狀態以異步的方式寫入穩定存儲.

當沒有出現故障時,Flink 檢查點的開銷極小,檢查點操作的速度由穩定存儲的可用帶寬決定。

如果檢查點操作失敗,Flink 會丟棄該檢查點並繼續正常執行,因爲之後的 某一個檢查點可能會成功。

在這種情況下,Flink 會重新拓撲(可能會獲取新的執行資源),將輸入流 倒回到上一個檢查點,然後恢復狀態值並從該處開始繼續計算。

Flink 將輸入流倒回到上一個檢查點屏障的位置,同時恢復 map 算子的狀態值。 然後,Flink 從此處開始重新處理。這樣做保證了在記錄被處理之後,map 算子的狀 態值與沒有發生故障時的一致.

Flink 檢查點算法的正式名稱是異步屏障快照(asynchronous barrier snapshotting)。

保存點

狀態版本控制

檢查點由 Flink 自動生成,用來在故障發生時重新處理記錄,從而修正狀 態。Flink 用戶還可以通過另一個特性有意識地管理狀態版本,這個特性叫作保存點(savepoint)。

保存點與檢查點的工作方式完全相同,只不過它由用戶通過 Flink 命令行工 具或者 Web 控制檯手動觸發,而不由 Flink 自動觸發,用戶可以從保存點重啓作業,而不用從頭開始。對保存點的另一種理解是,它在明確的時間點保存應用程序狀態的版本。

在圖中,v.0 是某應用程序的一個正在運行的版本。我們分別在 t1 時刻和 t2 時刻觸發了保存點。因此,可以在任何時候返回到這兩個時間點,並且重 啓程序。更重要的是,可以從保存點啓動被修改過的程序版本。舉例來說, 可以修改應用程序的代碼(假設稱新版本爲 v.1),然後從t1 時刻開始運行 改動過的代碼。

使用保存點更新Flink 應用程序的版本。新版本可以從舊版本生成的一個 保存點處開始執行.

端到端的一致性

在該應用程序架構中,有狀態的Flink 應用程序消費來自消息隊列的數據, 然後將數據寫入輸出系統,以供查詢。

輸入數據來自Kafka,在將狀態內容傳送到輸出存儲系統的過程中,如何保證 exactly-once 呢?這 叫作端到端的一致性。本質上有兩種實現方法,用哪一種方法則取決於輸 出存儲系統的類型,以及應用程序的需求。

(1) 第一種方法是在 sink 環節緩衝所有輸出,並在 sink 收到檢查點記錄時, 將輸出“原子提交”到存儲系統。這種方法保證輸出存儲系統中只存在 有一致性保障的結果,並且不會出現重複的數據。從本質上說,輸出存 儲系統會參與 Flink 的檢查點操作。要做到這一點,輸出存儲系統需要 具備“原子提交”的能力。

(2) 第二種方法是急切地將數據寫入輸出存儲系統,同時牢記這些數據可能 是“髒”的,而且需要在發生故障時重新處理。如果發生故障,就需要將 輸出、輸入和 Flink 作業全部回滾,從而將“髒”數據覆蓋,並將已經寫 入輸出的“髒”數據刪除。注意,在很多情況下,其實並沒有發生刪除 操作。例如,如果新記錄只是覆蓋舊紀錄(而不是添加到輸出中),那麼 “髒”數據只在檢查點之間短暫存在,並且最終會被修正過的新數據覆蓋。

根據輸出存儲系統的類型,Flink 及與之對應的連接器可以一起保證端到端 的一致性,並且支持多種隔離級別。

更多Flink相關文章:

Streaming-大數據的未來

實時計算大數據處理的基石-Google Dataflow

數據架構的未來——淺談流處理架構

穿梭時空的實時計算框架——Flink對時間的處理

Flink快速入門--安裝與示例運行

大數據實時處理的王者-Flink

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算:

相關文章