1.入門

1.簡介

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現。 工作原理:現在索引中找到對應的值,然後根據匹配的索引記錄找到對應的數據行。 select *from student where id = 5 ;先在索引上按id=5查找,然後返回包含該值的數據行。

2.應用場景

異步處理

場景:用戶註冊後,需要發註冊郵件和註冊短信。 傳統方式:將註冊信息寫入數據庫後,發送註冊郵件,再發送註冊短信,以上三個任務全部完成後才返回給客戶端。 這有一個問題是,郵件,短信並不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西。

引入消息隊列後,把發送郵件,短信不是必須的業務邏輯異步處理。 應用解耦

場景:雙11是購物狂節,用戶下單後,訂單系統需要通知庫存系統,傳統的做法就是訂單系統調用庫存系統的接口。缺點:當庫存系統出現故障時,訂單就會失敗;訂單系統和庫存系統高耦合。

消息隊列:訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。庫存系統:訂閱下單的消息,獲取下單消息,進行庫操作。

流量削峯 場景:秒殺活動,一般會因爲流量過大,導致應用掛掉,爲了解決這個問題,一般在應用前端加入消息隊列。 消息隊列:1.用戶的請求,服務器收到之後,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉到錯誤頁面.2.秒殺業務根據消息隊列中的請求信息,再做後續處理。

3.架構

Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸。

Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。

Queue:消息的載體,每個消息都會被投到一個或多個隊列。

Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。

vhost:虛擬主機,一個broker裏可以有多個vhost,用作不同用戶的權限分離。

Producer:消息生產者,就是投遞消息的程序。

Consumer:消息消費者,就是接受消息的程序。

Channel:消息通道,在客戶端的每個連接裏,可建立多個channel。

2.特性

1.任務分發機制

Round-robin(輪詢分發)

在默認情況下,RabbitMQ將逐個發送消息到在序列中的下一個消費者(而不考慮每個任務的時長等等,且是提前一次性分配,並非一個一個分配)。平均每個消費者獲得相同數量的消息。這種方式分發消息機制稱爲Round-Robin(輪詢)。 RabbbitMQ的分發機制非常適合擴展,而且它是專門爲併發程序設計的,如果現在load加重,那麼只需要創建更多的Consumer來進行任務處理。 Fair dispatch(公平分發)

輪訓不看消費者爲應答的數目,只是盲目的將第n條消息發給第n個消費者。可能存在有些服務很忙依然分發給它,有些很輕鬆卻沒有任務。爲了解決這個問題,我們使用basicQos( prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的消息給同一個消費者。 當消息處理完畢後,有了反饋,纔會進行第二次發送。還有一點需要注意,使用公平分發,必須關閉自動應答,改爲手動應答。

2.消息應答

爲了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ可以刪除它了。

如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解爲這個消息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會不丟失任何消息了。

沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ纔會重新投遞。即使處理一條消息會花費很長的時間。

消息應答是默認打開的。我們明確地把它們關掉了(autoAck=true)。現在將應答打開,一旦我們完成任務,消費者會自動發送消息應答。

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3.消息持久化

當消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍將失去!當RabbitMQ退出或者崩潰,將會丟失隊列和消息。除非你不要隊列和消息。兩件事兒必須保證消息不被丟失:我們必須把“隊列”和“消息”設爲持久化。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

4.發佈/訂閱

生產者只能發送消息給Exchanges(轉發器),轉發器是非常簡單的。一方面它接受生產者的消息,另一方面向隊列推送消息。轉發器必須清楚的知道如何處理接收到的消息。附加一個特定的隊列嗎?附加多個隊列?或者是否丟棄?這些規則通過轉發器的類型進行定義。

類型有:Direct、Topic、Headers和Fanout。

channel.exchangeDeclare("logs", "fanout");

轉發器轉發消息到隊列。關聯轉發器和隊列的叫Binding。

channel.queueBind(queueName, "logs", "");

Direct exchange(直接轉發) 準確匹配 Topic exchange(主題轉發器) 模糊匹配

(星號)可以代替任意一個標識符 ;#(井號)可以代替零個或多個標識符。

.orange. ”,Q2綁定鍵是“ .*.rabbit”,Q3綁定鍵是“lazy.#”。這些綁定可以概括爲:Q1只對橙色的動物感興趣。Q2則是關注兔子和所有懶的動物。

路由鍵爲“quick.orange.rabbit”的消息會被路由到2個隊列中去。而“lazy.orange.elephant”的消息同樣會發往2個隊列。另外“quick.orange.fox” 僅僅發往第一個隊列,而”lazy.brown.fox”則只發往第二個隊列。 “quick.brown.fox”則所有的綁定鍵都不匹配而被丟棄。

3.代碼

producer:

String EXCHANGE_NAME = "direct_logs";
		/**
		 * 創建連接連接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 設置MabbitMQ所在主機ip或者主機名
		factory.setHost("127.0.0.1");
		// 創建一個連接
		Connection connection = factory.newConnection();
		// 創建一個頻道
		Channel channel = connection.createChannel();
		// 指定轉發——廣播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
		//所有日誌嚴重性級別
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次發送一條不同嚴重性的日誌
			
			// 發送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//參數1:exchange name
			//參數2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			system.out.println(message);
		}
		// 關閉頻道和連接
		channel.close();

consumer:

ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打開連接和創建頻道,與發送端一樣
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 聲明一個隨機隊列
		String queueName = channel.queueDeclare().getQueue();
	    
	    String severity="error";//只關注error級別的日誌,然後記錄到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 創建隊列消費者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    system.out.println(message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);
相關文章