LiveListenerBus继承了SparkListenerBus,并实现了将事件异步投递给监听器,达到实时刷新UI界面数据的效果。LiveListenerBus主要由以下部分组成:

eventQueue:是SparkListenerEvent事件的阻塞队列,队列大小可以通过Spark属性spark.scheduler.listenerbus.eventqueue.size进行配置,默认为10000(Spark早期版本中属于静态属性,固定为10000,这导致队列堆满时,只得移除一些最老的事件,最终导致各种问题与bug);started:标记LiveListenerBus的启动状态的AtomicBoolean类型的变量;stopped:标记LiveListenerBus的停止状态的AtomicBoolean类型的变量;droppedEventsCounter:使用AtomicLong类型对删除的事件进行计数,每当日志打印了droppedEventsCounter后,会将droppedEventsCounter重置为0;lastReportTimestamp:用于记录最后一次日志打印droppedEventsCounter的时间戳;processingEvent:用来标记当前正有事件被listenerThread线程处理;logDroppedEvent:AtomicBoolean类型的变量,用于标记是否由于eventQueue已满,导致新的事件被删除;eventLock:用于当有新的事件到来时释放信号量,当对事件进行处理时获取信号量;listeners:继承自LiveListenerBus的监听器数组;listenerThread:处理事件的线程。

异步事件处理线程

listenerThread用于异步处理eventQueue中的事件,为了便于说明,这里将展示listenerThread及LiveListenerBus中的主要代码片段,见代码清单1。

代码清单1 LiveListenerBus主要逻辑的代码片段

private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private def validateAndGetQueueSize(): Int = { val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) if (queueSize <= 0) { throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") } queueSize } private val started = new AtomicBoolean(false) private val stopped = new AtomicBoolean(false) private val droppedEventsCounter = new AtomicLong(0L) @volatile private var lastReportTimestamp = 0L private var processingEvent = false private val logDroppedEvent = new AtomicBoolean(false) private val eventLock = new Semaphore(0) private val listenerThread = new Thread(name) { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire() // 获取信号量 self.synchronized { processingEvent = true } try { val event = eventQueue.poll //从eventQueue中获取事件 if (event == null) { // Get out of the while loop and shutdown the daemon thread if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } postToAll(event) // 事件处理 } finally { self.synchronized { processingEvent = false } } } } } }

通过分析代码清单1,listenerThread的工作步骤为:

不断获取信号量(当可以获取信号量时,说明还有事件未处理);通过同步控制,将processingEvent设置为true;从eventQueue中获取事件;调用超类ListenerBus的postToAll方法(postToAll方法对监听器进行遍历,并调用SparkListenerBus的doPostEvent方法对事件进行匹配后执行监听器的相应方法);每次循环结束依然需要通过同步控制,将processingEvent设置为false;

值得一提的是,listenerThread的run方法中调用了Utils的tryOrStopSparkContext,tryOrStopSparkContext方法可以保证当listenerThread的内部循环抛出异常后启动一个新的线程停止SparkContext(SparkContext的内容将在第4章详细介绍,tryOrStopSparkContext方法的具体实现请阅读《附录A Spark2.1核心工具类Utils》)。

LiveListenerBus的消息投递

在解释了异步线程listenerThread的工作内容后,还有一个要点没有解释:eventQueue中的事件是如何放进去的呢?由于eventQueue定义在LiveListenerBus中,因此ListenerBus和SparkListenerBus中并没有操纵eventQueue的方法,要将事件放入eventQueue只能依靠LiveListenerBus自己了,其post方法就是为此目的而生的,见代码清单2。

代码清单2 向LiveListenerBus投递SparkListenerEvent事件

def post(event: SparkListenerEvent): Unit = { if (stopped.get) { logError(s"$name has already stopped! Dropping event $event") return } val eventAdded = eventQueue.offer(event) // 向eventQueue中添加事件 if (eventAdded) { eventLock.release() } else { onDropEvent(event) droppedEventsCounter.incrementAndGet() } // 打印删除事件数的日志 val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }

从代码清单2看到post方法的处理步骤如下:

判断LiveListenerBus是否已经处于停止状态;向eventQueue中添加事件。如果添加成功,则释放信号量进而催化listenerThread能够有效工作。如果eventQueue已满造成添加失败,则移除事件,并对删除事件计数器droppedEventsCounter进行自增;如果有事件被删除,并且当前系统时间距离上一次打印droppedEventsCounter超过了60秒则将droppedEventsCounter打印到日志。

LiveListenerBus与监听器

与LiveListenerBus配合使用的监听器,并非是父类SparkListenerBus的类型参数SparkListenerInterface,而是继承自SparkListenerInterface的SparkListener及其子类。图1列出了Spark中监听器SparkListener以及它的6种最常用的实现[1]。

图1 SparkListener的类继承体系

SparkListener虽然实现了SparkListenerInterface中的每个方法,但是其实都是空实现,具体的实现需要交给子类去完成。

《Spark2.1.0之源码分析——事件总线》中首先对事件总线的接口定义进行了一些介绍,之后《Spark2.1.0事件总线分析——ListenerBus的继承体系》一文展示了ListenerBus的继承体系,然后《Spark2.1.0事件总线分析——SparkListenerBus详解》选择ListenerBus的子类SparkListenerBus进行分析,最后本文选择LiveListenerBus作为具体的实现例子进行分析,这里将通过图2更加直观的展示ListenerBus、SparkListenerBus及LiveListenerBus的工作原理。

图2 LiveListenerBus的工作流程图

最后对于图2作一些补充说明:图中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件来源,它们都是通过调用LiveListenerBus的post方法将消息交给异步线程listenerThread处理的。

查看原文 >>
相关文章