After the daily order volume reached 1 million, we restructured the order center
About the author: Once worked in Alibaba, Daily Youxian and other Internet companies, served as technical director.
Recently, many readers and friends have left messages, hoping that "two horses" will write more practical work experience, problems encountered in work and technical solutions. At the request of everyone, this article introduces an experience of order center reconstruction.
For an e-commerce company I worked for a few years ago, with the growth of our business, our daily orders quickly increased from 300,000 to 1 million, and the total number of orders exceeded 100 million. Mysql database used at that time. According to monitoring, our highest order volume per second has reached 2,000 (excluding seckill, seckill TPS has reached tens of thousands. We have a set of special solutions for seckill, see "Seckill System Design ~100 Million Users" for details). However, until this time, the order system was still a single database and a single table. Fortunately, the database server configuration was good at that time, and our system could support such a lot of pressure.
The business volume is still growing rapidly. If we don’t refactor the system, we will sooner or later have a major event. We spent a day quickly formulating a refactoring plan.
Refactoring? Speaking of being so tall, isn't it just a sub-library and sub-table? Indeed, it is the sub-library sub-table . However, in addition to sub-database and sub-table, it also includes management-side solutions . For example, operations, customer service and business need to query order data from multiple dimensions. After sub-database and sub-table, how to meet everyone's needs? After the database is divided into tables, the online plan and the data non-stop migration plan need to be carefully considered. In order to ensure the stability of the system, it is also necessary to consider the corresponding downgrade plan .
Why do we need to sub-library and sub-table?
When the database has a performance bottleneck: IO bottleneck or CPU bottleneck. Both bottlenecks will eventually lead to an increase in the number of active connections to the database, which in turn reaches the threshold of the maximum number of active connections that the database can sustain. Eventually, the application service will have no connection available, resulting in catastrophic consequences. You can optimize from the aspects of code , sql and indexing. If there is not much room for optimization in these aspects, it is time to consider sub-database and sub-table.
1. IO bottleneck
Disk read IO bottleneck. Because there is too much hot data, the database cache cannot hold it at all, and a large amount of disk IO will be generated during the query, and the query speed will be relatively slow, which will lead to a large number of active connections, which may eventually develop into the consequence of no connection available. You can adopt the scheme of one master and multiple slaves, and read and write separation, and use multiple slave libraries to share query traffic. Or use the scheme of sub-library + horizontal sub-table (splitting the data of one table into multiple tables for storage, for example, the order table can be split by user_id).
The second type: Disk write IO bottleneck. Due to frequent database writes, frequent disk write IO operations will occur, and frequent disk IO operations will result in a large number of active connections, which will eventually lead to the result that no connections are available. At this time, only the library partition scheme can be used, and multiple libraries are used to share the writing pressure. Coupled with the strategy of horizontal table splitting, after table splitting, the amount of data stored in a single table will be smaller, the cost of index search and update when inserting data will be lower, and the insertion speed will naturally be faster.
2. CPU bottleneck
SQL problem. If the SQL includes join, group by, order by, non-indexed field conditional query and other operations that increase the CPU operation, it will cause significant pressure on the CPU. At this time, you can consider SQL optimization, create appropriate indexes, or put some computationally-intensive SQL logic into the application for processing.
The amount of data in a single table is too large. Because the amount of data in a single table is too large, such as more than 100 million, the traversal tree level is too deep or the rows scanned are too many, the SQL efficiency will be very low, and it will also consume a lot of CPU. At this time, tables can be divided according to the level of business scenarios.
Sub-library and sub-table plan
There are two main schemes for sub-database sub-table:
Using MyCat, KingShard proxy middleware sub-database sub-table. The advantage is that the coupling with the business code is very low, only some configuration is required, and the access cost is low. The disadvantage is that this kind of proxy middleware needs to be deployed separately, so there is one more layer from the call connection. Moreover, the sub-database sub-table logic is completely managed by the agent middleware, which is completely a black box for the programmer. Once the agent itself has a problem (such as an error or downtime), it will lead to the inability to query and store related business data, resulting in catastrophic consequences. If you are not familiar with the source code of the proxy middleware, it will be very difficult to troubleshoot the problem. There was a company that used MyCat. After the online failure, it was forced to modify the plan, and it took three days and three nights to restore the system. CTO is also abolished!
Use Sharding-Jdbc, TSharding and other lightweight components to sub-library and sub-tables in the form of Jar packages. The disadvantage is that there will be a certain amount of code development workload, which is somewhat intrusive to the business. The advantage is that it is transparent to the programmer, and the programmer will have stronger control over the logic of sub-database and sub-table. Once a fault occurs, it will be easier to troubleshoot the problem.
To be on the safe side, we chose the second option, using the more lightweight Sharding-Jdbc.
Before doing system reconstruction, we must first determine the goal of reconstruction, and secondly, we must have an expectation for future business development, which can be learned from relevant business leaders. Identify refactoring scenarios based on goals and business expectations. For example, we hope that after this reconstruction, the system can support two years, and no major changes will be made within two years. The business side expects the daily order volume to reach 10 million within two years. It is equivalent to 10 times the daily order volume after two years.
According to the above data, we are divided into 16 databases. Based on the daily order volume of 10 million, the average daily order volume of each library is 625,000 (10 million/16), and the maximum order volume per second is theoretically around 1250 (2000*(62.5/100)). In this way, the pressure on the database is basically controllable, and server resources are basically not wasted.
Each library is divided into 16 tables. Even if the order volume is 10 million per day, the total order volume in two years is 7.3 billion (7.3 billion = 10 million * 365 * 2), and the average data volume of each library is 456 million (4.56 100 million = 7.3 billion/16), the average amount of data per table is 28.5 million (28.5 million = 456 million/16). It can be seen that the amount of data in each table in the next two to three years is not too much, and it is completely within the controllable range.
The sub-database and sub-table are mainly used for ordering and querying on the user side. The query frequency is the highest by user_id, followed by order_id. So we choose user_id as the sharding column, do hashing by user_id, and store the order data of the same user in the same table in the same database. In this way, when a user queries an order on a web page or an app, he only needs to route to one table to obtain all the user's orders, thus ensuring the query performance.
In addition, we mixed the user ID (user_id) information in the order ID (order_id). To put it simply, the design idea of order_id is to divide order_id into two parts, the front part is user_id, and the latter part is the specific order number. The combination of the two parts constitutes order_id. This way we can easily parse out the user_id from the order_id. When querying an order by order_id, first parse out the user_id from the order_id, and then route to the specific library table according to the user_id.
In addition, the database is divided into 16, and each library is divided into 16 tables and there is another advantage. 16 is the N power of 2, so the result of the hash modulo 16 is the same as the result of the bitwise AND operation of the hash and 16. We know that bit operations are based on binary and go directly to the lowest-level machine language across various compilations and transformations, and the efficiency is naturally much higher than modulo operations.
Some readers may ask, if the query directly checks the database, will there be performance problems? Yes. So we added Redis to the upper layer, and Redis made a sharded cluster to store the last 50 orders of active users. In this way, only a small number of user requests that cannot find orders in Redis will go to the database to query the order, which reduces the database query pressure, and each sub-database has two slave databases, and query operations only go to the slave database , further amortizing the pressure of each sub-library.
Some readers may ask, why is the consistent hash scheme not adopted? What if the user queries the last 50 orders before? Please keep looking back!
Management side technical solution
After the database and table are divided, the order data of different users are scattered in different databases and tables. If you need to query the order according to other conditions than the user ID. For example, if the operator wants to find out the order quantity of iphone7 on a certain day from the background, he needs to find out the data from the tables of all the databases and then aggregate them together. This code implementation is very complex, and the query performance will be very poor. So we need a better solution to this problem.
We adopted the ES (Elastic Search) + HBase combination scheme to isolate the index from the data storage. Fields that may participate in conditional retrieval will be indexed in ES, such as merchant, product name, order date, etc. All order data is fully stored in HBase. We know that HBase supports massive storage, and the query speed according to rowkey is super fast. The multi-condition retrieval capability of ES is very powerful. It can be said that this solution fully utilizes the advantages of ES and HBase.
Let's take a look at the query process of this solution: first, according to the input conditions, go to the corresponding index of ES to query the qualified rowkey value, and then use the rowkey value to query HBase. The query speed in the latter step is extremely fast, and the query time is almost negligible. As shown below:
This solution solves the business needs of the management side to query orders through various field conditions, and also solves the needs of the merchants to query orders according to the merchant ID and other conditions. If the user wants to query the historical orders before the last 50 orders, this solution can also be used.
Millions of order data are generated every day. If the management background wants to find the latest order data, it needs to update the ES index frequently. In the scenario of massive order data, will frequent index updates put too much pressure on ES?
ES indexes have the concept of a segment. ES divides each index into several smaller segments. Each segment is a complete inverted index, and all segments of the relevant index are scanned in turn during a search query. Every time you refresh (refresh the index), a new segment is generated, so the segment actually records a set of changed values for the index. Since each index refresh only involves individual segments, the cost of updating the index is low. Therefore, even if the default index refresh interval is only 1 second, ES can handle it calmly. However, since the storage and scanning of each segment needs to occupy a certain amount of resources such as memory and CPU, the ES background process needs to continuously merge segments to reduce the number of segments, thereby improving scanning efficiency and reducing resource consumption.
The order data in Mysql needs to be synchronized to Hbase and ES in real time. What is the synchronization scheme?
We use Canal to obtain the incremental order data in the Mysql database table in real time, and then push the order data to the message queue RocketMQ. After the consumer obtains the message, the data is written to Hbase, and the index is updated in ES.
The picture comes from the Internet
Above is the schematic of Canal,
1. Canal simulates the interactive protocol of mysql slave, disguising itself as the slave library of mysql
2. Send dump protocol to mysql master
3. The mysql master receives the dump protocol and sends the binary log to the slave (Canal)
4. Canal parses the binary log byte stream object, and performs corresponding processing on the binary log byte stream according to the application scenario
To ensure data consistency, no data is lost. We use RocketMQ's transactional messages to ensure that messages can be sent successfully. In addition, the ack operation is performed after both Hbase and ES have successfully operated to ensure normal consumption of messages.
Non-stop data migration
In the Internet industry, many systems have a high traffic volume, even at two or three in the morning. Service suspension due to data migration is difficult for business parties to accept! Let's talk about our non-stop data migration solution without the user's perception!
What key points should we pay attention to during the data migration process? First, to ensure that the data is accurate and not lost after the migration, that is, each record is accurate and not lost; second, it does not affect the user experience, especially the C-side business with high traffic needs to be smoothly migrated without downtime; third, to ensure that after the migration system performance and stability.
Commonly used data migration schemes mainly include three schemes: hanging from the library, double writing, and using data synchronization tools. Let's introduce them separately.
hang from the library
Build a slave library on the master library. After the data synchronization of the slave database is completed, the slave database will be upgraded to the master database (new database), and then the traffic will be switched to the new database.
This method is suitable for scenarios where the table structure remains unchanged, and the traffic during idle periods is low, allowing downtime for migration. It generally occurs in the scenario of platform migration , such as migration from the computer room to the cloud platform, and migration from one cloud platform to another cloud platform. Most small and medium-sized Internet systems have very low traffic during idle periods. During idle periods, a few minutes of downtime has little impact on users and is acceptable to the business side. So we can adopt the solution of downtime migration. Proceed as follows:
1. Create a new slave database (new database), and the data starts to be synchronized from the master database to the slave database.
2. After the data synchronization is completed, find a free time period. In order to ensure the consistency of the master-slave database data, you need to stop the service first, and then upgrade the slave database to the master database. If the domain name is used to access the database, directly resolve the domain name to the new database (the main database upgraded from the database). If the IP is used to access the database, change the IP to the new database IP.
3. Finally start the service and the entire migration process is complete.
The advantage of this migration solution is that the migration cost is low and the migration period is short. The downside is that the process of switching databases requires stopping the service. Our concurrency is relatively high, and we have done sub-database and sub-table, and the table structure has also changed, so this solution cannot be adopted!
The old library and the new library are written at the same time, then the old data is migrated to the new library in batches, and finally the traffic is switched to the new library and the reading and writing of the old library is closed.
This method is suitable for scenarios where the data structure changes and does not allow downtime for migration. It generally occurs when the system is reconfigured, and the table structure changes, such as the table structure change or the sub-database sub-table and other scenarios. Some large-scale Internet systems usually have a high concurrency, and there is a considerable amount of access even during idle periods. A few minutes of downtime will also have a significant impact on users, and even lead to the loss of certain users, which is unacceptable to the business side. Therefore, we need to consider a non-stop migration solution that users are not aware of.
Let's talk about our specific migration plan, the steps are as follows:
Code ready. In the place where the order table is added, deleted or modified in the service layer , to operate the new database (database table after sub-database and sub-table) and the old database at the same time, it is necessary to modify the corresponding code (write the new database and the old database at the same time). Prepare the migration program script for migrating old data. Prepare a verification program script to verify whether the data of the new library and the old library are consistent.
Double write is enabled, the old library and the new library are written at the same time. Note: Any additions, deletions and changes to the database must be double-written; for update operations, if there are no related records in the new database, you need to find the records from the old database first, and write the updated records into the new database; in order to ensure the writing performance, the old database After the library is written, the new library can be written asynchronously using the message queue.
Use a script to migrate old data before a certain timestamp to the new repository. Note: 1. The timestamp must be selected at the time point after double writing is enabled, such as 10 minutes after double writing is enabled, to avoid some old data being missed; The update operation of each step has already pulled the records to the new database; 3. The migration process must record logs, especially the error log. If there is a double-write failure, we can restore the data through the log to ensure the new and old databases. Data is consistent.
After the third step is completed, we also need to check the data through the script program to see whether the new database data is accurate and whether there is any missing data
After the data verification is ok, double-read is turned on. At first, a small amount of traffic is given to the new library, and the new library and the old library are read at the same time. Due to the delay problem, there may be a small amount of inconsistent data records between the new library and the old library, so if the new library cannot be read, the old library needs to be read again. Then gradually switch the read traffic to the new library, which is equivalent to the process of grayscale online. If you encounter problems, you can switch the traffic back to the old library in time
After all the read traffic is switched to the new library, close the writing of the old library (you can add a hot configuration switch to the code), and only write the new library
After the migration is completed, the useless code related to double-write and double-read can be removed later.
Utilize data synchronization tools
We can see that the above double-write scheme is more troublesome, and many places where the database is written need to modify the code. Is there a better solution?
We can also use tools such as Canal and Data Bus for data synchronization. Take Ali's open source Canal as an example.
Using the synchronization tool, there is no need to enable double writing, and the service layer does not need to write double writing code. You can directly use Canal for incremental data synchronization. The corresponding steps become:
Code ready. Prepare the canal code, parse the binary log byte stream object, and write the parsed order data into the new library. Prepare the migration program script for migrating old data. Prepare a verification program script to verify whether the data of the new library and the old library are consistent.
Run the Canal code to start the synchronization of incremental data (new data generated online) from the old library to the new library.
Use a script to migrate old data before a certain timestamp to the new repository. Note: 1. The timestamp must be selected from the time point after the Canal program is started (such as the time point 10 minutes after the Canal code is run ) to avoid some old data being missed; 3. The migration process must record logs, especially errors Log, if some records fail to be written, we can restore the data through the log to ensure that the data of the new and old databases is consistent.
After the third step is completed, we also need to check the data through the script program to see whether the new database data is accurate and whether there is any missing data
After the data verification is ok, double-read is turned on. At first, a small amount of traffic is given to the new library, and the new library and the old library are read at the same time. Due to the delay problem, there may be a small amount of inconsistent data records between the new library and the old library, so if the new library cannot be read, the old library needs to be read again. Gradually switch the read traffic to the new library, which is equivalent to the process of grayscale online. If you encounter problems, you can switch the traffic back to the old library in time
After all the read traffic is switched to the new library, switch the write traffic to the new library (you can add a hot configuration switch to the code. Note: Since the Canal program is still running during the switching process, the data changes of the old library can still be obtained and synchronized to the new library. library, so the switching process will not cause some old library data to be unable to synchronize with the new library)
Close the Canal program
Migration is complete.
Expansion and reduction plan
It is necessary to re-hash the data, and then write the data of the original multiple database tables into the expanded database table. The overall expansion plan is basically the same as the above non-stop migration plan. Data synchronization schemes such as double-write or Canal can be used.
Better sub-database sub-table solution
From the previous description, it is not difficult to see that our sub-database sub-table scheme has some defects. For example, the use of hash modulo method will cause uneven data distribution, and it is also very troublesome to expand and shrink.
These problems can be solved with a consistent hashing scheme. Consistent hashing based on the design principle of virtual nodes can make data distribution more uniform.
Moreover, the consistent hash adopts the ring design idea. When adding or removing nodes, the cost of data migration will be lower, and only the data of adjacent nodes needs to be migrated. However, when the capacity needs to be expanded, it is basically necessary to double the capacity, and a new node is added to each node gap on the hash ring, so that the access and storage pressure of all the original nodes can be shared.
Due to space reasons, the consistency hash is not described in detail here. There are a lot of relevant information on the Internet. If you are interested, you can study it carefully.
When the order service pressure is too high during the big promotion period, the synchronous call can be changed to the asynchronous message queue mode to reduce the order service pressure and improve the throughput.
During the big promotion, the order volume is very high at certain time points. We adopt the method of asynchronous batch writing to the database to reduce the frequency of database access, thereby reducing the writing pressure of the database. Detailed steps: The back-end service receives the order request and puts it directly into the message queue. After the order service retrieves the message, it first writes the order information to Redis, and accumulates 10 orders every 100ms or writes them into the database in batches. After placing an order on the front-end page, it regularly pulls the order information from the back-end, and then jumps to the payment page after obtaining the order information. Using this method of asynchronous batch writing to the database greatly reduces the frequency of database writing, thereby significantly reducing the writing pressure of the order database. However, because the order is written to the database asynchronously, there will be a temporary inconsistency between the database order and the corresponding inventory data, and the user may not be able to check the order in time after placing the order. Because it is a downgrade scheme after all, the user experience can be appropriately reduced, and we can ensure that the data is eventually consistent. According to the system pressure, you can turn on the downgrade switch for asynchronous batch writing at the beginning of the promotion, and turn off the downgrade switch after the promotion ends. The process is as follows: