经常使用 Apache Spark 从 Kafka 读数的同学肯定会遇到这样的问题:某些 Spark 分区已经处理完数据了,另一部分分区还在处理数据,从而导致这个批次的作业总消耗时间变长;甚至导致 Spark 作业无法及时消费 Kafka 中的数据。为了简便起见,本文讨论的 Spark Direct 方式读取 Kafka 中的数据,这种情况下 Spark RDD 中分区和 Kafka 分区是一一对应的,更多的细节请参见官方文档,这里就不介绍。

那么有没有办法解决这个问题呢?我们先来看看社区是咋解决这个问题。

SPARK-22056 这个 issue 正好提出了这个问题,并给出了一种解决方案。也就是修改了 KafkaRDD 类的 getPartitions 方法:

Spark 从 Kafka 读数设置子并发度问题

原实现:

override def getPartitions: Array[Partition] = {

offsetRanges.zipWithIndex.map { case (o, i) =>

val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))

new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)

}.toArray

}

修改后的实现:

override def getPartitions: Array[Partition] = {

val subconcurrency = if (kafkaParams.contains("topic.partition.subconcurrency"))

kafkaParams.getOrElse("topic.partition.subconcurrency

相关文章