摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper-listener/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

1. 概述2. ListenerManager3. AbstractListenerManager4. AbstractJobListener5. RegistryCenterConnectionStateListener666. 彩蛋

1. 概述

本文主要分享 Elastic-Job-Lite 註冊中心監聽器。

建議前置閱讀:

《Elastic-Job-Lite 源碼分析 —— 註冊中心》

涉及到主要類的類圖如下( 打開大圖 ):

你行好事會因爲得到讚賞而愉悅 同理,開源項目貢獻者會因爲 Star 而更加有動力 爲 Elastic-Job 點贊!傳送門

2. ListenerManager

ListenerManager,作業註冊中心的監聽器管理者。管理者兩類組件:

監聽管理器註冊中心連接狀態監聽器

其中監聽管理器管理着自己的作業註冊中心監聽器。

一起從代碼層面看看:

public final class ListenerManager { private final JobNodeStorage jobNodeStorage; private final ElectionListenerManager electionListenerManager; private final ShardingListenerManager shardingListenerManager; private final FailoverListenerManager failoverListenerManager; private final MonitorExecutionListenerManager monitorExecutionListenerManager; private final ShutdownListenerManager shutdownListenerManager; private final TriggerListenerManager triggerListenerManager; private final RescheduleListenerManager rescheduleListenerManager; private final GuaranteeListenerManager guaranteeListenerManager; private final RegistryCenterConnectionStateListener regCenterConnectionStateListener;}

第一類:electionListenerManager / shardingListenerManager / failoverListenerManager / MonitorExecutionListenerManager / shutdownListenerManager / triggerListenerManager / rescheduleListenerManager / guaranteeListenerManager 是不同服務的監聽管理器,都繼承作業註冊中心的監聽器管理者的抽象類( AbstractListenerManager )。我們以下一篇文章會涉及到的分片監聽管理器( ShardingListenerManager ) 來瞅瞅內部整體實現:

public final class ShardingListenerManager extends AbstractListenerManager { @Override public void start() { addDataListener(new ShardingTotalCountChangedJobListener()); addDataListener(new ListenServersChangedJobListener()); }class ShardingTotalCountChangedJobListener extends AbstractJobListener { // .... 省略方法}class ListenServersChangedJobListener extends AbstractJobListener { // .... 省略方法}}

ShardingListenerManager 內部管理了 ShardingTotalCountChangedJobListener / ListenServersChangedJobListener 兩個作業註冊中心監聽器。具體作業註冊中心監聽器是什麼,有什麼用途,下文會詳細解析。第二類:regCenterConnectionStateListener 是註冊中心連接狀態監聽器。下文也會詳細解析。

在《Elastic-Job-Lite 源碼分析 —— 作業初始化》「3.2.4」註冊作業啓動信息,我們看到作業初始化時,會開啓所有註冊中心監聽器:

// SchedulerFacade.java/*** 註冊作業啓動信息.* * @param enabled 作業是否啓用*/public void registerStartUpInfo(final boolean enabled) { // 開啓 所有監聽器 listenerManager.startAllListeners(); // .... 省略方法}// ListenerManager.java/*** 開啓所有監聽器.*/public void startAllListeners() { // 開啓 不同服務監聽管理器 electionListenerManager.start(); shardingListenerManager.start(); failoverListenerManager.start(); monitorExecutionListenerManager.start(); shutdownListenerManager.start(); triggerListenerManager.start(); rescheduleListenerManager.start(); guaranteeListenerManager.start(); // 開啓 註冊中心連接狀態監聽器 jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);}

3. AbstractListenerManager

AbstractListenerManager,作業註冊中心的監聽器管理者的抽象類。

public abstract class AbstractListenerManager { private final JobNodeStorage jobNodeStorage; protected AbstractListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) { jobNodeStorage = new JobNodeStorage(regCenter, jobName); } /** * 開啓監聽器. */ public abstract void start(); /** * 添加註冊中心監聽器 * * @param listener 註冊中心監聽器 */ protected void addDataListener(final TreeCacheListener listener) { jobNodeStorage.addDataListener(listener); }}

#addDataListener(),將作業註冊中心的監聽器添加到註冊中心 TreeCache 的監聽者裏。JobNodeStorage#addDataListener(…) 在《Elastic-Job-Lite 源碼分析 —— 作業初始化》「2.2」緩存已經詳細解析。子類實現 #start() 方法實現監聽器初始化。目前所有子類的實現都是將自己管理的註冊中心監聽器調用 #addDataListener(...),還是以 ShardingListenerManager 舉例子:

public final class ShardingListenerManager extends AbstractListenerManager {@Overridepublic void start() { addDataListener(new ShardingTotalCountChangedJobListener()); addDataListener(new ListenServersChangedJobListener());}}

4. AbstractJobListener

AbstractJobListener,作業註冊中心的監聽器抽象類。

public abstract class AbstractJobListener implements TreeCacheListener { @Override public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { ChildData childData = event.getData(); // 忽略掉非數據變化的事件,例如 event.type 爲 CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件 if (null == childData) { return; } String path = childData.getPath(); if (path.isEmpty()) { return; } dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8)); } /** * 節點數據變化 * * @param path 節點路徑 * @param eventType 事件類型 * @param data 數據 */ protected abstract void dataChanged(final String path, final Type eventType, final String data);}

作業註冊中心的監聽器實現類實現 #dataChanged(…),對節點數據變化進行處理。#childEvent(…) 屏蔽掉非節點數據變化事件,例如:CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件,只處理 NODE_ADDED、NODE_UPDATED、NODE_REMOVED 事件。

我們再拿 ShardingListenerManager 舉例子:

public final class ShardingListenerManager extends AbstractListenerManager { class ShardingTotalCountChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount(); if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { shardingService.setReshardingFlag(); JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount); } } } } class ListenServersChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) { shardingService.setReshardingFlag(); } } private boolean isInstanceChange(final Type eventType, final String path) { return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType; } private boolean isServerChange(final String path) { return serverNode.isServerPath(path); } }}

在《Elastic-Job-Lite 源碼解析 —— 任務分片》詳細解析。

5. RegistryCenterConnectionStateListener

RegistryCenterConnectionStateListener,實現 Curator ConnectionStateListener 接口,註冊中心連接狀態監聽器。

public final class RegistryCenterConnectionStateListener implements ConnectionStateListener { @Override public void stateChanged(final CuratorFramework client, final ConnectionState newState) { if (JobRegistry.getInstance().isShutdown(jobName)) { return; } JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) { // Zookeeper 連接終端 或 連接丟失 // 暫停作業調度 jobScheduleController.pauseJob(); } else if (ConnectionState.RECONNECTED == newState) { // Zookeeper 重新連上 // 持久化作業服務器上線信息 serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())); // 持久化作業運行實例上線相關信息 instanceService.persistOnline(); // 清除本地分配的作業分片項運行中的標記 executionService.clearRunningInfo(shardingService.getLocalShardingItems()); // 恢復作業調度 jobScheduleController.resumeJob(); } }}

當註冊中心連接 SUSPENDED 或 LOST 時,暫停本地作業調度:

// JobScheduleController.javapublic synchronized void pauseJob() { try { if (!scheduler.isShutdown()) { scheduler.pauseAll(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); }}

當註冊中心重新連接成功( RECONNECTED ),恢復本地作業調度:

/*** 恢復作業.*/public synchronized void resumeJob() { try { if (!scheduler.isShutdown()) { scheduler.resumeAll(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); }}

查看原文 >>
相關文章