摘要:当 shuffle reduce task 的数量小于等于bypassMergeThreshold 参数的值时(默认为200),就会启用bypass机制。注意,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。

Spark shuffle 调优

Spark 基于内存进行计算,擅长迭代计算,流式处理,但也会发生shuffle 过程。shuffle 的优化,以及避免产生 shuffle 会给程序提高更好的性能。因为 shuffle 的性能优劣直接决定了整个计算引擎的性能和吞吐量。

下图是官方的说明,1.2 版本之后默认是使用 sort shuffle 。这样会更加高效得利用内存。之前版本默认是 hash shuffle。

Spark shuffle 调优

SortShuffleManager 的运行机制主要分成两种,一种是普通运行机制,另一种是 bypass 运行机制。当 shuffle reduce task 的数量小于等于bypassMergeThreshold 参数的值时(默认为200),就会启用bypass机制。

Spark shuffle 调优

我们看下面,图 1-1 是 spark shuffle 过程的普通机制。

Spark shuffle 调优

图 1-1 sort shuffle 普通机制

看图说话,map task 的计算结果会写入一个内存数据结构中,这个数据结构根据算子,如使用 reduceByKey 这类聚合算子的话,这个内存结构是 Map, 一边通过 Map聚合,一边写入内存;如是使用 join 这类普通算子的话,这个内存结构是 Array,直接写入内存。这个内存结构默认大小是 5 M。

在 shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过 5 M 时,比如现在内存结构中的数据是 5.01 M 那么它会申请 5.01*2-5 = 5.02 M 的内存给内存结构,如果成功不会发生溢写,不成功则会发生溢写。

在溢写之前,会根据 key 对内存结构的数据进行排序,然后分批写入磁盘。每一批默认是 10000 条数据。也就是排序好的数据会以每批1 w 条数据的形式写入磁盘。写入磁盘时,由 Java 的 BufferedOutputStream 来实现的,作为缓冲流,现将数据写入缓冲区,等待缓冲区满了再溢出到磁盘,这样减少了磁盘的 IO ,提高了写的性能。

task 完成会后,会将所有的磁盘文件进行一次 Merge 成为一个磁盘文件,所以一个 task 只对应一个磁盘文件,但是还要为下游的 stage 提供数据,所以还要有一个索引文件,其中标识了下游的各个 task 的数据在磁盘文件中的 start offset 和 end offset。

上面提到了,下游的 stage 需要去寻找上一个 stage 产生的数据,也就是所谓的 shuffle 文件寻址。可以看我这篇文章:Spark 的 shuffle 文件寻址流程

bypass 机制,先来看一张流程图 1-2,与上面的普通机制进行对比。

Spark shuffle 调优

图 1-2 bypass 机制

前面也提到了,bypass 的触发条件是 shuffle reduce task 的数量小于我们设置的 bypassMergeThreshoold 参数。(默认是 200)

通过对比普通机制,可以看出 bypass 机制不会进行排序的过程。shuffle write 过程不会对数据进行排序,这样的话,就节省了这部分的性能开销。

大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。

因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。

注意,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。

后续的文章将会依次从代码开发、资源参数、数据倾斜方面展开。本文只讲解 shuffle 参数的调优。

spark.shuffle.file.buffer

参数:默认是 32 k。

说明:表示写入磁盘文件之前缓冲区的大小。

建议:如果资源充足,可以适当按倍数增加,比如 64 k, 从而减少 shuffle write 过程中溢写到磁盘文件的系数,减少磁盘 IO 次数,进而提升性能。1-5%

spark.reducer.maxSizeInFlight

参数:默认是 48 M。

说明:表示 shuffle read 过程拉取数据的 buffer 大小。

建议:如果资源充足,可以适当按倍数增加,比如 96 M, 从而减少拉取次数,减少网络传输的次数,进而提升性能。1-5%

spark.shuffle.io.maxRetries

参数:默认是 3。

说明:表示拉取数据的时候,执行失败重试的时间间隔。

建议:如果一个作业的 shuffle 过程特别耗时,可以加大该参数,比如 60 次,以避免由于 JVM 的 full gc 或者网络原因造成数据拉取失败。

spark.shuffle.io.retryWait

参数:默认是 5 s。

说明:表示拉取数据的时候,重试的最大时长。如果超过这个次数还没有拉取成功,这个任务就会失败。

建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.manager

参数:默认是 sort。

说明:用于设置 ShuffleManager 的类型。

建议:如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。

spark.shuffle.sort.bypassMergeThreshold

参数:默认是 200。

说明:用于设置ShuffleManager的类型。

建议:如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。

spark.shuffle.consolidateFiles

参数:默认是 false。

说明:如果使用 HashShuffleManager,该参数有效。如果设置为true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

更多参数,官方上都有说明:

Spark shuffle 调优

注意,影响一个Spark作业性能的因素,主要还有代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。可以关注我后续的文章。

如果对您有帮助,欢迎点好看、关注、转发。

相关文章