• notice
  • Congratulations on the launch of the Sought Tech site

Implementation method of RabbitMQ dead letter queue and delay switch in springcloud

0 Preface

Dead letter queue is a very important concept in message queue. At the same time, we need the concept of delayed sending in business scenarios, such as cancellation of unpaid orders after 30 minutes in 12306. So in this issue, we will explain the dead letter queue and how to implement the delayed sending requirement through the delay switch.


1. Dead letter queue


1.2 What is a dead letter?

Before understanding the dead letter queue, let's first explain what a dead letter is. The so-called dead letter is a message that has not been successfully consumed, but not all messages that have not been successfully consumed are dead letter messages. The generation of dead letter messages comes from the following three Approaches: (1) The message is rejected by the consumer, and the parameter requeue is set to false (2) The expired message, the expired message is divided into two types: a. When sending a message, set the time-to-live (message TTL) of a certain message , if the survival time is up and the message has not been consumed, it will be marked as a dead letter message b. Set the message survival time of the queue, for all messages in the queue, if the survival time is up and the message has not been consumed, it will be Marked as a dead letter message (3) When the queue reaches the maximum length, the message sent again will directly become a dead letter message


1.3 What is a dead letter queue?

Directly speaking, the queue used to hold dead letters is the dead letter queue, which seems to be nonsense, so the key point is to understand the concept of dead letters.

The role of the dead letter queue: (1) When the queue is full, the message will be sent to the dead letter queue, so that the message will not be lost, and then the message will be taken out from the dead letter queue for consumption. (2) The effect of delayed consumption can be realized based on the dead letter queue. The specific implementation will be explained later


1.4 Create dead letter switches and dead letter queues

Dead-letter exchanges and dead-letter queues are actually ordinary exchanges and queues, but they are specially declared to store dead-letter messages. We only need deadLetterExchangeto declare the dead letter exchange through the method, and then use the deadLetterRoutingKeymethod to declare the dead letter queue

As shown in the code below, we create test.queuetest.exchangeand dead.queuedead.exchangeand test.queueassign the dead-letter switch and dead-letter route to the test queue in

注意: It involves modifying the attributes of queues and switches. If the queues and switches already exist, they need to be deleted before they take effect. Otherwise, an error may be reported.

@Configurationpublic class RabbitMqConfig {
   private static final String TEST_EXCHANGE = "test.exchange";
   private static final String TEST_QUEUE = "test.queue";
   private static final String TEST_ROUTING_KEY = "test.routing.key";
   private static final String DEAD_EXCHANGE = "dead.exchange";
   private static final String DEAD_QUEUE = "dead.queue";
   private static final String DEAD_ROUTING_KEY = "dead.routing.key";
   
   @Bean
   public Queue deadQueue(){
       return new Queue(DEAD_QUEUE);
   }
   public DirectExchange deadExchange(){
   // Set up the demo, using the direct switch Direct, you can declare as other types of switches according to your business situation
       return new DirectExchange(DEAD_EXCHANGE);
   public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
       return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
   public Queue testQueue(){
       return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
   public DirectExchange testExchange(){
       return new DirectExchange(TEST_EXCHANGE);
   public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
       return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);}


1.5 Implementing dead letter messages


1.5.1 Reject or nack based on consumers to implement dead letter messages

@Componentpublic class QueueListener {
   @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
   public void handler(MyMessage messageInfo, Message message, Channel channel) {
       try{
           System.out.println("Received message: "+messageInfo.toString());
           // Set the requeue parameter to false to set the dead letter message
           channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
           // multiple and requeue are set to false to set dead letter messages
           channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
           // Return ack to confirm receipt of the message//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
       }catch (IOException e){
           try {
               channel.basicRecover();
           } catch (IOException ex) {
               ex.printStackTrace();
               log.error("Message processing failed: {}",e.getMessage());
           }
       }
   }}


1.5.2 Implementation based on time-to-live

(1) Set the time-to-live when sending a message

    @GetMapping("sendTestQueueWithExpiration")
   public String sendTestQueueWithExpiration(){
       MyMessage message = new MyMessage(1L,"Logistics reminder","Arrived at the loading area, pay attention to uploading documents",new Date());
       rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> {
           msg.getMessageProperties().setExpiration("5000");
           return msg;
       });
       return "Send successfully";
   }

(2) Queue setting survival time

    @Bean
   public Queue testQueue(){
       return QueueBuilder.durable(TEST_QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               // 10s expires
               .ttl(10000)
               .build();
   }


1.5.3 Implementation based on queue max_length

    @Bean
   public Queue testQueue(){
       return QueueBuilder.durable(TEST_QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               // The maximum capacity is 100
               .maxLength(100)
               .build();
   }


1.6 Delayed sending of messages based on dead letter queues

As mentioned above, we said that the dead letter queue can also delay the sending of messages. The idea is: (1) Set the survival time of the message when the message is sent, and its survival time is the time we want to delay (2) The messager monitors the dead letter queue for consumption

Messages in the normal queue are not consumed by consumers, and the survival time is specified at the same time. After the arrival time, the message is forwarded to the dead letter queue, and the consumer listens to the dead letter queue and consumes it.


The Problem of Delayed Sending of Messages Based on Dead Letter Queues

If there are two messages, one is 5s time-to-live and the other is 10s time-to-live, when we send a message with 10s time-to-live to the queue first, because rabbitmq will only monitor the life time of the message on the outermost side of the queue, that is, monitoring Messages with a 10s survival time, and messages with a 5s survival time will only be monitored after the outermost 10s message expires, which leads to the fact that I actually need 5s survival time. It actually takes 10s to monitor.

Therefore, the delayed messages implemented based on the dead letter queue are only used for messages with consistent delay time.

In order to adapt to more delay scenarios, delay messages have been implemented more easily, and we have introduced a delay switch


2. Delay switch

The delay switch is not a function that comes with rabbitmq, but is implemented by installing the delay switch plug- delayed_message_exchangein

We have already explained the installation of its plug-ins, no longer tired, you can refer to the following blog post springcloud: Install rabbitmq and configure the delay queue plug-in

The delay message realized by the delay switch mainly focuses on the switch. The queue is a common queue. After the message is sent to the switch, the delay time of the message will be recorded, and it will be sent to the queue after the arrival time, so that consumers can monitor the queue by monitoring the queue. , you can get the message at the specified time

Therefore, the realization of delay switches and ordinary switches is only when the switch is created, and other operations are the same as ordinary switches, so it is also very convenient to use.

Create a delay switch and declare the switch type through x-delayed-typeattributes, which can be direct or topic. Specifically, 4 switch types are supported. If you are not clear, you can refer to the previous blog post

@Configurationpublic class RabbitMqDelayConfig {
   public static final String DELAY_EXCHANGE = "delay.exchange";
   public static final String DELAY_QUEUE = "delay.queue";
   public static final String DELAY_ROUTING_KEY = "delay.routing.key";
   @Bean
   public Exchange delayExchange(){
       Map<String, Object> arguments = new HashMap<>(1);
       arguments.put("x-delayed-type","direct");
       return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
   }
   @Bean
   public Queue delayQueue(){
       return new Queue(DELAY_QUEUE);
   }
   @Bean
   public Binding delayBinding(Queue delayQueue, Exchange delayExchange){
       return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
   }}

Specifies the delay time, in milliseconds, when sending a message

rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
           @Override
           public Message postProcessMessage(Message message) throws AmqpException {
               message.getMessageProperties().setDelay(30000);
               return message;
           }
       });

We can also encapsulate this method as a tool class method, which is convenient to call later

/**
* Send delay queue
* @param exchange exchange
* @param routeKey route
* @param message message
* @param delaySecond delay seconds
*/
public void send(String exchange, String routeKey, Object message, int delaySecond){  
rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {
// message persistence
msg.getMessageProperties().setDelay(delaySecond * 1000);
return msg;
});
}


3. Application scenarios

There are many application scenarios for delayed messages. In addition to the automatic cancellation of the order that we mentioned at the beginning of 30 minutes without payment, there is also an automatic sign for receipt of the goods within 72 hours after arrival.

Basically all business scenarios that require delayed triggering can be implemented with rabbitmq delay queues.


4. Practice questions

For students who are new to rabbitmq, here I provide you with an exercise question, and let everyone strengthen your understanding of rabbitmq in practice:

Requirement: The order has not been signed for 72 hours after the arrival of the order, and the automatic signing has been explained. Here we want to realize the automatic signing function after the order arrives. After the order arrives, the method of sending an automatic signing message will be triggered. The status of the order has been signed is 2, The arrival status is 1. If the receipt has been signed 72 hours ago, that is, the status is updated to 2, then the automatic receipt needs to be cancelled (the automatic receipt is not executed, that is, the automatic receipt message is ignored)



Tags

Technical otaku

Sought technology together

Related Topic

0 Comments

Leave a Reply

+