rabbitmq~消息失敗後重試達到 TTL放到死信隊列(事務型消息補償機制)
摘要:} /** * 基於消息事務的處理方式,當消費失敗進行重試,有時間間隔,當達到超時時間,就發到死信隊列,等待人工處理. * @return */ @Bean public Queue testQueue() { //設置死信交換機 return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE) //毫秒 .withArgument("x-message-ttl", CONSUMER_EXPIRE) //設置死信routingKey .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build()。/** * 延時隊列:不應該有RabbitListener訂閱者,應該讓它自己達到超時時間後自動轉到死信裏去消費 * 消息異常處理:消費出現異常後,延時幾秒,然後從新入隊列消費,直到達到TTL超時時間,再轉到死信,證明這個信息有問題需要人工干預 * * @param message */ @RabbitListener(queues = MqConfig.QUEUE) public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException { try { System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"))。
這是一個基於消息的分佈式事務的一部分,主要通過消息來實現,生產者把消息發到隊列後,由消費方去執行剩下的邏輯,而當消費方處理失敗後,我們需要進行重試,即爲了最現數據的最終一致性,在rabbitmq裏,它有消息重試和重試次數的配置,但當你配置之後,你的TTL達到 後,消息不能自動放入死信隊列,所以這塊需要手工處理一下.
rabbitmq關於消息重試的配置
rabbitmq: host: xxx port: xxx username: xxx password: xxx virtual-host: xxx ###開啓消息確認機制 confirms publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual #設置確認方式 prefetch: 1 #每次處理1條消息 retry.max-attempts: 3 # 最大重試次數 retry.enabled: true #是否開啓消費者重試(爲false時關閉消費者重試,這時消費端代碼異常會一直重複收到消息) retry.initial-interval: 2000 #重試間隔時間(單位毫秒) default-requeue-rejected: true #該配置項是決定由於監聽器拋出異常而拒絕的消息是否被重新放回隊列。默認值爲true,需要手動basicNack時這些參數諒失效了
手工實現消息重試並放入死信的方式
定義隊列的相關配置
/** * 創建普通交換機. */ @Bean public TopicExchange lindExchange() { //消息持久化 return (TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build(); } @Bean public TopicExchange deadExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build(); } /** * 基於消息事務的處理方式,當消費失敗進行重試,有時間間隔,當達到超時時間,就發到死信隊列,等待人工處理. * @return */ @Bean public Queue testQueue() { //設置死信交換機 return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE) //毫秒 .withArgument("x-message-ttl", CONSUMER_EXPIRE) //設置死信routingKey .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build(); } @Bean public Queue deadQueue() { return new Queue(LIND_DEAD_QUEUE); } @Bean public Binding bindBuildersRouteKey() { return BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER); } @Bean public Binding bindDeadBuildersRouteKey() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE); }
消費者實現的代碼
/** * 延時隊列:不應該有RabbitListener訂閱者,應該讓它自己達到超時時間後自動轉到死信裏去消費 * 消息異常處理:消費出現異常後,延時幾秒,然後從新入隊列消費,直到達到TTL超時時間,再轉到死信,證明這個信息有問題需要人工干預 * * @param message */ @RabbitListener(queues = MqConfig.QUEUE) public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException { try { System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8")); //當程序處理出現問題時,消息使用basicReject上報 int a = 0; int b = 1 / a; channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception ex) { //出現異常手動放回隊列 Thread.sleep(2000); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } /** * 死信隊列. * * @param message */ @RabbitListener(queues = MqConfig.LIND_DEAD_QUEUE) public void dealSubscribe(Message message, Channel channel) throws IOException { System.out.println("Dead Subscriber:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
這邊嘗試讓消費者執行出錯,然後走到catch裏使用basicNack方法把消息從新放裏隊列裏,並讓線程讓休息2秒,以避免頻繁操作,之後就是我們希望看到的代碼
2019-12-20T17:21:31.190:Subscriber:send a message to mq 2019-12-20T17:21:33.200:Subscriber:send a message to mq 2019-12-20T17:21:35.206:Subscriber:send a message to mq 2019-12-20T17:21:37.213:Subscriber:send a message to mq 2019-12-20T17:21:39.221:Subscriber:send a message to mq Dead Subscriber:send a message to mq
這就是一個消息隊列的補償機制,使用死信隊列也可以實現 延時消息的機制
,有時間再給大家分享!