曹工雜談:分佈式事務解決方案之基於本地消息表實現最終一致性
曹工雜談:分佈式事務解決方案之基於本地消息表實現最終一致性
前言
爲什麼寫這個?其實我這邊的業務場景,嚴格來說,不算是典型的分佈式事務,需求是這樣說的:因爲我這邊負責的一個服務消費者consumer,是用戶登錄的入口;正常情況下,登錄時候要走用戶中心,這是個單獨的服務;如果用戶中心掛了,我這邊自然是沒法登錄的。
現在的需求就是說,假設用戶中心掛了,也要可以正常登錄。因爲我這個consumer其實也是緩存了用戶的數據的,在本地登錄也可以的,如果在我本地登錄的話,我就得後續等用戶中心恢復後,再把相關狀態同步過去。
基於這樣一個需求,我這邊的實現方案是:
1.配置文件裏維護一個開關,表示是否開啓:故障轉移模式。暫不考慮動態修改開關(如果要做,簡單做就提供個接口來改;複雜做,就放到配置中心裏,我們現在用的nacos,可以改了後推送到服務端)
2.如果開關是打開的,表示需要進行故障轉移,則登錄、退出登錄等各種需要訪問用戶中心的請求,都存儲到數據庫中;數據庫會有一張表,用來存放這類請求。大致如下:
CREATE TABLE `cached_http_req_to_resend` ( `http_req_id` bigint(20) NOT NULL COMMENT '主鍵', `req_type` tinyint(4) NOT NULL COMMENT '請求類型,1:推送待處置結果給第三方系統', `third_sys_feign_name` varchar(30) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '第三方系統的名稱,和feignClient的保持一致', `http_req_body` varchar(4000) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '請求體', `current_state` tinyint(4) DEFAULT NULL COMMENT '該請求當前狀態,1:成功;2:失敗;3:待處理;4:失敗次數過多,放棄嘗試', `fail_count` tinyint(4) DEFAULT NULL COMMENT '截止目前,失敗次數;超過指定次數後,將跳過該請求', `success_time` datetime DEFAULT NULL COMMENT '請求成功發送的時間', `create_time` datetime DEFAULT NULL COMMENT '創建時間', `related_entity_id` bigint(21) DEFAULT NULL COMMENT '相關的實體的id,比如在推送待處置警情時,這個id爲處警id', PRIMARY KEY (`http_req_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
3.單獨開一個schedule線程,定時去掃這個表,發現有需要處理的,就去重新發送請求就行了,成功了的,直接更新狀態爲success。
這個模式,其實就算是分佈式事務中的:本地消息表方案了。
本地消息表,有一個注意的點,就是要把保存消息的操作和業務相關操作,放到同一個事務中,這樣可以確保,業務成功了,消息肯定是落庫了的,很可靠。然後再開啓個定時任務,去掃描消息表即可。
我這邊不是發消息,而是發請求,道理是類似的。
下面開始基於代碼demo來講解。
代碼結構
這邊就是簡單的幾個module,基於spring cloud開發了一個服務提供者和一個服務消費者。服務提供者對外暴露的接口,通過api.jar的形式,提供給消費者,這種算是強耦合了,有優點,也有缺點,這裏就不討論了。
消費者通過feign調用服務提供者。有人會問,不需要eureka這些東西嗎,其實是可以不需要的,我們直接在ribbon的配置中,把服務對應的:ip和端口寫死就完了。
我們這裏就是,消費者訪問服務提供者,正常情況下直接訪問就行了;但我們這裏,模擬的就是服務A訪問不了的情況,所以會直接把請求落庫,後續由定時線程去處理。
服務提供者-api
我們看看服務提供者api,裏面僅有一個接口:
public interface FeignServiceA { /** * * @return */ @RequestMapping("/login") public Message<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO); }
服務提供者的邏輯
其中,邏輯如下:
@RestController @Slf4j public class DemoController extends BaseController implements FeignServiceA { // 1 @Override public Message<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO) { log.info("login is ok,param:{}", loginReqVO); LoginRespVO vo = new LoginRespVO(); vo.setUserName(loginReqVO.getUserName()); vo.setAge(11); vo.setToken(UUID.randomUUID().toString()); return successResponse(vo); } }
這裏1處就是提供了一個接口,接口裏返回一點點信息。測試一下:
服務消費者之正常請求服務提供者
pom.xml中依賴服務提供者的api
<dependency> <groupId>com.example</groupId> <artifactId>service-provider-A-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
feign client代碼
我們需要寫一個接口,繼承其feign api。
@FeignClient(value = "SERVICE-A") public interface RpcServiceForServiceA extends FeignServiceA { }
要調用的時候,怎麼弄呢? 直接注入該接口,然後調用對應的方法就行了,這樣就可以了。
@Autowired private RpcServiceForServiceA rpcServiceForServiceA; Message<LoginRespVO> message = rpcServiceForServiceA.login(reqVO);
但是,我們好像沒有配置註冊中心之類的東西,這個我們可以繞過,因爲最終發起調用的是,ribbon這個組件。
ribbon提供了幾個接口,其中一個,就是用來獲取服務對應的實例列表。
這裏要說的,就是下面這個接口:
package com.netflix.loadbalancer; import java.util.List; /** * Interface that defines the methods sed to obtain the List of Servers * @author stonse * * @param <T> */ public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); /** * Return updated list of servers. This is called say every 30 secs * (configurable) by the Loadbalancer's Ping cycle * */ public List<T> getUpdatedListOfServers(); }
這個接口,有多個實現,ribbon自帶了幾個實現,然後eureka 、nacos的客戶端,都自己進行了實現。
ribbon自帶的實現中,有一個叫做:
public class ConfigurationBasedServerList extends AbstractServerList<Server> { private IClientConfig clientConfig; ... @Override public List<Server> getUpdatedListOfServers() { // 1 String listOfServers = clientConfig.get("listOfServers"); return derive(listOfServers); }
1處可以看到,它獲取服務對應的實例,就是通過去配置文件裏獲取 listOfServers
這個key中配置的那些。
總之,最終我們向下面這樣配置就行了:
SERVICE-A.ribbon.ReadTimeout=3000 SERVICE-A.ribbon.listOfServers=localhost:8082 SERVICE-A.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
這裏的前綴, SERVICE-A
和之前下面這個地方一致就行了:
@FeignClient(value = "SERVICE-A") public interface RpcServiceForServiceA extends FeignServiceA { }
正常情況下,就說完了,直接調用就行,和httpclient調用沒啥本質差別。只不過ribbon提供了負載均衡、重試等各種功能。
設計表結構,在使用故障轉移模式時,保存請求
表結構我前面已經貼了,這裏就展示下數據吧(可點擊放大查看):
保存請求的代碼很簡單:
@Override public LoginRespVO login(LoginReqVO reqVO) { boolean failOverModeOn = isFailOverModeOn(); /** * 故障轉移沒有開啓,則正常調用服務 */ if (!failOverModeOn) { ... return ...; } /** * 1 使用本地數據進行服務,並將請求保存到數據庫中 */ iCachedHttpReqToResendService.saveLoginReqWhenFailOver(reqVO); /** * 返回一個 dummy 數據 */ return new LoginRespVO(); }
上面的1處,就會保存請求到數據庫。
定時線程消費邏輯
概覽
定時線程這邊,我設計得比較複雜一點。因爲實際場景中,上面的表中,會存儲多個第三方服務的請求;比如service-A,service-B。
所以,這裏的策略是:
簡單來說,就是定時線程,拿到任務後,按照第三方服務的名字來進行group by操作,比如,要發送到service-A的請求放一起,按時間排好序;要發送給service-B的放一起,排好序。
然後找到service-A,service-B各自對應的處理器,然後把數據丟給這些處理器;處理器拿到後,就會放到阻塞隊列裏;
然後此時worker線程就會被阻塞隊列給喚醒,喚醒後,就去開始處理這些請求,包括髮起feign調用,並且更新結果到數據庫中。
定時線程入口
@Scheduled(cron = "0/30 * * * * ? ") public void sendCachedFeignReq() { Thread.currentThread().setName("SendCachedFeignReqTask"); log.info("start sendCachedFeignReq"); /** * 1、獲取鎖 */ boolean success = iCommonDistributedLockService.tryLock(DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.lockName, DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.expireDurationInSeconds); /** * 進行業務邏輯處理 */ iCachedHttpReqToResendService.processCachedFeignReqForLoginLogout(); ... }
這裏還加了個分佈式鎖的操作,用數據庫實現的,還沒經過充分測試,可能會有點小問題,不過不是重點。
下面看看業務邏輯:
@Override public void processCachedFeignReqForLoginLogout() { // 1 String[] feignClients = {EFeignClient.SERVICE_A.getName()}; // 2 for (String feignClient : feignClients) { /** * 3 從數據庫獲取要發送到該服務的請求 */ List<CachedHttpReqToResend> recordsFromDb = getRecordsFromDb(feignClient); if (CollectionUtils.isEmpty(recordsFromDb)) { continue; } /** * 4 根據feign client,找到對應的處理器 */ CachedHttpReqProcessor cachedHttpReqProcessor = cachedHttpReqProcessors.stream().filter(item -> item.support(feignClient)).findFirst().orElse(null); if (cachedHttpReqProcessor == null) { throw new RuntimeException(); } /** * 5 利用對應的處理器,處理該部分請求 */ cachedHttpReqProcessor.process(recordsFromDb); } }
- 1,定義一個數組,數組中包括所有要處理的第三方系統
- 2,遍歷
- 3,根據該serviceName,比如,根據service-A,去數據庫查詢對應的請求(這裏可能和前面的圖有點出入,以這裏的代碼爲準)
- 4,根據該service-A,找到對應的處理器
- 5,利用第四步找到的處理器,來處理第三步中查到的數據
怎麼找到service-A對應的處理器
我們先看看處理器這個接口:
public interface CachedHttpReqProcessor { /** * 該處理器是否支持處理該service * @param feignClientName * @return */ boolean support(String feignClientName); /** * 具體的處理邏輯 * @param list */ void process(Collection<CachedHttpReqToResend> list); /** * worker線程的名字 * @return */ String getThreadName(); }
然後看看針對service-A的處理器,是怎麼實現的:
@Service public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor { // 1 @Override public boolean support(String feignClientName) { return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName); } @Override public String getThreadName() { return "CachedHttpReqProcessorForServiceA"; }
1處,判斷傳入的feign客戶端,是否等於 EFeignClient.SERVICE_A
,如果是,說明找到了對應的處理器。
我們這裏將這個service,註冊爲了bean;在有多個serviceA,serviceB的時候,就會有多個CachedHttpReqProcessor處理器。
我們在之前的上層入口那裏,就注入了一個集合:
@Autowired private List<CachedHttpReqProcessor> cachedHttpReqProcessors;
然後在篩選對應的處理器時,就是通過遍歷這個集合,找到合適的處理器。
具體的,大家可以把代碼拉下來看看。
CachedHttpReqProcessor的處理邏輯
對於serviceA,serviceB,service C,由於處理邏輯很大部分是相同的,我們這裏提取了一個抽象類。
@Slf4j public abstract class AbstractCachedHttpReqProcessor implements CachedHttpReqProcessor { private LinkedBlockingQueue<CachedHttpReqToResend> blockingQueue = new LinkedBlockingQueue<>(500); private AtomicBoolean workerInited = new AtomicBoolean(false); Thread workerThread; @Override public void process(Collection<CachedHttpReqToResend> list) { if (CollectionUtils.isEmpty(list)) { return; } /** * 1 直到有任務要處理時(該方法被調用時),纔去初始化線程 */ if (workerInited.compareAndSet(false, true)) { // 2 workerThread = new Thread(new InnerWorker()); workerThread.setDaemon(true); workerThread.setName(getThreadName()); workerThread.start(); } /** * 放到阻塞隊列裏 */ blockingQueue.addAll(list); }
我們這裏1處,給每個處理器,定義了一個工作線程,且只在本方法被調用時,纔去初始化該線程;爲了防止併發,使用了AtomicBoolean,保證只會初始化一次。
2處,給線程設置了Runnable,它會負責實際的業務處理。
然後3處,直接把要處理的任務,丟到阻塞隊列即可。
Worker的處理邏輯
任務已經是到了阻塞隊列了,那麼,誰去處理呢,就是worker了。如果大家忘了整體的設計,可以回去看看那張圖。
public abstract boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend); /** * 從隊列取數據;取到後,調用子類的方法去處理; * 子類處理後,返回處理結果 * 根據結果,設置成功或者失敗的狀態 */ public class InnerWorker implements Runnable { @Override public void run() { while (true) { // 1 boolean interrupted = Thread.currentThread().isInterrupted(); if (interrupted) { log.info("interrupted ,break out"); break; } // 2 CachedHttpReqToResend cachedHttpReqToResend; try { cachedHttpReqToResend = blockingQueue.take(); } catch (InterruptedException e) { log.info("interrupted,e:{}", e); break; } // 3 Integer reqType = cachedHttpReqToResend.getReqType(); if (reqType == null) { continue; } try { /** * 4 使用模板方法設計模式,交給子類去實現 */ boolean success = doProcess(reqType, cachedHttpReqToResend); // 5 if (!success) { cachedHttpReqToResend.setFailCount(cachedHttpReqToResend.getFailCount() + 1); } else { cachedHttpReqToResend.setCurrentState(CachedHttpReqToResend.CURRENT_STATE_SUCCESS); cachedHttpReqToResend.setSuccessTime(new Date()); } // 6 boolean count = iCachedHttpReqToResendService.updateById(cachedHttpReqToResend); if (count) { log.debug("update sucess"); } } catch (Throwable throwable) { log.error("e:{}", throwable); continue; } } } }
- 1,判斷是否被中斷了,這樣可以在程序關閉時,感知到;避免線程泄漏
- 2,從阻塞隊列中,獲取任務
- 3,判斷請求類型是否爲null,這個是必須要的
- 4,使用模板方法設計模式,具體邏輯,具體怎麼發請求,誰去發,交給子類實現
- 5、6,根據結果,更新這條數據的狀態。
子類中的具體邏輯
我們這裏貼個全貌:
@Service @Slf4j public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor { @Autowired private FeignServiceA feignServiceA; @Autowired private ObjectMapper objectMapper; @Override public boolean support(String feignClientName) { return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName); } @Override public String getThreadName() { return "CachedHttpReqProcessorForServiceA"; } /** * 1 根據請求type字段,我們就知道是要發送哪一個請求 * @param reqType * @param cachedHttpReqToResend * @return */ @Override public boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend) { switch (reqType) { // 2 case CachedHttpReqToResend.REQ_TYPE_LOGIN_TO_SERVICE_A: { // 3 String httpReqBody = cachedHttpReqToResend.getHttpReqBody(); try { // 4 LoginReqVO loginReqVO = objectMapper.readValue(httpReqBody, LoginReqVO.class); /** * 5 發起登錄 */ Message<LoginRespVO> message = feignServiceA.login(loginReqVO); boolean success = FeignMsgUtils.isSuccess(message); return success; } catch (Throwable e) { log.error("e:{}", e); return false; } } } return true; } }
- 1,這個類就是實現了父類中的抽象方法,這裏體現的就是模板方法設計模式
- 2,根據請求type,判斷要訪問哪個接口
- 3,4,將請求體進行反序列化
- 5,發起請求,調用feign。
代碼如何使用
具體的代碼,我放在了:
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/blockingqueue-consumer-producer
建表語句:
服務提供者A的訪問入口:
curl -i -X POST \ -H "Content-Type:application/json" \ -d \ '{ "userName": "zhangsan", "password":"123" }' \ 'http://localhost:8082/login'
服務消費者的application.properties中:
failover.mode=true
這個爲true時,就是故障轉移模式,訪問如下接口時,請求會落庫
http://localhost:8081/login.do
爲false的話,就會直接進行feign調用。
代碼中的bug
其實這個代碼是有bug的,因爲我們是定時線程,假設每隔30s執行,那假設我一開始取了10條出來,假設全部放到隊列了,阻塞隊列此時有10條,假設worker處理特別慢,30s內也沒執行完的話,定時線程會再次取出狀態沒更新的那個任務,又丟到隊列裏。
任務就被重複消費了。
大家可以想想怎麼處理這個問題,通過這個bug,我也發現,blockingqueue是一種比較徹底的解耦方式,但是,我們這裏的業務,解耦了嗎,如果業務不是解耦的,用這個方式,其實是有點問題。
過兩天我再更新這部分的方案,生產者和消費者,這裏還是需要通信的,才能避免任務重複消費的問題。
總結
要實現一個本地消息表最終一致性方案,有一定開發量,而且我這裏,消費過程中,強行引入了多線程和生產者、消費者模式,增加了部分複雜度。
不過,代碼不就是要多折騰嗎?