摘要:在Join的過程中,未使用Distribute By 的兩個表,將各自的數據隨機分配到executor上,當join的時候就會出現,跨executor的數據需要互相遷移進行匹配的情況,從而引起更大量的Shuffle。但如果我們將join的連接字段通過Distribute By 重分區,神奇的一幕將發生:需要互相匹配的數據都會在相同的executor上,從而避免跨executor的數據遷移,所有的匹配都在統一個executor上進行,這樣就大大減少了Shuffle。

01

引言

首先我們從 兩種計算引擎的基本架構圖來分析Spark/MapReduce的性能

1.1 M apReduce

 

處理效率低效 :

  • Map/Reduce任務中間結果寫磁盤,多個MR之間通過HDFS交換數據; 任務調度和啓動開銷大;

  • 一條SQL語句經常被拆分成多個Application,數據在多個Application之間只能通過讀寫HDFS交換;

  • 無法充分利用內存;

1.2 Spark

1.2.1高 (比MapReduce快幾倍到幾十倍) 

  • 內存計算引擎,提供Cache機制來支持需要反覆迭代計算或者多次數據共享,減少數據讀取的IO開銷,另外爲了解決純內存計算帶來的數據可靠性,引入了Checkpoint機制;

  • DAG引擎,減少多次計算之間中間結果寫到HDFS的開銷;

  • Executor使用線程池模型來減少task啓動開銷,shuffle過程中避免 不必要的sort操作以及減少磁盤IO操作。

02

影響Spark任務快慢的因素

2.1因子

  • 數據量 (GB級 vs TB級)

  • 數據組織形式 (存儲結構,壓縮算法,數據Schema[Map等複雜結構])

  • 小文件 (很多的KB級,10M級)

  • 內存 (內存偏少,大量溢寫)

  • Core個數 (併發量小,shuffle等待)

  • 算法 (聚合因子,過濾條件,SQL組織形式 )

  • 發生大量的Shuffle (可用Broadcast替換)

  • 是否使用緩存 (將經常訪問的數據緩存在Executor內存中) 

2.2.Spark基本流程圖

 

  • Driver (AppMaster) 負責DAG,Task調度

  • Yarn負責計算資源分配

  • Worker上的Executor負責計算

Worker節點纔是計算執行的地方,所以性能優化重點將研究Executor的優化。

2.3Executor

建議不要自己配置Executor個數,使用動態分配模式:

概念:

根據當前的負載動態的增加或者刪除Executor,這樣做的 好處 在於:

在業務組的隊列資源 (vcore, memory) 資源恆定額情況下,能更好的均衡各個業務的對資源的佔用,也就是對於一個計算量較小的任務不用佔用太多資源。而對於一個計算量較大的任務,也能從集羣中獲取相對較多的資源。

而採用指定模式,則會導致任務在獲取足夠多 (可通過參數設置比例) 的Executor之前一直處於等待狀態,而這通常會浪費計算資源。

Executor動態分配模型

 

ExecutorAllocationManager內部會定時根據工作負載計算所需的Executor數量:

  • 如果任務對Executor需求數量大於之前向集羣管理器申請的Executor數量,那麼向Yarn申請添加Executor;

  • 如果任務對Executor需求數量小於之前向集羣管理器申請的Executor數量,那麼向Yarn申請取消部分Executor;

  • ExecutorAllocationManager內部還會定時向Yarn申請移除 (殺死) 過期的Executor;

2.4Core

spark.executor.cores

建議:

  • 建議executor的cpu core數量設置爲2 ~ 4個比較合適;

  • 在隊列有大量任務提交的情況下,還要更少,以免影響其他用戶提交的任務因申請不到cpu (vcore) 資源而卡住。

2.5Memory

spark.executor.memory

建議:

每個Executor的每個core分配的內存設置4g較爲合適。

用戶設置該值的時候需要考慮如下影響因子:

  • 自己使用的executor-memory * num-executor所使用的資源不能超過所提交隊列的閾值;

  • 在隊列資源共用的模式下,所申請的資源還要更小,以免申請不到資源或者阻塞其他用戶的任務;

  • 用戶申請的executor-momory不能超過yarn設置的最大值,當前設置的最大值爲60g;

Spark內存區分存儲內存&計算內存

 

spark.storage.memoryFraction

控制存儲內存在整個內存中的比例

StorageMemory:

RDD cache, RDD Broadcast等等內容

根據應用的不同可自己動態調整,但通常情況下不需要調整。使用默認值即可,下圖展示的是Storage內存與Execution的 內存動態調節機制

 

2.6 Shuffle並行度

spark.default.parallelism

該參數用於設置每個stage的默認task數量,這個參數極爲重要,如果不設置可能會直接影響你的任務性能。 (只有在處理RDD時纔會起作用,對Spark SQL無效)

建議:

500 ~ 1000較爲合適,通常情況設置爲executor-memory * num-executor 乘積的2~3倍較爲合適;

spark.sql.shuffle.partitions

用於配置join 或聚合操作shuffle數據時使用的分區數 (對sparks SQL專用的設置)

2.7 存儲結構

目前HADOOP中常用的數據存儲結構包括:

  • Text (行式存儲)

  • CSV (行式存儲)

  • RCFile (列式存儲)

  • ORC (列式存儲)

  • Parquet (列式存儲)

目前Spark默認存儲的格式爲Parquet。 下圖展示的是相同數據以不同存儲結構存儲,存儲文件的Size對比:

 

2.8 列式存儲的好處

  • 查詢的時候不需要掃描全部的數據,而只需要讀取每次查詢涉及的列。 這樣可以將I/O消耗降低N倍,另外可以保存每一列的統計信息 (minmax、sum等) ,實現部分的謂詞下推;

  • 由於每一列的成員都是同構的,可以針對不同的數據類型使用更高效的數據壓縮算法,進一步減小I/O;

  • 由於每一列的成員的同構性,可以使用更加適合CPU pipeline的編碼方式,減小CPU的緩存失效;

  • 由於列式存儲數據量更小,Spark的Task讀取數據的時間更短,不光節省計算資源,還節省存儲資源。

 

列式存儲的向量化操作,相對於行式存儲一行一行的操作,列式存儲可做到 一個batch的操作,這樣的操作方式極大的 提升了運算性能

 

2.9 壓縮方式(gzip, bzip2, lzo, snappy)

案例1:

Snappy壓縮前後比例爲3:1

 

對於Spark任務來說,壓縮的數據帶來的好處是顯而易見的:

  • 大幅節省內存

  • 大幅節省磁盤

  • 大幅節省數據讀取時間

各種壓縮格式特性

 

Spark設置壓縮格式 (例如設置snappy壓縮)

各種壓縮格式性能對比

 

2.10 輸入小文件

案例1:

 

  • 小文件太多,導致每個task讀取的數據量較小,計算的時間很短;

  • 執行的時間不足以彌補JVM啓動的時間。

調節每個task任務的輸入數據大小

2.11 Shuffle&&輸出小文件

某個表的hdfs文件如下:

輸出表的分區下有20個小文件。

由於集羣中NameNode節點需要維護文件的元數據信息,太多的輸出小文件會給集羣的NameNode帶來巨大的壓力;

控制task個數

方案1:

使用hint將會使得輸入數據進行重新Repartition,調節最終task的個數以及輸出文件的個數。

通過重分區將減少或者增大分區數量以達到增加或減少task的數量,從而增大或者減少Task輸出的文件個數。

2.12 SQL自身-下推 (PushDownPredicate)

 

上圖中:

  • 方式1從磁盤中讀取出所有的數據,在內存中過濾;

  • 方式2,3將過濾從內存中下推到磁盤,在掃描磁盤的數據的時候就過濾掉數據。

概念:

所謂下推是指將過濾儘可能地下沉到數據源端,從而避免從磁盤讀取不必要數據。

下推與不下推性能對比 (DataBricks官方)

 

通過上圖可知,60%的下推比不下推的性能提高了2~18倍;

2.12.1謂詞下推的限制:

  • 只有operator 包含的所有expression都是確定性的時候纔可以下推, 比如 rand 表達式等;

  • Filter 的字段必須要在group by 的維度字段裏面,舉個例子:

1)下面的聚合是 可以謂詞下推 的:

2)下面的聚合是 不可以謂詞下 推的:

案例 1:

Partition是String類型,此處是Int類型,導致下推失敗。

 

案例 2:

Regexp是非確定的,導致下推失敗

 

2.13 SQL自身-廣播

Spark:

對該變量進行廣播。廣播後的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。

這樣的話,可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對Executor內存的佔用開銷,降低GC的頻率;

可適當調整廣播變量大小的閾值,使得稍微大一些的數據也能被廣播:

 

2.14 SQL自身-緩存

將數據緩存在內存中,需要 遵循的原則爲:

  • 數據重複使用

  • 重新生成這部分數據的代價昂貴

SQL語句:

權衡cache與否的代價,不cache則多次使用同一份數據都需要重新計算一次。

Cache則只會計算一次,但是會佔用executor的內存資源,那是否應該cache就是把計算RDD,從hdfs上獲取數據的時間資源與緩存數據的內存資源之間進行權衡。

 

  rdd1,rdd2不需要緩存

rdd可以緩存,rdd1,rdd2不需要緩存

2.15 表結構嵌套字段(Map,Array )

案例1:

 

這張表是業務用戶的表結構。

用戶行爲數據以Json形式上報,由於表的結構實在太過複雜。Column字段存在大量的Map結構,分析層面很難通過簡單的SQL語句來分析這行數據,只能以讀取HDFS,在代碼層面來做數據分析。

案例2:

Spark SQL 處理嵌套類型數據時,存在以下問題:

1)讀取大量不必要的數據:

對於嵌套數據類型的字段,如下圖中的Map 類型的people 字段,往往只需要讀取其中的子字段,如people.age。

卻需要將整個Map 類型的people 字段全部讀取出來然後抽取出people.age 字段。這會引入大量的無意義的IO 開銷。如果是幾百個Key,這也就意味着IO 被放大了幾十至幾百倍;

2)無法進行向量化讀取:

而向量化讀能極大的提升性能。Spark 不支持包含嵌套數據類型的向量化讀取,這極大地影響了包含嵌套數據類型的查詢性能;

3)不支持 Filter 下推:

Spark 不支持嵌套類型字段上的Filter 的下推;

4)重複計算:

JSON 字段,在Spark SQL 中以String 類型存在,嚴格來說不算嵌套數據類型。不過實踐中也常用於保存不固定的多個字段,在查詢時通過JSON Path 抽取目標子字段。

而大型JSON 字符串的字段抽取非常消耗CPU。對於熱點表,頻繁重複抽取相同子字段非常浪費資源。

2.16 Distribute By / Cluster By

好處:

在Join的過程中,未使用Distribute By 的兩個表,將各自的數據隨機分配到executor上,當join的時候就會出現,跨executor的數據需要互相遷移進行匹配的情況,從而引起更大量的Shuffle。

但如果我們將join的連接字段通過Distribute By 重分區,神奇的一幕將發生:需要互相匹配的數據都會在相同的executor上,從而避免跨executor的數據遷移,所有的匹配都在統一個executor上進行,這樣就大大減少了Shuffle。

 

使用方法:

 

投稿 | 大數據平臺

編輯 | sea

排版 | sea

往期推薦

在看點一下 大家都知道

相關文章