前言

不進行優化的代碼就是耍流氓。

總體來說大數據優化主要分爲三點,一是充分利用CPU,二是節省內存,三是減少網絡傳輸。

一、Hive/MapReduce調優

1.1 本地模式

Hive默認採用集羣模式進行計算,如果對於小數據量,可以設置爲單臺機器進行計算,這樣可以大大縮減查詢觸發任務時間。

用戶可以通過設置hive.exec.mode.local.auto 的值爲true,來讓Hive在適當的時候自動啓動這個優化。

set hive.exec.mode.local.auto=true; //開啓本地 mr
//設置 local mr 的最大輸入數據量,當輸入數據量小於這個值時採用 local mr 的方式,
默認爲 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//設置 local mr 的最大輸入文件個數,當輸入文件個數小於這個值時採用 local mr 的方
式,默認爲 4
set hive.exec.mode.local.auto.input.files.max=10;

1.2 null值過濾OR隨機分配null值

  • null值過濾

    對於key值傾斜,有的時候是無效的null導致的,這個時候可以考慮過濾掉。

    hive (default)> insert overwrite table jointable 
    select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = 
    o.id;
  • null值隨機分配

    如果null不是異常數據,那麼可以採用隨機分配將null值分到不同分區,解決數據傾斜。

    insert overwrite table jointable
    select n.* from nullidtable n full join ori o on 
    case when n.id is null then concat('hive', rand()) else n.id end = o.id;

1.3 Count(distinct)去重統計優化

對於大數據量去重,可以採用分組的方式進行優化。

hive (default)> select count(id) from (select id from bigtable group by id) a;

1.4 行列過濾

對關聯表進行過濾時,可以考慮在關聯時就進行過濾,提高查詢時間。

hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;

1.5 數據傾斜

小文件合併

set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

複雜文件增加Map數

增加 map 的方法爲:根據computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,調整 maxSize 最大值。讓 maxSize 最大值低於 blocksize 就可以增加 map 的個數。

設置最大切片值爲100個字節

hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
hive (default)> select count(*) from emp;

合理設置Reduce數

  • 調整 reduce 個數方法一

    (1)每個 Reduce 處理的數據量默認是 256MB

    hive.exec.reducers.bytes.per.reducer=256000000

    (2)每個任務最大的 reduce 數,默認爲 1009

    hive.exec.reducers.max=1009

    (3)計算 reducer 數的公式

  • 調整 reduce 個數方法二

    在 hadoop 的 mapred-default.xml 文件中修改,設置每個 job 的 Reduce 個數

    set mapreduce.job.reduces = 15;

1.6 並行執行

在共享集羣中設置併發執行可以提高運行速度。

set hive.exec.parallel=true; //打開任務並行執行
set hive.exec.parallel.thread.number=16; //同一個 sql 允許最大並行度,默認爲 8。

當然,得是在系統資源比較空閒的時候纔有優勢,否則,沒資源,並行也起不來。

1.7 嚴格模式

Hive 提供了一個嚴格模式,可以防止用戶執行那些可能意向不到的不好的影響的查詢。通過設置屬性 hive.mapred.mode 值爲默認是非嚴格模式 nonstrict 。開啓嚴格模式需要修改 hive.mapred.mode 值爲 strict,開啓嚴格模式可以禁止 3 種類型的查詢

1).對於分區表,除非 where 語句中含有分區字段過濾條件來限制範圍,否則不允許執行。

2).對於使用了 order by 語句的查詢,要求必須使用 limit 語句。

3).限制笛卡爾積的查詢。對關係型數據庫非常瞭解的用戶可能期望在執行 JOIN 查詢的時候不使用 ON 語句而是使用 where 語句,這樣關係數據庫的執行優化器就可以高效地將WHERE 語句轉化成那個 ON 語句。

1.8 JVM重用

在小文件場景或者task特別多的情況下,執行時間都很短。JVM重用可以使用同一個JVM在同一個Job裏面重複使用N次。N值在mapred-site.xml文件中進行配置。

<property>
 <name>mapreduce.job.jvm.numtasks</name>
 <value>10</value>
 <description>How many tasks to run per jvm. If set to -1, there is
 no limit. 
 </description>
</property>

缺點:JVM重用會一直佔用使用到的task插槽,以便進行重用,如果遇到了某個job裏面的reduce task分配不均勻,導致出現某幾個task佔用task時間很長,其它task空閒也沒法被其它job使用,只有所有的task都執行完後纔會釋放。

1.9 推測執行

對於某些耗時的任務,可以啓動推測執行,這樣就會把“拖後腿”的任務找出來,然後啓動個備份任務執行相同的數據。最後選出執行最快的爲最終結果。

設置開啓推測執行參數:Hadoop 的 mapred-site.xml 文件中進行配置:

<property>
 <name>mapreduce.map.speculative</name>
 <value>true</value>
 <description>If true, then multiple instances of some map tasks 
 may be executed in parallel.</description>
</property>
<property>
 <name>mapreduce.reduce.speculative</name>
 <value>true</value>
 <description>If true, then multiple instances of some reduce tasks 
 may be executed in parallel.</description>
</property>

不過 hive 本身也提供了配置項來控制 reduce-side 的推測執行:

<property>
 <name>hive.mapred.reduce.tasks.speculative.execution</name>
 <value>true</value>
 <description>Whether speculative execution for reducers should be turned on. 
</description>
 </property>

PS:對於時差要求很苛刻的建議關閉掉推測執行。對於執行很長的任務也不建議開啓,因爲會浪費很大資源。

1.10 HDFS小文件解決方案

1)Hadoop Archive:

是一個高效地將小文件放入 HDFS 塊中的文件存檔工具,它能夠將多個小文件打包成

一個 HAR 文件,這樣就減少了 namenode 的內存使用。

2)Sequence file:

sequence file 由一系列的二進制 key/value 組成,如果 key 爲文件名,value 爲文件內容,

則可以將大批小文件合併成一個大文件。

3)CombineFileInputFormat:

CombineFileInputFormat 是一種新的 inputformat,用於將多個文件合併成一個單獨的

split,另外,它會考慮數據的存儲位置。

二、Spark調優

2.1 性能監控方式

Spark Web UI

通過 http://master:4040我們可以獲得運行中的程序信息。

(1)stages和tasks調度情況;

(2)RDD大小和內存使用情況;

(3)系統環境信息;

(4)正在執行的executor信息;

設置歷史服務器記錄歷史信息:

(1)在$SPARK_HOME/conf/spark-env.sh中設置:

export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50 Dspark.history.fs.logDirectory=hdfs://master01:9000/directory"

說明:spark.history.retainedApplica-tions僅顯示最近50個應用。

spark.history.fs.logDirectory:Spark History Server頁面只顯示該路徑下的信息。

(2)$SPARK_HOME/conf/spark-defaults.conf

spark.eventLog.enabled true

spark.eventLog.dir hdfs://hadoop000:8020/directory #應用在運行過程中所有的信息均記錄在該屬性指定的路徑下

spark.eventLog.compress true

(3)HistoryServer 啓動

$SPARK_HOMR/bin/start-histrory-server.sh

(4)HistoryServer 停止

$SPARK_HOMR/bin/stop-histrory-server.sh

--同樣executor的logs也是查看的一個出處:

Standalone 模式:$SPARK_HOME/logs

YARN 模式:在 yarn-site.xml 文件中配置了 YARN 日誌的存放位置:yarn.nodemanager.log-dirs,或使用命令獲取 yarn logs -applicationId。

其它監控工具

Nmon

Jmeter

Jprofiler

2.2 調優要點

內存調優要點

1.對象佔內存,優化數據結構

(1)使用對象數組以及原始類型(primitive type)數組以替代 Java 或 者 Scala 集合類(collection class)。fastutil 庫爲原始數據類型提供了非常方便的集合類,且兼容 Java 標準類庫。

(2)儘可能地避免採用含有指針的嵌套數據結構來保存小對象。

(3)考慮採用數字 ID 或者枚舉類型以便替代 String 類型的主鍵。

(4)如果內存少於 32GB,設置 JVM 參數-XX:+UseCom-pressedOops以便將 8 字節指針修改成 4 字節。與此同時,在 Java 7 或者更高版本,設置 JVM 參數-XX:+UseC-----ompressedStrings 以便採用 8 比特來編碼每一個 ASCII 字符。

2.頻繁 GC 或者 OOM

針對這種情況,首先要確定現象是發生在 Driver 端還是在 Executor 端,然後在分別處理。

Driver 端:通常由於計算過大的結果集被回收到 Driver 端導致,需要調大 Driver 端的內存解決,或者進一步減少結果集的數量。

Executor 端:

(1)以外部數據作爲輸入的 Stage:可以增加 partition 的數量(即 task 的數量)來減少每個 task 要處理的數據,來減少 GC 的可能性。

(2)以 shuffle 作爲輸入的 Stage:解決數據傾斜問題。

開啓推測機制

在 spark-default.conf 中添加:spark.speculation true

推測機制與以下幾個參數有關:

  1. spark.speculation.interval 100:檢測週期,單位毫秒;
  2. spark.speculation.quantile 0.75:完成 task 的百分比時啓動推測;
  3. spark.speculation.multiplier 1.5:比其他的慢多少倍時啓動推測。

數據傾斜優化

  • 查找數據傾斜代碼

    根據shuffler確定數據傾斜代碼,然後通過隨機取樣找到傾斜數據。

    val sampledPairs = pairs.sample(false, 0.1)
    val sampledWordCounts = sampledPairs.countByKey()
    sampledWordCounts.foreach(println(_))

緩解/消除數據傾斜

避免數據源傾斜

比如數據源是Kafka,通常一個分區對應一個Task,所以如果分區數據不均衡,則導致spark處理不均衡。

比如數據源是Hive,如果Hive數據不均衡,也會導致Spark數據傾斜。

解決方案是預處理或者其它。

調整並行度

比如reduceByKey(1000)。如果是group by,join需要設置參數即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200,對於很多場景來說有點過小。設置完後不同的key就能分到不同的task去處理。

將join中的shuffler避免掉

針對一個大表一個小表的join操作,使用廣播變量將較小的數據進行廣播,這樣就可以把join改爲map操作。

兩階段聚合

針對RDD執行ReduceByKey等聚合shuffler算子,以及Spark Sql執行GroupByKey等聚合算子,針對數據傾斜,可以先在key前面打上隨機前綴,進行聚合,然後再把前綴去掉進行聚合,有效解決值分配不均勻問題。

示例如下:

// 第一步,給 RDD 中的每個 key 都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
 new PairFunction<Tuple2<Long,Long>, String, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
 throws Exception {
 Random random = new Random();
 int prefix = random.nextInt(10);
 return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
 }
 });
// 第二步,對打上隨機前綴的 key 進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
 new Function2<Long, Long, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Long call(Long v1, Long v2) throws Exception {
 return v1 + v2;
 }
 });
// 第三步,去除 RDD 中每個 key 的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
 new PairFunction<Tuple2<String,Long>, Long, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
 throws Exception {
 long originalKey = Long.valueOf(tuple._1.split("_")[1]);
 return new Tuple2<Long, Long>(originalKey, tuple._2);
 }
 });
// 第四步,對去除了隨機前綴的 RDD 進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
 new Function2<Long, Long, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Long call(Long v1, Long v2) throws Exception {
 return v1 + v2;
 }
 });

兩階段聚合案例

  1. 通過如下 SQL,將 id 爲 9 億到 9.08 億共 800 萬條數據的 id 轉爲9500048 或者 9500096,其它數據的 id 除以 100 取整。從而該數據集中,id 爲 9500048 和 9500096 的數據各 400 萬,其它 id 對應的數據記錄數均爲 100 條。這些數據存於名爲 test 的表中。
  2. 對於另外一張小表 test_new,取出 50 萬條數據,並將 id(遞增且唯一)除以 100 取整,使得所有 id 都對應 100 條數據。
  3. 通過如下操作,實現傾斜 Key 的分散處理:
  4. 將 leftRDD 中傾斜的 key(即 9500048 與 9500096)對應的數據單獨過濾出來,且加上 1 到 24 的隨機前綴,並將前綴與原數據用逗號分隔(以方便之後去掉前綴)形成單獨的 leftSkewRDD。
  5. 將 rightRDD 中傾斜 key 對應的數據抽取出來,並通過 flatMap 操作將該數據集中每條數據均轉換爲 24 條數據(每條分別加上 1 到 24 的隨機前綴),形成單獨的 rightSkewRDD。
  6. 將 leftSkewRDD 與 rightSkewRDD 進行 Join,並將並行度設置爲 48,且 在 Join 過 程 中 將 隨 機 前 綴 去 掉 , 得 到 傾 斜 數 據集的 Join 結 果skewedJoinRDD。
  7. 將 leftRDD 中不包含傾斜 Key 的 數 據 抽 取 出 來 作 爲 單 獨 的leftUnSkewRDD。
  8. 對 leftUnSkewRDD 與原始的 rightRDD 進行 Join,並行度也設置爲 48,得到 Join 結果 unskewedJoinRDD。
  9. 通過 union 算子將 skewedJoinRDD 與 unskewedJoinRDD 進行合併,從而得到完整的 Join 結果集。

具體實現代碼如下:

public class SparkDataSkew{
 public static void main(String[] args) {
 int parallelism = 48;
 SparkConf sparkConf = new SparkConf();
 sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
 sparkConf.set("spark.default.parallelism", parallelism + "");
 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
 JavaPairRDD<String, String> leftRDD = 
javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
 .mapToPair((String row) -> {
 String[] str = row.split(",");
 return new Tuple2<String, String>(str[0], str[1]);
 });
 JavaPairRDD<String, String> rightRDD = 
javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
 .mapToPair((String row) -> {
 String[] str = row.split(",");
 return new Tuple2<String, String>(str[0], str[1]);
 });
 String[] skewedKeyArray = new String[]{"9500048", "9500096"};
 Set<String> skewedKeySet = new HashSet<String>();
 List<String> addList = new ArrayList<String>();
 for(int i = 1; i <=24; i++) {
 addList.add(i + "");
 }
 for(String key : skewedKeyArray) {
 skewedKeySet.add(key);
 }
 Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
 Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);
 JavaPairRDD<String, String> leftSkewRDD = leftRDD
 .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
 .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new 
Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));
 JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> 
tuple) -> skewedKeys.value().contains(tuple._1()))
 .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
 .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
 .collect(Collectors.toList())
 .iterator()
 );
 JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
 .join(rightSkewRDD, parallelism)
 .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, 
String>(tuple._1().split(",")[1], tuple._2()._2()));
 JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> 
tuple) -> !skewedKeys.value().contains(tuple._1()));
 JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, 
parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, 
String>(tuple._1(), tuple._2()._2()));
 skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, 
String>> iterator) -> {
 AtomicInteger atomicInteger = new AtomicInteger();
 iterator.forEachRemaining((Tuple2<String, String> tuple) -> 
atomicInteger.incrementAndGet());
 });
 javaSparkContext.stop();
 javaSparkContext.close();
 } 
}

大表隨機添加 N 種隨機前綴,小表擴大 N 倍

過濾少數導致傾斜的 key

2.3 Shuffle調優

調優概述

代碼開發,資源分配和數據傾斜是重中之重,除此之外,Shuffler作爲一個補充,也需要學習下。

shuffler相關參數調優

  • spark.shuffle.file.buffer

    默認值:32K

    參數說明:緩衝大小,超過緩衝大小纔會寫入磁盤。

    調優建議:如果作業可用的內存資源較爲充足的話,可以適當增加這個參數的大小(),從而減少 shuffle write 過程中溢寫磁盤文件的次數,也就可以減少磁盤 IO 次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5%的提升。

  • spark.reducer.maxSizeInFlight

    默認值:48m

    參數說明:這個 buffer 緩衝決定了每次能夠拉取多少數據。

    調優建議:如果作業可用的內存資源較爲充足的話,可以適當增加這個參數的大小(比如 96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

  • spark.shuffle.io.maxRetries

    默認值:3

    參數說明:拉去失敗重試次數。

    調優建議:對於那些包含了特別耗時的 shuffle 操作的作業,建議增加重試最大次數(比如 60 次),以避免由於 JVM 的 full gc 或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的 shuffle 過程,調節該參數可以大幅度提升穩定性。

  • spark.shuffle.io.retryWait

    默認值:5s

    參數說明:重試拉取數據的等待時間,默認是5s。

    調優建議:建議加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性。

  • spark.shuffle.memoryFraction

    默認值:0.2

    參數說明:分配給聚合操作的內存比例,默認是20%。

  • spark.shuffle.manager

默認值:sort

2.4 程序開發調優

原則一:避免創建重複的 RDD

對同一個數據源不要建立多個RDD。

原則二:儘可能複用同一個 RDD

數據有包含關係的RDD能重用的就重用。

原則三:對多次使用的RDD進行持久化

每次你對RDD執行算子操作時,都會從源頭處重新計算一遍,所以一般會採取持久化方式,這樣就直接從內存取了。

對多次使用的RDD進行持久化示例:

// 如果要對一個 RDD 進行持久化,只要對這個 RDD 調用 cache()和 persist()即可。
// 正確的做法。
// cache()方法表示:使用非序列化的方式將 RDD 中的數據全部嘗試持久化到內存中。
// 此時再對 rdd1 執行兩次算子操作時,只有在第一次執行 map 算子時,纔會將這個 rdd1 從源頭處計
算一次。
// 第二次執行 reduce 算子時,就會直接從內存中提取數據進行計算,不會重複計算一個 rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手動選擇持久化級別,並使用指定的方式進行持久化。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER 表示,內存充足時優先持久化到內存中,
//內存不充足時持久化到磁盤文件中。
// 而且其中的_SER 後綴表示,使用序列化的方式來保存 RDD 數據,此時 RDD 中的每個 partition
//都會序列化成一個大的字節數組,然後再持久化到內存或磁盤中
// 序列化的方式可以減少持久化的數據對內存/磁盤的佔用量,進而避免內存被持久化數據佔用過多,
//從而發生頻繁 GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
 .persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

原則四:儘量避免使用shuffle類算子

開發過程中,能避免則儘可能避免使用 reduceByKey、join、distinct、repartition 等會進行 shuffle 的算子,儘量使用 map 類的非shuffle 算子。這樣的話,沒有 shuffle 操作或者僅有較少 shuffle 操作的Spark 作業,可以大大減少性能開銷。

Broadcast 與 map 進行 join 代碼示例:

// 傳統的 join 操作會導致 shuffle 操作。
// 因爲兩個 RDD 中,相同的 key 都需要通過網絡拉取到一個節點上,由一個 task 進行 join 操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map 的 join 操作,不會導致 shuffle 操作。
// 使用 Broadcast 將一個數據量較小的 RDD 作爲廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在 rdd1.map 算子中,可以從 rdd2DataBroadcast 中,獲取 rdd2 的所有數據。
// 然後進行遍歷,如果發現 rdd2 中某條數據的 key 與 rdd1 的當前數據的 key 是相同的,
//那麼就判定可以進行 join。
// 此時就可以根據自己需要的方式,將 rdd1 當前數據與 rdd2 中可以連接的數據,
//拼接在一起(String 或 Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建議僅僅在 rdd2 的數據量比較少(比如幾百 M,或者一兩 G)的情況下使用。
// 因爲每個 Executor 的內存中,都會駐留一份 rdd2 的全量數據。

原則五:使用map-side預聚合的shuffle操作

如果因爲業務需要,一定要使用 shuffle 操作,無法用 map 類的算子來替代,那麼儘量使用可以 map-side 預聚合的算子。

使用reduceByKey,aggregateByKey代替groupByKey,因爲reduceByKey和aggregateByKey會進行預聚合,groupByKey不會。

原則六:使用高性能的算子

使用 reduceByKey/aggregateByKey 替代 groupByKey,詳情見“原則五:使用 map-side 預聚合的 shuffle 操作”。

使用 mapPartitions 替代普通 map。

使用 filter 之後進行 coalesce 操作。

使用 repartitionAndSortWithinPartitions 替代 repartition 與 sort 類操作。

原則七:廣播大變量

有時在開發過程中,會遇到需要在算子函數中使用外部變量的場景,那麼此時就應該使用 Spark的廣播(Broadcast)功能來提升性能。因爲如果不使用廣播變量,那麼每個任務會拉取數據並創建一個副本,這樣會大大增加網絡開銷,並佔用系統內存。如果使用廣播變量的話,數據就會保留一份。

廣播大變量代碼示例:

// 以下代碼在算子函數中,使用了外部的變量。
// 此時沒有做任何特殊操作,每個 task 都會有一份 list1 的副本。
val list1 = ...
rdd1.map(list1...)
// 以下代碼將 list1 封裝成了 Broadcast 類型的廣播變量。
// 在算子函數中,使用廣播變量時,首先會判斷當前 task 所在 Executor 內存中,是否有變量副本。
// 如果有則直接使用;如果沒有則從 Driver 或者其他 Executor 節點上遠程拉取一份放到本地 Executor
內存中。
// 每個 Executor 內存中,就只會駐留一份廣播變量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原則八:使用 Kryo 優化序列化性能

代碼示例:

// 創建 SparkConf 對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器爲 KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 註冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原則九:優化數據結構

Java 中,有三種類型比較耗費內存:

1.對象。 2.集合類型,比如HashMap,LinedList等。3.字符串,每個字符串內部都有一個字符數組以及長度等額外信息。

2.5 運行資源調優

在spark-submit調節資源參數來提高資源利用率。

  • num-executors

    參數說明:設置spark作業總共用多少個executor來執行。

    參數調優建議:每個spark作業一般設置50~100個左右的Executor進程比較合適。太小不能充分利用資源,太大隊列無法提供足夠的資源。

  • executor-memory

    參數說明:設置每個Executor進程的內存。

    參數調優建議:每個 Executor 進程的內存設置 4G~8G 較爲合適。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors 乘 以 executor-memory,是不能超過隊列的最大內存量的。此外,如果你是跟團隊裏其他人共享這個資源隊列,那麼申請的內存量最好不要超過資源隊列最大總內存的 1/3~1/2,避免你自己的 Spark 作業佔用了隊列所有的資源,導致別的同學的作業無法運行。

  • executor-cores

    參數說明:設置每個Executor進程CUP core數量。因爲每個cpu core一個時間只能執行一個task,所以cpu core數量越多,執行速度越快。

    參數調優建議:Executor 的 CPU core 數量設置爲 2~4 個較爲合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core 限制是多少,再依據設置的 Executor 數量,來決定每個 Executor進程可以分配到幾個 CPU core。同樣建議,如果是跟他人共享這個隊列,那 麼 num-executors * executor-cores 不 要 超 過 隊 列 總 CPU core 的1/3~1/2 左右比較合適,也是避免影響其他同學的作業運行。

  • driver-memory

    參數說明:設置Driver進程的內存。

    參數調優建議:Driver 的內存通常來說不設置,或者設置 1G 左右應該就夠了。

  • spark.default.parallelism

    參數說明:該參數用於設置每個 stage 的默認 task 數量。這個參數極爲重要,如果不設置可能會直接影響你的 Spark 作業性能。

    參數調優建議:Spark 作業的默認 task 數量爲 500~1000 個較爲合適。很 多 同 學 常 犯 的 一 個 錯 誤 就 是 不 去 設 置 這 個 參 數 , 那 麼 此 時 就 會 導 致Spark 自己根據底層 HDFS 的 block 數量來設置 task 的數量,默認是一個HDFS block 對應一個 task。通常來說,Spark 默認設置的數量是偏少的(比如就幾十個 task),如果 task 數量偏少的話,就會導致你前面設置好的Executor 的參數都前功盡棄。試想一下,無論你的 Executor 進程有多少個,內存和 CPU 有多大,但是 task 只有 1 個或者 10 個,那麼 90%的 Executor進程可能根本就沒有 task 執行,也就是白白浪費了資源!因此 Spark 官網建議的設置原則是,設置該參數爲 num-executors * executor-cores 的 2~3倍較爲合適,比如 Executor 的總 CPU core 數量爲 300 個,那麼設置 1000個 task 是可以的,此時可以充分地利用 Spark 集羣的資源。

  • spark.storage.memoryFraction

    參數說明:設置持久化數據在Executor佔比,默認是0.6。

    根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。

    參數調優建議:根據實際,可以適當提高,讓數據寫入內存。

  • spark.shuffle.memoryFraction

    參數說明:該參數用於設置 shuffle 過程中一個 task 拉取到上個 stage的 task 的輸出後,進行聚合操作時能夠使用的 Executor 內存的比例,默認是 0.2。

    參數調優建議:如果 Spark 作業中的 RDD 持久化操作較少,shuffle 操作較多時,建議降低持久化操作的內存佔比,提高 shuffle 操作的內存佔比比例,避免 shuffle 過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由於頻繁的 gc 導致運行緩慢,意味着 task執行用戶代碼的內存不夠用,那麼同樣建議調低這個參數的值。

    資源參數的調優,沒有一個固定的值,需要同學們根據自己的實際情況(包括 Spark 作業中的 shuffle 操作數量、RDD 持久化操作數量以及 spark web ui 中顯示的作業 gc 情況),同時參考給出的原理以及調優建議,合理地設置上述參數。

    資源參數參考示例:

    以下是一份 spark-submit 命令的示例,大家可以參考一下,並根據自己的實際情況進行調節。

    ./bin/spark-submit \
    --master yarn-cluster \
    --num-executors 100 \
    --executor-memory 6G \
    --executor-cores 4 \
    --driver-memory 1G \
    --conf spark.default.parallelism=1000 \
    --conf spark.storage.memoryFraction=0.5 \
    --conf spark.shuffle.memoryFraction=0.3 \

三、Flink調優

3.1 Backpressure調優

  • web.backpressure.cleanup-interval

    說明:當啓動反壓數據採集後,獲取反壓前等待時間,默認是60s。

  • web.backpressure.delay-between-samples:Stack Trace

    說明:抽樣到確認反壓狀態之間的時延,默認爲50ms。

  • web.backpressure.num-samples

    說明:設定Stack Trace抽樣數以確定反壓狀態,默認爲100。

3.2 Checkpointing優化

通過調整Checkpointing之間的時間間隔進行優化。

val env=StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(milliseconds)

3.3 狀態數據壓縮

目前可用的壓縮算法是Snappy,設置如下:

val env=StreamExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig
config.setUseSnapshotCompression(true)

3.4 Flink內存優化

Flink,Spark等大數據引擎都實現了自己的內存管理,有效解決JVM內存溢出問題。

  • JobManager配置

    jobmanager.heap.size:設定JobManager堆內存大小,默認爲1024MB。
  • TaskManager配置

    TaskManager作爲Flink集羣中的工作節點,所有任務的計算邏輯均執行在TaskManager之上,因此對TaskManager內存配置顯得尤爲重要,可以通過以下參數配置對TaskManager進行優化和調整。

    taskmanager.heap.size

    說明:設定TaskManager堆內存大小,默認值爲1024M,如果在Yarn的集羣中,TaskManager取決於Yarn分配給TaskManager Container的內存大小,且Yarn環境下一般會減掉一部分內存用於Container的容錯。

    taskmanager.jvm-exit-on-oom

    說明:設定TaskManager是否會因爲JVM發生內存溢出而停止,默認爲false,當TaskManager發生內存溢出時,也不會導致TaskManager停止。

    taskmanager.memory.size

    說明:設定TaskManager內存大小,默認爲0,如果不設定該值將會使用taskmanager.memory.fraction作爲內存分配依據。

    taskmanager.memory.fraction

    說明:設定TaskManager堆中去除Network Buffers內存後的內存分配比例。該內存主要用於TaskManager任務排序、緩存中間結果等操作。例如,如果設定爲0.8,則代表TaskManager保留80%內存用於中間結果數據的緩存,剩下20%的內存用於創建用戶定義函數中的數據對象存儲。注意,該參數只有在taskmanager.memory.size不設定的情況下才生效。

    taskmanager.memory.off-heap

    說明:設置是否開啓堆外內存供Managed Memory或者Network Buffers使用。

    taskmanager.memory.preallocate

    說明:設置是否在啓動TaskManager過程中直接分配TaskManager管理內存。

    taskmanager.numberOfTaskSlots

    說明:每個TaskManager分配的slot數量。

3.5 設定Network內存比例

taskmanager.network.memory.fraction

說明:JVM中用於Network Buffers的內存比例。

taskmanager.network.memory.min

說明:最小的Network Buffers內存大小,默認爲64MB。

taskmanager.network.memory.max

說明:最大的Network Buffers內存大小,默認1GB。

taskmanager.memory.segment-size

說明:內存管理器和Network棧使用的Buffer大小,默認爲32KB。

3.6 堆內存調優

默認Flink使用的Parallel Scavenge的垃圾回收器,可以改用G1垃圾回收器。

啓動參數:

env.java.opts= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:+PrintGCDetails

  • -XX:MaxGCPauseMillis:設置允許的最大GC停頓時間,默認是200ms。

  • -XX:G1HeapRegionSize:每個分區的大小,默認值會根據整個堆區的大小計算出來,範圍是1M~32M,取值是2的冪,計算的傾向是儘量有2048個分區數。

  • -XX:MaxTenuringThreshold=n:晉升到老年代的“年齡”閾值,默認值爲15。

  • -XX:InitiatingHeapOccupancyPercent:一般會簡寫IHOP,默認是45%,這個佔比跟併發週期的啓動相關,當空間佔比達到這個值時,會啓動併發週期。如果經常出現FullGC,可以調低該值,今早的回收可以減少FullGC的觸發,但如果過低,則併發階段會更加頻繁,降低應用的吞吐。

  • -XX:G1NewSizePercent:年輕代最小的堆空間佔比,默認是5%。

  • -XX:G1MaxNewSizePercent:年輕代最大的堆空間佔比,默認是60%。

  • -XX:ConcGCThreads:併發執行的線程數,默認值接近整個應用程序數的1/4。

  • -XX:-XX:G1HeapWastePercent:允許的浪費空間的佔比,默認是5%。如果併發標記可回收的空間小於5%,則不會拋出MixedGC。

  • -XX:G1MixedGCCountTarget:一次全局併發標記之後,後續最多執行的MixedGC次數。默認值是8。

    系列傳送門

相關文章