摘要: 上週四在 Flink 中文社區釘釘羣中直播分享了《Demo: 基於 Flink SQL 構建流式應用》,直播內容偏向實戰演示。 這篇文章是對直播內容的一個總結,並且改善了部分內容,比如除 Flink 外其他組件全部採用 Docker Compose 安裝,簡化準備流程。 讀者也可以結合視頻和本文一起學習。

完整分享可以觀看視頻回顧:

https://www.bilibili.com/video/av90560012

Flink 1.10.0 於近期剛發佈,釋放了許多令人激動的新特性。尤其是 Flink SQL 模塊,發展速度非常快,因此本文特意從實踐的角度出發,帶領大家一起探索使用 Flink SQL 如何快速構建流式應用。

本文將基於 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建一個電商用戶行爲的實時分析應用。 本文所有的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。 本實戰演練的最終效果圖:

準備

一臺裝有 Docker 和 Java8 的 Linux 或 MacOS 計算機。

使用 Docker Compose 啓動容器

本實戰演示所依賴的組件全都編排到了容器中,因此可以通過 docker-compose 一鍵啓動。 你可以通過 wget 命令自動下載該 docker-compose.yml 文件,也可以手動下載。

mkdir flink-demo; cd flink-demo;

wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml

該 Docker Compose 中包含的容器有:

  • DataGen 數據生成器。 容器啓動後會自動開始生成用戶行爲數據,併發送到 Kafka 集羣中。 默認每秒生成 1000 條數據,持續生成約 3 小時。 也可以更改 docker-compose.yml 中 datagen 的 speedup 參數來調整生成速率(重啓 docker compose 才能生效)。

  • MySQL 集成了 MySQL 5.7 ,以及預先創建好了類目表(category),預先填入了子類目與頂級類目的映射關係,後續作爲維表使用。

  • Kafka 主要用作數據源。 DataGen 組件會自動將數據灌入這個容器中。

  • Zookeeper Kafka 容器依賴。

  • Elasticsearch 主要存儲 Flink SQL 產出的數據。

  • Kibana 可視化 Elasticsearch 中的數據。

在啓動容器前,建議修改 Docker 的配置,將資源調整到 4GB 以及 4核。 啓動所有的容器,只需要在 docker-compose.yml 所在目錄下運行如下命令。

docker-compose up -d

該命令會以 detached 模式自動啓動 Docker Compose 配置中定義的所有容器。 你可以通過 docker ps 來觀察上述的五個容器是否正常啓動了。 也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運行正常。

另外可以通過如下命令停止所有的容器:

docker-compose down

下載安裝 Flink 本地集羣

我們推薦用戶手動下載安裝 Flink,而不是通過 Docker 自動啓動 Flink。 因爲這樣可以更直觀地理解 Flink 的各個組件、依賴、和腳本。

1. 下載 Flink 1.10.0 安裝包並解壓(解壓目錄 flink-1.10.0): https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz

2. 進入 flink-1.10.0 目錄: cd flink-1.10.0。

3. 通過如下命令下載依賴 jar 包,並拷貝到 lib/ 目錄下,也可手動下載和拷貝。 因爲我們運行時需要依賴各個 connector 實現。

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar | \

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \

wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

4. 將 conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因爲我們會同時運行多個任務。

5. 執行 ./bin/start-cluster.sh,啓動集羣。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。 並且可以看到可用 Slots 數爲 10 個。

6. 執行 bin/sql-client.sh embedded 啓動 SQL CLI。 便會看到如下的松鼠歡迎界面。

 

使用 DDL 創建 Kafka 表

Datagen 容器在啓動後會往 Kafka 的 user_behavior topic 中持續不斷地寫入數據。 數據包含了2017年11月27日一天的用戶行爲(行爲包括點擊、購買、加購、喜歡),每一行表示一條用戶行爲,以 JSON 的格式由用戶ID、商品ID、商品類目ID、行爲類型和時間組成。 該原始數據集來自阿里雲天池公開數據集,特此鳴謝。

我們可以在 docker-compose.yml 所在目錄下運行如下命令,查看 Kafka 集羣中生成的前10條數據。

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'

{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}

{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}

...

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic 了。 在 Flink SQL CLI 中執行該 DDL。

CREATE TABLE user_behavior (

user_id BIGINT,

item_id BIGINT,

category_id BIGINT,

behavior STRING,

ts TIMESTAMP(3),

proctime as PROCTIME(), -- 通過計算列產生一個處理時間列

WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成爲事件時間列

) WITH (

'connector.type' = 'kafka', -- 使用 kafka connector

'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本

'connector.topic' = 'user_behavior', -- kafka topic

'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取

'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址

'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址

'format.type' = 'json' -- 數據源格式爲 json

);

如上我們按照數據的格式聲明瞭 5 個字段,除此之外,我們還通過計算列語法和 PROCTIME() 內置函數聲明瞭一個產生處理時間的虛擬列。 我們還通過 WATERMARK 語法,在 ts 字段上聲明瞭 watermark 策略(容忍5秒亂序), ts 字段因此也成了事件時間列。 關於時間屬性以及 DDL 語法可以閱讀官方文檔瞭解更多:

  • 時間屬性:

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html

  • DDL:

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table

在 SQL CLI 中成功創建 Kafka 表後,可以通過 show tables; 和 describe user_behavior; 來查看目前已註冊的表,以及表的詳細信息。 我們也可以直接在 SQL CLI 中運行 SELECT * FROM user_behavior; 預覽下數據(按q退出)。

接下來,我們會通過三個實戰場景來更深入地瞭解 Flink SQL 。

統計每小時的成交量

使用 DDL 創建 Elasticsearch 表

我們先在 SQL CLI 中創建一個 ES 結果表,根據場景需求主要需要保存兩個數據: 小時、成交量。

CREATE TABLE buy_cnt_per_hour (

hour_of_day BIGINT,

buy_cnt BIGINT

) WITH (

'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector

'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本

'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址

'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相當於數據庫的表名

'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當於數據庫的庫名

'connector.bulk-flush.max-actions' = '1', -- 每條數據都刷新

'format.type' = 'json', -- 輸出數據格式 json

'update-mode' = 'append'

);

我們不需要在 Elasticsearch 中事先創建 buy_cnt_per_hour 索引,Flink Job 會自動創建該索引。

提交 Query

統計每小時的成交量就是每小時共有多少 "buy" 的用戶行爲。 因此會需要用到 TUMBLE 窗口函數,按照一小時切窗。 然後每個窗口分別統計 "buy" 的個數,這可以通過先過濾出 "buy" 的數據,然後 COUNT(*) 實現。

INSERT INTO buy_cnt_per_hour

SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)

FROM user_behavior

WHERE behavior = 'buy'

GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

這裏我們使用 HOUR 內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了 INSERT INTO將 query 的結果持續不斷地插入到上文定義的 es 結果表中(可以將 es 結果表理解成 query 的物化視圖)。另外可以閱讀該文檔瞭解更多關於窗口聚合的內容: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows

在 Flink SQL CLI 中運行上述查詢後,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,因此會一直運行。

可以看到凌晨是一天中成交量的低谷。

使用 Kibana 可視化結果

我們已經通過 Docker Compose 啓動了 Kibana 容器,可以通過 http://localhost:5601 訪問 Kibana。 首先我們需要先配置一個 index pattern。 點擊左側工具欄的 "Management",就能找到 "Index Patterns"。 點擊 "Create Index Pattern",然後通過輸入完整的索引名 "buy_cnt_per_hour" 創建 index pattern。 創建完成後, Kibana 就知道了我們的索引,我們就可以開始探索數據了。

先點擊左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛創建的索引中的內容。

接下來,我們先創建一個 Dashboard 用來展示各個可視化的視圖。 點擊頁面左側的"Dashboard",創建一個名爲 ”用戶行爲日誌分析“ 的Dashboard。 然後點擊 "Create New" 創建一個新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,並保存爲”每小時成交量“。

統計一天每10分鐘累計獨立用戶數

另一個有意思的可視化是統計一天中每一刻的累計獨立用戶數(uv),也就是每一刻的 uv 數都代表從0點到當前時刻爲止的總計 uv 數,因此該曲線肯定是單調遞增的。

我們仍然先在 SQL CLI 中創建一個 Elasticsearch 表,用於存儲結果彙總數據。 主要有兩個字段: 時間和累積 uv 數。

CREATE TABLE cumulative_uv (

time_str STRING,

uv BIGINT

) WITH (

'connector.type' = 'elasticsearch',

'connector.version' = '6',

'connector.hosts' = 'http://localhost:9200',

'connector.index' = 'cumulative_uv',

'connector.document-type' = 'user_behavior',

'format.type' = 'json',

'update-mode' = 'upsert'

);

爲了實現該曲線,我們可以先通過 OVER WINDOW 計算出每條數據的當前分鐘,以及當前累計 uv(從0點開始到當前行爲止的獨立用戶數)。 uv 的統計我們通過內置的 COUNT(DISTINCT user_id)來完成,Flink SQL 內部對 COUNT DISTINCT 做了非常多的優化,因此可以放心使用。

CREATE VIEW uv_per_10min AS

SELECT

MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,

COUNT(DISTINCT user_id) OVER w AS uv

FROM user_behavior

WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

這裏我們使用 SUBSTR 和  DATE_FORMAT 還有 || 內置函數,將一個 TIMESTAMP 字段轉換成了 10分鐘單位的時間字符串,如: 12:10, 12:20。關於 OVER WINDOW 的更多內容可以參考文檔: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations

我們還使用了 CREATE VIEW 語法將 query 註冊成了一個邏輯視圖,可以方便地在後續查詢中對該 query 進行引用,這有利於拆解複雜 query。 注意,創建邏輯視圖不會觸發作業的執行,視圖的結果也不會落地,因此使用起來非常輕量,沒有額外開銷。 由於 uv_per_10min 每條輸入數據都產生一條輸出數據,因此對於存儲壓力較大。 我們可以基於 uv_per_10min 再根據分鐘時間進行一次聚合,這樣每10分鐘只有一個點會存儲在 Elasticsearch 中,對於 Elasticsearch 和 Kibana 可視化渲染的壓力會小很多。

INSERT INTO cumulative_uv

SELECT time_str, MAX(uv)

FROM uv_per_10min

GROUP BY time_str;

提交上述查詢後,在 Kibana 中創建 cumulative_uv 的 index pattern,然後在 Dashboard 中創建一個"Line"折線圖,選擇 cumulative_uv 索引,按照如下截圖中的配置(左側)畫出累計獨立用戶數曲線,並保存。

頂級類目排行榜

最後一個有意思的可視化是類目排行榜,從而瞭解哪些類目是支柱類目。 不過由於源數據中的類目分類太細(約5000個類目),對於排行榜意義不大,因此我們希望能將其歸約到頂級類目。 所以筆者在 mysql 容器中預先準備了子類目與頂級類目的映射數據,用作維表。

在 SQL CLI 中創建 MySQL 表,後續用作維表查詢。

CREATE TABLE category_dim (

sub_category_id BIGINT, -- 子類目

parent_category_id BIGINT -- 頂級類目

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3306/flink',

'connector.table' = 'category',

'connector.driver' = 'com.mysql.jdbc.Driver',

'connector.username' = 'root',

'connector.password' = '123456',

'connector.lookup.cache.max-rows' = '5000',

'connector.lookup.cache.ttl' = '10min'

);

同時我們再創建一個 Elasticsearch 表,用於存儲類目統計結果。

CREATE TABLE top_category (

category_name STRING, -- 類目名稱

buy_cnt BIGINT -- 銷量

) WITH (

'connector.type' = 'elasticsearch',

'connector.version' = '6',

'connector.hosts' = 'http://localhost:9200',

'connector.index' = 'top_category',

'connector.document-type' = 'user_behavior',

'format.type' = 'json',

'update-mode' = 'upsert'

);

第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 將該查詢註冊成一個視圖,簡化邏輯。維表關聯使用 temporal join 語法,可以查看文檔瞭解更多: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

CREATE VIEW rich_user_behavior AS

SELECT U.user_id, U.item_id, U.behavior,

CASE C.parent_category_id

WHEN 1 THEN '服飾鞋包'

WHEN 2 THEN '家裝家飾'

WHEN 3 THEN '家電'

WHEN 4 THEN '美妝'

WHEN 5 THEN '母嬰'

WHEN 6 THEN '3C數碼'

WHEN 7 THEN '運動戶外'

WHEN 8 THEN '食品'

ELSE '其他'

END AS category_name

FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C

ON U.category_id = C.sub_category_id;

最後根據 類目名稱分組,統計出 buy 的事件數,並寫入 Elasticsearch 中。

INSERT INTO top_category

SELECT category_name, COUNT(*) buy_cnt

FROM rich_user_behavior

WHERE behavior = 'buy'

GROUP BY category_name;

提交上述查詢後,在 Kibana 中創建 top_category 的 index pattern,然後在 Dashboard 中創建一個"Horizontal Bar"條形圖,選擇 top_category 索引,按照如下截圖中的配置(左側)畫出類目排行榜,並保存。

可以看到服飾鞋包的成交量遠遠領先其他類目。

Kibana 還提供了非常豐富的圖形和可視化選項,感興趣的用戶可以用 Flink SQL 對數據進行更多維度的分析,並使用 Kibana 展示出可視化圖,並觀測圖形數據的實時變化。

結尾

在本文中,我們展示瞭如何使用 Flink SQL 集成 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。 整個過程無需一行 Java/Scala 代碼,使用 SQL 純文本即可完成。 期望通過本文,可以讓讀者瞭解到 Flink SQL 的易用和強大,包括輕鬆連接各種外部系統、對事件時間和亂序數據處理的原生支持、維表關聯、豐富的內置函數等等。 希望你能喜歡我們的實戰演練,並從中獲得樂趣和知識!

作者介紹:

伍翀(雲邪),Apache Flink PMC member & Committer,阿里巴巴技術專家,北京理工大學碩士畢業。2015年加入阿里巴巴,從事 JStorm 的開發與設計。自2016年開始長期活躍於 Flink 社區,Flink/Blink SQL 模塊的核心開發之一。

Tips:點擊「 閱讀原文 」可查看全部直播彙總~

關注 Flink 中文社區,獲取更多技術乾貨

你也「 在看 」嗎?:point_down:

相關文章