Distributed training patterns
This chapter covers Distinguishing the traditional model training process from the distributed training process Using parameter servers to build models that cannot fit in a single machine Improving distributed model training performance using the collective communication pattern Handling unexpected failures during the distributed model training process
The previous chapter introduced a couple of practical patterns that can be incorporated into the data ingestion process, which is usually the beginning process in a distributed machine learning system that’s responsible for monitoring any incoming data and performing necessary preprocessing steps to prepare model training.
Distributed training, the next step after the data ingestion process, is what distinguishes distributed machine learning systems from other distributed systems. It’s the most critical part of a distributed machine learning system.
The system design needs to be scalable and reliable to handle datasets and models of different sizes and various levels of complexity. Some large and complex models cannot fit in a single machine, and some medium-size models that are small enough to fit in single machines struggle to improve the computational performance of distributed training.
It’s also essential to know what to do when we see performance bottlenecks and unexpected failures. Parts of the dataset may be corrupted or cannot be used to train the model successfully, or the distributed cluster that the distributed training depends on may experience an unstable or even disconnected network due to weather conditions or human error.
In this chapter, I’ll explore some of the challenges involved in the distributed training process and introduce a few established patterns adopted heavily in industries. Section 3.2 discusses challenges in training large machine learning models that tag main themes in new YouTube videos but cannot fit in a single machine; it also shows how to overcome the difficulty using the parameter server pattern. Section 3.3 shows how to use the collective communication pattern to speed up distributed training for smaller models and avoid unnecessary communication overhead among parameter servers and workers. The last section discusses some of the vulnerabilities of distributed machine learning systems due to corrupted datasets, unstable networks, and preemptive worker machines, as well as ways to address those problems.
3.1 What is distributed training? Distributed training is the process of taking the data that has already been processed by data ingestion (discussed in chapter 2), initializing the machine learning model, and then training the model with the processed data in a distributed environment such as multiple nodes. It’s easy to get this process confused with the traditional training process of machine learning models, which takes place in a single-node environment where the datasets and the machine learning model objects are on the same machine, such as a laptop. By contrast, distributed model training usually happens in a cluster of machines that could work concurrently to greatly speed up the training process.
In addition, the dataset is often located on the local disk of a single laptop or machine in traditional model training, whereas in distributed model training, a remote distributed database is used to store the dataset, or the dataset has to be partitioned on disks of multiple machines. If the model is not small enough to fit on a single machine, it’s not possible to train the model in a traditional way with a single machine. From a network infrastructure perspective, an InfiniBand (https://wiki.archlinux.org/title/InfiniBand) or remote direct memory access (RDMA; https://www.geeksforgeeks.org/remote-direct-memory-access-rdma/) network is often preferred for distributed training instead of a single local host. Table 3.1 provides a comparison of these training methods.
Table 3.1 Comparison of traditional (nondistributed) training and distributed training for machine learning models Traditional model training Distributed model training Computational resources Laptop or single remote server Cluster of machines Dataset location Local disk on a single laptop or machine Remote distributed database or partitions on disks of multiple machines Network infrastructure Local hosts InfiniBand or RDMA Model size Small enough to fit on a single machine Medium to large
InfiniBand and RDMA InfiniBand is a computer networking communications standard used in high-performance computing. It features high throughput and low latency for data interconnecting both among and within computers or storage systems, which is often required for distributed training. RDMA provides direct access from the memory of multiple machines without involving any machine’s operating system. This standard permits high-throughput, low-latency networking—especially useful in the distributed training process, in which communications among machines are frequent.
3.2 Parameter server pattern: Tagging entities in 8 million YouTube videos Suppose that we have a dataset called YouTube-8M (http://research.google.com/youtube8m; figure 3.1) that consists of millions of YouTube video IDs, with high-quality machine-generated annotations from a diverse vocabulary of more than 3,800 visual entities (such as Food, Car, and Music). We’d like to train a machine learning model to tag the main themes of YouTube videos that the model hasn’t seen.
This dataset consists of both coarse and fine-grained entities. Coarse entities are the ones nondomain experts can recognize after studying some existing examples, and fine-grained entities can be identified by domain experts who know how to differentiate among extremely similar entities. These entities have been semiautomatically curated and manually verified by three raters to be visually recognizable. Each entity has at least 200 corresponding video examples, with an average 3,552 training videos. When the raters identify the entities in the videos, they are given a guideline to assess how specific and visually recognizable each entity is, using a discrete scale from 1 to 5, where 1 represents an entity that a layperson can easily identify (figure 3.2).
In the online dataset explorer provided by YouTube-8M (http://research.google. com/youtube8m/explore.html), the list of entities appears on the left side, and the number of videos that belong to each entity appears next to the entity name (figure 3.3).
Note that in the dataset explorer, the entities are ordered by the number of videos in each entity. In figure 3.3, the three most popular entities are Games, Video game, and Vehicle, respectively, ranging from 415,890 to 788,288 training examples. The least popular entities (not shown in the figure) are Cylinder and Mortar, with 123 and 127 training videos, respectively.
Figure 3.2 A screenshot of a question and guideline displayed to human raters for identifying the entities in the YouTube videos to assess how visually recognizable each entity is (Source: Sudheendra Vijayanarasimhan et al. Licensed under Nonexclusive License 1.0)
Figure 3.3 A screenshot of the dataset explorer provided by the YouTube-8M website, ordering the entitiesby number of videos (Source: Sudheendra Vijayanarasimhan et al. Licensed under Nonexclusive License 1.0)
3.2.1 The problem With this dataset, we’d like to train a machine learning model to tag the main themes of new YouTube videos that the model hasn’t seen. This task may be trivial for a simpler dataset and machine learning model, but that’s certainly not the case for the You-Tube-8M dataset. This dataset comes with precomputed audiovisual features from billions of frames and audio segments, so we don’t have to calculate and obtain them on our own—tasks that often take a long time and require a large amount of computational resources.
Even though it is possible to train a strong baseline model on this dataset in less than a day on a single GPU, the dataset’s scale and diversity can enable deep exploration of complex audiovisual models that can take weeks to train. Is there any solution for training this potentially large model efficiently?
3.2.2 The solution First, let’s take a look at some of the entities using the data explorer on the YouTube-8M website and see whether any relationships exist among the entities. Are these entities unrelated, for example, or do they have some level of overlap in content? After some exploration, we will make necessary adjustments to the model to take those relationships into account. Figure 3.4 shows a list of YouTube videos that belong to the Pet entity. In the third video of the first row, a child is playing with a dog.
Figure 3.4 Example videos that belong to the Pet entity (Source: Sudheendra Vijayanarasimhan et al. Licensed under Nonexclusive License 1.0)
Let’s a look a similar entity. Figure 3.5 shows a list of YouTube videos that belong to the Animal entity, in which we can see animals such as fish, horses, and pandas. Interestingly, a cat is getting cleaned by a vacuum in the third video of the fifth row. One might guess that this video is in the Pet entity as well because a cat can be a pet if it’s adopted by human beings.
Figure 3.5 Example videos that belong to the Animal entity (Source: Sudheendra Vijayanarasimhan et al. Licensed under Nonexclusive License 1.0)
If we’d like to build machine learning models for this dataset, we may need to do some additional feature engineering before fitting the model directly to the dataset. We might combine the audiovisual features of these two entities (Animal and Pet)into a derived feature because they provide similar information and overlap, which can boost the model’s performance depending on the specific machine learning model we selected. If we continue exploring the combinations of the existing audiovisual features in the entities or perform a huge number of feature engineering steps, we may no longer be able to train a machine learning model on this dataset in less than a day on a single GPU.
If we are using a deep learning model instead of a traditional machine learning model that requires a lot of feature engineering and exploration of the dataset, the model itself learns the underlying relationships among features, such as audiovisual features of similar entities. Each neural network layer in the model architecture consists of vectors of weights and biases representing a trained neural network layer that gets updated over training iterations as the model gathers more knowledge from the dataset.
If we use only 10 of the 3,862 entities, we could build a LeNet model (figure 3.6)that classifies new YouTube videos into 1 of the 10 selected entities. At a high level, LeNet consists of a convolutional encoder consisting of two convolutional layers and a dense block consisting of three fully connected layers. For simplicity, we assume that each individual frame from the videos is a 28 × 28 image and that it will be processed by various convolution and pooling layers that learn the underlying feature mapping between the audiovisual features and the entities.
Brief history of LeNet LeNet (https://en.wikipedia.org/wiki/LeNet) is one of the first published convolutional neural networks (CNNs; https://en.wikipedia.org/wiki/Convolutional_neural_network) to capture wide attention for its performance on computer vision tasks. It was introduced by Yann LeCun, a researcher at AT&T Bell Labs, to recognize handwritten digits in images. In 1989, LeCun published the first study that successfully trained CNNs via backpropagation after a decade of research and development. At that time, LeNet achieved outstanding results matching the performance of support vector machines, the dominant approach in supervised machine learning algorithms.
In fact, those learned feature maps contain parameters that are related to the model. These parameters are numeric vectors that are used as weights and biases for this layer of model representation. For each training iteration, the model takes every frame in the YouTube videos as features, calculates the loss, and then updates those model parameters to optimize the model’s objective so that the relationships between features and the entities can be modeled more closely.
Figure 3.6 LeNet model architecture that could be used to classify new YouTube videos in 1 of 10 selected entities. (Source: Aston Zhang et al. Licensed under Creative Commons Attribution-ShareAlike 4.0 International Public License)
Unfortunately, this training process is slow, as it involves updating all the parameters in different layers. We have two potential solutions to speed up the training process.
Let’s take a look at the first approach. We want to make an assumption here, and we’ll remove it later when we discuss a better approach. Let’s assume that the model is not too large and we can fit the entire model using existing resources without any possibility of out-of-memory or disk errors.
In this case, we can use one dedicated server to store all the LeNet model parameters and use multiple worker machines to split the computational workloads. Figure 3.7 shows an architecture diagram.
Each worker node takes a particular part of the dataset to calculate the gradients and then sends the results to the dedicated server to update the LeNet model parameters. Because the worker nodes use isolated computational resources, they can perform the heavy computations asynchronously without having to communicate. Therefore, we’ve achieved around a triple speedup simply by introducing additional worker nodes if costs such as message passing among nodes are neglected.
This dedicated single server responsible for storing and updating the model parameters is called a parameter server. We’ve designed a more efficient distributed machine learning training system by incorporating the parameter server pattern.
Next comes the real-world challenge. Deep learning models often get complex; additional layers with custom structures can be added on top of a baseline model. Those complex models usually take up a lot of disk space due to the large number of model parameters in those additional layers. A lot of computational resources are required to meet the memory footprint requirement for successful training. What if the model is large, and we cannot fit all of its parameters on a single parameter server?
Figure 3.7 A machine learning training component with a single parameter server
A second solution could address the challenges in this situation. We can introduce additional parameter servers, each responsible for storing and updating a particular model partition. Each worker node is responsible for taking a particular part of the dataset to update the model parameters in a model partition.
Figure 3.8 shows an architecture diagram of this pattern using multiple parameter servers. This diagram is different from figure 3.7, in which a single server stores all the LeNet model parameters and use worker machines split the computational workloads. Each worker node takes a subset of the dataset, performs the calculations required in each neural network layer, and then sends the calculated gradients to update one model partition that’s stored in one of the parameter servers. Note that because all workers perform calculations in an asynchronous fashion, the model partitions that each worker node uses to calculate the gradients may not be up to date. To guarantee that the model partitions each worker node is using or each parameter server is storing are the most recent ones, we constantly have to pull and push updates of the model among the worker nodes.
Figure 3.8 A machine learning training component with multiple parameter servers
With the help of parameter servers, we could effectively resolve the challenges of building a machine learning model to tag the main themes of new YouTube videos that the model hasn’t seen. Figure 3.9 shows a list of YouTube videos that are not used for model training, tagged with the Aircraft theme by the trained machine learning model. Even when the model is too large to fit on a single machine, we could train the model efficiently. Note that although the parameter server pattern would be useful in this scenario, it is specially designed to train models with a lot of parameters.
Figure 3.9 A list of new YouTube videos not used for model training, tagged with the Aircraft theme(Source: Sudheendra Vijayanarasimhan et al. Licensed under Nonexclusive License 1.0)
3.2.3 Discussion The previous section introduced the parameter server pattern and showed how it can be used to address potential challenges in the YouTube-8M video identification application. Even though the parameter server pattern is useful when the model is too large to fit on a single machine and even though the patterns seem like a straightforward approach to the challenge, in real-world applications, we still have to make decisions to make the distributed training system efficient.
Machine learning researchers and DevOps engineers often struggle to figure out a good ratio between the number of parameter servers and the number of workers for different machine learning applications. There are nontrivial communication costs to send the calculated gradients from workers to parameter servers, as well as costs for pulling and pushing the updates of the most recent model partitions. If we find that the model is getting larger and adds too many parameter servers to the system, the system will end up spending a lot of time communicating among nodes and a small amount of time making the computations among neural network layers.
Section 3.3 discusses these practical challenges in more detail. The section introduces a pattern that addresses these challenges so that engineers no longer need to spend time tuning the performance of workers and parameter servers for different types of models.
3.2.4 Exercises 1 If we’d like to train a model with multiple CPUs or GPUs on a single laptop, is this process considered distributed training? 2 What’s the result of increasing the number of workers or parameter servers? 3 What types of computational resources (such as CPUs, GPUs, memory, or disk)should we allocate to parameter servers, and how much of those types of resources should we allocate?
3.3 Collective communication pattern Section 3.2.2 introduced the parameter server pattern, which comes in handy when the model is too large to fit in a single machine, such as the one we would have to build to tag entities in 8 million YouTube videos. Although we could use parameter servers to handle extremely large and complex models with a large number of parameters, it’s nontrivial to incorporate the pattern into the design of an efficient distributed training system.
Section 3.2.3 stated that DevOps engineers, who support the distributed machine learning infrastructure for data scientists or analysts, often have a hard time figuring out a good ratio between the number of parameter servers and the number of workers for different machine learning applications. Suppose that there are three parameter servers and three workers in the model training component of our machine learning system, as shown in figure 3.10. All three workers perform intensive computations asynchronously and then send the calculated gradients to the parameter servers to update different partitions of the model’s parameters.
Figure 3.10 A distributed model training component that consists of three parameter servers and three worker nodes
In reality, worker nodes and parameter servers do not provide one-on-one mapping, particularly if the number of worker nodes is different from the number of parameter servers. In other words, multiple workers may send updates to the same subset of parameter servers. Now suppose that two workers have finished calculating the gradients at the same time, and they both want to update the model parameters stored on the same parameter server (figure 3.11).
Figure 3.11 Two of the worker nodes have finished calculating gradients and want to push updates to the first parameter server at the same time.
As a result, the two workers are blocking each other from sending the gradients to the parameter server. In other words, the gradients from both worker nodes cannot be accepted by the same parameter server simultaneously.
3.3.1 The problem: Improving performance when parameter servers become a bottleneck In this case, only two workers are blocking each other when sending gradients to the same parameter server, which makes it hard to gather the calculated gradients on time and which requires a strategy to resolve the blocking problem. Unfortunately, in realworld distributed training systems that incorporate parameter servers, multiple workers may be sending the gradients at the same time; thus, we must resolve many communications blocks.
When the ratio between the number of workers and the number of parameter servers is not optimal, for example, many workers are sending gradients to the same parameter server at the same time. The problem gets even worse, and eventually, the blocking of communications among different workers or parameter servers becomes a bottleneck. Is there a way to prevent this problem?
3.3.2 The solution In this situation, the two workers need to figure out an approach to continue. They have to reconcile, decide which worker will take the next step first, and then take turns sending the calculated gradients to that particular parameter server. In addition, when one worker finishes sending gradients to update the model parameters on that parameter server, the parameter server starts sending the updated model partition back to that worker. Thus, the worker has the most up-to-date model to be fine-tuned as it’s fed incoming data. If, at the same time, another worker is also sending calculated gradients to that parameter server, as shown in figure 3.12, another blocking communication occurs, and the workers need to reconcile again.
This time, unfortunately, the reconciliation may not be easy to resolve, as the worker that is trying to send the calculated gradients may not have used the latest model when calculating the gradients. This situation may be fine when the differences among model versions are small, but eventually, it may cause a huge difference in the statistical performance of the trained model.
If each parameter server stores different model partitions unevenly—perhaps the first parameter server stores two-thirds of the model parameters, as shown in figure 3.13—calculated gradients using this outdated model partition will have a huge effect on the final trained model. In such cases, we may want to drop the calculated gradients and let the other worker send the updated gradients to the parameter servers.
Now another challenge arises. What if the dropped gradients that we consider to be outdated were calculated from a larger portion of the entire training data, and it could take a long time to recalculate them using the latest model partition(figure 3.14)? In this case, we probably want to keep those gradients so we don’t waste too much time recalculating them.
Figure 3.12 One worker is pulling updates while another worker is pushing updates to the same parameter server.
Figure 3.13 An example of imbalanced model partitions in which the first parameter server contains twothirds of the entire set of model parameters.
In real-world distributed machine learning systems with parameter servers, we may encounter many challenges and problems that cannot be resolved completely. When those situations happen, we have to consider reconciliation and tradeoff approaches. As the numbers of workers and parameter servers increase, the cost of reconciliation and communication required to pull and push model parameters among workers and parameter servers becomes nontrivial. The system will end up spending a lot of time communicating between nodes and a small amount of making computations among neural network layers.
Figure 3.14 The second worker is trying to push gradients calculated from half of the training data.
Even though we may have a lot of experience with the tradeoffs and performance differences involved in applying different ratios and computational resources for parameter servers and workers to our system, it still seems counterintuitive and timeconsuming to tune toward a perfect system. In some circumstances, some of the workers or parameters fail during training, or the network becomes unstable, causing problems when nodes are communicating with push and pull updates. In other words, the parameter server pattern may not be suitable for a particular use case due to our lack of expertise or available time to work with the underlying distributed infrastructure.
Is there any alternative to this problem? The parameter server pattern may be one of the few good options for large models, but for simplicity and demonstration purposes, let’s assume that the model size does not change. The whole model is small enough to fit on a single machine. In other words, each machine has enough disk space to store the model.
With that assumption in mind, what would be an alternative to parameter servers if we want only to improve the performance of distributed training? Without parameter servers, we have only worker nodes, each of which node stores a copy of the entire set of model parameters, as shown in figure 3.15.
Figure 3.15 A distributed model training component with only worker nodes. Every worker stores a copy of the entire set of model parameters and consumes partitions of data to calculate the gradients.
How do we perform model training in this case? Recall that every worker consumes some portions of data and calculates the gradients required to update the model parameters stored locally on this worker node. When all the worker nodes have successfully completed their calculations of gradients, we need to aggregate all the gradients and make sure that every worker’s entire set of model parameters is updated based on the aggregated gradients. In other words, each worker should store a copy of the same updated model. How do we aggregate all the gradients?
We are already familiar with the process for sending gradients from one node to another, such as sending the calculated gradients from a worker node to a parameter server to update the model parameters in a particular model partition. In general, that process is called point-to-point communication (figure 3.16). No other process is involved.
Figure 3.16 An example of point-to-point communication with data being transferred between two processes. Note that no other process is involved.
In this situation, point-to-point communication is somewhat inefficient. Only worker nodes are involved, and we need to perform some kind of aggregation on the results from all workers. Fortunately, we can use another type of communication. Collective communication allows communication patterns across all processes in a group, which is composed of a subset of all processes. Figure 3.17 illustrates collective communication between one process and a group that consists of three other processes. In this case, each worker node carries the gradients and wants to send them to a group, including the rest of the worker nodes, so that all worker nodes will obtain the results from every worker.
Figure 3.17 An example of collective communication between one process and a group that consists of three other processes
For our machine learning models, we usually perform some kind of aggregate operation on all the received gradients before sending the aggregated result to all the workers. This type of aggregation is called a reduce function, which involves making a set of numbers into a smaller set of numbers. Examples of reduce functions are finding the sum, maximum, minimum, or average of the set of numbers—in our case, the gradients we received from all the workers.
Figure 3.18 illustrates a reduce operation. Vectors v0, v1, and v2 in each of the processes in the process group are merged with the first process via a reduce operation.
When the gradients are reduced in a distributed fashion, we send the reduced gradients to all the workers so that they are on the same page and can update the model parameters in the same way, ensuring that they have exactly the same models. This kind of operation is called a broadcast operation and is often used to perform collective communications. Figure 3.19 illustrates a broadcast operation that sends a value to every process in the process group.
The combination of reduce and broadcast operations here is called allreduce, which reduces the results based on a specified reduce function and then distributes the reduced results to all processes—in our case, to all the workers so that the model stored on each worker is exactly the same and is up to date (figure 3.20). When we finish a round of an allreduce operation, we start the next round by feeding new data to the updated model, calculating gradients, and performing the allreduce operation again to gather all gradients from workers to update the model.
Figure 3.18 An example of a reduce operation with the sum as the reduce function
Figure 3.19 An example of a broadcast operation that sends a value to every process in the process group
Let’s take a break to see what we’ve accomplished. We’ve successfully used the collective communication pattern, which takes advantage of the underlying network infrastructure, to perform allreduce operations for communicating gradients among multiple workers and allows us to train a medium-sized machine learning model in a distributed fashion. As a result, we no longer need parameter servers; thus, there is no communication overhead between parameter servers and workers. The collective communication pattern is useful in machine learning systems and also in distributed and parallel computing systems, where concurrency is applied to computations and communication primitives such as broadcast and reduce are critical for communicating among different nodes. We’ll apply this pattern in section 9.2.2.
Figure 3.20 An example of an allreduce operation that reduces the results on each process in the group and then sends the result to every process in the group
3.3.3 Discussion The collective communication pattern is a great alternative to parameter servers when the machine learning model we are building is not too large. As a result, there is no communication overhead among parameter servers and workers, and it’s no longer necessary to spend a lot of effort on tuning the ratio between the number of workers and parameter servers. In other words, we can easily add workers to speed up the model training process without worrying about performance regression.
One potential problem is worth mentioning, though. After we incorporate the collective communication pattern by applying the allreduce operation, each worker will need to communicate with all its peer workers, which may slow down the entire training process if the number of workers becomes large. Actually, collective communications rely on communication over the network infrastructure, and we still haven’t fully used all the benefits of that yet in the allreduce operation.
Fortunately, we could use better collective communication algorithms to update the model more efficiently. One example is the ring-allreduce algorithm. The process is similar to that of the allreduce operation, but the data is transferred in ringlike fashion without the reduce operation. Each N worker needs to communicate with only two of its peer workers 2 * (N – 1) times to update all the model parameters completely. In other words, this algorithm is bandwidth-optimal; if the aggregated gradients are large enough, it will optimally use the underlying network infrastructure. Both the parameter server pattern and the collective communication pattern make distributed training scalable and efficient. In practice, however, any of the workers or parameter servers may not start due to a lack of resources and may fail in the middle of distributed training. Section 3.4 introduces patterns that will help in those situations and make the entire distributed training process more reliable.
3.3.4 Exercises 1 Do blocking communications happen only among the workers? 2 Do workers update the model parameters stored on them asynchronously or synchronously? 3 Can you represent an allreduce operation with a composition of other collective communication operations?
3.4 Elasticity and fault-tolerance pattern Both the parameter server pattern and the collective communication pattern enable us to scale up the distributed model training process. Parameter servers can be useful for handling large models that don’t fit on a single machine; a large model can be partitioned and stored on multiple parameter servers, while individual workers can perform heavy computations and update each individual partition of model parameters asynchronously. When we observe too much communication overhead when using parameter servers, however, we can use the collective communication pattern to speed up the training process for medium-size models.
Let’s assume that our distributed training component is well designed; can train machine learning models efficiently; and can handle the requirements of different types of models, using patterns such as parameter server and collective communication. One thing worth mentioning is that distributed model training is a long-running task, usually persisting for hours, days, or even weeks. Like all other types of software and systems, this long-running task is vulnerable to unexpected intervention. Because model training is a long-running process, it may be affected by internal or external intervention at any minute. Following are some examples of interventions that often occur in a distributed model training system: Parts of the dataset are corrupted or cannot be used to train the model successfully. The distributed cluster that the distributed training model depends on may experience an unstable or disconnected network due to weather conditions or human error. Some of the parameter servers or worker nodes are preempted; the computational resources they rely on are rescheduled for tasks and nodes that have higher priority.
3.4.1 The problem: Handling unexpected failures when training with limited computational resources When unexpected interventions happen, if no actions are taken to address them, problems start to accumulate. In the first example in the preceding section, all workers use the same logic to consume the data to fit the model; when they see corrupted data that the training code is not able to handle, all of them fail eventually. In the second example, when the network becomes unstable, communications among parameter servers and workers will hang until the network recovers. In the third example, when the parameter servers or worker nodes are preempted, the entire training process is forced to stop, leading to unrecoverable failure. What should we do to help the distributed training system recover in those situations? Do we have a way to prevent unexpected failures?
3.4.2 The solution Let’s take a look at the first situation. Assume that the training process encounters a batch of data that’s corrupted. In figure 3.21, some of the videos in the YouTube-8M dataset were accidentally modified by third-party video editing software after they were downloaded from the original source. The first worker node is trying to read those portions of the data to feed the model. The machine learning model object that was initialized earlier cannot be fed with the edited and incompatible video data.
Figure 3.21 A worker encounters new batches of training data that’s being edited and cannot be consumed successfully.
When this situation happens, the training process encounters an unexpected failure:the existing code does not contain the logic to handle an edited or corrupted dataset. In other words, we need to modify the distributed model training logic to handle this situation and then retrain the model from scratch.
Let’s start the distributed training process again and see whether everything works well. We can skip the batches of data that we found to be corrupted and continue to train the machine learning model with the next batches of the remaining data.
Unfortunately, after the model has been trained for hours with half of the data, we realize that the new batches of data are being consumed much more slowly than before. After some digging and communicating with the DevOps team, we found that the network has become extremely unstable due to an incoming storm at one of our data centers—the second scenario mentioned earlier. If our dataset is residing on a remote machine instead of having been downloaded to a local machine, as shown in figure 3.22, the training process would be stuck waiting for a successful connection with the remote database. While waiting, we should checkpoint (store) the current trained model parameters and pause the training process. Then we can easily resume the training process when the network becomes stable again.
Figure 3.22 A worker encounters an unstable network while fetching data from a remote database.
Did the unstable network have other effects? We neglected one fact: we also rely on the network for communication between worker and parameter server nodes to send the calculated gradients and update the model parameters. Recall that if the collective communication pattern is incorporated, the training process is synchronous. In other words, one worker’s communication blocks other workers’ communications; we would need to obtain all gradients from all workers to aggregate the results to update the model parameters. If at least one worker becomes slow in communicating, the cascading effect eventually leads to a stuck training process.
In figure 3.23, three worker processes in the same process group are performing an allreduce operation. Two of the communications become slow due to the unstable network that the underlying distributed cluster is experiencing. As a result, two of the processes that depend on the slow communications do not receive some values(denoted by question marks) on time, and the entire allreduce operation is stuck until everything is received.
Figure 3.23 An allreduce process with slow communications due to the unstable network that blocks the entire training process
Can we do anything to continue training without being affected by the degrading network performance of individual nodes? In this case, first, we can abandon the two worker processes that are experiencing slow network connection; then we can abandon the current allreduce operation. Given the nature of the collective communication pattern, the remaining workers still have exactly the same copy of the model, so we can continue the training process by reconstructing a new worker process group that consists of the remaining workers and then performing the allreduce operation again.
The approach could also deal with situations in which some worker nodes are preempted, with their computational resources rescheduled to higher-priority tasks and nodes. When those workers get preempted, we reconstruct the worker process group and then perform the allreduce operation. This approach allows us to avoid wasting resources to train the model from scratch when unexpected failures happen. Instead, we can pick up the training process from where it paused and use the existing workers to which we’ve already allocated computational resources. If we have additional resources, we can easily add workers and then reconstruct the worker process groups to train more efficiently. In other words, we can easily scale the distributed training system up and down so that the entire system is elastic in terms of available resources. Many other distributed systems apply the same idea to make sure that the systems in place are reliable and scalable.
3.4.3 Discussion We’ve successfully continued and recovered the distributed training process without wasting the resources we used to calculate the gradients from each worker. What if our distributed training uses parameter servers instead of collective communications with only workers?
Recall that when parameter servers are used, each parameter server stores a model partition that contains a subset of the complete set of model parameters. If we need to abandon any of the workers or parameter servers, such as when some communications failed or got stuck due to an unstable network on one parameter server or when the workers got preempted, we need to checkpoint the model partition in the failed nodes and then repartition the model partitions to the parameter servers that are still alive.
In reality, many challenges are still involved. How do we checkpoint the model partitions, and where do we save them? How often should we checkpoint them to make sure that they are as recent as possible?
3.4.4 Exercises 1 What is the most important thing to save in a checkpoint in case any failures happen in the future? 2 When we abandon the workers that are stuck or unable to recover without having time to make model checkpoints, where should we obtain the latest model, assuming that we are using the collective communication pattern?
3.5 Answers to exercises Section 3.2.4 1 No, because the training happens on a single laptop. 2 The system will end up spending a lot of time communicating between nodes and a small amount of time making the computations among neural network layers. 3 We need more disk space for parameter servers to store large model partitions and less CPUs/GPUs/memory on them because parameter servers do not perform heavy computations.
Section 3.3.4 1 No. They also appear between workers and parameter servers. 2 Asynchronously 3 You use a reduce operation and then a broadcast operation. Section 3.4.4 1 The most recent model parameters 2 Under the collective communication pattern, the remaining workers still have the same copy of the model, which we can use to continue training.
Summary Distributed model training is different from the traditional model training process, given the size and location of the dataset, the size of the model, the computational resources, and the underlying network infrastructure. We can use parameter servers to build large and complex models, storing partitions of the model parameters on each server. If communications between workers and parameter servers develop a bottleneck, we can switch to the collective communication pattern to improve distributed model training performance for small or medium-sized models. Unexpected failures happen during distributed model training, and we can take various approaches to avoid wasting computational resources.