When using RocketMQ, the issue of Consumers consuming messages when the service is not started during service startup is handled
background
When we use RocketMQ, the general Consumer
startup is the use of @PostConstruct
annotations. ( @PostConstruct
: Used to execute methods that need to be executed after dependency injection is performed when performing any initialization.), or use the bean configuration.
The configuration is as follows:
Producer configuration
Configure all producers in the configuration class, inject and use them in the business, and bind the start and destruction of producers to the initialization and destruction of beans:
@Configuration public class MQProducerConfig { // first producer @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer demo1MQProducer() { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); defaultMQProducer.setNamesrvAddr("<nameServer>"); defaultMQProducer.setProducerGroup("<group>"); defaultMQProducer.setInstanceName("<instanceName>"); // other producer configuration return defaultMQProducer; } // second producer @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer demo2MQProducer() { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); defaultMQProducer.setNamesrvAddr("<nameServer>"); defaultMQProducer.setProducerGroup("<group>"); defaultMQProducer.setInstanceName("<instanceName>"); // other producer configuration return defaultMQProducer; } //...}
Consumer configuration
Configure all consumers in the configuration class, and bind the startup and destruction of consumers to the initialization and destruction of beans:
@Configuration public class MQConsumerConfig { // first consumer @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQPushConsumer demo1Consumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>"); consumer.setNamesrvAddr("<nameServer>"); consumer.setInstanceName("<instanceName>"); consumer.subscribe("<topic>", "<tag>"); consumer.setMessageListener(<listener>); // other consumer configuration return consumer; } // second consumer @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQPushConsumer demo2Consumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>"); consumer.setNamesrvAddr("<nameServer>"); consumer.setInstanceName("<instanceName>"); consumer.subscribe("<topic>", "<tag>"); consumer.setMessageListener(<listener>); // other consumer configuration return consumer; } //...}
optimization
The above configuration will start the producer and consumer when the project is started and Bean
loaded , resulting in a slow start of the project, and there will be a large influx of messages before the project is started, so you can use the ApplicationRunner
or CommandLineRunner
interface after the project is started successfully. Execute the startup of MQ. Also, remove Consumer
the @PostConstruct
annotations.
The notes are as follows:
// Remove the initMethod configuration @Bean(destroyMethod = "shutdown")
The consumer startup in the above configuration is no longer bound to the bean initialization phase.
The new consumer configuration is as follows:
@Configuration public class MQConsumerConfig { // first consumer @Bean(destroyMethod = "shutdown") public DefaultMQPushConsumer demo1Consumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>"); consumer.setNamesrvAddr("<nameServer>"); consumer.setInstanceName("<instanceName>"); consumer.subscribe("<topic>", "<tag>"); consumer.setMessageListener(<listener>); // other consumer configuration return consumer; } // second consumer @Bean(destroyMethod = "shutdown") public DefaultMQPushConsumer demo2Consumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>"); consumer.setNamesrvAddr("<nameServer>"); consumer.setInstanceName("<instanceName>"); consumer.subscribe("<topic>", "<tag>"); consumer.setMessageListener(<listener>); // other consumer configuration return consumer; } //...}
Start the consumer uniformly on the project startup completion node:
@[email protected] class MqConsumerApplicationRunner implements ApplicationRunner { @Autowired private Map<String, DefaultMQPushConsumer> defaultMQPushConsumerMap; @Override public void run(ApplicationArguments args) { if (CollectionUtils.isEmpty(defaultMQPushConsumerMap)) { return; } defaultMQPushConsumerMap.forEach((bean, consumer) -> { try { consumer.start(); } catch (MQClientException e) { log.error("Consumer bean:[{}] start error.", bean, e); } } }}
0 Comments