Posted by Hao Liang's Blog on Monday, January 1, 0001

Part 3 Building a distributed machine learning workflow If you’ve survived the training up to this point, congratulations! You’ve just learned many common patterns that can be used in real-world machine learning systems, as well as understanding the tradeoffs when deciding which patterns to apply to your system.

In the last part of the book, we will build an end-to-end machine learning system to apply what we learned previously. We will gain hands-on experience implementing many patterns previously learned in this project. We’ll learn how to solve problems at a larger scale and take what’s developed on our laptops to large distributed clusters.

In chapter 7, we’ll go through the project background and system components. Then, we’ll go through the challenges in each of these components and share the patterns that we will apply to address them. Chapter 8 covers the basic concepts of the four technologies (TensorFlow, Kubernetes, Kubeflow, and Argo Workflows) and provides an opportunity to gain hands-on experience in each one of them to prepare our implementation of the final project.

In the last chapter of the book, we’ll implement the end-to-end machine learning system with the architecture we designed in chapter 7. Our complete implementation of each of the components will incorporate the previously discussed patterns. We’ll use the technologies we learned in chapter 8 to build different components of a distributed machine learning workflow.

Project overview and system architecture

This chapter covers  Providing a high-level overall design of our system  Optimizing the data ingestion component for multiple epochs of the dataset  Deciding which distributed model training strategy best minimizes overhead  Adding model server replicas for high performance model serving  Accelerating the end-to-end workflow of our machine learning system

In the previous chapters, we learned to choose and apply the correct patterns for building and deploying distributed machine learning systems to gain practical experience managing and automating machine learning tasks. In chapter 2, I introduced a couple of practical patterns that can be incorporated into data ingestion, usually the first process of a distributed machine learning system and responsible for monitoring incoming data and performing necessary preprocessing steps to prepare for model training.

In chapter 3, we explored some challenges dealing with the distributed training component, and I introduced a couple of practical patterns that can be incorporated into the component. The distributed training component is the most critical part of a distributed machine learning system and is what makes the system unique from general distributed systems. In chapter 4, we covered the challenges involved in distributed model serving systems, and I introduced a few commonly used patterns. You can use replicated services to achieve horizontal scaling and the sharded services pattern to process large model serving requests. You also learned how to assess model serving systems and determine whether the event-driven design is beneficial in real-world scenarios.

In chapter 5, we discussed machine learning workflows, one of the most essential components in machine learning systems, as it connects all other components in a machine learning system. Finally, in chapter 6, we discussed some operational efforts and patterns that can greatly accelerate the end-to-end workflow and reduce maintenance and communication efforts when engineering teams collaborate with teams of data scientists or machine learning practitioners before the systems become production ready.

For the remaining chapters of the book, we will build an end-to-end machine learning system to apply what we learned previously. You will gain hands-on experience implementing many patterns we’ve previously discussed. You’ll learn how to solve problems at a larger scale and take what you’ve developed on your laptop to large distributed clusters. In this chapter, we’ll go through the project background and system components. Then we’ll go through the challenges related to the components and discuss the patterns we can apply to address them.

Note that although we won’t dive into the implementation details in this chapter, in the remaining chapters, we’ll use several popular frameworks and cutting-edge technologies—particularly TensorFlow, Kubernetes, Kubeflow, Docker, and Argo Workflows—to build the components of a distributed machine learning workflow.

7.1 Project overview For this project, we will build an image classification system that takes raw images downloaded from the data source, performs necessary data cleaning steps, builds a machine learning model in a distributed Kubernetes cluster, and then deploys the trained model to the model serving system for users to use. We also want to establish an end-to-end workflow that is efficient and reusable. Next, I will introduce the project background and the overall system architecture and components.

7.1.1 Project background We will build an end-to-end machine learning system to apply what we learned previously. We’ll build a data ingestion component that downloads the Fashion-MNIST dataset and a model training component to train and optimize the image classification model. Once the final model is trained, we’ll build a high-performance model serving system to start making predictions using the trained model.

As previously mentioned, we will use several frameworks and technologies to build distributed machine learning workflow components. For example, we’ll use TensorFlow with Python to build the classification model on the Fashion-MNIST dataset and make predictions. We’ll use Kubeflow to run distributed machine learning model training on a Kubernetes cluster. Furthermore, we’ll use Argo Workflows to build a machine learning pipeline that consists of many important components of a distributed machine learning system. The basics of these technologies will be introduced in the next chapter, and you’ll gain hands-on experience with them before diving into the actual implementation of the project in chapter 9. In the next section, we’ll examine the project’s system components.

7.1.2 System components Figure 7.1 is the architecture diagram of the system we will be building. First, we will build the data ingestion component responsible for ingesting data and storing the dataset in the cache using some of the patterns discussed in chapter 2. Next, we will build three different model training steps that train different models and incorporate the collective communication pattern addressed in chapter 3. Once we finish the model training steps, we will build the model selection step that picks the top model. The selected optimal model will be used for model serving in the following two steps. At the end of the model serving steps, we aggregate the predictions and present the result to users. Finally, we want to ensure all these steps are part of a reproducible workflow that can be executed at any time in any environment.

We’ll build the system based on the architecture diagram in Figure 7.1 and dive into the details of the individual components. We’ll also discuss the patterns we can use to address the challenges in building those components.

Figure 7.1 The architecture diagram of the end-to-end machine learning system we will be building

The machine learning workflow is triggered.

Three model training steps train different models.

This step picks the top model that will be used in the following model serving steps.

The results from the two model serving steps are then aggregated via a result aggregation step to present to users.

7.2 Data ingestion For this project, we will use the Fashion-MNIST dataset, introduced in section 2.2, to build the data ingestion component, as shown in figure 7.2. This dataset consists of a training set of 60,000 examples and a test set of 10,000 examples. Each example is a 28 × 28 grayscale image that represents one Zalando’s article image associated with a label from 10 classes. Recall that the Fashion-MNIST dataset is designed to serve as a direct drop-in replacement for the original MNIST dataset for benchmarking machine learning algorithms. It shares the same image size and structure of training and testing splits.

Figure 7.2 The data ingestion component (dark box) in the end-to-end machine learning system

The machine learning workflow is triggered.

As a recap, figure 7.3 is a screenshot of the collection of images for all 10 classes(T-shirt/top, trouser, pullover, dress, coat, sandal, shirt, sneaker, bag, and ankle boot)from Fashion-MNIST, where each class takes three rows in the screenshot.

Figure 7.4 is a closer look at the first few example images in the training set together with their corresponding text labels.

The downloaded Fashion-MNIST dataset should only take 30 MBs on disk if compressed. It’s easy to load the entire downloaded dataset into memory at once.

7.2.1 The problem Although the Fashion-MNIST data is not large, we may want to perform additional computations before feeding the dataset into the model, which is common for tasks that require additional transformations and cleaning. We may want to resize, normalize, or convert the images to grayscale. We also may want to perform complex mathematical operations such as convolution operations, which can require large additional memory space allocations. Our available computational resources may or may not be sufficient after we load the entire dataset in memory, depending on the distributed cluster size.

Figure 7.3 A screenshot of the collection of images from the Fashion-MNIST dataset for all 10 classes(T-shirt/top, trouser, pullover, dress, coat, sandal, shirt, sneaker, bag, and ankle boot)

Every three rows represent example images that represent a class. For example, the top three rows are images of T-shirts.

Figure 7.4 A closer look at the first few example images in the training set with their corresponding labels in text

In addition, the machine learning model we are training from this dataset requires multiple epochs on the training dataset. Suppose training one epoch on the entire training dataset takes 3 hours. If we want to train two epochs, the time needed for model training would double, as shown in figure 7.5.

Figure 7.5 A diagram of model training for multiple epochs at time t0, t1, etc. where we spent 3 hours for each epoch

In real-world machine learning systems, a larger number of epochs is often needed, and training each epoch sequentially is inefficient. In the next section, we will discuss how we can tackle that inefficiency.

7.2.2 The solution Let’s take a look at the first challenge we have: the mathematical operations in the machine learning algorithms may require a lot of additional memory space allocations while computational resources may or may not be sufficient. Given that we don’t have too much free memory, we should not load the entire Fashion-MNIST dataset into memory directly. Let’s assume that the mathematical operations that we want to perform on the dataset can be performed on subsets of the entire dataset. Then, we could use the batching pattern introduced in chapter 2, which would group a number of data records from the entire dataset into batches, which will be used to train the machine learning model sequentially on each batch.

To apply the batching pattern, we first divide the dataset into smaller subsets or mini-batches, load each individual mini-batch of example images, perform expensive mathematical operations on each batch, and then use only one mini-batch of images in each model training iteration. For example, we can perform convolution or other heavy mathematical operations on the first mini-batch, which consists of only 20 images, and then send the transformed images to the machine learning model for model training. We then repeat the same process for the remaining mini-batches while continuing to perform model training.

Since we’ve divided the dataset into many small subsets (mini-batches), we can avoid any potential problems with running out of memory when performing various heavy mathematical operations on the entire dataset necessary for achieving an accurate classification model on the Fashion-MNIST dataset. We can then handle even larger datasets using this approach by reducing the size of the mini-batches. With the help of the batching pattern, we are no longer concerned about potential out-of-memory problems when ingesting the dataset for model training. We don’t have to load the entire dataset into memory at once, and instead, we are consuming the dataset batch by batch sequentially. For example, if we have a dataset with 1,000 records, we can first take 500 of the 1,000 records to form a batch and then train the model using this batch of records. Subsequently, we can repeat this batching and model training process for the remaining records. Figure 7.6 illustrates this process, where the original dataset gets divided into two batches and processed sequentially. The first batch gets consumed to train the model at time t0, and the second batch gets consumed at time t1.

Figure 7.6 The dataset is divided into two batches and processed sequentially. The first batch is consumed to train the model at time t0, and the second batch is consumed at time t1.

The two batches of the dataset are consumed sequentially for model training.

Now, let’s tackle the second challenge mentioned in section 7.2.1: we want to avoid wasting time if we need to train a machine learning model that involves iterating on multiple epochs of the original dataset. Recall that, in chapter 2, we talked about the caching pattern, which would solve this type of problem. With the help of the caching pattern, we can greatly speed up the re-access to the dataset for the model training process that involves training on the same dataset for multiple epochs.

We can’t do anything special to the first epoch since it’s the first time the machine learning model has seen the entire training dataset. We can store the cache of the training examples in memory, making it much faster to re-access when needed for the second and subsequent epochs.

Let’s assume that the single laptop we use to train the model has sufficient computational resources such as memory and disk space. As soon as the machine learning model consumes each training example from the entire dataset, we can hold off recycling and instead keep the consumed training examples in memory. For example, in figure 7.7, after we have finished fitting the model for the first epoch, we can store a cache for both batches used for the first epoch of model training.

Then, we can start training the model for the second epoch by feeding the stored in-memory cache to the model directly without repeatedly reading from the data source for future epochs. Next, we will discuss the model training component we will build in our project.

Figure 7.7 A diagram of model training for multiple epochs at time t0, t1, etc. using cache, making reading from the data source repeatedly unnecessary

7.2.3 Exercises 1 Where do we store the cache? 2 Can we use the batching pattern when the Fashion-MNIST dataset gets large?

7.3 Model training In the previous section, we’ve talked about the data ingestion component of the system we are building and how we can use the caching and batching pattern to handle large datasets and make the system more efficient. Next, let’s discuss the model training component we are building. Figure 7.8 is a diagram of the model training component in the overall architecture.

In the diagram, three different model training steps are followed by a model selection step. These model training steps can train three different models competing with each other for better statistical performance. The dedicated model selection step then picks the top model, which will be used in the subsequent components in the end-to-end machine learning workflow.

In the next section, we will look more closely at the model training component in figure 7.8 and discuss potential problems when implementing this component.

Figure 7.8 The model training component (dark boxes) in the end-to-end machine learning system

Three model training steps train different models.

This step picks the top two models that will be used in the following two separate model serving steps.

7.3.1 The problem In chapter 3, I introduced the parameter server and the collective communication patterns. The parameter server pattern is handy when the model is too large to fit in a single machine, such as the one for tagging entities in the 8 million YouTube videos(section 3.2). The collective communication pattern is useful to speed up the training process for medium-sized models when the communication overhead is significant. Which pattern should we select for our model training component?

7.3.2 The solution With the help of parameter servers, we can effectively resolve the challenge of building an extremely large machine learning model that may not fit a single machine. Even when the model is too large to fit in a single machine, we can still successfully train the model efficiently with parameter servers. For example, figure 7.9 is an architecture diagram of the parameter server pattern using multiple parameter servers. Each worker node takes a subset of the dataset, performs calculations required in each neural network layer, and sends the calculated gradients to update one model partition stored in one of the parameter servers.

Because all workers perform calculations in an asynchronous fashion, the model partitions each worker node uses to calculate the gradients may not be up to date. For instance, two workers can block each other when sending gradients to the same parameter server, which makes it hard to gather the calculated gradients on time and requires a strategy to resolve the blocking problem. Unfortunately, in real-world distributed training systems where parameter servers are incorporated, multiple workers may send the gradients at the same time, and thus many blocking communications must be resolved.

Figure 7.9 A machine learning training component with multiple parameter servers

Another challenge comes when deciding the optimal ratio between the number of workers and the number of parameter servers. For example, many workers are sending gradients to the same parameter server at the same time; the problem gets even worse, and eventually, the blocking communications between different workers or parameter servers become a bottleneck.

Now, let’s return to our original application, the Fashion-MNIST classification model. The model we are building is not as large as large recommendation system models; it can easily fit in a single machine if we give the machine sufficient computational resources. It’s only 30 MBs in compressed form. Thus, the collective communication model is perfect for the system we are building.

Now, without parameter servers, each worker node stores a copy of the entire set of model parameters, as shown in figure 7.10. I previously mentioned that every worker consumes some portion of data and calculates the gradients needed to update the model parameters stored locally on this worker node (see chapter 3). We want to aggregate all the gradients as soon as all worker nodes have successfully completed their calculation of gradients. We also want to make sure every worker’s entire set of model parameters is updated based on the aggregated gradients. In other words, each worker should store a copy of the exact same updated model.

Going back to the architecture diagram in figure 7.8, each model training step uses the collective communication pattern, taking advantage of the underlying network infrastructure to perform allreduce operations to communicate gradients between multiple workers. The collective communication pattern also allows us to train multiple medium-sized machine learning models in a distributed setting. Once the model is trained, we can start a separate process to pick the top model to be used for model serving. This step is pretty intuitive, and I’ll defer the implementation details to chapter 9. In the next section, we will discuss the model serving component of our system.

Figure 7.10 Distributed model training component with only worker nodes, where every worker stores a copy of the entire set of model parameters and consumes partitions of data to calculate the gradients

Each of these workers contains a copy of the entire set of model parameters and consumes partitions of data to calculate the gradients.

7.3.3 Exercises 1 Why isn’t the parameter server pattern a good fit for our model? 2 Does each worker store different parts of the model when using the collective communication pattern?

7.4 Model serving We’ve talked about both the data ingestion and model training components of the system we are building. Next, let’s discuss the model server component, which is essential to the end-user experience. Figure 7.11 shows the serving training component in the overall architecture.

Next, let’s take a look at a potential problem and its solution we will encounter when we begin building this component.

7.4.1 The problem The model serving system needs to take raw images uploaded by users and send the requests to the model server to make inferences using the trained model. These model serving requests are being queued and waiting to be processed by the model server.

If the model serving system is a single-node server, it can only serve a limited number of model serving requests on a first-come, first-served basis. As the number of requests grows in the real world, the user experience suffers when users must wait a long time to receive the model serving result. In other words, all requests are waiting to be processed by the model serving system, but the computational resources are limited to this single node. How do we build a more efficient model serving system?

Figure 7.11 The model serving component (dark boxes) in the end-to-end machine learning system

The results from the two model serving steps are then aggregated via a result aggregation step to present to users.

7.4.2 The solution The previous section lays a perfect use case for the replicated services pattern discussed in chapter 4. Our model serving system takes the images uploaded by users and sends requests to the model server. In addition, unlike the simple single-server design, the system has multiple model server replicas to process the model serving requests asynchronously. Each model server replica takes a single request, retrieves the previously trained classification model from the model training component, and classifies the images that don’t existed in the Fashion-MNIST dataset.

With the help of the replicated services pattern, we can easily scale up our model server by adding model server replicas to the single-server model serving system. The new architecture is shown in figure 7.12. The model server replicas can handle many requests at a time since each replica can process individual model serving requests independently.

Multiple model serving requests from users are sent to the model server replicas at the same time after we’ve introduced them. We also need to define a clear mapping relationship between the requests and the model server replicas, which determines which requests are processed by which of the model server replicas.

To distribute the model server requests among the replicas, we need to add an additional load balancer layer. For example, the load balancer takes multiple model serving requests from our users. It then distributes the requests evenly among the model server replicas, which are responsible for processing individual requests, including model retrieval and inference on the new data in the request. Figure 7.13 illustrates this process.

Figure 7.12 The system architecture of the replicated model serving services

Users upload images and then submit requests to the model serving system for classification.

Figure 7.13 A diagram showing how a loader balancer distributes requests evenly across the model server replicas

Multiple model serving requests from users

The load balancer distributes the requests evenly among the model server replicas.

The load balancer uses different algorithms to determine which request goes to which particular model server replica. Example algorithms for load balancing include round robin, least-connection method, and hashing.

Note that from our original architecture diagram in figure 7.11, there are two individual steps for model serving, each using different models. Each model serving step consists of a model serving service with multiple replicas to handle model serving traffic for different models.

7.4.3 Exercises 1 What happens when we don’t have a load balancer as part of the model serving system?

7.5 End-to-end workflow Now that we’ve looked at the individual components, let’s see how to compose an end-to-end workflow that consists of all those components in a scalable and efficient way. We will also incorporate a few patterns from chapter 5 into the workflow. Figure 7.14 is a diagram of the end-to-end workflow we are building.

7.14 is a diagram of the end-to-end workflow we are building.

Figure 7.14 The architecture diagram of the end-to-end machine learning system we will build

The machine learning workflow is triggered.

Three model training steps train different models.

This step picks the top model that will be used in the following model serving steps.

The results from the two model serving steps are then aggregated via a result aggregation step to present to users.

Instead of paying attention to individual components, we will look at the entire machine learning system, which chains all the components together in an end-to-end workflow.

7.5.1 The problems First, the Fashion-MNIST dataset is static and does not change over time. However, to design a more realistic system, let’s assume we’ll manually update the Fashion-MNIST dataset regularly. Whenever the updates happen, we may want to rerun the entire machine learning workflow to train a fresh machine learning model that includes the new data. In other words, we need to execute the data ingestion step every time when changes happen. In the meantime, when the dataset is not updated, we want to experiment with new machine learning models. Thus, we still need to execute the entire workflow, including the data ingestion step. The data ingestion step is usually very time consuming, especially for large datasets. Is there a way to make this workflow more efficient?

Second, we want to build a machine learning workflow that can train different models and then select the top model, which will be used in model serving to generate predictions using the knowledge from both models. Due to the variance of completion time for each of the model training steps in the existing machine learning workflow, the start of each following step, such as model selection and model serving, depends on the completion of the previous steps. However, this sequential execution of steps in the workflow is quite time-consuming and blocks the rest of the steps. For example, say one model training step takes much longer to complete than the rest of the steps. The model selection step that follows can only start to execute after this long-running model training step has completed. As a result, the entire workflow is delayed by this particular step. Is there a way to accelerate this workflow so it will not be affected by the duration of individual steps?

7.5.2 The solutions For the first problem, we can use the step memoization pattern from chapter 5. Recall that step memoization can help the system decide whether a step should be executed or skipped. With the help of step memoization, a workflow can identify steps with redundant workloads that can be skipped without being re-executed and thus greatly accelerate the execution of the end-to-end workflow.

For instance, figure 7.15 contains a simple workflow that only executes the data ingestion step when we know the dataset has been updated. In other words, we don’t want to re-ingest the data that’s already collected if the new data has not been updated.

Many strategies can be used to determine whether the dataset has been updated. With a predefined strategy, we can conditionally reconstruct the machine learning workflow and control whether we would like to include a data ingestion step to be reexecuted, as shown in figure 7.16.

Cache is one way to identify whether a dataset has been updated. Since we suppose our Fashion-MNIST dataset is being updated regularly on a fixed schedule (e.g., once a month), we can create a time-based cache that stores the location of the ingested and cleaned dataset (assuming the dataset is located in a remote database) and the timestamp of its last updated time.

Figure 7.15 A diagram of skipping the data ingestion step when the dataset has not been updated

The dataset has not been updated yet.

New model type or hyperparameters?

As in figure 7.16, the data ingestion step in the workflow will then be constructed and executed dynamically based on whether the last updated timestamp is within a particular window. For example, if the time window is set to two weeks, we consider the ingested data as fresh if it has been updated within the past two weeks. The data ingestion step will be skipped, and the following model training steps will use the already ingested dataset from the location in the cache. The time window can be used to control how old a cache can be before we consider the dataset fresh enough to be used directly for model training instead of re-ingesting the data from scratch.

Figure 7.16 The workflow has been triggered. We check whether the data has been updated within the last two weeks by accessing the cache. If the data is fresh, we can skip the unnecessary data ingestion step and execute the model training step directly.

The workflow is triggered.

Writes new cache or reads existing cache that contains timestamp information

The data has been updated within the last two weeks.

The data has not been updated within the last two weeks.

Now, let’s take a look at the second problem: sequential execution of the steps blocks the subsequent steps in the workflow and is inefficient. The synchronous and asynchronous patterns introduced in chapter 5 can help.

When a short-running model training step finishes—for example, model training step 2 in figure 7.17—we successfully obtain a trained machine learning model. In fact, we can use this already-trained model directly in our model serving system without waiting for the rest of the model training steps to complete. As a result, users will be able to see the results of image classification from their model serving requests that contain videos as soon as we have trained one model from one of the steps in the workflow. After a second model training step (figure 7.17, model training step 3) finishes, the two trained models are sent to model serving. Now, users benefit from the aggregated results obtained from both models.

Figure 7.17 After a second model training step finishes, we can pass the two trained models directly to model serving. The aggregated inference results will be presented to users instead of only the results from the first model.

After a second model training step finishes, we can pass the two trained models directly to be used for model serving, and the aggregated inference results will be presented to users instead of the results from only the one model that we obtained initially.

Both short-running model training steps have finished.

As a result, we can continue to use the trained models for model selection and model serving; in the meantime, the long-running model training steps are still running. In other words, they execute asynchronously without depending on each other’s completion. The workflow can proceed and execute the next step before the previous one finishes. The long-running model training step will no longer block the entire workflow. Instead, it can continue to use the already-trained models from the short-running model training steps in the model serving system. Thus, it can start handling users’ model serving requests.

7.5.3 Exercises 1 Which component can benefit the most from step memoization? 2 How do we tell whether a step’s execution can be skipped if its workflow has been triggered to run again?

7.6 Answers to exercises Section 7.2 1 In memory 2 Yes Section 7.3 1 There are blocking communications between workers and parameter servers. 2 No, each worker stores exactly the same copy of the model. Section 7.4 1 We cannot balance or distribute the model serving requests among the replicas. Section 7.5 1 The data ingestion component 2 Using the metadata in the step cache

Summary  The data ingestion component uses the caching pattern to speed up the processing of multiple epochs of the dataset.  The model training component uses the collective communication pattern to avoid the potential communication overhead between workers and parameter servers.  We can use model server replicas, which are capable of handling many requests at one time since each replica processes individual model serving requests independently.  We can chain all our components into a workflow and use caching to effectively skip time-consuming components such as data ingestion.