供稿 |  eBay DSS Team

作者 | 田川曉陽

編輯 | 顧欣怡

本文4490字,預計閱讀時間14分鐘

更多幹貨請關注“eBay技術薈”公衆號

導讀

新一代數據開發分析平臺Zeta eBay DS S (Data Services and Solutions) 團隊自主研 ,主要針對在Spark SQL運行過程中可能存在的 性能隱患及Spark執行計劃圖的缺 ,提出相應解決方案,旨在降低Spark SQL優化門檻,助力eBay用戶解放分析效能,也希望對同業人員有所啓發和幫助

1. 背景介紹

eBay的大部分數據倉庫之前一直構建於商業數據倉庫系統Teradata之上,從 2017年 開始,eBay決定完全基於 Hadoop 平臺來構建數據倉庫,並開始了恢弘的數據搬遷之旅。然而隨着以 Spark 爲主要計算引擎的新開源平臺的落戶,複雜的數據訪問模式和調優分析以及未經結構化的數據展現等問題開始變成全量投產的瓶頸。

爲了解決這個困境, eBay DSS(Data Services & Solutions) 部門挺身而出,自主研發了Zeta——新一代數據開發分析平臺 ,旨在爲數據工程師、數據分析師和數據科學家提供跨平臺且涵蓋全生命週期的數據服務。平臺支持基於元數據的數據探索、大數據開發、數據測試、數據分析以及終端的數據可視化等功能。目前,平臺已累計服務超過 2000多名 用戶,現有 1000多名 數據工程師、分析師及產品經理高度依賴Zeta平臺來完成日常的數據開發處理和分析工作。

2. 痛點分析

隨着大量分析場景的湧入和使用的激增,目前Hadoop平臺每天要運行 上萬個 Job,處理超過 500PB 的數據。與此同時,低質量Spark SQL的 大量 提交導致性能問題屢屢發生,這嚴重影響了平臺性能,導致了資源浪費。然而,讓用戶自己對Spark SQL進行優化存在很高的技術門檻。對於缺少Spark經驗的使用者來說,進行自主排查並解決問題無疑是一項痛苦甚至不可能完成的任務。

爲了能夠儘早地發現性能隱患、降低SQL優化的門檻、助力eBay用戶解放分析效能,Zeta團隊基於以往的實踐對這些問題進行了一些探索並研發出了獨有的解決方案,接下來本文將對此進行詳細闡述。

3. 行業分析

3.1 關 於性能分析的行業解決方案

1)Doctor Elephant:作爲一個開源的Hadoop和Spark性能監控調優工具,主要是通過採集Spark進程運行過程中記錄下來的log來進行性能分析並給出建議;但是採集的數據粒度較粗,並且分析過程滯後,無法實時地看到Job的運行情況的分析,而且給出的建議是針對Spark Job運行過程中底層計算框架級別信息的,對於Spark SQL的Job來說 不具有針對性 ,還需要結合Spark內核知識來進行解讀。

2)Ganglia等監控工具:這類工具主要通過採集Spark Metrics和log等在運行過程中暴露出來的內部數據來進行可視化展示,以供用戶進行性能分析,但是缺點是 太過於底層還需要用戶自己做大量的統計分析工 作,不夠直接。

3)Spark UI中提供的SQL Physical Plan執行計劃圖:最貼近用戶實際需求,並且可以實時地看到Job運行的狀態,但是存在一些問題,下文會詳細描述。

3.2  現有Spark執行計劃圖的缺陷

Spark UI 中SQL的 物理執行計劃圖 是我們觀察Job運行狀態最直接的一個重要途徑,但這個物理執行計劃圖存在很多缺陷,比如缺失了很多和Spark進程內部情況相關的度量數據以及和JVM相關的重要信息,而這些缺失的信息都是幫助我們洞察性能瓶頸時不可或缺的線索。針對這種情況,Zeta團隊進行了一系列的優化改造。

其實Spark執行計劃圖的主要缺陷在於用戶很難直接從圖上提供的信息中找出性能瓶頸。在執 行Spark SQL的時候,Spark UI中存在兩個可以用來觀察分析自己Job運行情況的Tab,分別是 Task level的運行狀態圖 (圖1)和 SQL 的Physical Plan DAG (圖2):

圖1 Task Level

(點擊可查看大圖)

圖2 Physical Plan DAG

(點擊可查看大圖)

Task level的Tab內容更加偏底層一些,我們可以獲取到很多重要的數據信息,比如當前已處理的數據數量以及和JVM相關的重要數據。但問題是,在SQL特別複雜的時候很難將狀態信息和自己的SQL的邏輯計劃對應起來。 SQL Tab 展示了物理執行計劃圖,更貼近用戶的邏輯計劃圖,但是缺少Spark執行過程中偏底層的狀態信息,比如task粒度的信息。如果發生了數據傾斜,無法直接在這張圖上看出問題所在。

因此,在現有的Spark UI 佈局下,對於用戶來說,想要debug就必須在不同的圖之間反覆切換,同時還要結合Spark內核原理來分析狀態信息從而診斷自己Job的問題所在,這對大部分數據分析師和產品經理來說具有一定的學習成本。而且即便發現了問題,這部分狀態信息也不一定能給用戶提供解決問題的具體方向,Spark當前所暴露出來的內部信息滿足不瞭解決問題的需求。

4. Zeta 解決方案

4.1  核心問題

一句 SQL從解析到實際被Spark進程執行的過程中會歷經如下幾個階段:

圖3(點擊可查看大圖)

如圖3所示,SQL最終會被轉換成底層的RDD,整個任務執行的DAG圖會被分解成一個或者多個具有依賴關係的stage並最終以task爲執行單元發送到Spark Executor的進程中去執行。大部分情況下,到了這個階段就已經無法再從task執行的上下文中找出這個task到底是在執行SQL上對應的哪塊邏輯,因爲經過Codegen等一系列優化之後理論上已經無法在實際的物理執行過程和最初的邏輯計劃上建立映射關係。

對於數據傾斜這種情況,我們可以通過觀察Spark UI來進行判定。 如果某個stage執行了很長時間,其中少部分task處理的數據又比其他task多很多,那麼就證明出現了數據傾斜。 以多張表做join爲例,如果在shuffle的過程中產生了數據傾斜,爲了儘可能將數據分散到不同的進程中進行處理,從而達到平衡工作負載的目的,比較通用的有以下幾個方法:修改邏輯,將shuffle時的key儘可能打散;加大shuffle的分區數量從而使數據分散到更多的分區中去;單獨找出產生了極大傾斜的key,在邏輯中單獨處理最後再和其他部分union起來。

在準備開始解決這個問題之前,我們必須要回答兩個核心問題:

1)如何找出SQL邏輯中發生了傾斜的那個部分?

2)如果發生了傾斜,又該如何知道到底傾斜在了哪一些key上呢?

對於問題1),一般來說數據傾斜都發生在會產生shuffle的操作上,比如join和group by等操作。對Spark內核比較熟悉的用戶可以根據Spark UI上DAG的實時計劃圖大致推斷出對應在SQL上的邏輯操作。

而問題2)就需要用戶自己花額外的功夫來對錶的數據做統計分析,比如算出表中用來做join的字段中每個值的數量,並按照值的大小進行排序,由此可以觀察出主要有哪些造成傾斜最嚴重的值,從而進行鍼對優化。

如果這兩個問題Spark能夠自動在執行任務的時候解決,並通過一張圖的形式很直觀地表達出來,然後在用戶執行Spark SQL的過程中實時推送給用戶,那就可以即時地幫助用戶發現執行過程中存在的性能問題,也就能節省大量分析推斷和數據調研的時間,大大提高開發效率。

4.2  定 制化DAG圖

經過內部用戶的調研和分析,我們決定通過修改Spark源碼來滿足以上需求。在現有Spark UI的基礎上構建一張能同時包含具有這兩張圖關鍵特性的DAG圖,既要反映出實際Spark進程中的關鍵狀態信息,又要儘可能地幫助用戶,將出現問題的狀態映射到SQL中的某個邏輯塊上。

Spark SQL經過解析後的Physical Plan中,每個物理算子節點都實現了對RDD的轉換過程。所以當最終SQL的執行過程經過一系列轉換變成RDD的轉換過程後,一個物理算子就可以映射到RDD DAG圖中的某一段路徑上,然後根據RDD DAG圖劃分stage的規則,從而將stage映射到SQL Physical plan的某個或者多個相鄰節點上,再將Task level的狀態映射到Physical Plan的節點上了。

Task level的狀態信息非常豐富,包括輸入數據的大小以及和shuffle相關的狀態信息等,這些信息都是幫助判定是否在某個算子上發生了傾斜的重要線索。同時,SQL Physical Plan的物理算子中,有的算子恰好保留了和邏輯執行計劃及對應操作相關的上下文,如SortMergeJoinExec、ShuffledHashJoinExec算子保留了join操作時用到的key的上下文。因此,這樣一來就解決了4.1中的 問題1)

對於 問題2) ,因爲要計算key的值就必須引入額外的計算,而在實際的計算中key的基數又是很大的,所以爲了不對Job的整體性能造成過大的影響, 我們只需要計算Top N的那幾個key即可。 從優化傾斜的角度來講,我們往往只需要找到傾斜最嚴重的部分key就可以了,而且這些key應該也只是少數。

接下來需要考慮的,就是該如何插入這段計算邏輯,以及如何讓這段邏輯覆蓋大部分情況下的傾斜度計算。在Spark的shuffle write階段,其實writer寫的時候就已經遍歷了每一條數據,但是這個階段太抽象,想在該階段對數據進行統計計算並還原成實際處理數據的key並不是很容易,需要做非常多的workaround,這樣就不能直接地解決問題。

因此我們還是決定從 物理算子 入手,通過修改部分物理算子的算法,來達到在做原有計算邏輯的同時也對數據做統計計算。於是我們在這當中對少部分物理算子的算法做了較大的重構。由於做了重構設計,因此當動態計算key的功能啓動之時,這少部分原本支持Codegen的物理算子將無法支持Codegen。

在實際的計算過程中,如果開啓動態計算key的功能,將會爲每一個TaskSet創建一個定製化的AdvancedTaskSetManager,主要作用一是執行原有物理算子的邏輯,二是當發現某些task存在數據傾斜的時候,會額外啓動一個TaskSet’來執行統計計算的邏輯,如圖4所示:

圖4(點擊可查看大圖)

這個TaskSet’的執行邏輯和正常TaskSet的執行邏輯一樣,都是修改過算法後的執行邏輯。不同的是AdvancedTaskSetManager會爲這兩個TaskSet分別注入不通的TaskContext,從而控制實際Task 在Runtime中執行不同的邏輯分支,一部分進行正常的計算,另外一部分進行統計計算並將結果返回到Driver端進行聚合,從而達到統計彙總的目的。這個改動對Spark原有代碼有一定的侵入性。

好了,這下我們需要的運行時的數據都拿到了,接下來要做的就是構建這幅圖了。Spark在Driver的初始化進程中會創建一個Spark UI對象,Spark UI會啓動一個Jetty的web服務來供外部訪問,Driver內部的狀態存儲對象AppStatusStore會爲不同的Tab提供後端Render頁面的數據,運行時Spark UI內部的狀態如圖5所示:

圖5(點擊可查看大圖)

構建的第一步便是記錄構建圖形所需要的數據。Spark會在運行過程的某些邏輯中構建對應的事件,以便記錄上下文並異步發送到Spark Driver內部的消息總線LiveListenerBus中。而且會有特定的Listener在總線的隊列中監聽特定的事件,當SQL被解析完畢並且準備開始執行的時候,會發出 SparkListenerSQLExecutionStart 事件。該事件中包含了SQL的物理計劃執行圖,像SparkPlanInfo,Driver內部的SQLAppStatusListener會監聽這個事件並根據SparkPlanInfo準備將來做SQL Physical Plan後端渲染的數據。

因此,我們在SQL執行前物理計劃樹的遍歷階段記錄下每個算子和對應RDD的上下文信息(圖6),並以事件的形式發送到消息總線中,再由我們定製化的Listener監聽捕捉並和已有的物理計劃圖進行整合即可(圖7)。

圖6(點擊可查看大圖)

圖7(點擊可查看大圖)

而在每個stage開始執行和執行完畢的時候,也會發出相應的事件,這些事件中就包含了上文提到的各種統計信息和額外被注入的診斷信息。因此這些事件也會被監聽,並用來update當前我們定製化的DAG圖的狀態。以一個實際生產中的案例作爲參考, 原始DAG圖 (圖8)和 經過定製化後的DAG圖 (圖9)分別如下所示:

圖8 社區版DAG圖

(點擊可查看大圖)

圖9 經過定製後的DAG圖

點擊可查看大圖)

此案例是 實際生產中 兩張表進行join的時候產生了數據傾斜,可見 社區版原始的DAG圖 中只展示了少量的和內存以及數據量等相關的信息,而這些數據並不足以幫助我們觀察出內在的性能問題。反觀 經過定製化的DAG圖 ,可以看到在這個階段運行過程中檢測到了數據傾斜,並且顯示傾斜發生在join操作上,還提示了join的字段爲user_id,並同時計算出了傾斜最嚴重的值爲1。而這個傾斜最嚴重的值在相關業務場景中其實是髒數據,於是用戶根據這個信息更改了SQL,將user_id爲1的數據filter掉了,從而大大減少了shuffle時候的數據量,大幅縮短了整體的運行時間,解決了內存溢出的問題。

5. 線上效果

5.1 減 少事故發生率

在數據平臺上上線這個定製化DAG圖的Spark版本後,來自數據分析師和產品經理的support request變少了,同時許多數據開發人員在Job正式上線到生產環境之前,會通過數據平臺運行生產中的Job並使用這個DAG圖來分析執行過程中的性能問題,從而提前採取措施來對Job進行優化,減少了生產中的事故。

5.2 性 能影響

需要注意的是,開啓了自動計算傾斜 key值後,Job性能會有一定的下降,最壞的情況下整體Job的執行時間相較於不開啓的時候多了 30% 左右,同時整體的CPU時間也提升了一些,性能影響最小的一個Job整體時間大概增加了 5% 。總體趨勢是當Spark開啓了DynamicAllocation之後,隨着數據量的增大,對性能的影響會逐漸加強,這主要是由於額外的統計分析計算消耗了CPU時間,而且由於需要關閉特定算子的Codegen,所以相對於之前會產生更多的虛函數調用。此外,爲了從Yarn申請更多的container來啓動Executor運行額外的TaskSet,也增加了更多的調度時間。

6. 未來優化方向及社區跟進

當前DAG圖的內部實現原理還有很多可以優化的地方,比如在計算極大傾斜的key的階段, 我們可以使用採樣的方法來替代全體掃描 。因爲對於大部分情況來說,傾斜的key往往是少部分,採樣雖然存在誤差,但是基本能夠找出發生了問題的key,這樣可以節省不少CPU時間 。同時也可以根據當前採集到的信息對SQL的物理執行過程進行動態優化 ,比如可以結合社區最新版本的AdaptiveExecution功能,做更加深入的動態調優從而提升整體的執行性能。在開發定製化DAG的時候 Spark 3.0 還沒有正式發佈,在3.0版本中社區對DAG圖也做了更進一步的優化和增強,未來會考慮與其進行整合。

您可能還感興趣:

實戰 | 利用Delta Lake使Spark SQL支持跨表CRUD操作

一探究竟 | eBay流量管理之DSR在基礎架構中的運用及優化

乾貨 | Rheos SQL: 高效易用的實時流式SQL處理平臺

分享 | “三高”產品設計的這些坑,你是不是也踩過?(上)

分享 | “三高”產品設計的這些坑,你是不是也踩過?(下)

一探究竟 | eBay流量管理之看不見的手

解密 | 一樁由數據潔癖引發的DNS懸案

:point_down:點擊 閱讀原文 ,一鍵投遞 

    eBay大量優質職位,等的就是你

相關文章