以下文章來源於微觀技術 ,作者TomGE

微觀技術

計算機研究生,阿里架構師,寫過專利,CSDN博客專家。負責過電商交易、社區團購、流量營銷等業務。分享後端架構技能、一線大廠面試經驗、團隊管理等話題。歡迎關注

Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分佈式消息中間件。

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於集羣中的不同的Broker Group。

快速安裝:https://rocketmq.apache.org/docs/quick-start/

源代碼:https://github.com/apache/rocketmq-spring

主要功能:

1、業務解耦。採用發佈訂閱模式,生產端發送消息到MQ Server,下游的消費端訂閱接收消息。異步形式,系統解耦,提升系統擴展性

2、削峯限流。由於消息中間件的吞吐量很高,過量的請求會暫時放在 MQ server,下游慢慢消費,避免過量請求沖垮系統

3、億級消息的堆積能力,單個隊列中的百萬級消息的累積容量。

4、高可用性:Broker服務器支持多Master多Slave的同步雙寫以及Master多Slave的異步複製模式,其中同步雙寫可保證消息不丟失。

5、高可靠性:生產者將消息發送到Broker端有三種方式,同步、異步和單向。Broker在對於消息刷盤有兩種策略:同步刷盤和異步刷盤,其中同步刷盤可以保證消息成功的存儲到磁盤中。消費者的消費模式也有集羣消費和廣播消費兩種,默認集羣消費,如果集羣模式中消費者掛了,一個組裏的其他消費者會接替其消費。

6、分佈式事務消息:這裏是採用半消息確認和消息回查機制來保證分佈式事務消息。

7、支持消息過濾:建議採用消費者業務端的tag過濾

8、支持順序消息:消息在Broker中是採用隊列的FIFO模式存儲的,也就是發送是順序的,只要保證消費的順序性即可。

9、支持定時消息和延遲消息:Broker中由定時消息的機制,消息發送到Broker中,不會立即被Consumer消費,會等到一定的時間才被消費。延遲消息也是一樣,延遲一定時間之後纔會被Consumer消費。

核心組件:

1、Namesrv

Namesrv充當路由消息的提供者。Namesrv是一個幾乎無狀態節點,多個Namesrv實例組成集羣,但相互獨立,沒有信息交換。Namesrv主要作用是:爲producer和consumer提供關於topic的路由信息。管理broker節點:監控更新broker的實時狀態。路由註冊、路由刪除(故障剔除)。

2、Broker

負責存儲消息、轉發消息。Broker是以group爲單位提供服務。一個group裏面分Master和Slave。Master和Slave存儲的數據一樣,slave從master同步數據(同步雙寫或異步複製看配置)。一個Master可以對應多個Slave,一個Slave只能對應一個Master。Master與Slave的對應關係通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId爲0表示Master,非0表示Slave。

基本概念:

1、主題(Topic) 表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。每個topic可分爲若干個分區(queue)

2、生產者組(Producer Group) 同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之後崩潰,則Broker服務器會聯繫同一生產者組的其他生產者實例以提交或回溯消費。

3、消費者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集羣消費(Clustering)和廣播消費(Broadcasting)。

4、普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

5、嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有序的。

6、消息(Message) 消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。

7、標籤(Tag) 爲消息設置的標誌,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標籤。標籤能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。

RocketMQ 特性:

外部依賴:

spring boot 已經爲RocketMQ 封裝了starter組件,只需在 pom.xml 文件中添加jar版本依賴即可:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version></dependency>

配置文件:

在配置文件 application.yaml 中配置 RocketMQ 的相關參數,具體內容如下:

rocketmq: name-server: localhost:9876 consumer: topic: maker-order-topic group: my-group1 producer: group: p-my-group1

消息生產端:

@Resourceprivate RocketMQTemplate rocketMQTemplate;private static String makerOrderTopic = "maker-order-topic";@GetMapping("/Send_make_order_message")public Object send_make_order_message { try { Long orderId = Long.valueOf(new Random.nextInt(1000000)); OrderModel orderModel = OrderModel.builder.orderId(orderId).buyerUid(200000L).amount(26.8).shippingAddress("上海").build; SendResult sendResult = rocketMQTemplate.syncSend(makerOrderTopic, orderModel); System.out.printf("Send message to topic %s , sendResult=%s %n", makerOrderTopic, sendResult); return "消息發送成功"; } catch (Exception e) { e.printStackTrace; return "消息發送失敗"; }}

消息消費端:

@Service@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")public class OrderConsumer implements RocketMQListener<OrderModel> { @Override public void onMessage(OrderModel orderModel) { System.out.printf("consumer received message: %s \n", JSON.toJSONString(orderModel)); }}

操作演示

瀏覽器訪問:http://localhost:9071/send_make_order_message,模擬生產端發送消息到MQ Server中。

消費端接收消息日誌:

Send message to topic maker-order-topic , sendResult=SendResult [sendStatus=SEND_OK, msgId=C0A80069816F14DAD5DC73A75B9F0014, offsetMsgId=C0A8006900002A9F0000000000058841, messageQueue=MessageQueue [topic=maker-order-topic, brokerName=192.168.0.105, queueId=2], queueOffset=0] consumer received message: {"amount":26.8,"buyerUid":200000,"orderId":895586,"shippingAddress":"上海"}

1、同步發送

同步發送是指消息發送方發出一條消息後,在收到服務端返回響應後,線程纔會執行後續代碼

OrderModel orderModel = mockOrderModel;Message message = new Message(makerOrderTopic, "TageA", JSON.toJSONString(orderModel).getBytes);SendResult sendResult = rocketMQTemplate.getProducer.send(message);

2、異步發送

異步發送是指發送方發出一條消息後,不需要等服務端返回響應。異步發送,需要實現異步發送回調接口(SendCallback),通過回調接口接收服務端響應,並處理結果

OrderModel orderModel = mockOrderModel;rocketMQTemplate.asyncSend(makerOrderTopic, orderModel, new SendCallback { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息發送成功,msgId=" + sendResult.getMsgId); } @Override public void onException(Throwable throwable) { System.out.println("發送失敗," + throwable); }});

3、順序消息

對於指定的一個Topic,所有消息根據Sharding Key分區。同一個分區內的消息按照嚴格的FIFO順序進行發佈和消費。Sharding Key是順序消息中用來區分不同分區的關鍵字段,和普通消息的Key是完全不同的概念。

比如:電商的訂單創建,以訂單ID作爲Sharding Key,那麼同一個訂單相關的消息,如創建訂單、付款、發貨、訂單退款消息、訂單物流消息都會按照發布的先後順序來消費。

for (long orderId = 0; orderId < 20; orderId++) { String shardingKey = String.valueOf(orderId % 5); OrderModel orderModel = OrderModel.builder.orderId(orderId).build; SendResult sendResult = rocketMQTemplate.syncSendOrderly(makerOrderTopic, orderModel, shardingKey); if (sendResult != null) { System.out.println(orderId + " ,發送成功"); }}

4、延時消息

Producer將消息發送到消息隊列RocketMQ服務端,但並不期望立馬投遞這條消息,而是延遲一定時間後才投遞到Consumer進行消費,該消息稱爲延時消息。

OrderModel orderModel = mockOrderModel;org.springframework.messaging.Message message = MessageBuilder.withPayload(JSON.toJSONString(orderModel).getBytes).build;//延時等級 3, 這個消息將在10s之後發送,現在只支持固定的幾個時間值//delayTimeLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";SendResult sendResult = rocketMQTemplate.syncSend(makerOrderTopic, message, 8000, 3);

5、事務消息

RocketMQ提供類似X/Open XA的分佈式事務功能,通過消息隊列RocketMQ事務消息能達到分佈式事務的最終一致。

由於網絡閃斷、生產者應用重啓等原因,導致某條事務消息的二次確認丟失,消息隊列RocketMQ服務端通過掃描發現某條消息長期處於“半事務消息”時,主動向生產者查詢該消息的最終狀態(COMMIT或Rollback),該過程稱之爲消息回查。

典型場景:在電商購物車下單時,涉及到購物車系統和交易系統,這兩個系統之間的數據最終一致性可以通過分佈式事務消息的異步處理實現。在這種場景下,交易系統是最爲核心的系統,需要最大限度地保證下單成功。而購物車系統只需要訂閱消息隊列RocketMQ的交易訂單消息,做相應的業務處理,即可保證最終的數據一致性。

發送步驟:

  • 發送方將半事務消息發送至MQ Server。
  • MQ服務端將消息持久化成功之後,向發送方返回Ack確認消息已經發送成功,此時消息爲半事務消息。
  • 發送方開始執行本地事務邏輯
  • 發送方根據本地事務執行結果向服務端提交二次確認(Commit或Rollback),服務端收到Commit狀態則將半事務消息標記爲可投遞,訂閱方將收到該消息;服務端收到Rollback狀態則刪除半事務消息,訂閱方不會收到該消息。

回查步驟:

  • 在斷網或者應用重啓的特殊情況下,上述步驟4提交的二次確認最終未到達服務端,經過固定時間後服務端將對該消息發起消息回查。
  • 發送方收到消息回查後,需要檢查對應消息的本地事務執行的最終結果。
  • 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行操作。

發送半事務消息,示例代碼如下:

OrderModel orderModel = mockOrderModel;org.springframework.messaging.Message message = MessageBuilder.withPayload(JSON.toJSONString(orderModel)).build;TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("tx_order_message", makerOrderTopic, message, null);SendStatus sendStatus = transactionSendResult.getSendStatus;LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState;System.out.println("send message status: " + sendStatus + " , localTransactionState: " + localTransactionState);

編寫RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。

@Component@RocketMQTransactionListener(txProducerGroup = "tx_order_message")public class TXProducerListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { // 執行本地事務 System.out.println("TXProducerListener 開始執行本地事務。。。"); RocketMQLocalTransactionState result; try { // 模擬業務處理( 如:創建訂單 ) // int i = 1 / 0; //模擬異常 result = RocketMQLocalTransactionState.COMMIT; // 成功 } catch (Exception e) { System.out.println("本地事務執行失敗。。。"); result = RocketMQLocalTransactionState.ROLLBACK; } return result; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 檢查本地事務( 例如檢查下訂單是否成功 ) System.out.println("檢查本地事務。。。"); RocketMQLocalTransactionState result; try { //模擬業務處理( 根據檢查結果,決定是COMMIT或ROLLBACK ) result = RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { // 異常就回滾 System.out.println("檢查本地事務 error"); result = RocketMQLocalTransactionState.ROLLBACK; } return result; }} https://github.com/aalansehaiyang/spring-boot-bulking 模塊:spring-boot-bulking-rocketmq

1、如何保證順序消息?

順序由producer發送到broker的消息隊列是滿足FIFO的,所以發送是順序的,單個queue裏的消息是順序的。多個Queue同時消費是無法絕對保證消息的有序性的。所以,同一個topic,同一個queue,發消息的時候一個線程發送消息,消費的時候一個線程去消費一個queue裏的消息。

2、怎麼保證消息發到同一個queue裏?

RocketMQ給我們提供了MessageQueueSelector接口,可以重寫裏面的接口,實現自己的算法,比如判斷i%2==0,那就發送消息到queue1否則發送到queue2。

3、如何實現消息過濾?

有兩種方案,一種是在broker端按照Consumer的去重邏輯進行過濾,這樣做的好處是避免了無用的消息傳輸到Consumer端,缺點是加重了Broker的負擔,實現起來相對複雜。另一種是在Consumer端過濾,比如按照消息設置的tag去重,這樣的好處是實現起來簡單,缺點是有大量無用的消息到達了Consumer端只能丟棄不處理。

4、如果由於網絡等原因,多條重複消息投遞到了Consumer端,你怎麼進行消息去重?

這個得先說下消息的冪等性原則:就是用戶對於同一種操作發起的多次請求的結果是一樣的,不會因爲操作了多次就產生不一樣的結果。只要保持冪等性,不管來多少條消息,最後處理結果都一樣,需要Consumer端自行實現。

去重的方案:因爲每個消息都有一個MessageId, 保證每個消息都有一個唯一鍵,可以是數據庫的主鍵或者唯一約束,也可以是Redis緩存中的鍵,當消費一條消息前,先檢查數據庫或緩存中是否存在這個唯一鍵,如果存在就不再處理這條消息,如果消費成功,要保證這個唯一鍵插入到去重表中。

5、RocketMQ是怎麼實現分佈式事務消息的?

  • Producer向broker發送半消息
  • Producer端收到響應,消息發送成功,此時消息是半消息,標記爲“不可投遞”狀態,Consumer消費不了。
  • Producer端執行本地事務。
  • 正常情況本地事務執行完成,Producer向Broker發送Commit/Rollback,如果是Commit,Broker端將半消息標記爲正常消息,Consumer可以消費,如果是Rollback,Broker丟棄此消息。
  • 異常情況,Broker端遲遲等不到二次確認。在一定時間後,會查詢所有的半消息,然後到Producer端查詢半消息的執行情況。
  • Producer 端查詢本地事務的狀態
  • 根據事務的狀態提交commit/rollback到broker端。

6、從Producer角度分析,如何確保消息成功發送到了Broker?

  • 採用同步發送,即發送一條數據等到接受者返回響應之後再發送下一個數據包。如果返回響應OK,表示消息成功發送到了broker,狀態超時或者失敗都會觸發二次重試。MQ Server端會有冪等控制。
  • 可以採用分佈式事務消息的投遞方式。
  • 如果一條消息發送之後超時,也可以通過查詢日誌的API,來檢查是否在Broker存儲成功。總的來說,Producer還是採用同步發送來保證的。

7、從Broker角度分析,如何確保消息持久化?

  • 消息只要持久化到CommitLog(日誌文件)中,即使Broker宕機,未消費的消息也能重新恢復再消費。
  • Broker的刷盤機制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲在page cache,但是同步刷盤更可靠,它是Producer發送消息後等數據持久化到磁盤之後再返回響應給Producer。
  • Broker支持多Master多Slave同步雙寫和多Master多Slave異步複製模式,消息都是發送給Master主機,但是消費既可以從Master消費,也可以從Slave消費。同步雙寫模式可以保證即使Master宕機,消息肯定在Slave中有備份,保證了消息不會丟失。

8、從Consumer角度分析,如何保證消息被成功消費?

Consumer自身維護了個持久化的offset(對應Message Queue裏的min offset),用來標記已經成功消費且已經成功發回Broker的消息下標。如果Consumer消費失敗,它會向Broker發回消費失敗的狀態,發回成功纔會更新自己的offset。如果發回給broker時broker掛掉了,Consumer會定時重試,如果Consumer和Broker一起掛掉了,消息還在Broker端存儲着,Consumer端的offset也是持久化的,重啓之後繼續拉取offset之前的消息進行消費。

相關文章