• notice
  • Congratulations on the launch of the Sought Tech site

RabbitMQ multi-consumer sequential consumption message implementation

Recently, a project message center has been set up to transfer messages generated in various systems.RabbitMQ is used.Since there are multiple consumer services in the UAT environment and production environment, some messages require sequential consumption, so you need to take Certain measures are taken to ensure the sequential consumption of messages.Here are three methods we continue to optimize:

1.The first thing we considered was relatively simple.The direct switch used specifies a specific consumer server to monitor the queue, and other consumer servers do not. For example, there are three consumer machines, C1, C2, and C3.We decide that C1 consumes messages, and C2 and C3 do not listen. When we start C1, we add C1_IP to the startup script and do processing in the code.When the consumer server starts, if the current server IP is the C1_IP of the startup script, then this C1 will monitor and consume messages. This method has a single point of failure problem.If the C1 server goes down, the remaining two nodes in the entire message center cannot consume the queue, resulting in the accumulation of queue messages. If there are abundant monitoring measures, after monitoring that C1 is down, you can manually configure C2_IP (or C3_IP) to the startup script to restart the C2 server (C3 server) to consume messages.

2.In order to solve the single point of failure problem, we use fanout switches, each consumer creates a dedicated queue, so if the producer generates two sequential messages m1 and m2 (they have a common batch number batchNo and The unique message number msgID) will be pushed to each queue, as shown in the figure below. At the same time, when consumers consume, they need to cooperate with the database to implement it.After the consumer listens to the message, it will be stored in the library (the content of the library includes the m1 message information and the consumer IP).According to the unique indexability of msgID, the message will be discarded and consumed.For m2, it is necessary to retrieve whether the consumer IP of m1 is the current IP from the library table, and discard the message if it is not. But this scheme has a disadvantage: if consumer1 dies after consuming m1, m2 can only be consumed after consumer1 is normal, and cannot be transferred to other consumers for consumption, which will be unfriendly to some business scenarios (of course, this place can be considered The dead letter exchange dead letter queue transfers, but the architecture is more complicated).


 3.The third method is similar to the second method, using fanout switches, and each consumer creates a dedicated queue. But without the help of a database, by accessing the API interface of rabbitMQ, the IPs of all consumers in these three queues are obtained and placed in the list.After the consumers listen to the message, they judge whether their ip is the minimum value in the ip set.If so, consume, if otherwise, discard the message. Once the consumer with the smallest IP goes down, there will only be two IPs left in the list, and the consumer selected by the subsequent message will choose the smallest IP consumption from these two IPs. For the same reason, it also has the shortcomings of the second scheme.

Finally, attach the code for obtaining minIP through rabbitmq's api (input parameter consumerIps is a list with initial size=0), as follows:

 private String findUsefulMinIP(List<String> consumerIps) {
       String minIp = null;
       SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
       try {
           RestTemplate rest = new RestTemplateBuilder().basicAuthentication(username, password).build();
           JSONArray result2 = rest.getForObject(moccMQApiUrl, JSONArray.class);
           if(result2 != null && result2.size()> 0) {
               log.info("===clear the ips===new query start===");
           for(int m=0; m<result2.size(); m++) {
               LinkedHashMap itmap = (LinkedHashMap) result2.get(m);
               LinkedHashMap queueMap = (LinkedHashMap)itmap.get("queue");
               if(!queueMap.values().stream().anyMatch(v -> v.toString().indexOf(moccQueue)>=0)) {
               LinkedHashMap consumerMap = (LinkedHashMap)itmap.get("channel_details");
           log.info("===query from mq===consumerIps={}", consumerIps);
       } catch (RestClientException e) {
           log.error(e.getMessage(), e);
       minIp = Collections.min(consumerIps);
       return minIp;


Technical otaku

Sought technology together

Related Topic


Leave a Reply