• notice
  • Congratulations on the launch of the Sought Tech site

When using RocketMQ, the issue of Consumers consuming messages when the service is not started during service startup is handled

background

When we use RocketMQ, the general Consumerstartup is the use of @PostConstructannotations. @PostConstruct: Used to execute methods that need to be executed after dependency injection is performed when performing any initialization.), or use the bean configuration.

The configuration is as follows:

Producer configuration

Configure all producers in the configuration class, inject and use them in the business, and bind the start and destruction of producers to the initialization and destruction of beans:

@Configuration public class MQProducerConfig {
     // first producer
     @Bean(initMethod = "start", destroyMethod = "shutdown")
     public DefaultMQProducer demo1MQProducer() {
         DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer.setNamesrvAddr("<nameServer>");
         defaultMQProducer.setProducerGroup("<group>");
         defaultMQProducer.setInstanceName("<instanceName>");
         // other producer configuration
         return defaultMQProducer;
     }
     // second producer
     @Bean(initMethod = "start", destroyMethod = "shutdown")
     public DefaultMQProducer demo2MQProducer() {
         DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer.setNamesrvAddr("<nameServer>");
         defaultMQProducer.setProducerGroup("<group>");
         defaultMQProducer.setInstanceName("<instanceName>");
         // other producer configuration
         return defaultMQProducer;
     }
     //...}

Consumer configuration

Configure all consumers in the configuration class, and bind the startup and destruction of consumers to the initialization and destruction of beans:

@Configuration public class MQConsumerConfig {
    // first consumer
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // other consumer configuration
        return consumer;
    }
    // second consumer
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // other consumer configuration
        return consumer;
    }
    //...}

optimization

The above configuration will start the producer and consumer when the project is started and Beanloaded , resulting in a slow start of the project, and there will be a large influx of messages before the project is started, so you can use the ApplicationRunneror CommandLineRunnerinterface after the project is started successfully. Execute the startup of MQ. Also, remove Consumerthe @PostConstructannotations.

The notes are as follows:

// Remove the initMethod configuration @Bean(destroyMethod = "shutdown")

The consumer startup in the above configuration is no longer bound to the bean initialization phase.

The new consumer configuration is as follows:

@Configuration public class MQConsumerConfig {
    // first consumer
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // other consumer configuration
        return consumer;
    }
    // second consumer
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // other consumer configuration
        return consumer;
    }
    //...}

Start the consumer uniformly on the project startup completion node:

@[email protected] class MqConsumerApplicationRunner implements ApplicationRunner {
    @Autowired
    private Map<String, DefaultMQPushConsumer> defaultMQPushConsumerMap;
    @Override
    public void run(ApplicationArguments args) {
        if (CollectionUtils.isEmpty(defaultMQPushConsumerMap)) {
            return;
        }
        defaultMQPushConsumerMap.forEach((bean, consumer) -> {
            try {
                consumer.start();
            } catch (MQClientException e) {
                log.error("Consumer bean:[{}] start error.", bean, e);
            }
        }
    }}


Tags

Technical otaku

Sought technology together

Related Topic

0 Comments

Leave a Reply

+