技術資訊 | Spark sql優化案例分享
摘要:在Join的過程中,未使用Distribute By 的兩個表,將各自的數據隨機分配到executor上,當join的時候就會出現,跨executor的數據需要互相遷移進行匹配的情況,從而引起更大量的Shuffle。但如果我們將join的連接字段通過Distribute By 重分區,神奇的一幕將發生:需要互相匹配的數據都會在相同的executor上,從而避免跨executor的數據遷移,所有的匹配都在統一個executor上進行,這樣就大大減少了Shuffle。
01
引言
首先我們從 兩種計算引擎的基本架構圖來分析Spark/MapReduce的性能 。
1.1 M apReduce
處理效率低效 :
-
Map/Reduce任務中間結果寫磁盤,多個MR之間通過HDFS交換數據; 任務調度和啓動開銷大;
-
一條SQL語句經常被拆分成多個Application,數據在多個Application之間只能通過讀寫HDFS交換;
-
無法充分利用內存;
1.2 Spark
1.2.1高 效 (比MapReduce快幾倍到幾十倍)
-
內存計算引擎,提供Cache機制來支持需要反覆迭代計算或者多次數據共享,減少數據讀取的IO開銷,另外爲了解決純內存計算帶來的數據可靠性,引入了Checkpoint機制;
-
DAG引擎,減少多次計算之間中間結果寫到HDFS的開銷;
-
Executor使用線程池模型來減少task啓動開銷,shuffle過程中避免 不必要的sort操作以及減少磁盤IO操作。
02
影響Spark任務快慢的因素
2.1因子
-
數據量 (GB級 vs TB級)
-
數據組織形式 (存儲結構,壓縮算法,數據Schema[Map等複雜結構])
-
小文件 (很多的KB級,10M級)
-
內存 (內存偏少,大量溢寫)
-
Core個數 (併發量小,shuffle等待)
-
算法 (聚合因子,過濾條件,SQL組織形式 )
-
發生大量的Shuffle (可用Broadcast替換)
-
是否使用緩存 (將經常訪問的數據緩存在Executor內存中)
2.2.Spark基本流程圖
-
Driver (AppMaster) 負責DAG,Task調度
-
Yarn負責計算資源分配
-
Worker上的Executor負責計算
Worker節點纔是計算執行的地方,所以性能優化重點將研究Executor的優化。
2.3Executor
建議不要自己配置Executor個數,使用動態分配模式:
概念:
根據當前的負載動態的增加或者刪除Executor,這樣做的 好處 在於:
在業務組的隊列資源 (vcore, memory) 資源恆定額情況下,能更好的均衡各個業務的對資源的佔用,也就是對於一個計算量較小的任務不用佔用太多資源。而對於一個計算量較大的任務,也能從集羣中獲取相對較多的資源。
而採用指定模式,則會導致任務在獲取足夠多 (可通過參數設置比例) 的Executor之前一直處於等待狀態,而這通常會浪費計算資源。
Executor動態分配模型
ExecutorAllocationManager內部會定時根據工作負載計算所需的Executor數量:
-
如果任務對Executor需求數量大於之前向集羣管理器申請的Executor數量,那麼向Yarn申請添加Executor;
-
如果任務對Executor需求數量小於之前向集羣管理器申請的Executor數量,那麼向Yarn申請取消部分Executor;
-
ExecutorAllocationManager內部還會定時向Yarn申請移除 (殺死) 過期的Executor;
2.4Core
spark.executor.cores
建議:
-
建議executor的cpu core數量設置爲2 ~ 4個比較合適;
-
在隊列有大量任務提交的情況下,還要更少,以免影響其他用戶提交的任務因申請不到cpu (vcore) 資源而卡住。
2.5Memory
spark.executor.memory
建議:
每個Executor的每個core分配的內存設置4g較爲合適。
用戶設置該值的時候需要考慮如下影響因子:
-
自己使用的executor-memory * num-executor所使用的資源不能超過所提交隊列的閾值;
-
在隊列資源共用的模式下,所申請的資源還要更小,以免申請不到資源或者阻塞其他用戶的任務;
-
用戶申請的executor-momory不能超過yarn設置的最大值,當前設置的最大值爲60g;
Spark內存區分存儲內存&計算內存
spark.storage.memoryFraction
控制存儲內存在整個內存中的比例
StorageMemory:
RDD cache, RDD Broadcast等等內容
根據應用的不同可自己動態調整,但通常情況下不需要調整。使用默認值即可,下圖展示的是Storage內存與Execution的 內存動態調節機制 。
2.6 Shuffle並行度
spark.default.parallelism
該參數用於設置每個stage的默認task數量,這個參數極爲重要,如果不設置可能會直接影響你的任務性能。 (只有在處理RDD時纔會起作用,對Spark SQL無效)
建議:
500 ~ 1000較爲合適,通常情況設置爲executor-memory * num-executor 乘積的2~3倍較爲合適;
spark.sql.shuffle.partitions
用於配置join 或聚合操作shuffle數據時使用的分區數 (對sparks SQL專用的設置)
2.7 存儲結構
目前HADOOP中常用的數據存儲結構包括:
-
Text (行式存儲)
-
CSV (行式存儲)
-
RCFile (列式存儲)
-
ORC (列式存儲)
-
Parquet (列式存儲)
目前Spark默認存儲的格式爲Parquet。 下圖展示的是相同數據以不同存儲結構存儲,存儲文件的Size對比:
2.8 列式存儲的好處
-
查詢的時候不需要掃描全部的數據,而只需要讀取每次查詢涉及的列。 這樣可以將I/O消耗降低N倍,另外可以保存每一列的統計信息 (min 、 max、sum等) ,實現部分的謂詞下推;
-
由於每一列的成員都是同構的,可以針對不同的數據類型使用更高效的數據壓縮算法,進一步減小I/O;
-
由於每一列的成員的同構性,可以使用更加適合CPU pipeline的編碼方式,減小CPU的緩存失效;
-
由於列式存儲數據量更小,Spark的Task讀取數據的時間更短,不光節省計算資源,還節省存儲資源。
列式存儲的向量化操作,相對於行式存儲一行一行的操作,列式存儲可做到 一個batch的操作,這樣的操作方式極大的 提升了運算性能 。
2.9 壓縮方式(gzip, bzip2, lzo, snappy)
案例1:
Snappy壓縮前後比例爲3:1
對於Spark任務來說,壓縮的數據帶來的好處是顯而易見的:
-
大幅節省內存
-
大幅節省磁盤
-
大幅節省數據讀取時間
各種壓縮格式特性
Spark設置壓縮格式 (例如設置snappy壓縮) :
各種壓縮格式性能對比
2.10 輸入小文件
案例1:
-
小文件太多,導致每個task讀取的數據量較小,計算的時間很短;
-
執行的時間不足以彌補JVM啓動的時間。
調節每個task任務的輸入數據大小
2.11 Shuffle&&輸出小文件
某個表的hdfs文件如下:
輸出表的分區下有20個小文件。
由於集羣中NameNode節點需要維護文件的元數據信息,太多的輸出小文件會給集羣的NameNode帶來巨大的壓力;
控制task個數
方案1:
使用hint將會使得輸入數據進行重新Repartition,調節最終task的個數以及輸出文件的個數。
通過重分區將減少或者增大分區數量以達到增加或減少task的數量,從而增大或者減少Task輸出的文件個數。
2.12 SQL自身-下推 (PushDownPredicate)
上圖中:
-
方式1從磁盤中讀取出所有的數據,在內存中過濾;
-
方式2,3將過濾從內存中下推到磁盤,在掃描磁盤的數據的時候就過濾掉數據。
概念:
所謂下推是指將過濾儘可能地下沉到數據源端,從而避免從磁盤讀取不必要數據。
下推與不下推性能對比 (DataBricks官方)
通過上圖可知,60%的下推比不下推的性能提高了2~18倍;
2.12.1謂詞下推的限制:
-
只有operator 包含的所有expression都是確定性的時候纔可以下推, 比如 rand 表達式等;
-
Filter 的字段必須要在group by 的維度字段裏面,舉個例子:
1)下面的聚合是 可以謂詞下推 的:
2)下面的聚合是 不可以謂詞下 推的:
案例 1:
Partition是String類型,此處是Int類型,導致下推失敗。
案例 2:
Regexp是非確定的,導致下推失敗
2.13 SQL自身-廣播
Spark:
對該變量進行廣播。廣播後的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。
這樣的話,可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對Executor內存的佔用開銷,降低GC的頻率;
可適當調整廣播變量大小的閾值,使得稍微大一些的數據也能被廣播:
2.14 SQL自身-緩存
將數據緩存在內存中,需要 遵循的原則爲:
-
數據重複使用
-
重新生成這部分數據的代價昂貴
SQL語句:
權衡cache與否的代價,不cache則多次使用同一份數據都需要重新計算一次。
Cache則只會計算一次,但是會佔用executor的內存資源,那是否應該cache就是把計算RDD,從hdfs上獲取數據的時間資源與緩存數據的內存資源之間進行權衡。
rdd1,rdd2不需要緩存
rdd可以緩存,rdd1,rdd2不需要緩存
2.15 表結構嵌套字段(Map,Array )
案例1:
這張表是業務用戶的表結構。
用戶行爲數據以Json形式上報,由於表的結構實在太過複雜。Column字段存在大量的Map結構,分析層面很難通過簡單的SQL語句來分析這行數據,只能以讀取HDFS,在代碼層面來做數據分析。
案例2:
Spark SQL 處理嵌套類型數據時,存在以下問題:
1)讀取大量不必要的數據:
對於嵌套數據類型的字段,如下圖中的Map 類型的people 字段,往往只需要讀取其中的子字段,如people.age。
卻需要將整個Map 類型的people 字段全部讀取出來然後抽取出people.age 字段。這會引入大量的無意義的IO 開銷。如果是幾百個Key,這也就意味着IO 被放大了幾十至幾百倍;
2)無法進行向量化讀取:
而向量化讀能極大的提升性能。Spark 不支持包含嵌套數據類型的向量化讀取,這極大地影響了包含嵌套數據類型的查詢性能;
3)不支持 Filter 下推:
Spark 不支持嵌套類型字段上的Filter 的下推;
4)重複計算:
JSON 字段,在Spark SQL 中以String 類型存在,嚴格來說不算嵌套數據類型。不過實踐中也常用於保存不固定的多個字段,在查詢時通過JSON Path 抽取目標子字段。
而大型JSON 字符串的字段抽取非常消耗CPU。對於熱點表,頻繁重複抽取相同子字段非常浪費資源。
2.16 Distribute By / Cluster By
好處:
在Join的過程中,未使用Distribute By 的兩個表,將各自的數據隨機分配到executor上,當join的時候就會出現,跨executor的數據需要互相遷移進行匹配的情況,從而引起更大量的Shuffle。
但如果我們將join的連接字段通過Distribute By 重分區,神奇的一幕將發生:需要互相匹配的數據都會在相同的executor上,從而避免跨executor的數據遷移,所有的匹配都在統一個executor上進行,這樣就大大減少了Shuffle。
使用方法:
投稿 | 大數據平臺
編輯 | sea
排版 | sea
往期推薦
在看點一下 大家都知道