摘要:當使用確認機制的時候,生產者從連接或者channel故障中恢復過來時,會重發沒有被Broker確認簽收的消息。RabbitMQ中消費者有3種簽收消息方式:1、channel.basicAck(long deliveryTag, boolean multiple):確認簽收消息。

一般而言,如果你選擇RabbitMQ,那肯定就是把可靠性放在第一位。畢竟,RabbitMQ可是金融行業消息隊列的標配。如果把性能放在第一位,那毫無疑問,必須是Kafka。但是,可靠性畢竟是相對的,就拿大火的阿里雲,AWS雲,或者傳統的IBM小型機,Oracle數據庫,沒有誰敢說自己可靠性100%,都是說幾個9。所以,本文的目的很明確,就是儘可能的提高我們RabbitMQ的可靠性,從發送、存儲、消費、集羣、監控、告警等多個維度給出可行性方案,指導開發者以及運維人員獲取更加可靠的消息投遞,保障我們的業務系統安全、可靠、穩定的運行。

數據可靠性是和RabbitMQ節點、生產者、消費者以及服務器等息息相關的。本文比較長,大概分爲如下幾個段落:

  1. 確認機制

  2. 生產者

  3. 消費者

  4. 隊列鏡像

  5. 告警

  6. 監控和Metrics

  7. 健康檢查

如下是一張RabbitMQ架構圖,本文對可靠性的分析,會涉及到架構圖中的方方面面:

1. 確認機制

當連接出現問題的時候,在客戶端和服務端之間的消息可能正在投遞中,還沒有被Broker接收,它們可能正在被編碼或者解碼,或者一些其他的情況。在這種場景下,消息並沒有被投遞,那麼它們是需要被重新投遞以保障業務穩定性。確認機制讓服務端和客戶端知道什麼時候需要做這些事情,它對於生產者和消費者保障數據安全是非常重要的。

確認機制能被用在兩個方向:允許消費者告訴服務器(Broker)它已經收到了消息,也允許服務器告訴生產者它接收到了消息。前者就是我們常說的消費者Ack,後者就是我們常說的生產者Confirm。

1.1 生產者/消費者確認

生產者確認以及消費者確認接下來會單獨的段落進行詳細的介紹,並且有示例代碼,這裏就不過多的講解了。

1.2 確認機制總結

確認機制的使用,能夠保證最少一次(at least once)投遞。如果沒有確認機制,消息就非常可能會丟失,這時候只能保證最多一次(at most once )。至於恰好一次投遞,目前還沒有哪個中間件可以保證,畢竟分佈式系統非常複雜,尤其是網絡的不可控,不確定的因素太多太多。

2. 生產者

當使用確認機制的時候,生產者從連接或者channel故障中恢復過來時,會重發沒有被Broker確認簽收的消息。如此一來,消息就可能被重複發送,因爲可能是由於網絡故障等原因,Broker發送了確認,但是生產者沒有收到而已。亦或者,消息壓根就沒有發送到Broker那裏去。正因爲生產者爲了可靠性可能會重發消息,所以在消費者消費消息處理業務時,還需要去重,或者對接受到的消息做冪等處理(推薦冪等處理)。

生產者增加確認機制非常簡單,channel開啓confirm模式,然後增加監聽即可:

// 選擇確認機制

channel.confirmSelect();

// 確認消息監聽

channel.addConfirmListener(new ConfirmListener() {

@Override

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("消息已經ack,tag: " + deliveryTag);

}


@Override

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

// 對於消費者沒有ack的消息,可以做一些特殊處理

System.out.println("消息被拒籤,tag: " + deliveryTag);

}

});

說明:RabbitMQ還有事務機制(txSelect、txCommit、txRollback),也能保障消息的發送。不過事務機制是 「同步阻塞」 的,所以不推薦使用。而confirm模式是 「異步」 機制。如下圖所示(圖片摘自《Rabbit實戰指南》,皮皮廝的書籍,非常值得一看),是事務機制以及confirm方式TPS性能對比,我們可以很明顯的看到,事務機制是性能最差的:

在一些很重要的業務場景,我們還需要確保消息被正確路由到了隊列。爲了確保消息被正確路由到一個已知的隊列,我們需要確保消息被正確的從交換器傳遞到了隊列中,並且還需要確保目標隊列至少有一個消費者。我們試想,如果一個隊列都沒有任何消費者,那發送的消息相當於石沉大海了。

前者,我們可以通過給交換器綁定一個備份交換器解決這個問題。比如一個direct類型的交換器,如果發送消息時路由KEY不匹配,那麼這條消息就會進入備份交換器中,而不會被丟失:

Map<String, Object> argsMap = new HashMap<>();

argsMap.put("alternate-exchange", ALTER_EXCHANGE_NAME);

// map參數中alternate-exchange的值,就是申明綁定的備份交換器名稱

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, argsMap);

RabbitMQ支持的4種交換器類型中,只有fanout是不存在路由不到隊列的情況,因爲它會自動路由到所有隊列中,跟綁定KEY沒有任何關係。所以,在滿足你業務的前提下,筆者建議,儘可能使用fanout類型交換器。

後者,我們可以通過死信交換器(DLX,也被稱爲私信隊列)來解決這個問題,假設一些消息沒有被消費,那麼它就會被轉移到綁定的死信交換器上,對於這類消息,我們消費並處理死信隊列即可:

Map<String, Object> argsMap = new HashMap<>();

// 死信交換器/死信隊列

argsMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);

// 設置隊列過期時間(第一次設置一個值後,以後不能設置一個更大的值)

argsMap.put("x-message-ttl", 60000);


channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

// 死信的關係一定要在queue申明時指定,而不能在exchange申明時指定

channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);

3. 消費者

只有消費者確認的消息,RabbitMQ纔會刪除它,不確認就不會被刪除。所以,在消費端,建議關閉自動確認機制。應該在收到消息處理完業務以後,手動確認消息。消費者手動確認實現代碼如下:

DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties props, byte[] body){

System.out.println("死信隊列接受到的消息:" + new String(body));

// 手動確認消息接受成功

channel.basicAck(envelope.getDeliveryTag(), false);

// channel.basicNack(envelope.getDeliveryTag(), false, false);

}


};

// 推模式,並且關閉自動確認機制,即autoAck=false

channel.basicConsume(QUEUE_NAME, false, consumer);

注意上面的basicAck方法:void basicAck(long deliveryTag, boolean multiple) 第二個參數multiple。要說明這個參數的含義,首先需要講清楚一個概念 「deliveryTag」 ,即投遞消息唯一標識符,它是一個 「單調遞增」 的Long類型正整數。假設此次basicAck的tag爲123130,如果multiple=false,那麼表示只確認簽收這一條消息。如果multiple=true,那麼表示確認簽收tag小於或等於123130的所有消息。

「最大Tag值」:Delivery tag是一個64位長整型值,也就是說它的最大值是9223372036854775807(2^64-1),不過這個tag值是按channel劃分的。也就是說,理論上每一個channel都可以發送2^64-1條消息。因此,我們認爲在實際情況中,tag基本上不可能達到它的上限值。

前面提到,由於網絡等一些原因引起了故障,就會導致消息被重發。因此,消費端一定要做好處理重複消息的準備,強烈建議在消費端實現 「冪等」 的業務邏輯。比如以支付送積分爲例,那麼,消息體中肯定有訂單號這個業務屬性唯一的ID。那麼在消費這個消息送積分的業務代碼中,需要根據這個訂單號做冪等處理,即 「同一個訂單號只能送一次積分」

在消費端還有一種情況,就是當前消費者認爲它不能處理當前消息。因此,它就拒絕簽收(basic.reject或者basic.nack)這個消息。那麼生產者也需要監聽這些消息並做特殊的業務處理。

RabbitMQ中消費者有3種簽收消息方式:1、channel.basicAck(long deliveryTag, boolean multiple):確認簽收消息;2、channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):不確認簽收消息,並設置是否重入隊列中,並且可以批量不確定簽收tag之前的所有還未ack的消息;3、channel.basicReject(long deliveryTag, boolean requeue):直接拒絕簽收這一條消息,並設置是否重入隊列中。

4. 隊列鏡像

爲了防止在Broker中丟失消息,交換器、隊列和消息都應該設置爲持久化。除此以外,隊列和消息還應該被複制,爲了應對操作系統未及時fsync刷盤、Broker重啓、Broker服務器硬件故障、或者Broker crash故障等問題。

集羣節點提供了冗餘能力,能容忍單節點故障。在RabbitMQ集羣中,所有的定義都可以被複制,例如交換器、綁定關係、用戶等。但是隊列有所不同,默認只存在一個節點上(這一點和kafka很不一樣,kafka的Topic可以設置多個副本)。不過,可以通過配置把隊列 「鏡像」 到多個節點上,從而讓隊列不會有單點故障的問題:

當節點發生故障時,如果這時候有鏡像隊列,還會自動發生新的選舉,就能選舉出新的Master。從而不會因爲某個節點故障而導致隊列不可用,而且整個過程對業務完全無感知。

不過需要注意的是,鏡像並不能保證所有隊列的高可用, 「排他性隊列」 (Exclusive Queues)就不行。因爲排他性隊列生命週期與它的連接綁定在一起,當Broker節點發生故障重啓後,排他性隊列是會自動刪除的,因此不能被鏡像(Mirrored)!

5. 告警

這裏我們主要說一下磁盤告警以及內存告警。因爲出現下面兩種情況時,RabbitMQ會停止從客戶端網絡socket中讀數據: 1、內存使用達到配置上限;2、磁盤使用達到配置上限;

內存使用上限有3種設置方式:

## 設置節點可使用RAM百分比,超過這個百分比就會告警

vm_memory_high_watermark.relative = 0.4

## 設置節點可使用RAM的上限,單位爲byte即字節

vm_memory_high_watermark.absolute = 1073741824

## RabbitMQ 3.6.0+,設置節點可使用RAM的絕對大小,它的優先級低於relative

vm_memory_high_watermark.absolute = 2GB

磁盤使用上限也有3種設置方式:

## 設置磁盤剩餘空間閾值,當可用空間低於這個值就會觸發告警

disk_free_limit.absolute = 51200


## with RabbitMQ 3.6.0+.

disk_free_limit.absolute = 500KB

disk_free_limit.absolute = 50MB

disk_free_limit.absolute = 5GB


## 設置磁盤剩餘空間爲有效RAM的多少倍,比如當前可用RAM爲2G,且設定這個參數爲2。那麼當磁盤可用空間低於2*2=4G時就會觸發告警

disk_free_limit.relative = 2.0

由上可知:開啓磁盤告警以及內存告警非常簡單,需要說明的是,設置絕對值大小時,MB和M是不一樣的,M即MiB表示 mebibytes (2^20 ,1,048,576 bytes),而MB表示 megabytes (10^6 ,1,000,000 bytes)。

只要滿足這兩種情況其中的一種情況,服務器就會臨時阻塞連接,並且連接心跳也會斷開。這時候我們通過rabbitmqctl或者RabbitMQ管理後臺查看,所有網絡連接都是 「blocking」 。如果我們是RabbitMQ集羣,那麼內存和磁盤告警就會影響整個集羣。只要其中一個節點內存或者磁盤達到上限,整個集羣的所有節點都會阻塞連接。

一些客戶端包是支持連接阻塞提醒的,如果你使用的客戶端包恰好也支持,建議基於此增加監控告警。此外,運維需要針對RabbitMQ集羣服務器增加內存和磁盤使用率告警。假設RabbitMQ磁盤告警是隻剩10%,那麼運維設置的告警應該要大於10%,比如15%,從而能夠提前發現隱患,提前介入處理問題,避免磁盤使用達到上限而引起線上問題,背個P0故障可不好看。

6. 監控和Metrics

生產環境我們應該有完善且合理的監控機制,從而做到防患於未然。監控能在問題還未暴露出來時,就能提前發現問題。監控最重要的事情就是對Metrics的採集和分析。我們把Metrics分爲兩大類: 「RabbitMQ的Metrics」「基礎設施的Metrics」

6.1 基礎設施的Metrics

這塊的監控,我們需要收集運行RabbitMQ節點的所有服務器,以及應用的一些metrics,包括但不限於:

  1. CPU統計情況;

  2. 內存使用率;

  3. 虛擬內存統計;

  4. RabbitMQ節點數據目錄下磁盤剩餘可用空間;

  5. 磁盤IO情況;

  6. 網絡吞吐量(接收量、發送量、最大網絡吞吐量等);

  7. 網絡延遲情況(RabbitMQ集羣所有節點以及客戶端之間的網絡延遲);

  8. 文件描述符;

對基礎設施以及系統內核Metrics的監控工具非常多,而且都很成熟。比如:Prometheus、Datadog、Zabbix等。它們都能很好的收集Metrics信息,然後存儲並可視化展示,並且可以自定義告警規則。

6.2 RabbitMQ的Metrics

接下來我們說一下對RabbitMQ集羣本身的監控。RabbitMQ的管理後臺UI暴露了節點很多metrcis信息,並且RabbitMQ還通過HTTP API把這些信息暴露出來方便我們二次開發,自定義監控系統就非常依賴它的HTTP API。HTTP API訪問參考:curl -i -u root:root123 'http://localhost:15672/api/overview'。

我們先說RabbitMQ管理後臺,這個後臺做的還是很不錯的。但是如果用於監控的話,就顯得有些不足了:

  1. 強依賴被監控的系統(應該要解耦被監控的系統和監控系統);

  2. 只存儲最近一天的數據(超過一天的數據就沒有了);

  3. 用戶接口不夠強大;

  4. 它的權限系統是依賴RabbitMQ權限系統的;

可喜的是,RabbitMQ從3.8版本開始,已經支持Prometheus和Grafana了,也推薦應用在生產環境上。接下來說一下RabbitMQ監控系統需要關注RabbitMQ的哪些metrics。

6.2.1 集羣Metrics

我們可以通過 'http://localhost:15672/api/overview' 得到RabbitMQ集羣相關信息,。結果(部分字段)如下:

{

... ...

"message_stats": {

"ack": 212,

"ack_details": {

"rate": 0.0

},

"confirm": 143,

"confirm_details": {

"rate": 0.0

},

"publish": 323,

... ...

},

"queue_totals": {

"messages": 197,

"messages_details": {

"rate": 0.0

},

...

},

"object_totals": {

"channels": 2,

"connections": 2,

"consumers": 1,

"exchanges": 24,

"queues": 10

},

... ...

}

這裏我們需要關注的一些主要指標有:

  • message_stats.ack:消費者確認接受消息的數量;

  • message_stats.confirm:生產者得到Broker已經確認的消息(ConfirmListener中的handleAck);

  • message_stats.publish:最近發佈的消息總量(它只有最近的數據,沒有RabbitMQ集羣整個生命週期的數據。前面兩個指標ack和confirm一樣,也都是最近的數據。如果RabbitMQ閒置一段時間,這幾個值都會歸零);

  • object_totals.channels:channel數量,等價於RabbitMQ管理後臺 「Channels」 頁面中交換器數量;

  • object_totals.connections:連接數量,等價於RabbitMQ管理後臺 「Connections」 頁面中交換器數量;

  • object_totals.consumers:消費者數量;

  • object_totals.exchanges:交換器數量,等價於RabbitMQ管理後臺 「Exchanges」 頁面中交換器數量;

  • object_totals.queues:隊列數量,等價於RabbitMQ管理後臺 「Queues」 頁面中隊列數量;

說明:object_totals這個節點下幾個字段的值,我們也可以在RabbitMQ管理後臺 「Overview」 頁面的 「Global counts」 中得到,如下圖所示:

6.2.2. 節點Metrics

首先,我們通過 'http://localhost:15672/api/nodes/' 可以獲取到RabbitMQ集羣中所有節點的詳細信息,它返回的是一個JSON數組。也可以通過 'http://localhost:15672/api/nodes/{nodeName}' 獲取某個節點信息(這裏的nodeName就是第一個URL結果中字段name的值,例如:"name": "rabbit@afeideMacBook-Pro"),所以這個URL返回的是一個JSON對象。結果(部分字段)如下:

{

"partitions": [],

"os_pid": "36637",

"fd_total": 4864,

"sockets_total": 4285,

"mem_limit": 3435973836,

"mem_alarm": false,

"disk_free_limit": 50000000,

"disk_free_alarm": false,

"proc_total": 1048576,

"rates_mode": "basic",

"uptime": 115368753,

"run_queue": 1,

"processors": 4,

... ...

}

這裏我們需要關注的一些主要指標有:

  • mem_used:已經使用的內存;

  • mem_limit:限制最大允許使用的內存;

  • mem_alarm:bool類型值,是否開啓了內存告警;

  • disk_free_limit:磁盤還剩下多少空間告警閾值,也就是說當磁盤可用空間小於這個值時就會告警;

  • disk_free_alarm:bool類型值,是否開啓了磁盤告警;

  • fd_total:總計可用文件描述符數量;

  • fd_used:已經使用的文件描述符數量;

  • sockets_total:總計可用sockets數量;

  • sockets_used:已經使用sockets數量;

6.2.3 隊列Metrics

隊列的metrics都可以通過'http://localhost:15672/api/queues/afei/queue-normal-afei'獲取。得到的json結果如下:

{

"consumer_details": [],

"arguments": {

"x-dead-letter-exchange": "exchange-dlx-afei",

"x-message-ttl": 3000

},

"auto_delete": false,

... ...

"idle_since": "2020-05-05 7:45:58",

"incoming": [],

"memory": 19900,

"message_bytes": 0,

... ...

"state": "running",

"vhost": "/afei"

}

我們需要關注的一些主要指標有:

  • memory:使用的內存;

  • messages:未被確認的消息總數 + 準備投遞的消息總數;

  • messages_ready:準備投遞的消息總數;

  • messages_unacknowledged:未被確認的消息總數;

  • state:當前隊列的狀態,running狀態表示正常;

  • idle_since:表示當前隊列有多久沒有被消費者消費了,如果這個時間相比當前時間是很久以前,很有可能說明當前隊列沒有生產者已經廢棄。

6.2.4 應用Metrics

現在大部分系統都是分佈式的,在這樣的系統中,當某個組件出問題時,通常很難立即定位問題。所以,系統的每個部分,包括應用本身,都應該被監控起來。

一些基礎設施級別和RabbitMQ的Metrics能看出系統當前有異常行爲和問題,但是,還不能定位根據原因。例如,通過監控我們很容易知道某個節點磁盤空間不足,但是很難麻煩就知道爲什麼不足。所以,我們需要應用Metris信息來協助我們排查這樣的問題:區分哪個消息生產者速度異常、哪個消息者出現重複性的失敗、消費者速度趕不上消息產生的速度等。

應用程序跟蹤的Metrics可能是特定系統的,但是也有一些Metrics與大多數系統都是有關聯的,例如:Connection opening rate、Channel opening rate、Connection failure (recovery) rate、Publishing rate、Delivery rate、Positive delivery acknowledgement rate、Negative delivery acknowledgement rate、Mean/95th percentile delivery processing latency... ...

7. 健康檢查

下面推薦一些RabbitMQ監控檢查的命令:

  • rabbitmq-diagnostics -q ping:如果節點沒有任何問題,那麼返回Ping succeeded。

  • rabbitmq-diagnostics -q status:會顯示當前RabbitMQ節點即broker的很多信息,例如內存信息、磁盤信息、虛擬內存信息、告警信息、文件描述符等。

  • rabbitmq-diagnostics -q alarms:該命令可以檢查RabbitMQ本地以及集羣節點是否有告警信息,如果沒有,那麼返回這樣的信息:Node rabbit@afeideMacBook-Pro reported no alarms, local or clusterwide;

說明: 「rabbitmq-diagnostics」 這個命令還有很多其他的用法,我們可以通過rabbitmq-diagnostics --help查看它還能監控和檢查哪些Metrics。

最後推薦一些第三方監控工具,如下表格所示,按照字母排序,排名不分先後。這些工具能力上各有千秋,不過一般都是既能收集基礎設施級別的Metrics,也能收集RabbitMQ的Metrics。當然,筆者能力有限,可能還有其他一些好用的工具並沒有列舉出來:

名稱 簡介 地址
AppDynamics RabbitMQ監控擴展,從RaabitMQ管理API上收集RabbitMQ的metrics並上傳到AppDynamics Controller上 https://github.com/Appdynamics/rabbitmq-monitoring-extension
AWS CloudWatch 把RabbitMQ集羣的Metrics信息推送到AWS CloudWatch上的RabbitMQ插件 https://github.com/noxdafox/rabbitmq-cloudwatch-exporter
DataDog 通過Datadog代理監控RabbitMQ https://github.com/DataDog/integrations-core/tree/master/rabbitmq
Prometheus 通過Prometheus暴露RabbitMQ一些核心Metrics https://github.com/rabbitmq/rabbitmq-prometheus
Zabbix 通過Zabbix監控RabbitMQ https://git.zabbix.com/projects/ZBX/repos/zabbix/browse/templates/app/rabbitmq_http

END

如果讀完覺得有收穫的話,歡迎點【好看】,關注【阿飛的博客】,查閱更多精彩歷史!!!

相關文章