摘要: 人工智能應用場景中,Flink 在包括特徵工程,在線學習,在線預測等方面都有一些獨特優勢,爲了更好的支持人工智能的使用場景,Flink 社區以及各個生態都在努力。本文將介紹近期 Flink 在人工智能生態系統中的工作進展, 主要內容包括:

  1. Flink 構建 AI 系統的背景

  2. Flink ML Pipeline 和算法庫 Alink

  3. 分析和 AI 的統一工作流(AI Flow)

  4. Flink 在流運行模式下迭代的架構設計

Tips: 點擊文末「 閱讀原文 」,即可獲取作者現場分享視頻。

一. Flink 構建 AI 系統的背景

1. Lambda 架構及批流統一數據處理

首先向大家介紹 Flink 構建 AI 系統的背景。

Lambda 架構是大數據處理領域中一種典型的架構。如上圖所示,圖中上方藍色線代表的是離線批處理的數據流,下方黑色線代表的是實時流模式的數據流。爲了兼顧整個大數據處理中的吞吐和實時性,我們通常會將離線的數據處理和實時的數據處理結果進行合併後對外提供服務。

在 Lambda 架構裏,在離線數據處理和實時數據處理過程中,我們會用到兩種不同的計算引擎,同時維護兩份代碼。但這兩個流程對數據的處理邏輯其實是一樣的,這樣會導致兩個問題:

  • 維護代價高 ( 包括代碼的維護代價,及不同的計算引擎對部署的維護代價 )

  • 很難保證在線離線處理邏輯一致

爲了解決這兩個問題,實現批和流的數據統一,就有了 Flink 和 Spark。Flink 和 Spark 都可以處理批數據,對流數據的處理可以由 Flink 和 Spark streaming 來完成。這樣就避免了上述維護兩套代碼和兩套系統及邏輯一致性的問題。

2. 機器學習在線離線處理

在機器學習場景下,在線離線處理也會面臨一些問題。上圖是一個典型的機器學習處理過程。首先會將離線的數據進行預處理和特徵工程(如紅框標註所示),然後進行離線的模型訓練,訓練好的模型會推到線上做推理。推理模塊加載模型後,在線的數據也會有進行預處理和特徵工程的過程,將處理之後的數據餵給模型做在線推理。

這樣的過程也會面臨如上述大數據處理中的問題,我們同樣會維護離線的數據處理和在線推理的數據處理兩份代碼。

在機器學習領域除了離線的模型訓練以外,還有在線的模型訓練。如下圖所示,我們通常會將預處理的數據寫到一個 Message Queue 中(如 Kafka),然後進行 Online training,training 的過程是持續不斷的,期間會不斷的產生動態的模型,然後推送給在線的推理模塊進行推理。在線的機器學習的特點就是模型的動態更新、持續訓練和不斷驗證。同時需要比較複雜的模型監控,模型部署和模型回滾等策略。

於是就產生了機器學習中的 Lambda 架構:

一般來說,在線的模型訓練並不是從頭訓練一個模型,而是通過離線訓練出一個基準的模型,然後推給在線,在線流程再在這個基準模型上進行在線的訓練。這樣同樣存在離線和在線兩份代碼,涉及兩套不同的系統。也會增加維護的複雜度。

那麼 Flink 在這套架構中有什麼樣的價值。Flink 是天生支持批流一體的計算引擎,在在線機器學習領域的預處理過程中,我們很自然地會使用 Flink 進行數據處理,在離線的機器學習訓練中也可以使用 Flink 進行批次的預處理。同時,在在線推理的過程中,也會使用 Flink 進行推理(因爲推理是一個對時間比較敏感的過程)。所以 Flink 在機器學習 Lambda 架構中的價值體現在在線的數據的預處理,離線數據的預處理,在線的推理。

所以是否能將機器學習中的 Lambda 架構進行批流統一?

如上圖所示,除了數據處理和推理可以使用 Flink 外,Online training 和 offline training 的過程也可以用 Flink 計算引擎替代。這樣做的好處是:用戶只需要寫一份代碼就可以同時實現在線和離線的訓練,並且避免了邏輯的不一致性和維護的難度。在線和離線訓練中我們通常會使用比如一些深度學習的計算框架比如 TensorFlow 或者 Pytorch 運行在 Flink 上進行模型訓練。第二個好處是,我們可以通過使用 Flink 形成一站式的數據處理解決方案。如下圖:

在整個數據上,Flink 提供了豐富的接口,包括 SQL、DataStream、CEP,如果再加上 ML 的接口就可以共享整個數據集,不會涉及到不同的系統之間數據拷貝的過程。此外,在將數據打通之後,我們可以使用 SQL,DataStream 這些豐富的 API 來處理數據。

二. Flink ML Pipeline 和算法庫 Alink

1. Flink AI 生態系統架構

上圖是整個機器學習的 Lambda 架構圖。對應機器學習任務中的不同階段:

  • 首先是數據的管理和獲取階段 (Data Acquisition),在這個階段 Flink 提供了非常豐富的 connector(包括對 HDFS,Kafka 等多種存儲的支持),Flink 目前還沒有提供對整個數據集的管理。

  • 下一個階段是整個數據的預處理 (Preprocessing) 特徵工程 部分,在這個階段 Flink 已經是一個批流統一的計算引擎,並且提供了較強的 SQL 支持。

  • 之後是模型訓練過程 (Model Training),在這個過程中,Flink 提供了 Iterator 的支持,並且有如 Alink,MLlib 這樣豐富的機器學習庫支持,且支持 TensorFlow,Pytorch 這樣的深度學習框架。

  • 模型產出之後是 模型驗證和管理階段 (Model Validation & Serving),這個階段 Flink 目前還沒有涉足。

  • 最後是線上推理階段 (Inference),這個階段 Flink 還沒有形成一套完整的方案。同時形成了 Flink ML Pipeline,以及目前正在做的 Flink AI Flow。

2. Flink ML Pipeline

上圖是 Flink ML Pipeline 的介紹,該 Pipeline 主要涉及兩個抽象,第一個是 Transformer 抽象,是對數據預處理和在線推理的抽象。第二個抽象是 Estimator 抽象,主要是對整個模型訓練的抽象。兩個抽象最大的差異是 Transformer 是將一份數據轉化爲另一份處理後的數據,而 Estimator 是將數據進行訓練轉化爲模型。

3. 算法庫 Alink

Alink 主要是重寫了 Flink 中很多機器學習庫。其有兩個重要的特點,一是 Alink 是基於 Flink 的 ML Pipeline。第二是基於 Flink Table API。Flink Table API 天然就是批流統一的。

ML Pipeline 簡單案例:

4. ML Pipeline 價值

Flink ML Pipeline 最大的價值在於爲最終用戶統一了模型訓練和推理的 API,用戶只需要關心 Estimator 的 Transformer 裏面的邏輯即可。此外,Pipeline 將整個訓練過程進行了持久化,確保了訓練和推理之間的邏輯一致性,解決了之前 Lambda 架構中維護兩份代碼可能會導致的邏輯不一致問題。

三. 分析和AI的統一工作流(AI Flow)

1. AI Flow 背景

在整個機器學習任務中,有一部分是和模型訓練相關的,還有一部分是數據分析,特徵工程相關的。因此 AI Flow 的目標是將這整個流程串起來,提供一個端到端的解決方案。

2. AI Flow 概述

AI Flow 的 API 中包含了 Example(對數據的 API),Transformer(對預處理流程及推理的 API),Trainer(訓練流程的 API),Model(模型管理的 API)等。在每個模塊中都會產生一些中間 meta 數據,AI Flow 將這些數據存儲在 Meta Store 中。這些 API 只是定義了機器學習中的一些處理邏輯,AI Flow 中的 Translator 則將這些邏輯轉化爲真正可執行的任務。

我們的目標是實現整個機器學習 lambda 架構的批流統一,用戶寫一份機器學習處理邏輯,可以同時作用於離線的學習過程,同時也可以支持在線的學習過程。 Translator 的作用就是將用戶通過 AI Flow API 寫好的 code 轉化爲可執行的任務。 目前 AI Flow 中包含兩類 Translator,第一類是 Batch Translator,第二類是 Stream Translator。

在 Translator 中還有一個抽象是 Components,包含與 AI Flow API 相對應的一些 Components。 這些 Components 可以對應地解析 AI Flow API 中用戶定義的邏輯。 通過 Translator 將用戶定義好的邏輯處理完之後,通過 Deployer 把生成的任務部署到不同的環境,包括本地環境,K8S,Yarn 等。

3. AI Flow 原理

用戶通過 AI Flow API 寫的代碼在編譯之後會生成一個邏輯的執行計劃,這些邏輯執行計劃由很多節點組成,每個節點都有對應的 Translator 中的 Component 解析,解析過程有可能會將其解析爲單個的 job,或者一個 job 的一部分(即多個節點生成一個 job)。這個邏輯執行計劃會傳遞給 Translator,如果是流式任務,Translator 會將其翻譯爲流式的一些 job,同樣的批式任務會翻譯成批相關的一些 job。之後會將一組job組成一個 Execution,傳遞給 Deployer 通過配置運行在本地,K8S 或 Yarn 上,最後生成一些可以運行的任務。

4. AI Flow 的特點和 Flink AI Flow

AI Flow 提供了用於部署生產環境數據分析和機器學習流水線的端到端的 API,提供了批流統一的數據分析和機器學習工作流 API。具體來說有以下幾個特點:

AI Flow:

  • 批流統一

  • 引擎與平臺無關

  • 定義執行組件關係

  • 定義數據集和 IO 格式規範

Flink AI Flow:

  • Flink 作爲默認分析引擎

  • TensorFlow / Pytorch 作爲機器學習引擎

5. 圖片分類簡單案例

首先通過 Example 對象定義處理前的圖片數據,然後通過 Transformer 對象定義預處理過程和模型的預測過程,最後將兩個 Transformer 合起來組成一個 Execution 邏輯。

6. AI Flow 總結

AI Flow 提供了部署生產環境數據分析和機器學習流水線的端到端 API,同時 AI Flow 還提供了批流一體的數據分析和機器學習工作流 API。

四. Flink 在流運行模式下迭代的架構設計

1. 背景與動機

Flink 在機器學習中有一個整體的端到端的框架,目前 Flink 是通過 DataStream 和 DataSet 兩套接口分別提供了流處理和批處理的能力。通過前面的講述,我們可以看到 Flink 流處理和批處理的能力可以用於機器學習的數據預處理階段。

其實除了這些通用的流和批的處理之外,Flink 對於機器學習中的模型訓練和圖計算這些複雜的分析場景也提供了原生支持。這些場景的特點是對數據的迭代計算要求較高,目前 Flink 對迭代計算的支持主要是在 DataSet 接口部分,因爲 DataSet 整體上是一個批處理的接口,所以 Flink 對迭代分析的支持主要是有限數據迭代分析。

例:基於 DataSet 迭代的 K-Means

以常見的 K-Means 爲例,K-Means 算法的兩個輸入爲待聚類的點和初始類中心,在 DataSet 迭代時,它會在整個計算圖中添加一個特殊節點來維護待求中心點當前的結果,在每輪迭代中,對待求中心點進行一次更新(將每個點分配到較近的中心點,之後重新計算中心點,將計算結果通過一個回邊發送到維護的待求中心點算子,從而支持超過 DAG 計算能力的處理形式),重複多輪迭代直到收斂,輸出最終的中心點。

爲了表示這種迭代的計算圖,Flink 中有幾個通用的概念,首先“迭代變量”是在迭代中需要更新的變量,“靜態數據集”是在迭代過程中會多次使用但是不會發生變化的數據,在計算過程中該數據實際上只會發送一次,Flink 對該數據做了一個基於磁盤的緩存,在每輪迭代時做了一個重放。他們都是普通的 DataSet 對象,代表來一個有限的數據集,在某一個數據集上調用 Iteration 方法,指定迭代終止條件。Flink 會自動將每一輪的迭代邏輯擴展到所有輪,用戶在寫迭代邏輯時不需要考慮收到多輪數據的情況。

爲了實現流批一體,Flink 將批處理的能力從 DataSet 的接口遷移到 DataStream 的接口之上,在 DataStream 接口之上,Flink 會引入一個 BoundedStream 的特殊子類,來實現批處理的能力。相應的,如果把批處理的能力遷移過去,DataSet 的迭代的處理能力也要進行遷移,一個選擇是直接平移相應的 DataSet 的實現,但是從前面的介紹可以發現,DataSet 目前的實現有一些問題:

  • 首先它不支持多迭代變量或者嵌套迭代的形式,但是在一些算法(如 boosting 算法)中對這兩種迭代也是有需求的。

  • 第二點是對於靜態數據,Flink 現在是做了一個基於磁盤的重放,在這種情況下,用戶是有可能基於業務邏輯做一個更高效的緩存的,但是現在在 DataSet 的迭代上無法實現這一點。

  • 最後,在 DataSet 上很難實現針對在線算法的模型訓練或者在線流處理的支持。

比如使用每一個小時的數據做一次 K-Means。

利用每小時的數據更新聚類點。

因此我們需要引入一種新的迭代機制,可以兼容在有限數據上的迭代,並且支持“無限流上每一部分數據分別進行迭代”的語義。

2. Mini-batch 流式 Iteration

上圖是 Mini-batch 流式 Iteration 示例,對於靜態數據集,其可以視爲只有一個 Mini-batch。對於流式數據,可以將其拆成多個 Mini-batch,他們之間可以獨立並行迭代。算子可以自動將單個 Mini-batch 操作擴展到每一個 Mini-batch 上。

還有一種情況是使用無限的數據集訓練一個統一的模型,這種情況下不同的 Mini-batch 共享同一組狀態(待更新的模型),其執行機制如上圖所示。

在上述迭代過程中,算子需要知道每個 Mini-batch 的迭代終止情況,我們稱之爲進度追蹤,其目標是提供各 Mini-batch 各輪迭代終止的通知。這種能力通過 Assigner 節點在數據流中插入特殊的標記消息,然後其它算子對標記消息進行對齊來實現。關於進度追蹤詳細描述,如下:

3. Mini-batch 迭代 API

通過上面的描述可以列出 Mini-batch 迭代 API 的基本框架:

4. 總結

總結來說,我們設計了一種新的基於 Mini-batch 的流式迭代機制,這種迭代機制既可以兼容原來的 DataSet 上基於有限數據上的迭代,也可以支持對無限流上每一部分數據分別進行迭代。未來,可以更好地支持在線的機器學習訓練和在線的圖處理的場景。

作者介紹:

陳戊超(仲卓) ,阿里巴巴技術專家,加入阿里巴巴之前曾就職於百度 spider 部門,在阿里巴巴主要參與搜索離線業務和機器學習平臺建設。目前關注在 Flink 上支持機器學習框架方向。

高贇(雲騫) ,阿里巴巴技術專家,2017 年博士畢業於中國科學院後加入阿里巴巴實時計算 Flink 團隊,主要從事 Flink Runtime 層的設計與研發。

  一個預告  

3 月 17 日下週二 )晚上 20:00,Flink 社區系列直播課程將邀請 Apache Flink PMC 跟大家分享 PyFlink 的相關進展,您將瞭解以下內容:

  1. 細緻分析 PyFlink 所適用的場景,並進行 CDN 日誌分析的案例演示

  2. 首次詳細剖析 PyFlink API 和 UDF 的架構,並分享架構背後的思考

  3. 首次披露  PyFlink 1.11 功能規劃和性能指標

  4. 首次談及 PyFlink 的使命願景,一張 PyFlink 大圖勾勒未來願景

  5. 首次透露 PyFlink 的核心貢獻者的個人聯繫信息,PyFlink 的任何問題可以隨時拋來

釘釘掃描下方二維碼即可觀看~

▼ 更多技術文章 

Flink Batch SQL 1.10 實踐

Flink SQL 如何實現數據流的 Join?

Demo: 基於 Flink SQL 構建流式應用

Flink DataStream 關聯維表實戰

Flink 1.10 Native Kubernetes 原理與實踐

從開發到生產上線,如何確定集羣大小?

在 Flink 算子中使用多線程如何保證不丟數據?

一行配置作業性能提升53%!Flink SQL 性能之旅

性能提升約 7 倍!Apache Flink 與 Apache Hive 的集成

Flink 1.10 和 Hive 3.0 性能對比(附 Demo 演示 PPT)

Flink on Zeppelin (3) - Streaming 篇

Flink on Zeppelin (2) - Batch 篇

Flink on Zeppelin (1) - 入門篇

關注 Flink 中文社區,獲取更多技術乾貨

你也「 在看 」嗎? :point_down:

相關文章