整理:謝縣東 (Flink 社區志願者)

1.PyFlink 的發展史

1.1、v1.8.x

  1. Flink 在 1.8 版本的時候就已經提供 Python API,只在 Datase/Stream 上提供支持。

  2. 存在一些問題,比如:

  • Table API 不支持 Python。

  • 兩套各自獨立實現的一個 Python API。

  • 底層實現是 JPython,JPython 無法支持 Python3.x。

1.2、v1.9.x

  1. 2019 年 8 月發佈。

  2. 支持 Python Table API。

1.3、v1.10.x

  1. 2020 年 2 月發佈。

  2. 提供了 Python UDF 的支持。

  3. 提供 UDF 的依賴管理。

1.4、未來發展

  1. 提供 Pandas UDF 的支持。

  2. 提供用戶自定義的一些 UDF Metrics。

  3. ML API。

  4. 在易用性方面,提供 SQL DDL 支持 Python UDF。

  5. 在後面的一些版本中,我們也希望越來越多的人能夠參與到 PyFlink 的貢獻和開發中去。

2.PyFlink 核心功能及原理介紹

PyFlink 核心功能將主要從每個版本的劃分來跟大家進行介紹,第1個 PyFlink 1.9 版本里面提供 Python Table API 的支持,然後是 PyFlink 1.10 裏面提供了 Python UDF 還有相關依賴管理,最後 1.11 版本里面提供了 Pandas UDF 和用戶自定義的 Metrics。

2.1、Python Table API (PyFlink 1.9)

■ 1.Python Table API

什麼是 Python Table API 呢?我們可以從編程的角度來介紹一下。Python Table API 大概提供了一些 Python 的 API ,比如這裏主要可以看一下 Table 的接口, Table 接口上有很多 Table 相關的算子,這些算子可以分爲兩類:

  • 1.跟 sql 相關的算子。比如 select、filter、join、window 等;

  • 2.在 sql 的基礎上擴展的一些算子。比如 drop_columns(..),可以用來提升 sql 的便利性,比如:

當有一個很大的表並且想刪除某一列的時候,可以用 drop_columns 來刪除某一列。

對於我們來說,可以隨意組合 Table 上的方法,然後編寫不同的業務邏輯。我們接下來看一下,如何用 Table API 來寫一個 WordCount 的例子,可以讓大家有一個比較完整的認識。

■  2.WordCount

如下圖所示,是一個完整的 Python Table API 的 WordCount 的例子。主要可以包含4個部分。

  • 首先,我們需要去初始化環境,比如第 6 行,我們先拿到了一個 ExecutionEnvironment,然後第 7 行,去創建一個 TableEnvironment。

  • 創建 TableEnvironment 之後,需要去定義 source 跟 sink ,這裏 source 跟 sink 都是指定了輸入和輸出的文件路徑,還指定了文件中 Table 對應的一些字段,以及字段對應的數據類型。而且可以定義輸出分隔符。

  • 定義好 source 跟 sink 之後,再來看一下如何編寫計算邏輯。可以用 from_path 算子來讀取 source 表,讀取完之後,就可以進行 group by 的一些聚合,做 group by 跟 wordcount。

  • 做完之後,可以把結果表用 insert_into 進行輸出。最後調用 Environment 的 execute 來提交作業。

經過上面4步,我們就完整的寫出了一個 Python Table API 的 WordCount。那麼對於 WordCount 例子,它的底層實現邏輯是怎麼樣的呢?接下來看一下,Python Table API 的一個架構。

■  3.Table API 架構

  • 通過這個架構圖,可以看到,Python Table API 是建立在 Java Table API 的基礎上的,我們並沒有單獨的從上到下實現一套 Python Table API。

  • Python Table API 是一個特別的存在,它是在 Java Table API 的基礎上加了一層薄薄的 API,這兩層 API 是可以相互調用的。

  • 在 client 端的時候,會起一個 Python VM 然後也會起一個 Java VM ,兩個 VM 進行通信。通信的細節可以看下面這張圖。

  • 我們可以看到 Python 跟 Java VM 裏面都會用 Py4J 各自起一個 Gateway。然後 Gateway 會維護一些對象。

  • 比如我們在 Python 這邊創建一個 table 對象的時候,它也會在相應的 Java 這邊創建一個相同 table 對象。如果創建一個 TableEnvironment 對象,在 Java 部分也會創建一個 TableEnvironment 對象。

  • 如果你調用 table 對象上的方法,那麼也會映射到 Java 這邊,所以是一個一一映射的關係。

  • 基於這一套架構,我們可以得出一個結論:如果你用 Python Table API 寫出了一個作業,這個作業沒有 Python UDF 的時候,那麼這個作業的性能跟你用 Java 寫出來的作業性能是一樣的。因爲底層的架構都是同一套 Java 的架構。

剛剛我們介紹了 PyFlink 1.9 版本里面的 Python Table API ,也提到了 table 的接口上面提供了很多不同的算子,而且可以用這些算子去組合,實現不同的業務邏輯。但是對於這些算子來說,它的功能還無法滿足一些特定的情況,比如某些業務需要編寫一些自定義的邏輯,此時就需要強依賴 Python UDF,所以在 PyFlink 1.10 版本里面,提供了 Python UDF 並且提供了相應的依賴管理。

2.2、Python UDF & 依賴管理 (PyFlink 1.10)

■  1.Python UDF 架構

  • 如果你的作業是包含一個 Python UDF 的作業,那麼從提交的時候,就是左邊的架構圖,然後 deploy 到 Remote 端的時候,可以看到 Remote 端的架構圖分爲兩個部分。左邊部分是 Java 的 Operator,右邊部分是 Python 的 Operator。

  • 大體的流程我們可以大概看一下:

  1. 在 open 方法裏進行 Java Operator 和 Python Operator 環境的初始化。

  2. 環境初始化好之後,會進行數據處理。當 Java Operator 收到數據之後,先把數據放到一個 input buffer 緩衝區中,達到一定的閾值後,纔會 flash 到 Python 這邊。Python 處理完之後,也會先將數據放到一個結果的緩衝區中,當達到一定閾值,比如達到一定的記錄的行數,或者是達到一定的時間,纔會把結果 flush 到這邊。

  3. state 訪問的鏈路。

  4. logging 訪問的鏈路。

  5. metrics 彙報的鏈路。

■  2.Python UDF 的使用

  • PyFlink-1.9 版本中,Python API 中支持註冊使用 Java UDF,使用方法如下:可以調 TableEnvironment 上的 register_java_function 這個方法,有兩個參數,一個參數是給 UDF 的命名,第 2 個是 Java 類的路徑。

table_env.register_java_function("func1", "java.user.defined.function.class.name")

下面是一個例子:

  • Python UDF 的使用:

可以調 TableEnvironment 上的 register_function 這個方法,有兩個參數,一個參數是給 UDF 起的名字,第 2 個是 python_udf 的一個對象。

table_env.register_function("func1", python_udf)

下面是一個例子:

■  3.Python UDF 的定義方式

PyFlink 裏面也支持一些其他的方式去定義 UDF,我們可以看一下,總共有4種方式:

  1. 可以繼承 ScalaFunction 基類,並重寫 eval 方法。

  2. 直接定義一個 Named Function,然後再用 UDF 的簽名去聲明 UDF 的輸入類型和輸出類型。

  3. 也可以用剛剛例子裏面的 Lambda Function 的這種方式,來定義 Python UDF。

  4. 最後一種是 Callable Function 的方式。也是聲明其輸入和輸出的類型。

■  4.依賴管理

寫完 UDF 的時候,經常遇到一個問題,UDF 裏面可能會有一些依賴,如何去解決這些依賴問題呢?PyFlink 提供了 4 種依賴的 API,如下所示。

  • 依賴文件

如果 UDF 裏面依賴一個文件的話,可以用 add_python_file 加載依賴的文件的路徑,指定完之後,作業提交的時候,就會把這個文件分發到集羣,那麼在遠程執行的時候,你的 UDF 就可以去訪問這個文件。

table_env.add_python_file(file_path)
  • 依賴存檔(打包)文件

可能會去依賴一個存檔的文件,這個時候你可以用 add_python_archive 方法,傳入兩個參數。第 2 個參數是一個可選的參數。第 1 個參數表示對你存檔文件的重命名。如果調用了 API,那麼在 UDF 裏面就可以去訪問存檔文件裏面的所有文件。

table_env.add_python_archive("py_env.zip", "myenv")

# the files contained in the archive file can be accessed in UDF

def my_udf():

with open("myenv/py_env/data/data.txt") as f:

  • 依賴第三方項目

可以用 set_python_requirements 方法去指定你的第三方依賴。也是有兩個參數,第 1 個參數是傳一個文件,文件中寫了所依賴的第三方項目,以及它對應的版本。第 2 個參數是一個可選的參數,如果集羣是一個有網絡的環境,那麼第 2 個參數可以不填,當第 2 個參數不填的時候,作業提交開始初始化的時候, Python 就會去根據你的 requirements 文件裏面配置的依賴,自動的去網絡下載你的依賴,然後安裝。如果集羣是沒有網絡的,可以預先把這些依賴下載好,下載到 cached 的目錄裏面去。然後把目錄也一起提交到集羣,集羣拿到這個目錄會去安裝這些依賴。

# commands executed in shell

echo numpy==1.16.5 > requirements.txt

pip download -d cached_dir -r requirements.txt --no-binary :all:

# python code

table_env.set_python_requirements("requirements.txt", "cached_dir")

  • 指定 Python Interpreter 路徑

假設你的 Python UDF 運行的時候,會依賴某一個版本的 Python 解釋器。那麼這個時候可以去指定你所希望 Python UDF 運行的一個解釋器的路徑。

table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")

2.3、Pandas UDF & User-defined Metrics (PyFlink 1.11)

我們在 Pyflink 1.11 的版本里面提供了 Pandas UDF,還有用戶自定義的 Metrics。當然 Pyflink 1.11 版本里面,不光是這兩個功能,我這裏主要是介紹一下這兩個功能。Pyflink 1.11 版本也會即將在 2020 年的 6 月份進行發佈。

接下來會從功能和性能兩個角度來介紹一下 Pandas UDF。

■ 1.Pandas UDF – 功能

我們先來看一下功能方面,如果你要編寫一個 Pandas UDF,那麼跟剛纔定義普通 UDF 的形式基本上是一致的。這裏只需要去聲明一個 udf_type,指定爲 Pandas 就行了。

指定之後, UDF 運行起來的時候系統傳的 i 跟 j 就變成一個 pandas.Series 的數據結構。這個時候可以直接用 series 來進行操作。與此同時會有一個好處,就是我們拿到的是一個 pandas 的數據結構,可以調用 pandas 相關的一些庫函數,並且可以調用一些數值計算相關的庫函數,這樣可以極大的擴展功能。不需要再去實現一套邏輯。

■  2.Pandas UDF - 性能

那麼性能上 Pandas UDF 的好處,主要有兩點。

  1. 減少了調用的開銷,因爲剛剛說到了系統傳給 UDF 的是一個 pandas.series,它相當於是將多行的數據一次性的傳給了 UDF。而普通 UDF 處理多行時,每行都需要調用一次 UDF。所以對比可以發現,Pandas UDF 可以顯著減少 UDF 的調用開銷。

  2. 可以減少 UDF 的序列化和反序列化開銷。這裏具體介紹一下, Pandas UDF  是如何減少了序列化和反序列化的。

我們可以看一下右邊這個圖,左邊是 Java Operator,右邊是 Python Operator。假設 Operator 收到了一個 X,然後 X 在這裏會進行一個序列化,變成 arrow 的內存數據格式,這個時候用 X’ 來表示。那麼這個時候 Java 這邊會把 X’ 傳給 Python,Python 就可以直接來訪問 arrow 數據結構,因爲 pandas 底層的數據結構就是用 arrow 來表示的,所以這個時候不需要在 Python 進行反序列化,可以直接來操作 X’。然後在 X’ 加一之後,得到 Y’, Y’ 也是直接生成的 arrow 內存數據格式,這裏也不需要反序列化。那麼把 Y’ 傳到 Java 時,就需要進行一個反序列化。

我們可以發現,只需要在 Java 進行一個序列化和反序列化。Python 這邊可以省去了序列化和反序列化開銷。

而且這裏需要提出的一點是,如果你的輸入 X 也是一個 arrow 的內存數據格式,那麼 Java 這邊的序列化跟反序列化也是可以避免的。比如你的 source 是一個 Parquet Source,那麼它輸出的數據格式也是 arrow 數據格式,這個時候就可以避免掉 Java 的序列化和反序列化。所以,Pandas UDF 也是可以減少序列化反序列化的開銷。

■  3.User-defined Metrics

我們再來看一下用戶自定義 Metrics。

  • Metric 註冊

先來看一下 Metric 的註冊,Metric 註冊可以是在 metric_group 上調用對應的 Metric 方法來註冊。

  • Metric Scope

metric_group 還可以調用他的 add_group 方法去定義你的 Metric 的一個域,可以對 Metric 進行分類。

  • Metric 類型

目前 PyFlink 裏面提供的 Metric 類型有以下4種:

    • Counter

類似累加器。一開始需要在 open 方法裏面進行 Counter 的註冊,然後調用 match_group 上 Counter 方法,這裏我們給了一個 Metric 的名字叫 my_counter。定義完之後,就可以在 Eval 方法裏面進行使用。然後 Counter 可以提供 Inc 方法,你可以調用 Inc 進行相應的增加。 

    • Gauge

它是用來反映一個瞬時值。假設我們需要在 Metric 上顯示 length 值的變化情況。那麼我們需要用 Gauge 方法來註冊,名字是 my_gauge。第2個參數這裏需要注意它是一個 UDF ,我們需要返回要監控數值的值是什麼,返回這個值。然後在 Eval 方法裏或者其他 UDF 的調用裏可以改變這個值。框架底層就會不斷去彙報這個值當前值是多少。

    • Meter

Meter 這種 Metric 是表示當前這一秒往前一個時間區間內所有數值相加的一個均值。我們看可以調用 Meter 方法來註冊。第2個參數是一個默認的參數,默認是60秒,表示60秒內所有值的一個均值。這裏需要注意的是,Meter 每一秒都會去彙報當前這一秒往前60秒時間區間內,所有值的均值。可以用 Meter 的 mark_event 方法來彙報。

    • Distribution (sum/count/min/max/mean)

最後一種是 Distribution 的一個 Metric 類型,它對你的值能提供一些 sum/count/min/max/mean 等統計信息。可以調用 metric_group 上的 Distribution 這個方法。更新上可以調用 distribution.update。 

3. PyFlink 的 Demo 演示

接下來對這些核心功能做一些 Demo 的演示跟講解。此處我們提供了一個 playgrounds 的 git。主要是希望幫助大家更快速地熟悉 PyFlink 所有的功能及使用,並附上了相關代碼示例。 具體參考信息請見下方鏈接:

https://github.com/pyflink/playgrounds

4.PyFlink 社區扶持計劃

  • 爲什麼要發起 PyFlink 社區扶持計劃?

用戶逐漸變多、有經驗用戶少

https://survey.aliyun.com/apps/zhiliao/B5JOoruzY

初步審覈符合條件後我們會在收到問卷的 10 個工作日內與您聯繫。

  • 扶持目標

面向所有 PyFlink 社區企業用戶

  • PyFlink 問題支持&共享

如果你有一些相關的問題或者是其他的一些意見,可以發到社區的郵件列表裏面去。

點擊「 下方鏈接 」可回顧作者分享視頻~

https://ververica.cn/developers/flink-training-course1/

福利來了   

Apache Flink 極客挑戰賽

萬衆矚目的第二屆 Apache Flink 極客挑戰賽來啦!本次大賽全面升級,重量級助陣嘉賓專業指導,強大的資源配置供你發揮創意,還有 30w 豐厚獎金等你帶走~聚焦  Flink 與 AI 技術的應用實踐,挑戰疫情防控的世界級難題,你準備好了麼?

(點擊圖片可瞭解更多大賽信息)

點擊「 閱讀 原文 」即可報名

相關文章