摘要:} /** * 基於消息事務的處理方式,當消費失敗進行重試,有時間間隔,當達到超時時間,就發到死信隊列,等待人工處理. * @return */ @Bean public Queue testQueue() { //設置死信交換機 return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE) //毫秒 .withArgument("x-message-ttl", CONSUMER_EXPIRE) //設置死信routingKey .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build()。/** * 延時隊列:不應該有RabbitListener訂閱者,應該讓它自己達到超時時間後自動轉到死信裏去消費 * 消息異常處理:消費出現異常後,延時幾秒,然後從新入隊列消費,直到達到TTL超時時間,再轉到死信,證明這個信息有問題需要人工干預 * * @param message */ @RabbitListener(queues = MqConfig.QUEUE) public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException { try { System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"))。

這是一個基於消息的分佈式事務的一部分,主要通過消息來實現,生產者把消息發到隊列後,由消費方去執行剩下的邏輯,而當消費方處理失敗後,我們需要進行重試,即爲了最現數據的最終一致性,在rabbitmq裏,它有消息重試和重試次數的配置,但當你配置之後,你的TTL達到 後,消息不能自動放入死信隊列,所以這塊需要手工處理一下.

rabbitmq關於消息重試的配置

rabbitmq:
    host: xxx
    port: xxx
    username: xxx
    password: xxx
    virtual-host: xxx
    ###開啓消息確認機制 confirms
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual #設置確認方式
        prefetch: 1 #每次處理1條消息
        retry.max-attempts: 3 # 最大重試次數
        retry.enabled: true #是否開啓消費者重試(爲false時關閉消費者重試,這時消費端代碼異常會一直重複收到消息)
        retry.initial-interval: 2000 #重試間隔時間(單位毫秒)
        default-requeue-rejected: true #該配置項是決定由於監聽器拋出異常而拒絕的消息是否被重新放回隊列。默認值爲true,需要手動basicNack時這些參數諒失效了

手工實現消息重試並放入死信的方式

定義隊列的相關配置

/**
   * 創建普通交換機.
   */
  @Bean
  public TopicExchange lindExchange() {
    //消息持久化
    return (TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
  }

  @Bean
  public TopicExchange deadExchange() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build();
  }

  /**
   * 基於消息事務的處理方式,當消費失敗進行重試,有時間間隔,當達到超時時間,就發到死信隊列,等待人工處理.
   * @return
   */
  @Bean
  public Queue testQueue() {
    //設置死信交換機
    return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)
        //毫秒
        .withArgument("x-message-ttl", CONSUMER_EXPIRE)
        //設置死信routingKey
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build();
  }
  @Bean
  public Queue deadQueue() {
    return new Queue(LIND_DEAD_QUEUE);
  }
  @Bean
  public Binding bindBuildersRouteKey() {
    return BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER);
  }

  @Bean
  public Binding bindDeadBuildersRouteKey() {
    return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE);
  }

消費者實現的代碼

/**
   * 延時隊列:不應該有RabbitListener訂閱者,應該讓它自己達到超時時間後自動轉到死信裏去消費
   * 消息異常處理:消費出現異常後,延時幾秒,然後從新入隊列消費,直到達到TTL超時時間,再轉到死信,證明這個信息有問題需要人工干預
   *
   * @param message
   */
  @RabbitListener(queues = MqConfig.QUEUE)
  public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
    try {
      System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
      //當程序處理出現問題時,消息使用basicReject上報
      int a = 0;
      int b = 1 / a;
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    } catch (Exception ex) {
      //出現異常手動放回隊列
      Thread.sleep(2000);
      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }

  }
 /**
   * 死信隊列.
   *
   * @param message
   */
  @RabbitListener(queues = MqConfig.LIND_DEAD_QUEUE)
  public void dealSubscribe(Message message, Channel channel) throws IOException {
    System.out.println("Dead Subscriber:" + new String(message.getBody(), "UTF-8"));
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  }

這邊嘗試讓消費者執行出錯,然後走到catch裏使用basicNack方法把消息從新放裏隊列裏,並讓線程讓休息2秒,以避免頻繁操作,之後就是我們希望看到的代碼

2019-12-20T17:21:31.190:Subscriber:send a message to mq
2019-12-20T17:21:33.200:Subscriber:send a message to mq
2019-12-20T17:21:35.206:Subscriber:send a message to mq
2019-12-20T17:21:37.213:Subscriber:send a message to mq
2019-12-20T17:21:39.221:Subscriber:send a message to mq
Dead Subscriber:send a message to mq

這就是一個消息隊列的補償機制,使用死信隊列也可以實現 延時消息的機制 ,有時間再給大家分享!

相關文章