TensorFlow(hereinafter referred to as TF) is an open source deep learning framework launched by Google, which has been widely used in the Meituan recommendation system scenario. However, the official version of TensorFlow's support for industrial-level scenarios is not particularly perfect at present. In the process of mass production, Meituan encountered the following challenges:
All parameters are expressed in Variable, which opens up a lot of memory for more than 10 billion sparse parameters, resulting in a waste of resources;
Only supports distributed expansion of hundreds of workers, and has poor scalability to thousands of workers;
Because it does not support dynamic addition, deletion and incremental export of large-scale sparse parameters, Online Learning cannot be supported;
When a large-scale cluster is running, it will encounter slow machines and downtime; because the framework layer cannot handle it, the task will run abnormally.
The above problems are not the problems of TensorFlow design, but more of the underlying implementation. Considering the usage habits of Meituan's large number of businesses and the compatibility of the community, we are based on the native TensorFlow 1.x architecture and interface, from the support of large-scale sparse parameters, training mode, distributed communication optimization, pipeline optimization, operator optimization and integration, etc. Multi-dimensional deep customization has been carried out to solve the core pain point of this scene.
First of all, in terms of support capability, the new system can currently achieve nearly linear acceleration of hundreds of billions of parameter models, distributed training of thousands of workers, and complete training in one day with sample data throughout the year, and supports online learning capabilities. At the same time, the various architectures and interfaces of the new system are more friendly, and Meituan’s internal business departments, including Meituan Takeaway, Meituan Select, Meituan Search, Advertising Platform, and Dianping Feeds, are all used. This article will focus on the work of large-scale distributed training optimization , hoping to help or inspire everyone.
2 Large-scale training optimization challenges
2.1 Challenges brought by business iteration
With the development of Meituan's business, the scale and complexity of the recommendation system model is also growing rapidly. The specific performance is as follows:
Training data : The training samples increased from tens of billions to hundreds of billions, an increase of nearly 10 times.
Sparse parameters : the number has increased from hundreds to thousands, which has also increased by nearly 10 times; the total number of parameters has increased from hundreds of millions to tens of billions, an increase of 10 to 20 times.
Model complexity : More and more complex, the model single-step calculation time increases by more than 10 times.
For a high-traffic business, a training experiment has grown from a few hours to a few days, and in this scenario, it is a basic requirement to keep an experiment within one day.
2.2 System Load Analysis
2.2.1 Problem Analysis Tool Chain
TensorFlow is a very large open source project with millions of lines of code. The monitoring indicators of the native system are too coarse and do not support global monitoring. It is difficult to locate some complex performance bottlenecks. Based on Meituan's open-source monitoring system CAT, we built a fine-grained monitoring link for TensorFlow (as shown in Figure 1 below), which can accurately locate performance bottlenecks.
Figure 1 TensorFlow PS architecture full link monitoring
At the same time, in the process of performance optimization, a large number of performance tests and result analysis will be involved, which is also a very labor-intensive work. We abstracted an automated experiment framework (as shown in Figure 2 below), which can conduct experiments automatically and in multiple rounds, automatically collect various monitoring indicators, and then generate reports.
Figure 2 Automated experiment framework
2.2.2 Load Analysis from a Business Perspective
In the recommendation system scenario, we use the TensorFlow Parameter Server  (referred to as PS) asynchronous training mode to support business distributed training requirements. For this architecture, what kind of load changes will the above-mentioned business changes bring? As shown in Figure 3 below:
Figure 3. Large-scale training load analysis of TensorFlow PS architecture
In summary, it mainly includes communication pressure, PS concurrency pressure, and Worker computing pressure. For distributed systems, the load problem is usually solved by horizontal scaling. Although it seems that the problem can be solved, from the experimental results, when the PS is expanded to a certain number, the single-step training time will increase, as shown in Figure 4 below:
Figure 4 Experiment of extending PS to improve training performance
The core reason for this result is: Worker single-step training needs to be completed synchronously with all PS communications, and N communication links are added for each additional PS, which greatly increases the link delay (as shown in Figure 5 below). And a training to perform millions, tens of millions of steps of training. In the end, the link delay exceeds the benefit of adding PS computing power concurrency .
Figure 5 Link overhead caused by adding PS
For this system, the core difficulty of optimization lies in how to optimize distributed computing under limited PS instances .
3 Optimizing Practices
3.1 Introduction to large-scale sparse parameters
For the recommendation system model, most of the parameters are sparse parameters, and a very important operation for sparse parameters is Embedding, which is usually the heaviest load and the focus of subsequent optimization. Since we redefine the sparse parameter, the subsequent optimization is also based on this, so let's introduce the work of this part first.
To build the Embedding module in native TensorFlow, the user needs to first create a Variable enough to hold all the sparse parameters, and then learn Embedding on this Variable. However, using Variable for Embedding training has many disadvantages:
The size of the variable must be set in advance. For the scene of tens of billions of billions, this setting will bring huge waste of space;
The training speed is slow, and custom optimization for sparse models is not possible.
We first solved the problem of presence or absence, using HashTable instead of Variable, sparse feature ID as Key, and Embedding vector as Value. Compared with the native way of using Variable for Embedding, it has the following advantages:
The size of the HashTable can be automatically scaled during the training process, avoiding redundant storage space, and users do not need to pay attention to the application size, thereby reducing the cost of use.
A series of customized optimizations have been implemented for the HashTable solution. Compared with Variable, the training speed has been greatly improved. It can train hundreds of billions of models with good scalability.
Thanks to the dynamic scaling of sparse parameters, we support Online Learning on this basis.
The API design remains compatible with the community version, and the use is almost the same as the native Variable, and the docking cost is extremely low.
A simplified version of the implementation based on the PS architecture is shown in Figure 6 below:
Figure 6 HashTable scheme supporting large-scale sparse parameters
The core process can be roughly divided into the following steps:
The sparse feature IDs (usually we will complete the unified coding work in advance) enter the Embedding module. With the help of the Send-Recv mechanism built by TensorFlow, these sparse feature IDs are pulled to the PS side, and operators such as Lookup on the PS side will actually start from the bottom layer. Query and assemble Embedding vectors in HashTable.
The above Embedding vector is pulled back by the Worker for subsequent training, and the gradients of this part of the parameters are calculated through backpropagation, and these gradients are further pulled back by the optimizer on the PS side.
The optimizer on the PS side first calls the Find operator, obtains the original sparse parameter vector corresponding to the gradient and the corresponding optimizer parameters from the HashTable, and finally completes the update calculation of the Embedding vector and optimizer parameters through the optimization algorithm, and then uses the Insert operator. Insert into HashTable.
3.2 Distributed Load Balancing Optimization
This part of the optimization is the classic optimization direction of distributed computing. The PS architecture is a typical "bucket model". In order to complete one-step training, the Worker needs to interact with all PSs, so the balance between PSs is very important. However, in practice, we found that the time consumption of multiple PSs is not balanced. The reasons include not only the load imbalance caused by the simple graph-cutting logic (Round-Robin) of the TensorFlow PS architecture, but also the imbalance caused by heterogeneous machines. balanced.
For the recommendation model, our main optimization strategy is to automatically and evenly split all sparse parameters and large dense parameters to each PS, which can solve most of these problems. In the process of practice, we also found a problem that is difficult to troubleshoot: the implementation of the native Adam optimizer causes the PS load to be unbalanced. It will be described in detail below.
In the Adam optimizer, its parameter optimization process requires two betas to participate in the calculation. In the implementation of native TensorFlow, these two betas are shared by all Variabl (or HashTable) that need this optimizer to optimize, and will be shared with The first Variable (lexicographical order of names) falls on the same PS, which brings a problem: each optimizer has only one β_1 and one β_2, and only on a certain PS. Therefore, in the process of parameter optimization, the PS will be subjected to much higher requests than other PSs, thus causing the PS to become a performance bottleneck.
Figure 7 Adam optimization algorithm
But by observing Adam's optimization algorithm, we can see that β_1 and β_2 are both constants, and the blue highlighted parts are relatively independent calculation processes, and each PS can be completed independently. Based on this finding, the optimization method is very intuitive. We created β parameters redundantly for the Adam optimizer on each PS, and calculated t and alpha values locally to remove PS hot spots caused by uneven load. question.
The improvement brought by this optimization is universal and has obvious effect. In a business model within Meituan, removing the beta hotspot can bring about a 9% performance improvement. In addition, because of getting rid of the global dependence on β, this optimization can also improve the scalability of the PS architecture, which will bring a better speedup than before when increasing the number of workers.
3.3 Communication optimization
According to the analysis in Chapter 2.2, the communication pressure of the system is also very large, and we mainly do the communication optimization work based on RDMA. First of all, let's briefly introduce RDMA. Compared with the traditional communication process based on socket TCP/IP protocol stack, RDMA has the advantages of zero copy and kernel bypass, which not only reduces the delay of the network, but also reduces the occupancy rate of the CPU. RDMA is more suitable for the relevant communication process of deep learning models.
RDMA mainly includes three protocols Infiniband, RoCE (V1, V2), iWARP. In the deep learning scenario within Meituan, the RDMA communication protocol uses the RoCE V2 protocol. At present, in the field of deep learning training, especially in dense model training scenarios (NLP, CV, etc.), RDMA has become the standard for large-scale distributed training. However, in the training of large-scale sparse models, the open source system has very limited support for RDMA. The TensorFlow Verbs  communication module has not been updated for a long time, and the communication effect is not ideal. We have carried out a lot of work based on this. improvement work.
The optimized version, in the 1TB Click Logs public dataset, DLRM model, and training with more than 100 workers, has improved the performance by 20%~40%. In terms of multiple business models of Meituan, the communication layer implementation transformed by TensorFlow Seastar  also has a speed improvement of 10% to 60%. And also giving our work back to the community.
3.3.1 Memory Registration Optimization
RDMA has three data transmission methods: SEND/RECV, WRITE, and READ. Among them, WRITE and READ are similar to the data sender directly reading and writing in the remote Memory, which the Receiver cannot perceive. WRITE and READ are suitable for batch data transmission. Inside TensorFlow, the RDMA-based data transmission method uses the WRITE unilateral communication mode.
Figure 8 RDMA transmission mode
When RDMA transmits data, it is necessary to open up memory space in advance and register it on the network card device (Memory Registration process, hereinafter referred to as MR), so that this space can be directly operated by the network card. Opening up new memory and registering it on the device is a time-consuming process. Figure 9 below shows the time-consuming of binding memory of different sizes to the network card device. It can be seen that with the increase of registered memory, the time-consuming of binding MR increases rapidly.
Figure 9 MR process overhead
The community version of Tensorflow RDMA is implemented. Tensor creation still uses the unified BFC Allocator, and all created Tensors are registered on MR. As mentioned above, MR registration binding has performance overhead, and high-frequency, large-space MR registration will bring significant performance degradation. For Tensors in the training process, only those Tensors involved in cross-node communication need to perform MR, and the rest of the Tensors do not need to be registered with MR. Therefore, the optimization method is relatively straightforward. We identify and manage those communication Tensors, and only perform MR registration on these Tensors that communicate across nodes.
3.3.2 RDMA Static Allocator
The RDMA static allocator is an extension of the previous MR registration optimization. Through Memory Registration optimization, we reduce the number of MR registrations by removing the MR registrations of non-transmitting Tensors. However, under the large-scale training of sparse scenes, there are often hundreds or thousands of workers trained in parallel, which will bring new problems:
The PS and Worker in the PS architecture are each other's Client-Server. Here, the PS side is taken as an example. When the number of Workers increases to thousands, the number of Workers increases, resulting in a very high frequency of MR registration on the PS side, which increases memory allocation and registration. time consuming.
Since the shape of the output Tensor of the same operator may change between different steps in a sparse scene, the reusability of the created MR is poor, resulting in high memory fragmentation and repeated registration of the MR overhead.
In response to the above problems, we introduce the strategy of MR static allocator.
Figure 10 MR static distributor
The core design ideas here are:
Although the shape of the output Tensor of the same operator in sparse scenarios may change, the overall change range is controllable. Through monitoring and analysis, a relatively stable memory size can be found to meet the storage requirements of Tensors between multiple steps.
Based on the above information, we modified the original Tensor (Request) MR application strategy, pre-applying for a large space at one time and registering it on the network card side, and then allocating space through the allocation strategy maintained by ourselves, which greatly reduces the Given the frequency of MR applications, in most cases, only one MR registration application is required during the entire training process.
We introduced a simple exchange protocol to package the shape and data of the transmitted Tensor together and write it to the client side. According to the protocol, the client parses the size of the Tensor, and finally reads the Data, which avoids the multiple negotiation process caused by the change of the shape of the Tensor in the native implementation.
Figure 11 MR static distributor construction flow
Specifically, in the implementation, we introduced the Allocation Analysis module. At the beginning of training, we will analyze the allocated historical data to obtain an actual pre-developed MR size and the reserved space size of each Tensor. Then we will suspend the training process and start the construction process of Allocator, including the creation of MR and the synchronization of information on both ends of the communication. The MR Info Map is constructed by using the relevant information. The key of this Map is the unique mark of the transmission Tensor (ParsedKey, which is determined when calculating the graph and cutting the graph). The Info structure contains the local address pointer, offset size, ibv_send_wr related information, etc. Then resume training, and subsequent Tensor transmissions can be sent and received using the statically developed MR, which also eliminates the need for multiple negotiation processes due to Shape changes.
3.3.3 Multi RequestBuffer and CQ Load Balancing
The RDMA communication process of the TensorFlow community version includes not only the sending and receiving process of the above Tensor data, but also the sending and receiving process of transmission-related control messages. The sending and receiving processes of control messages also use the ibv_post_send and ibv_post_recv primitives. There are some bottlenecks in the implementation of native control flow, which limit the throughput of control flow during large-scale training, thereby affecting the efficiency of data transmission and reception. Specifically reflected in:
The sending of the request is written out through the same piece of RequestBuffer memory, and the requests of multiple clients all rely on this piece of Buffer, which leads to the fact that the control flow information is actually sent serially, and the next Request can only be made after waiting for the Ack information of the opposite end. The write out, which limits the sending throughput of the request.
On the client side, the RDMA Completion Queue needs to be polled to obtain the arrival of the request and the change of the related state. The native implementation has only one Completion Queue, and a single thread performs polling processing, which limits the efficiency of responses in large-scale distributed training.
In response to the above problems, we adopted Multi RequestBuffer and CQ load balancing optimization to break the throughput bottleneck that may exist in the request sending and request response links.
3.3.4 Send-Driven & Rendezvous-Bypass
Students who are familiar with the Tensorflow PS architecture will understand that after a whole calculation graph is cut into the Worker side and the PS side, in order to enable the two calculation graphs to exchange data with each other, an asynchronous data exchange mode based on the Rendezvous (convergence point) mechanism is established. . As shown in Figure 12 below:
Figure 12 Send-Recv pair addition of TensorFlow cut graph
Based on the graph-cutting logic in the above figure, the Recv operator represents the demand for Tensors on this side of the computation graph, and the Tensor's producer is behind the Send operator on another paired device.
In terms of specific implementation, Tensorflow implements the data exchange mode of Recv-Driven. As shown in the figure above, the two computation graphs located in DeviceA and DeviceB will be executed asynchronously and concurrently. When Recv in DeviceB is executed, an RPC request will be sent to DeviceA. , DeviceA will route the request to Rendezvous after receiving the request. If it finds that the required data has been produced and registered by the Send operator, it will obtain the data locally and return it to DeviceB; If the production has not yet been completed, the Recv request from DeviceB is registered in Rendezvous. After the subsequent DeviceA is produced, the Send operator will send it to find the registered Recv, trigger a callback, and return the data to DeviceB.
We see that the rendezvous mechanism elegantly solves the problem of data exchange in the case of different producer-consumer rhythms. However, the Recv-Driven model also introduces two potential problems:
According to our observation, in the actual business model, the ratio of the Recv operator waiting for the Send operator in Rendezvous is the same as the ratio of the Send operator waiting for the Recv operator. At that moment, it can be sent to the opposite end, but due to the problem of mechanism implementation, it is still necessary to wait for the Recv operator to come over before pulling the data back. The communication process takes a long time.
As a data exchange hotspot, Rendezvous does not have low internal logic overhead.
In response to the problems mentioned above, we have implemented another data exchange mode on RDMA, called Send-Driven mode. Compared with the Recv-Driven mode, as the name implies, the Send operator directly writes the data to the Recv side, the Recv side receives the data and registers it in the local Rendezvous, and the Recv operator directly obtains the data from the local Rendezvous. The specific process is shown in Figure 13 below:
Figure 13 Native Recv-Driven and supplementary Send-Driven mechanism
As can be seen from the figure, compared with the Recv-Driven mode, the communication process of the Send-Driven mode has been greatly simplified. In addition, the feature of sending immediately after the data is ready skips the Rendezvous on one side, and for production In the case of consumers prior to consumers, the speed of data acquisition at the consumer end can be accelerated.
3.4 Latency Optimization
This part of the optimization is also the classic optimization direction of distributed computing. Those that can be streamlined, merged, and overlapped in the entire process chain need to be continuously explored. For machine learning systems, compared with other systems, some approximate algorithms can also be used to do this part of the work, so as to obtain a greater performance improvement. Here are some optimization practices we have done in both of these areas.
3.4.1 Sparse Domain Parameter Aggregation
After enabling HashTable to store sparse parameters, correspondingly, some supporting parameters also need to be replaced with HashTable implementation, so that there will be multiple HashTables and a large number of related operators in the entire calculation graph. In practice, we found that we need to reduce the number of operators such as Lookup/Insert as much as possible, on the one hand to reduce the PS load, and on the other hand to reduce the RPC QPS. Therefore, for common usages of sparse models, we conduct related aggregation work.
Taking the Adam optimizer as an example, two slots, m and v, need to be created to save the momentum information in the optimization. Its Shape is the same as that of Embedding. In the native optimizer, these two Variables are created separately and read and written when the reverse gradient is updated. Similarly, when using the HashTable scheme, we need to create two separate HashTables to train the m and v parameters at the same time. Then, in the forward and reverse, it is necessary to perform a Lookup and an Insert on the Embedding, m, and v respectively, and a total of three Lookups and three Inserts are required.
An optimization point here is to aggregate the Embedding, m, v, and low-frequency filtering counters (see Counting HashTable in Figure 14 below) as the Value of the HashTable, so that related operations on sparse parameters can be aggregated and executed, greatly reducing the number of The operation frequency of sparse parameters reduces the pressure on PS.
Figure 14 Parameter fusion strategy based on HashTable
This feature is a universal optimization. After the aggregation function is turned on, the training speed is significantly improved, and the performance improvement is always positive as the scale of the model and worker changes. In the real business model of Meituan, the performance after aggregation can be improved by about 45% compared with the non-aggregation method.
3.4.2 Embedding pipeline optimization
Assembly line, in industrial production, refers to a production method in which each production unit only focuses on processing a certain segment of work to improve work efficiency and output. In the computer field, it is more well known that pipeline represents a parallelization technique of overlapping execution between multiple tasks. For example, in a typical RISC processor, the user's program consists of a large number of instructions, and the execution of an instruction can be roughly divided into: fetching, decoding, executing, accessing memory, and writing back. These links will use a variety of different hardware units such as instruction Cache, data Cache, registers, and ALU. In each instruction cycle, the hardware units of these five links will be executed in parallel, so that the hardware capabilities can be fully utilized. Improves instruction throughput performance across the processor. The processor's instruction pipeline is a complex and systematic underlying technology, but its ideas are also widely used in distributed deep learning frameworks, such as:
If distributed training is simply abstracted into two processes of computation and communication, most mainstream deep learning frameworks support overlapping of communication and computation when executing computational graph DAGs.
If the deep model training is simply divided into forward and reverse, in a single step, due to the strong dependence of the two, it cannot be effectively parallelized. The communication scheduling introduced in BytePS breaks the interval between step iterations. After the update of some parameters of the previous round is completed, the forward calculation of the next round can be started in advance, which enhances the overlap of the forward and reverse directions in the overall perspective.
In order to solve the problem that the parameters are located in the main memory, but the calculation is located in the GPU in the GPU training of the CTR scene, Baidu AIBox  skillfully schedules different hardware devices, and builds a parameter preparation stage that mainly uses the CPU/main memory/network card. In the network computing stage of GPU/NVLink, higher training throughput is achieved through two-stage Overlap.
We see that in the design of deep learning framework, by analyzing the scene, parallel stages can be discovered from different perspectives to improve the overall training throughput.
For large-scale sparse model training, the core model process is: first perform Embedding of sparse parameters, and then perform dense partial subnetworks. The sparse parameter Embedding is executed on the remote PS, which mainly consumes network resources, while the dense sub-network is executed on the local Worker, which mainly consumes computing resources. These two parts account for most of the time in the entire process, and they take up 40+% and 50+% of the actual business model of Meituan.
Then can we perform the Embedding of the sparse parameters in advance to achieve the overlap of communication and calculation, and hide this part of the time? It is certainly feasible in terms of system implementation, but algorithmically speaking, doing so will introduce the problem of parameter staleness, which may cause the accuracy of the model to be affected. However, in actual production scenarios, large-scale asynchronous training itself will bring about tens to hundreds of steps of lag. After our test, the sparse parameters of one or two steps were obtained in advance, and the model accuracy was not affected.
In terms of specific implementation, we split the entire calculation graph into two subgraphs, Embedding Graph (EG) and Main Graph (MG), which are executed asynchronously and independently to achieve the overlap of the splitting process (the entire splitting process can be done right user transparency). EG mainly covers the extraction of the Embedding Key from the sample, the query and assembly of the Embedding vector, and the update of the Embedding vector. MG mainly includes dense partial sub-network calculation, gradient calculation, and partial update of dense parameters.
Figure 15 Embedding pipeline module interaction relationship
The interaction between the two subgraphs is: EG transfers the Embeding vector to MG (from the perspective of MG, it reads the value from a dense Variable); MG transfers the gradient corresponding to the Embedding parameter to EG. The expressions of the above two processes are the computation graphs of TensorFlow. We use two threads and two sessions to execute the two computation graphs concurrently, so that the two stages overlap, thus achieving greater training throughput.
Figure 16 Embedding pipeline architecture flowchart
The above figure is the architecture flowchart of the Embedding pipeline. Intuitively, it is divided into the sample distribution module on the left, the cross-session data exchange module on the top, and the Embedding Graph and Main Graph obtained by automatic graph segmentation. The blue circle represents the new operator, and the orange arrow represents the key process of EG. The blue arrows represent the MG focus process, and the red arrows represent the sample data focus process.
An abstraction layer named Pipeline Dataset is introduced in a form transparent to the user. This layer is generated to meet the requirements of the two calculation graphs of EG/MG running at different rhythms, and supports custom configuration. In addition, in order to make the data in the entire pipeline match each other, it will also be responsible for the generation and registration of a global Batch ID. Pipeline Dataset exposes two Iterators, one for EG and one for MG. The bottom of the Pipeline Dataset shares the Dataset of each layer native to TensorFlow.
The top ExchangeManager is a static, cross-session data exchange medium that exposes data registration and data pull capabilities. The reason for abstracting this module is that EG and MG originally belonged to one computational graph, but they were disassembled into two graphs because of the pipeline, so we needed to establish a cross-session data exchange mechanism and matched them accurately. It uses the global Batch ID as the key internally, and then manages the sample data, Embeding vector, Embedding gradient, Index after Unique and other data, and is responsible for the life cycle management of these data.
The middle Embedding Graph is run by a separate TF Session in a separate thread. After obtaining the sample data through the a operator, the extraction of feature IDs is performed, and the sparse parameter query based on the HashTable method is performed. The query result is passed through the c operator. Placed in ExchangeManager. EG also contains the f operator for reverse update, which will obtain the Embedding gradient and its matching forward parameters from the ExchangeManager, and then execute the gradient update parameter logic.
The following Main Graph is responsible for the calculation of the actual dense sub-network. We inherit and implement a trainable EmbeddingVariable. Its construction process (d operator) will find the matching Embedding vector from the ExchangeManager and encapsulate it into an EmbeddingVariable for the dense sub-network. . In addition, in the reverse method of EmbeddingVariable registration, we add the e operator so that the Embedding gradient can be added to the ExchangeManager for consumption by the f operator in EG.
Through the above design, we have built a controllable EG/MG concurrent pipeline training mode. Overall, the revenue sources of the Embedding pipeline training mode are:
Through our profiling analysis of multiple business models, we found that the time ratio of EG and MG is about 3:7 or 4:6. By parallelizing these two stages, the Embedding stage can be effectively hidden, so that the MG network can be effectively hidden. The computational part can almost always start immediately, greatly speeding up the overall model training throughput.
When multiple optimizers (sparse and non-sparse) are used in the TensorFlow engine, there will be a problem of repeatedly constructing the reverse calculation graph, which increases additional calculations to a certain extent. This problem is avoided by splitting the two subgraphs.
During the implementation process, the ExchangeManager is not only responsible for the exchange of Embedding parameters and gradients, but also for the management of metadata reuse. For example, the results of operators such as Unique are saved, which further reduces repeated calculations.
In addition, in the API design, we have made it transparent to the user, only one line of code can open the Embedding pipeline function, and the EG/MG cutting process is hidden from the user. At present, in a certain business training of Meituan, the Embedding pipeline function can bring a performance improvement of 20%~60% under the CPU PS architecture (and the larger the concurrent worker scale, the better the performance).
3.5 Single instance PS concurrency optimization
After the analysis in chapter 2.2, we can see that we cannot improve the throughput of distributed tasks by continuously expanding PS. The concurrent optimization of single-instance PS is also a very important optimization direction. Our main optimization work is as follows.
3.5.1 High-performance HashTable
Under the PS architecture, large-scale sparse model training has high requirements for the concurrent reading and writing of HashTable, because each PS has to bear the Embedding pressure of hundreds or even thousands of workers. Here, we consider speed and stability, and choose tbb: :concurrent_hash_map is implemented as the underlying HashTable table and wraps it into a new TBBConcurrentHashTable operator. After testing, at the scale of 100 billion, TBBConcurrentHashTable is 3 times faster than the native MutableDenseHashTable training speed.
3.5.2 HashTable BucketPool
For large-scale sparse model training, Embedding HashTable will face a large number of concurrent operations. Through Profiling, we found that frequent and dynamic memory requests will bring a large performance overhead (even though TensorFlow's Tensor has a dedicated memory allocator). We optimize the memory management of HashTable based on the idea of memory pooling.
When we initialize the HashTable, we will first create two BucketPools for the Key and Value respectively, and each pool will first Malloc a larger piece of memory to reserve, considering that there may be scenarios in which the Key and Value in the HashTable are removed (such as Online Learning training), the memory used by the Key and Value deleted from the HashTable needs to be reclaimed, so each BucketPool also has a ReuseQueue responsible for maintaining the reclaimed memory. Each time the key and value are inserted into the internal hash table data structure, the memory and release allocation of the key and value are pooled and managed. In this way, the sparse memory allocation overhead encountered in large-scale sparse training is reduced, and the overall end-to-end training performance is improved by about 5%.
Figure 17 HashTable memory optimization
3.6 Throughput optimization of unit computing power
After the analysis in Chapter 2.2, the computational pressure of the Worker is also very high. If the Worker is not optimized and the throughput is to be maintained, more Workers need to be scaled horizontally, which will bring more pressure to the PS. For users, if it can bring performance improvement under limited computing resources, the business value will be higher. We counted some high-frequency operators through CAT and carried out special optimization. Here, the Unique&DynamicPartition operator fusion case is selected for sharing.
In the TensorFlow PS architecture, the shared parameters including the Embedding vector are stored on the PS and interact with the Worker through the network. During the Embedding query process, the following two links are often involved:
Due to the nature of sparse parameters, the repetition rate of the Embedding ID to be queried extracted from the sample is often as high as 70%~90%. Not a little pressure. Therefore, the Unique operation is usually performed before the query.
In large-scale sparse scenarios, in order to store hundreds of billions of parameters, there will be multiple PS machines jointly hosting. The Worker side will be responsible for splitting the query request according to the set routing rules. Here, the DynamicPartition action is usually performed before the query.
Usually these two processes are built using TensorFlow's existing operators, but in actual use, we found that it is not very efficient. The main problems are:
The Unique operator is natively implemented, and its internal memory allocation strategy is relatively inefficient. Memory allocation is performed using twice the size of the input parameter (Embedding ID), but due to the large input parameter and high repetition rate, the HashTable is created too large and very sparse. Almost every insert will generate a minor_page_fault, resulting in HashTable performance degradation. We verified this using Intel Vtune (see Figure 18).
Unique and Dynamic Partition operators have redundant data traversal. In fact, these operations can be completed in one data traversal, which saves time for operator switching and redundant data traversal.
Figure 18 DRAM Bound problem in Unique operator
To sum up, HashTable has opened up a large number of minor_page_faults, resulting in increased memory access time. If HashTable is too small, it may lead to expansion. We adopted the implementation of the memory adaptive Unique operator based on heuristic algorithm. Through the statistics of the repetition rate of the training history, we can obtain a relatively reasonable HashTable size to improve the memory access performance; in addition, the specific HashTable in the Unique operator In terms of choice, after our various tests, we chose Robin HashTable to replace the implementation in native TF.
Further, we combined the operators around the Unique and Partition links of Embedding ID to simplify the logic implementation. After the above optimization, the Unique single operator can achieve a 51% speedup, and the end-to-end performance of the real model can be improved by about 10%, and the total number of operators is reduced by 4%.
In the whole process of key operator optimization, Lin Lifan, Zhang Xiangze, and Gao Ming from Intel provided a lot of technical support, and we also reused some of their optimization work. We would like to express our deepest gratitude!
4 Modeling of large-scale sparse algorithms
In the process of large-scale sparse capability, in the process of business implementation, the algorithm level also needs to be upgraded from the characteristics and model structure to get very good results. Among them, food delivery advertisements start from the business characteristics and introduce large-scale sparse features to complete the upgrade of the feature system in the food delivery scenario, providing a higher-dimensional feature space and parameter space, and enhancing the fitting ability of the model. The feature coding scheme for high-dimensional sparse scenes is redesigned to solve the feature conflict problem in the feature coding process. At the same time, the coding process removes some redundant feature hash operations, which simplifies the feature processing logic to a certain extent and reduces the The time-consuming feature calculation.
At the system level, the training of large-scale sparse models with tens of billions of parameters and tens of billions of samples will greatly reduce the efficiency of training iterations, and a single experiment will increase from less than one day to about a week. The Meituan machine learning platform training engine team, in addition to the above-mentioned optimization at the TensorFlow framework level, has also made special optimizations for the business model. The overall throughput is optimized by 8 to 10 times (if more computing resources are invested, it can be further accelerated), which greatly improves the business. The iterative efficiency of the company has helped the takeaway advertising business to achieve a significant improvement.
5 Summary and Outlook
TensorFlow is widely used in large-scale recommender systems, but the lack of large-scale sparse large-scale distributed training capabilities hinders business development. Based on the TensorFlow native architecture, Meituan supports large-scale sparse capabilities, and has been deeply optimized from multiple perspectives to achieve efficient distributed training with hundreds of billions of parameters and hundreds of billions of samples, and has been used on a large scale within Meituan. . The TensorFlow community has also resonated with the lack of such key capabilities. The community officially created SIG Recommenders in 2020 to solve such problems through community co-construction. Meituan will also actively participate in the community in the future. among the contributions.
The model training of the Meituan recommendation system scenario is currently mainly running on the CPU, but with the development of the business, some models have become more and more complex, and it is difficult to optimize the CPU on the CPU (the optimized Worker CPU usage is 90%). %above). In recent years, the computing power of GPU has improved by leaps and bounds. The new generation of NVIDIA A100 GPU has a computing power of 156TFLOPS (TF32 Tensor Cores), 80G memory, and 600GB/s of bandwidth between cards. For the workload of such complex models, we designed the next-generation distributed training architecture based on the A100 GPU architecture. After preliminary optimization, we have also achieved good results in the recommendation model of a large-traffic business in Meituan, and we are still further During the optimization, we will share it later, so stay tuned.