Continue to update the austin project today. If you haven't read the series, you can click on my historical article to review it. Don't forget to like it during the reading process ! It is recommended not to miss or skip to read, otherwise this article will not be understood, and I will not repeat the knowledge points and business I wrote before.
What we want to achieve today is the isolation of handlermodule consumption data . Before talking about this, let's take a look at the previous implementation.
austin-apiAfter receiving the request, send the request to Kafka, topicName is austin. And austin-handlera groupName named austinGrouplistens austinfor the data of this topic, and then realizes the message sending.
From the perspective of system architecture, the austin project can send various types of messages: SMS, WeChat applet, email, etc.
If it is a single topic and a single group, have you ever thought about a question: if there is an exception on a sending channel interface and it times out, what will happen at this time ?
Yes, messages will be blocked because they consume the same topic and use the same consumer.
01. Data isolation
To break the game? Simple. Multiple topics and multiple groups will do .
Does the above solve all the problems? No. _ Even in the same channel, the characteristics of different types of messaging are different. For example, if I want to send a push marketing message, it is possible to push a 4000W crowd at some point.
It is not realistic to send these 4000W people completely in a short period of time. This most likely means that push messages of the notification class will be affected
Still breaking? Simple. After all, we had this in mind when designing our message templates . The message template has msgTypefields to identify which type the current template belongs to, so we can divide the corresponding groups according to different message types.
In theory, we can separate a topic and group for each message type on each channel . Because the data between topics is isolated, and the consumption between different groups is also isolated, so we must be data isolated when consuming.
However, my current approach is: single topic with multiple groups . Consumption is isolated, but produced topics are shared. I think that the code will be clearer and easier to understand. If there is a bottleneck in the later stage, we can continue to change it.
02. Consumer Design
From the above, it has been decided to achieve data isolation through single topic and multiple groups. For example, I currently define 6 channels (im/push/mail/SMS/mini program/WeChat service account) and 3 message types (notification/marketing/verification code), which is equivalent to 18 consumers.
After getting the message from kafka, my tentative plan is to take a few steps: message discard -> deduplication -> really send
Deduplication and sending messages are inherently network IO intensive . Therefore, in order to improve the throughput , I decided to consume Kafka and store it in the cache as a layer of buffer .
Doing a layer of buffers can improve throughput, but it also brings other problems. For example: when the application restarts, the data in the buffer has not been consumed, will it be lost?
We can take a look at how to fix the problems later (continue to pay attention, there will be more after project optimization). I still think that the advantages of buffers outweigh the disadvantages, so back to buffers.
The first reaction that buffer gave me was to implement the producer-consumer pattern
To implement this mode, I thought it was very simple at first: consume Kafka messages as a producer, then throw the data into the blocking queue, and open multiple threads to consume the data in the blocking queue.
Later, I thought about it again, isn't the direct thread pool finished? Isn't the thread pool the realization of producers and consumers?
As a result, the architecture becomes the following diagram:
03. Code design
The code you first look at on the consumer side Receiver, this class looks simple, there is only one @KafkaListenerannotation modification method, which is consumed from Kafka and then handed over pendingfor processing
I use @KafkaListenerannotations to pull messages from Kafka instead of low-level ones Kafka api, for no other reason: I don’t need to be perfect in the early stage of the project, and I can think of a solution when there is a bottleneck. That said, it still caused me a lot of trouble when I wrote it.
The first problem : @KafkaListenerit is an annotation. From the source code annotation, its value can only be used in Spring EL expressions and read a certain configuration. But what you need to know is that my purpose is to have multiple groups consuming the same topic . And I can't say to define a consumption method for each group, right? ( Writing this kind of broken code, I can't sleep )
After looking through the technical blog for a night, I couldn't find a solution, and even sent a circle of friends to complain if anyone has encountered it. The next day I carefully flipped through Spring's official documentation and finally found a solution for me.
It's still official documentation !
Once you have a solution, things will be easier to do. Since I have to isolate each message type of each message channel, then I will enumerate this and be done!
My Receiver is multi-instance, so as long as I traverse this List (initialize the consumer on the ReceiverStart class).
After solving the problem of @KafkaListenerdynamically passing in groupId with annotations and creating multiple consumers.
I ran into a second problem : Spring has @Aysncannotations to elegantly implement thread pool method calls. I haven't used @Aysncannotations before, but I looked at the principles and postures. I find it quite elegant ( elegance never goes out of style ). But I @Aysncmust create a thread pool myself , and I have to create its own thread pool for each consumer . And I can't say to define a method for creating a thread pool for each group, right? ( Writing this kind of broken code, I can't sleep )
This time I went through the official website and various technical blogs, but I couldn't solve my problem: dynamically passing in the thread pool instance on the @Async annotation in the Spring environment, and when creating a thread pool instance, it can support passing parameters according to conditions.
Finally, we can only give up @Aysncthe annotation and implement it programmatically:
The following is the implementation of TaskPendingHolder (nothing more than creating a corresponding thread pool for each consumer), and whether to make it dynamic will be considered later:
The task implementation is relatively simple at present, just call the corresponding Handler directly and then send the message:
The code seems simple and the business seems easy to understand, but it should be known that even many small companies ' production projects do not have this design. A shuttle is really too common (the function is not impossible, the code is not impossible to run, the most important thing: people are not unable to run)
This article mainly describes an idea: when consuming MQ, multiple groups can achieve data isolation. If you want to improve the throughput of consumption, you can make another layer of buffer (provided that the consumption is IO-intensive )