37-62 25 2 63-87 24 3 88-111 23 4 112-135 23 5 136-156 20 6 157-176 19 7 177-208 31 8 209-239 20 9
Part 2 Patterns of distributed machine learning systems
Now that you know the basic concepts and background of distributed machine learning systems, you should be able to proceed to this part of the book. We will explore some of the challenges involved in various components of a machine learning system and introduce a few established patterns adopted heavily in industries to address those challenges.
Chapter 2 introduces the batching pattern, used to handle and prepare large datasets for model training; the sharding pattern, used to split huge datasets into multiple data shards that spread among multiple worker machines; and the caching pattern, which could greatly speed the data ingestion process when a previously used dataset is re-accessed for model training.
In chapter 3, we will explore the challenges of the distributed model training process. We’ll cover the challenges of training large machine learning models that tag main themes in new YouTube videos but cannot fit on a single machine. The chapter also covers how to overcome the difficulty of using the parameter server pattern. In addition, we see how to use the collective communication pattern to speed distributed training for smaller models and avoid unnecessary communication overhead among parameter servers and workers. At the end of this chapter, we talk about some of the vulnerabilities of distributed machine learning systems due to corrupted datasets, unstable networks, and preemptive worker machines, and we see how we can address those issues.
Chapter 4 focuses on the model serving component, which needs to be scalable and reliable to handle the growing number of user requests and the growing size of individual requests. We will go through the tradeoffs of making design decisions to build a distributed model serving system. We will use the replicated services to handle the growing number of model serving requests. We will also learn how to assess model serving systems and determine whether the event-driven design would be beneficial in real-world scenarios.
In chapter 5, we’ll see how to build a system that executes complex machine learning workflows to train multiple machine learning models and pick the most performant models to provide good entity tagging results in the model serving system, using the fan-in and fan-out patterns. We’ll also incorporate the synchronous and asynchronous patterns to make machine learning workflows more efficient and avoid delays due to the long-running model training steps that block consecutive steps.
Chapter 6, the last chapter in this part of the book, covers some operational efforts and patterns that can greatly accelerate the end-to-end workflow, as well as reduce the maintenance and communication efforts that arise when engineering and data science teams collaborate. We’ll introduce a couple of scheduling techniques that prevent resource starvation and deadlocks when many team members work in the same cluster with limited computational resources. We will also discuss the benefits of the metadata pattern, which we could use to gain insights from the individual steps in machine learning workflows and handle failures more appropriately to reduce the negative effect on users.
第2部分 机器学习系统分布式模式
现在您已经了解了分布式机器学习系统的基本概念和背景,相信您已经准备好进一步探索了。 接下来,我们将探讨机器学习系统各个组件所涉及的一些挑战,并介绍一些在行业中广泛采用的模式来应对这些挑战。
第二章介绍了 3 种模式:1、批处理模式,用于准备和处理用于模型训练的大型数据集;2、分片模式,用于将大数据集分割成多个小的数据分片,分布在多台机器上;3、缓存模式,在访问数据集时,直接复用之前摄取的数据时,大大提高数据摄取的速度。
在第三章中,我们将探讨分布式模型训练过程中遇到的挑战。 我们将介绍训练大型机器学习模型的挑战,这些模型负责标记新上传的 YouTube 视频的主题,但不能在单台机器上运行。 本章还介绍了如何克服使用参数服务器模式的困难。 此外,我们还将了解到如何使用集合通信模式来加速小模型的分布式训练,并避免参数服务器和工作节点之间不必要的通信开销。 在本章的最后,我们讨论了分布式机器学习系统由于数据集损坏、网络不稳定和工作节点抢占而存在的一些漏洞,并探讨如何解决这些问题。
第四章重点介绍模型服务组件,该组件需要具有可扩展性和可靠性,以处理不断增长的用户请求量和单个请求数据量的大小。 我们将权衡设计决策,以构建分布式模型服务系统。并使用复制服务来处理不断增长的模型服务请求。 我们还将学习如何评估模型服务系统,并确定事件驱动的设计在现实场景中是否有益。
在第五章中,我们将了解如何构建一个执行复杂机器学习工作流的系统,以此来训练多个机器学习模型,然后使用扇入和扇出模式选择性能最佳的模型,最后在模型服务系统中提供良好的实体标记结果。 我们还将结合同步和异步模式,使工作流更加高效,这避免了由于长时间运行的模型训练步骤阻碍了其他连续步骤,导致整个工作流延迟。
第六章是本部分的最后一章,涵盖了一些可以大大加速端到端工作流的方法和模式,以及减少工程和数据科学团队协作时产生的维护和沟通工作的方法。 我们将介绍几种调度技术,这些技术可以在许多团队成员在计算资源有限的同一集群中工作时,防止资源匮乏和死锁。 我们还将讨论元数据模式的好处,我们可以利用该模式来深入理解机器学习工作流中的各个步骤,从而更适当地处理故障,以减少对用户的负面影响。
Data ingestion patterns
This chapter covers Understanding data ingestion and its responsibilities Handling large datasets in memory by consuming smaller datasets in batches(the batching pattern) Preprocessing extremely large datasets as smaller chunks on multiple machines (the sharding pattern) Fetching and re-accessing the same dataset for multiple training rounds (the caching pattern)
Chapter 1 discussed the growing scale of modern machine learning applications such as larger datasets and heavier traffic for model serving. It also talked about the complexity and challenges of building distributed systems—distributed systems for machine learning applications in particular. We learned that a distributed machine learning system is usually a pipeline of many components, such as data ingestion, model training, serving, and monitoring, and that some established patterns are available for designing each component to handle the scale and complexity of real-world machine learning applications.
All data analysts and scientists should have some level of exposure to data ingestion, either hands-on experience in building a data ingestion component or simply using a dataset from the engineering team or customer. Designing a good data ingestion component is nontrivial and requires understanding the characteristics of the dataset we want to use for building a machine learning model. Fortunately, we can follow established patterns to build that model on a reliable and efficient foundation. This chapter explores some of the challenges involved in the data ingestion process and introduces a few established patterns adopted heavily in industries. In section 2.3, we will use the batching pattern in cases where we want to handle and prepare large datasets for model training, either when the machine learning framework we are using cannot handle large datasets or requires domain expertise in the underlying implementation of the framework. In section 2.4, we will learn how to apply the sharding pattern to split extremely large datasets into multiple data shards spread among multiple worker machines; then we speed up the training process as we add worker machines that are responsible for model training on each data shard independently. Section 2.5 introduces the caching pattern, which could greatly speed up the data ingestion process when a previously used dataset is re-accessed and processed for multi-epoch model training.
2.1 What is data ingestion? Let’s assume that we have a dataset at hand, and we would like to build a machine learning system that builds a machine learning model from it. What is the first thing we should think about? The answer is quite intuitive: first, we should get a better understanding of the dataset. Where did the dataset come from, and how was it collected? Are the source and the size of the dataset changing over time? What are the infrastructure requirements for handling the dataset? We should ask these types of questions first. We should also consider different perspectives that might affect the process of handling the dataset before we start building a distributed machine learning system. We will walk through these questions and considerations in the examples in the remaining sections of this chapter and learn how to address some of the problems we may encounter by using different established patterns.
Data ingestion is the process that monitors the data source, consumes the data all at once (nonstreaming) or in a streaming fashion, and performs preprocessing to prepare for the training process of machine learning models. In short, streaming data ingestion often requires long-running processes to monitor the changes in data sources; nonstreaming data ingestion happens in the form of offline batch jobs that process datasets on demand. Additionally, the data grows over time in streaming data ingestion, whereas the size of the dataset is fixed in nonstreaming data ingestion.
Table 2.1 summarizes the differences.
数据摄取模式
本章涵盖 了解数据摄取及其职责 通过批处理较小的数据集(批处理模式)来处理内存中的大型数据集 在多台机器上将大型的数据集预处理为较小的数据块(分片模式) 在多轮训练中获取并重新访问相同的数据集(缓存模式)
第一章讨论了现代机器学习应用规模在不断地扩大,例如出现了更大的数据集和更大的模型服务流量。 它还讨论了构建分布式系统(特别是用于机器学习应用程序的分布式系统)的复杂性和挑战。 我们了解到,分布式机器学习系统通常是由许多组件构成的流水线系统,例如:数据摄取、模型训练、服务和监控,还有一些现有的模式,可用于设计各个组件来处理现实中具有一定规模和复杂性的机器学习应用。
所有数据分析师和科学家都应该对数据摄取有一定程度的了解,要么具有构建数据摄取组件的实践经验,要么简单地使用过工程团队或客户的数据集。 设计一个好的数据摄取组件并不简单,需要了解我们想要用于构建数据集的特征。 幸运的是,我们可以遵循既定的模式,在可靠高效的基础上构建该模型。 本章探讨了在数据摄取过程中遇到的一些挑战,并介绍了行业中广泛采用的一些模式。 在 2.3 节中,当我们准备处理用于模型训练的大型数据集时,我们发现机器学习框架无法处理大型数据集,并且需要具备与框架的基础实现领域相关的专业知识,因此我们将使用批处理模式。 在 2.4 节中,我们将学习如何应用分片模式将大数据集分割成多个数据分片,分布在多台机器上;然后,我们通过添加多台机器对每个数据分片进行训练,从而加快训练过程。 2.5 节介绍了缓存模式,当重新访问和处理之前使用过的数据集以进行多轮训练时,该模式可以大大加快数据摄取过程。
2.1 数据摄取的基本概念 假设我们手头有一个数据集,我们想要构建一个机器学习系统,并从中构建机器学习模型。 我们首先应该考虑什么问题呢? 直观的答案是:为了更好地理解数据集,我们应该首先考虑:数据集从哪里来,是如何收集的?数据集的来源和大小是否随时间变化?处理这些数据集对基础设施的要求是什么? 在开始构建分布式机器学习系统之前,我们还应该考虑可能影响数据集处理的各种因素。 我们将在本章剩余部分的示例中探讨这些问题和考虑因素,并学习如何使用不同的模式来解决我们可能遇到的一些问题。
数据摄取是处理数据集的过程,它监控数据源,以非流式(一次性)或流式的方式读取数据,并对数据做预处理,以便进行后续的模型训练。 简单来说,流式数据摄取通常涉及长期运行的程序,以实时监控数据源的变化;而非流式数据摄取则通过离线批处理作业按需处理固定大小的数据集。 此外,在流式数据摄取中,数据随时间的推移而增长,而在非流式数据摄取中,数据集的大小是固定的。
表 2.1 总结了这些差异。
2.2 The Fashion-MNIST dataset
Table 2.1 Comparison of streaming and nonstreaming data ingestion in machine learning applications Streaming data ingestion Nonstreaming data Ingestion Dataset size Increases over time Fixed size Infrastructure requirements Long-running processes to monitor the changes in data source Offline batch jobs to process datasets on demand
The remaining sections of this chapter focus on data ingestion patterns from a non-streaming perspective, but they can be applied to streaming data ingestion as well.
Data ingestion is the first step and an inevitable step in a machine learning pipeline, as shown in figure 2.1. Without a properly ingested dataset, the rest of the processes in a machine learning pipeline would not be able to proceed.
Model deployment Data ingestion Model training Model selection The rest of the processes in the machine learning pipeline are all dependent on the success of data ingestion. Data ingestion is the first step in a machine learning pipeline. Figure 2.1 A flowchart that represents the machine learning pipeline. Note that data ingestion is the first step in the pipeline.
The next section introduces the Fashion-MNIST dataset, which I use to illustrate the patterns in the remaining sections of this chapter. I focus on building patterns around data ingestion in distributed machine learning applications, which are distinct from data ingestion that happens on local machines or laptops. Data ingestion in distributed machine learning applications is often more complex and requires careful design to handle large-scale datasets or datasets that are growing rapidly.
2.2 Fashion-MNIST 数据集
表 2.1 机器学习应用中流式和非流式数据摄取的比较 流数据摄取 非流数据摄取 数据集大小 随时间增加 固定大小 基础设施要求 长时间运行的进程来监控数据源的变化 离线批处理作业按需处理数据集
本章的其余部分从非流式的角度关注数据摄取模式,但它们也可以应用于流式数据摄取。
数据摄取是机器学习流水线的第一步,也是不可避免的一步,如图 2.1 所示。 如果没有正确地摄取数据集,机器学习流水线中的其余过程将无法继续。
模型 部署 数据摄取 模型训练 模型选择 机器学习流程中的后续步骤都依赖于数据摄取环节的成功执行。 数据摄取是机器学习流水线的起始阶段。 图 2.1 展示了机器学习流水线的流程图。值得注意的是,数据摄取位于流水线中的第一步。
下一节将介绍 Fashion-MNIST 数据集,我将利用这个数据集来阐释本章剩余部分的其他相关模式。 重点关注构建分布式机器学习应用中数据摄取的模式,这与个人电脑或笔记本上进行的数据摄取有显著区别。 在分布式机器学习应用中进行数据摄取通常更加复杂,需要经过周密的设计来有效管理大型、快速扩展的数据集。
2.2 The Fashion-MNIST dataset
The MNIST dataset by LeCun et al. (http://yann.lecun.com/exdb/mnist/) is one of the most widely used datasets for image classification. It contains 60,000 training images and 10,000 testing images extracted from images of handwritten digits; it is used widely in the machine learning research community as a benchmark dataset to validate state-of-art algorithms and machine learning models. Figure 2.2 shows some example images of handwritten digits, with each row representing images of a particular handwritten digit.
Each row represents images for a particular handwritten digit. For example, the first row represents images of the digit 0.
Figure 2.2 A screenshot of some example images for handwritten digits from 0 to 9, with each row representing images of a particular handwritten digit (Source: Josep Steffan, licensed under CC BY-SA 4.0)
Despite wide adoption in the community, researchers have found this dataset to be unsuitable for distinguishing between stronger models and weaker ones; many simple models nowadays can achieve good classification accuracy over 95%. As a result, the MNIST dataset now serves as more a sanity check than a benchmark.
NOTE The creators of the MNIST dataset kept a list of the machine learning methods tested on the dataset. In the original paper, “Gradient-Based Learning Applied to Document Recognition,” published in 1998 on the MNIST dataset (http://yann.lecun.com/exdb/publis/index.html#lecun-98), LeCun et al. stated that they used a support-vector machine model to get an error rate of 0.8%. A similar but extended dataset called EMNIST was published in2017. EMNIST contains 240,000 training images and 40,000 testing images of handwritten digits and characters.
Instead of using MNIST in several examples throughout this book, I will focus on a quantitatively similar but relatively more complex dataset: the Fashion-MNIST dataset, which was released in 2017 (https://github.com/zalandoresearch/fashion-mnist). Fashion-MNIST is a dataset of Zalando’s article images consisting of a training set of 60,000 examples and a test set of 10,000 examples. Each example is a 28 × 28 grayscale image associated with a label from ten classes. The Fashion-MNIST dataset is designed to serve as a direct drop-in replacement for the original MNIST dataset for benchmarking machine learning algorithms. It uses the same image size and structure for training and testing splits.
Figure 2.3 shows the collection of images for all 10 classes (T-shirt/top, trouser, pullover, dress, coat, sandal, shirt, sneaker, bag, and ankle boot) from Fashion-MNIST. Each class takes up three rows of the screenshot.
Figure 2.4 provides a closer look at the first few example images in the training set, together with their corresponding text labels. Next, I discuss the scenario for the case study.
Every three rows represent example images that represent a class. For example, the top three rows are images of T-shirts.
Figure 2.3 A screenshot of the collection of images from Fashion-MNIST dataset for all 10 classes: T-shirt/top, trouser, pullover, dress, coat, sandal, shirt, sneaker, bag, and ankle boot (Source: Zalando SE, licensed under MIT License)
Figure 2.4 The first few example images in the training set (Source: Zalando SE, licensed under MIT License)
2.2 Fashion-MNIST 数据集
LeCun 等人创建的 MNIST 数据集(可通过访问 http://yann.lecun.com/exdb/mnist/ 获取)是用于图像分类的最广泛使用的数据集之一。 它包含60,000张训练图像和10,000张测试图像,这些图像都是从手写数字中提取出来的; 在机器学习研究社区中,它被广泛用作基准数据集,用于验证最新算法和机器学习模型的性能。 图 2.2 显示了一些手写数字的示例图像,每行代表特定手写数字的图像。
每行代表特定手写数字的图像。 例如,第一行代表数字0的手写图像。
图 2.2 展示了手写数字 0 到 9 的一些示例图像的截图,每行代表特定手写数字的图像(来源:Josep Steffan,遵循 CC BY-SA 4.0 许可协议)
尽管 MNIST 数据集在研究社区中得到了广泛的采用,研究人员发现该数据集不适合区分较强和较弱的模型;许多简单的模型如今可以轻松实现超过95%的良好分类准确率。 因此,MNIST 数据集由于过于简单,不作为一个基准数据集使用。
注意 MNIST 数据集的创造者们维护了一个列表,该列表记录了在该数据集上测试过的机器学习方法。 在 1998 年发表的关于 MNIST 数据集的开创性论文《基于梯度学习的文档识别应用》(Gradient-Based Learning Applied to Document Recognition)(详见:http://yann.lecun.com/exdb/publis/index.html#lecun-98)中 ,LeCun 等人提到:他们使用的支持向量机模型得到了 0.8% 的错误率。 2017 年,一个名为 EMNIST 的数据集被发布,它与 MNIST 类似但更为扩展。 EMNIST 包含 240,000 张训练图像和 40,000 张测试图像,涵盖了手写数字和字母。
在本书中,我将不会使用 MNIST 数据集作为示例,而是选择关注一个在数量上相似但在复杂度上相对更高的数据集:2017 年发布的 Fashion-MNIST 数据集(详见:https://github.com/zalandoresearch/fashion-mnist)。 Fashion-MNIST 是由 Zalando 提供的服饰图像数据集,包含 60,000 张训练集图像和 10,000 张测试集图像。每张图像都是 28 × 28 像素的黑白灰度图,与十个类别中的一个标签相对应。 该数据集的设计初衷是作为原始 MNIST 数据集的替代品,用于机器学习算法的基准测试。它使用了相同的图像大小和结构来进行训练、测试数据的分割。
图 2.3 显示了 Fashion-MNIST 中所有 10 个类别(T 恤/上衣、裤子、套头衫、连衣裙、外套、凉鞋、衬衫、运动鞋、包和踝靴)的图像集合。 每个类别的图像占据了 3 行。
图 2.4 详细介绍了训练集中的前几个示例图像及其相应的文本标签。 接下来,将讨论案例研究的具体场景。
每 3 行图像代表一个类别。例如,前 3 行代表的是T恤图像。
图 2.3 Fashion-MNIST 数据集中所有 10 个类别的图像集合:T 恤/上衣、裤子、套头衫、连衣裙、外套、凉鞋、衬衫、运动鞋、包和踝靴(图片来源:Zalando SE,遵循 MIT 许可协议)
图 2.4 训练集中的前几张示例图像(来源:Zalando SE,遵循 MIT 许可协议)
Assume that we’ve downloaded the Fashion-MNIST dataset. The compressed version should only take 30 MB on disk. Even though the dataset is small, it’s trivial to load the downloaded dataset into memory at one time by using available implementations. If we’re using a machine learning framework like TensorFlow, for example, we can download and load the entire Fashion-MNIST dataset into memory with a couple of lines of Python code, as shown in the following listing.
Loads the TensorFlow library
Downloads the Fashion-MNIST dataset and then loads it into memory
Alternatively, if the dataset is already in memory—in the form of NumPy (https:// numpy.org) arrays, for example—we can load the dataset from an in-memory array representation into formats that the machine learning framework accepts, such as tf.Tensor objects, which can easily be used for model training later. The following listing shows an example.
Listing 2.2 Loading the Fashion-MNIST dataset from memory into TensorFlow
Normalizes the images Splits the training dataset object into images and labels Loads in-memory array representation into a tf.data.Dataset object that will make it easier to use for training in TensorFlow Inspects the dataset’s information, such as shapes and data types
假设我们已经下载了 Fashion-MNIST 数据集。 这个压缩后的数据集在硬盘上只占用大约 30 MB 的空间。数据集不大,利用现有方法,一次性将下载的数据集加载到内存中是非常简单的事情。 例如,如果我们使用像 TensorFlow 这样的机器学习框架,我们可以通过几行 Python 代码下载并将整个 Fashion-MNIST 数据集加载到内存中,如下面的代码所示
代码 2.1 使用 TensorFlow 将 Fashion-MNIST 数据集加载到内存中 加载 TensorFlow 库
下载 Fashion-MNIST 数据集,然后将其加载到内存中
或者,如果数据集已经以某种形式存在于内存中(例如,以 NumPy (https://numpy.org) 数组的形式),我们可以将数据集从内存中的数组转换为机器学习框架中的对象,比如 tf.Tensor 对象,这些对象可以方便地用于后续的模型训练。 下面的代码展示了一个例子。
代码 2.2 从内存中加载 Fashion-MNIST 数据集到 TensorFlow
Normalizes the images 标准化图像 Splits the training dataset object into images and labels 将训练数据集对象分割为图像和标签 Loads in-memory array representation into a tf.data.Dataset object that will make it easier to use for training in TensorFlow 将内存中的数组加载到 tf.data.Dataset 对象中,以便用于在 TensorFlow 中进行训练 Inspects the dataset’s information, such as shapes and data types 检查数据集的信息,例如 shapes 和数据类型
2.3 Batching pattern Now that we know what the Fashion-MNIST dataset looks like, let’s examine a potential problem we might face in a real-world scenario.
2.3.1 The problem: Performing expensive operations for Fashion MNIST dataset with limited memory Even though it’s easy to load a small dataset like Fashion-MNIST into memory to prepare for model training, in real-world machine learning applications, this process can be challenging. The code snippet in listing 2.1, for example, can be used to load the Fashion-MNIST into memory to prepare for model training in TensorFlow; it embeds the features and labels arrays in our TensorFlow graph as tf.constant() operations. This process works well for a small dataset, but it wastes memory because the contents of the NumPy array will be copied multiple times and can run into the 2 GB limit for the tf.GraphDef protocol buffer that TensorFlow uses. In real-world applications, the datasets are much larger, especially in distributed machine learning systems in which datasets grow over time.
Figure 2.5 shows a 1.5-GB in-memory NumPy array representation that will be copied two times with a tf.constant() operation. This operation would result in an out-of-memory error because the total 3 GB exceeds the maximum size of the tf.Graph-Def protocol buffer that TensorFlow uses.
Figure 2.5 An example 1.5-GB in-memory NumPy array representation that hits an out-of-memory error when being converted to a tf.GraphDef protocol buffer
Problems like this one happen often in different machine learning or data loading frameworks. Users may not be using the specific framework in an optimal way, or the framework may not be able to handle larger datasets.
In addition, even for small datasets like Fashion-MNIST, we may perform additional computations before feeding the dataset into the model, which is common in tasks that require additional transformations and cleaning. For computer vision tasks, images often need to be resized, normalized, or converted to grayscale, or they may require even more complex mathematical operations, such as convolution operations. These operations may require a lot of additional memory space allocation, but we may not have many computational resources available after we load the entire dataset into memory.
2.3.2 The solution Consider the first problem mentioned in section 2.2. We’d like to use TensorFlow’s from_tensor_slices() API to load the Fashion-MNIST dataset from an in-memory NumPy array representation to a tf.Dataset object that TensorFlow’s model training program can use. Because the contents of the NumPy array will be copied multiple times, however, we can run into the 2 GB limit for the tf.GraphDef protocol buffer. As a result, we cannot load larger datasets that go beyond this limit.
It’s not uncommon to see problems like this one for specific frameworks like TensorFlow. In this case, the solution is simple because we are not making the best use of TensorFlow. Other APIs allow us to load large datasets without loading the entire dataset into in-memory representation first. 2.3 批处理模式
现在我们了解了 Fashion-MNIST 数据集的基本情况,接下来看看在现实场景中可能遇到的潜在问题。
2.3.1 问题:在内存有限的情况下对 Fashion MNIST 数据集执行耗费资源的操作 尽管像 Fashion-MNIST 这样的小型数据集,将它加载到内存中为模型训练做准备是很容易的,但在实际的机器学习应用中,这个过程可能会具有挑战性。
例如,代码片段 2.1 将 Fashion-MNIST 加载到内存中,为在 TensorFlow 中进行模型训练做好准备;它将特征和标签数组经过 tf.constant() 函数嵌入到我们的 TensorFlow 的 graph 中。
这个过程对于小型数据集来说效果很好,但它浪费了内存,因为 NumPy 数组的内容会被多次复制,并且可能会遇到 TensorFlow 使用的 tf.GraphDef 协议缓冲区的 2 GB 限制。 在实际应用中,数据集通常要大得多,尤其是在数据集随时间增长的分布式机器学习系统中。
图 2.5 显示了一个占用 1.5 GB 内存的 NumPy 数组,它将通过 tf.constant() 操作被复制两次。 这个操作会导致内存溢出错误,因为总共 3 GB 的内存分配超出了 TensorFlow 使用的 tf.Graph-Def 协议缓冲区的最大大小。
图 2.5 一个占用 1.5 GB 内存的 NumPy 数组的例子,它在转换为 tf.GraphDef 协议缓冲区时遇到内存溢出错误
The tf.constant() operation involves making additional copies of the original in-memory NumPy representation. tf.constant() 操作涉及制作原始内存中 NumPy 表示的附加副本。
These additional copies wouldresult in out of memory errorsince the total 3 GB hits themaximum size of tf.GraphDefprotocol buffer that TensorFlow uses. 这些额外的副本将导致内存不足错误,因为总共 3 GB 达到了 TensorFlow 使用的 tf.GraphDefprotocol 缓冲区的最大大小。
在不同的机器学习或数据加载框架中,这样的问题经常发生。 用户可能没有以最佳方式来使用框架,或者框架本身无法处理大数据集。
此外,即使对于像 Fashion-MNIST 这样的小型数据集,我们也可能在将数据集输入模型之前执行额外的计算,这在需要额外转换和清洗的数据预处理任务中很常见。 对于计算机视觉任务,图像通常需要调整大小、标准化或转换为灰度图,或者可能需要经过更复杂的数学运算,例如卷积运算。 这些操作可能需要分配一些额外的内存空间,但在我们将整个数据集加载到内存中后,可能就已经没有太多可用的计算资源了。
2.3.2 解决方案 考虑到 2.2 节中提到的第一个问题,我们希望使用 TensorFlow 的 from_tensor_slices() API 将内存中的 NumPy 数组(来源于 Fashion-MNIST 数据集)转换为 TensorFlow 程序可以使用的 tf.Dataset 对象。 然而,由于 NumPy 数组的内容会被多次复制,我们可能会遇到 tf.GraphDef 协议缓冲区的 2 GB 限制。 因此,我们无法加载超出此限制的大数据集。
对于类似 TensorFlow 的框架,遇到类似这样的问题并不罕见。 在这种情况下,解决方案很简单,因为我们没有很好地利用 TensorFlow。 其他 API 允许我们加载大型数据集,而无需先将整个数据集加载到内存中。
TensorFlow’s I/O library, for example, is a collection of filesystems and file formats that are not available in TensorFlow’s built-in support. We can load datasets like MNIST from a URL to access the dataset files that are passed directly to the tfio.IODataset.from_mnist() API call, as shown in the following listing. This ability is due to the inherent support that TensorFlow (https://github.com/tensorflow/io)I/O library provides for the HTTP filesystem, eliminating the need to download and save datasets in a local directory.
Listing 2.3 Loading the MNIST dataset with TensorFlow I/O
For larger datasets that might be stored in distributed file systems or databases, some APIs can load them without having to download everything at one time, which could cause memory- or disk-related problems. For demonstration purposes, without going into too many details here, the following listing shows how to load a dataset from a PostgreSQL database (https://www.postgresql.org). (You’ll need to set up your own PostgreSQL database and provide the required environment variables to run this example.)
Listing 2.4 Loading a dataset from the PostgreSQL database
Now let’s go back to our scenario. In this case, assume that TensorFlow does not provide APIs like TensorFlow I/O that can deal with large datasets. Given that we don’t have too much free memory, we should not load the entire Fashion-MNIST dataset into memory directly. Let’s assume that the mathematical operations we would like to perform on the dataset can be performed on subsets of the entire dataset. Then we can divide the dataset into smaller subsets (mini-batches), load each mini-batch of example images, perform expensive mathematical operations on each batch, and use only one mini-batch of images in each model training iteration.
If the first mini-batch consists of the 19 example images in figure 2.4, we can perform convolution or other heavy mathematical operations on those images first and then send the transformed images to the machine learning model for model training. We repeat the same process for the remaining mini-batches while continuing model training in the meantime.
Because we’ve divided the dataset into many small subsets or mini-batches, we avoid potential out-of-memory problems when performing the heavy mathematical operations necessary for achieving an accurate classification model. Then we can handle even larger datasets by reducing the size of the mini-batches. This approach is called batching. In data ingestion, batching involves grouping data records from the entire dataset into batches that will be used to train the machine learning model sequentially.
If we have a dataset with 100 records, we can take 50 of the 100 records to form a batch and then train the model using this batch of records. We repeat this batching and model training process for the remaining records. In other words, we make two batches in total; each batch consists of 50 records, and the model we are training consumes the batches one by one. Figure 2.6 illustrates the process of dividing the original dataset into two batches. The first batch gets consumed to train the model at time t0, and the second batch gets consumed at time t1. As a result, we don’t have to load the entire dataset into memory at one time; instead, we are consuming the dataset sequentially, batch by batch.
例如,TensorFlow I/O 是 TensorFlow 并未内置支持的文件系统和文件格式的集合。 我们可以通过 URL 地址加载 MNIST 数据集,直接通过 tfio.IODataset.from_mnist() API 调用获取数据集文件,如以下列表所示。 这种能力得益于 TensorFlow (https://github.com/tensorflow/io)I/O 库为 HTTP 文件系统提供的支持,用户无需下载并本地保存数据集。
代码 2.3 使用 TensorFlow I/O 加载 MNIST 数据集
对于可能存储在分布式文件系统或数据库中的较大数据集,通过一些 API 可以在不需要一次性下载全部内容的情况下加载它们,这可能会导致内存或磁盘相关的问题。 为了演示,这里不涉及太多细节,以下代码显示了如何从 PostgreSQL 数据库 (https://www.postgresql.org) 加载数据集。 (您需要设置自己的 PostgreSQL 数据库并配置运行此示例所需的环境变量。)
代码 2.4 从 PostgreSQL 数据库加载数据集
回到我们的场景中。 在这种情况下,假设 TensorFlow 没有提供像 TensorFlow I/O 这样可以处理大型数据集的 API。 并且我们没有太多的空闲内存,没办法直接将整个 Fashion-MNIST 数据集一次性加载到内存中。 假设我们希望对数据集执行的数学运算可以在整个数据集的子集上执行。 那么我们可以将数据集划分为更小的子集,称为 mini-batch(小批量),加载 mini-batch 的样本图像,对每个训练批次执行大量复杂的运算,并且在每次模型训练迭代中只使用一个 mini-batch 的图像。
如果第一个小批量由图 2.4 中的 19 个示例图像组成,我们可以首先对这些图像执行卷积或其他复杂的数学运算,然后将转换后的图像发送到机器学习模型进行模型训练。 我们对剩余的小批量重复相同的过程,继续模型训练。
由于我们已将数据集划分为许多小的子集或小批量,因此在执行复杂数学运算以实现准确分类模型时,我们可以避免潜在的内存不足问题。 然后我们可以通过减小数据子集的大小来处理更大的数据集。 这种方法称为批处理。 在数据摄取过程中,批处理涉及将整个数据集中的数据划分为多个 batch(批量),这些 batch 将依次用于模型训练。
如果我们有一个包含 100 条记录的数据集,我们可以从 100 条记录中取出 50 条形成一个 batch,然后使用这个 batch 来训练模型。 我们重复这个分批和模型训练过程来处理剩余的记录。 也就是说,我们总共制作了两个 batch;每个 batch 包含 50 条记录,我们正在训练的模型依次使用这些 batch。 图 2.6 展示了将原始数据集分成两个 batch 的过程。 第一个 batch 在时间 t0 被用于模型训练,第二个 batch 在时间 t1 被使用。 因此,我们不必一次性将整个数据集加载到内存中,而是将数据集分批后逐个 batch 顺序消费。
Figure 2.6 The dataset gets divided into two batches. The first batch gets consumed to train the model at time t0, and the second batch gets consumed at time t1.
This batching pattern can be summarized as the pseudocode in the following listing, where we continuously try to read the next batch from the dataset and train the model, using the batches until no more are left.
Listing 2.5 Pseudocode for batching
We can apply the batching pattern when we want to handle and prepare large datasets for model training. When the framework we are using can handle only in-memory datasets, we can process small batches of the entire large datasets to ensure that each batch can be handled within limited memory. In addition, if a dataset is divided into batches, we can perform heavy computations on each batch sequentially without requiring a huge amount of computational resources. We’ll apply this pattern in section 9.1.2.
2.3.3 Discussion Other considerations need to be taken into account when performing batching. This approach is feasible only if the mathematical operations or algorithms we are performing can be done on subsets of the entire dataset in a streaming fashion. If an algorithm requires knowledge of the entire dataset, such as the sum of a particular feature over the entire dataset, batching would no longer be a feasible approach, as it’s not possible to obtain this information over a subset of the entire dataset.
In addition, machine learning researchers and practitioners often try different machine learning models on the Fashion-MNIST dataset to get a better-performing, more accurate model. If an algorithm would like to see at least 10 examples for each class to initialize some of its model parameters, for example, batching is not an appropriate approach. There is no guarantee that every mini-batch contains at least 10 examples from each class, especially when batch sizes are small. In an extreme case, the batch size would be 10, and it would be rare to see at least one image from each class in all batches.
Another thing to keep in mind is that the batch size of a machine learning model, especially for deep learning models, depends strongly on allocation of resources, making it particularly difficult to decide in advance in shared-resource environments. Also, the allocation of resources that a machine learning job can use efficiently depends not only on the structure of the model being trained but also on the batch size. This codependency between the resources and the batch size creates a complex web of considerations that a machine learning practitioner must make to configure their job for efficient execution and resource use. Fortunately, algorithms and frameworks are available that eliminate manual tuning of batch size. AdaptDL (https://github.com/petuum/adaptdl), for example, offers automatic batch-size scaling, enabling efficient distributed training without requiring any effort to tune the batch size manually. It measures the system performance and gradient noise scale during training and adaptively selects the most efficient batch size. Figure 2.7 compares the effects of automatically and manually tuned batch sizes on the overall training time of the ResNet18 model (https://arxiv.org/abs/1512.03385).
Figure 2.7 A comparison of the effect of automatically and manually tuned batch sizes on the overall training time of the ResNet18 model (Source:Petuum, licensed under Apache License 2.0)
The batching pattern provides a great way to extract subsets of the entire dataset so that we can feed the batches sequentially for model training. For extremely large datasets that may not fit in a single machine, we’ll need other techniques. The next section introduces a new pattern in that addresses the challenges.
图 2.6 数据集被分为两个 batch 。第一个 batch 在时间 t0 用于训练模型,第二个 batch 在时间 t1 被使用。
这种批处理模式可以用以下伪代码概括,我们不断尝试从数据集中读取下一个 batch,并使用这些 batch 来训练模型,直到没有剩余的批为止。
代码 2.5 批处理的伪代码
当我们想要预处理用于模型训练的大型数据集时,可以使用批处理模式。 当我们使用的框架只能处理内存中的数据集时,我们可以依次处理整个数据集的每个 batch,以确保每个 batch 都可以在有限的内存内处理。 此外,数据集被分成多个 batch 后,我们可以依次对每个 batch 进行计算,但不需要耗费大量的计算资源。我们将在第 9.1.2 节中应用此模式。
2.3.3 讨论 执行批处理时还需要考虑其他因素。 只有当模型训练算法可以流式地在整个数据集的子集上完成时,这种方法才可行。 如果训练算法需要依赖整个数据集,例如:需要获取整个数据集上指定特征的总和,批处理方法将不再可行,因为我们无法通过单个数据子集获取到这些信息。
此外,机器学习研究人员和从业者经常在 Fashion-MNIST 数据集上尝试使用不同的机器学习模型,以获得性能更好、更准确的模型。 例如,如果算法希望每个分类至少有 10 个样本来初始化某些模型参数,则批处理就不是一个合适的方法。 因为它无法保证每个 mini-batch 在每个分类中至少包含 10 个样本,尤其是当 batch 的数据量较小时。 在极端情况下,即使 batch 的大小为 10,我们也很难在所有 batch 中遇到每个分类至少有一张图像的情况。
另外,模型的 batch size(批量大小),尤其是深度学习模型的 batch size,在很大程度上取决于资源的分配,这使得在共享资源环境中提前确定 batch size 非常困难。 此外,机器学习作业能够高效使用的资源分配不仅取决于正在训练的模型结构,还取决于 batch size。 资源和 batch size 之间的这种相互依赖关系共同构建了复杂的决策因素,机器学习从业者必须考虑这些因素来配置训练任务,以实现高效执行和资源使用。 幸运的是,我们可以使用一些算法和框架来自动调整 batch size。例如,AdaptDL (https://github.com/petuum/adaptdl) 提供 batch size 自动缩放功能,无需手动调整 batch size 即可实现高效的分布式训练。 它在训练过程中测量系统性能和梯度噪声规模,并自适应地选择最高效的 batch size。 图 2.7 比较了自动和手动调整 batch size 对 ResNet18 (https://arxiv.org/abs/1512.03385) 模型整体训练时间的影响。
图 2.7 比较了自动和手动调整 batch size 对 ResNet18 模型整体训练时间的影响(图片来源:Petuum,遵循 Apache License 2.0 许可协议)
批处理模式提供了一种提取整个数据集子集的方法,以便我们可以依次对每个 batch 进行模型训练。 对于可能无法在一台机器中存储的超大数据集,我们需要使用其他手段。 下一节将介绍一种新模式以解决这些挑战。
2.3.4 Exercises 1 Are we training the model using the batches in parallel or sequentially? 2 If the machine learning framework we are using does not handle large datasets, can we use the batching pattern? 3 If a machine learning model requires knowing the mean of a feature of the entire dataset, can we still use the batching pattern?
2.4 Sharding pattern: Splitting extremely large datasets among multiple machines Section 2.3 introduced the Fashion-MNIST dataset, the compressed version of which takes only 30 MB on disk. Even though it is trivial to load the whole dataset into memory at one time, it’s challenging to load larger datasets for model training. The batching pattern covered in section 2.3 addresses the problem by grouping data records from the entire dataset into batches that will be used to train the machine learning model sequentially. We can apply the batching pattern when we want to handle and prepare large datasets for model training, either when the framework we are using cannot handle large datasets or when the underlying implementation of the framework requires domain expertise.
Suppose that we have a much larger dataset at hand. This dataset is about 1,000 times bigger than the Fashion-MNIST dataset. In other words, the compressed version of it takes 30 MB × 1,000 = 30 GB on disk, and it’s about 50 GB when it’s decompressed. This new dataset has 60,000 × 1,000 = 60,000,000 training examples. We’ll try to use this larger dataset to train our machine learning model to classify images into classes in the expanded Fashion-MNIST dataset (T-shirts, bags, and so on). For now, I won’t address the detailed architecture of the machine learning model(chapter 3); instead, I’ll focus on its data ingestion component. Assume that we are allowed to use three machines for any potential speed-ups. Given our experience, because the dataset is large, we could try applying the batching pattern first, dividing the entire dataset into batches small enough to load into memory for model training. Let’s assume that our laptop has enough resources to store the entire 50 GB decompressed dataset on disk. We divide the dataset into 10 small batches (5 GB each). With this batching approach, we can handle large datasets as long as our laptop can store the large datasets and divide them into batches. Next, we start the model training process by using the batches of data.
In section 2.3, we trained the model sequentially. In other words, one batch was completely consumed by the machine learning model before the next batch was consumed. In figure 2.8, the second batch is consumed at time t1 by model fitting only after the first batch has been completely consumed by the model at time t0. t0 and t1 represent two consecutive time points in this process.
2.4.1 The problem Unfortunately, this sequential process of consuming data can be slow. If each 5 GB batch of data takes about 1 hour to complete for the specific model we are training, it would take 10 hours to finish the model training process on the entire dataset. In other words, the batching approach may work well if we have enough time to train the model sequentially, batch by batch. In real-world applications, however, there’s always demand for more efficient model training, which will be affected by the time spent ingesting batches of data.
Figure 2.8 The dataset gets divided into two batches. The first batch gets consumed to train the model at time t0, and the second batch gets consumed at time t1.
2.3.4 练习 1 我们应该并行还是顺序地读取 batch 来训练模型? 2 如果我们使用的机器学习框架不能处理大型数据集,还可以使用批处理模式吗? 3 如果机器学习模型需要知道整个数据集某个特征的平均值,我们还可以使用批处理模式吗?
2.4 分片模式:在多台机器之间分割极大的数据集 2.3 节介绍了 Fashion-MNIST 数据集,其压缩版本仅占用 30 MB 磁盘空间。 尽管很容易就能将整个数据集一次性加载到内存中,但如果要加载更大的数据集进行模型训练就变得很困难。 2.3 节中介绍的批处理模式通过将整个数据集中的数据记录分成多个 batch 后,再依次进行模型训练来解决该问题。 当我们想要预处理大型数据集进行模型训练时,无论是因为我们使用的框架无法处理大型数据集,还是因为处理大型数据集需要深入理解框架的底层实现而产生过高的学习成本,我们都可以使用批处理模式。
假设我们有一个更大的数据集,该数据集大约比 Fashion-MNIST 数据集还要大 1,000 倍。 换句话说,它压缩后需要 30 MB × 1,000 = 30 GB 的磁盘空间,解压后大约为 50 GB。 这个新数据集有 60,000 × 1,000 = 60,000,000 个训练样本。 我们将尝试使用这个更大的数据集来训练我们的模型,对扩展的 Fashion-MNIST 数据集中的图像进行分类(如T恤、包包等)。 先不讨论机器学习模型的详细架构(见第 3 章);而是重点关注其数据摄取组件。 假设我们可以使用 3 台机器来加速数据摄取。 由于数据集非常庞大,根据经验,我们可以首先尝试用批处理模式将整个数据集分成足够小的 batch,以便加载到内存中进行模型训练。 假设我们的笔记本电脑有足够的磁盘空间能够存储 50 GB 解压后的完整数据集。我们将数据集分为 10 个 batch(每个 5 GB)。 通过这种批处理方法,只要我们的笔记本电脑能够存储并将整个数据集分成多个 batch,也就具有了处理这样大型数据集的能力。 接下来,我们开始使用这些 batch 进行模型训练。
在2.3节中,我们依次顺序地训练了模型。 换句话说,在使用下一个 batch 的数据前,机器学习模型要确保已经处理完了上一个 batch 的数据。 在图 2.8 中,第一个 batch 的数据在时间 t0 被模型完全使用后,第二个批次的数据才在时间 t1 被模型开始使用。 t0 和 t1 代表这个过程中两个连续的时间点。
2.4.1 问题 不幸的是,模型顺序地消费使用数据的过程可能会很慢。如果我们正在训练的特定模型,每消费 5 GB 的 batch 大约需要 1 小时才能完成,那么完成整个数据集的模型训练将需要 10 小时。 也就是说,如果我们有足够的时间按顺序、逐批次地用数据进行模型训练,批处理方法是可行的。 但在实际应用中,我们总是希望能更高效地训练模型,这将受到读取每个 batch 所花费的时间的影响。
图 2.8 数据集被分为两个 batch。第一个 batch 的数据在时间 t0 被使用,第二个 batch 的数据在时间 t1 被使用。
2.4.2 The solution Now that we understand the slowness of training the model sequentially by using the batching pattern alone, what can we do to speed up the data ingestion part, which will greatly affect the model training process? The major problem is that we need to train the model sequentially, batch by batch. Can we prepare multiple batches and then send them to the machine learning model for consumption at the same time? Figure 2.9 shows that the dataset gets divided into two batches, with each batch being consumed to train the model at the same time. This approach does not work yet, as we cannot keep the entire dataset (two batches) in memory at the same time, but it is close to the solution.
Figure 2.9 The dataset gets divided into two batches; each batch is consumed to train the model at the same time.
Let’s assume that we have multiple worker machines, each of which contains a copy of the machine learning model. Each copy can consume one batch of the original dataset; hence, the worker machines can consume multiple batches independently. Figure 2.10 shows an architecture diagram of multiple worker machines; each consumes batches independently to train the copy of the model located on it.
Figure 2.10 An architecture diagram of multiple worker machines. Each worker machine consumes batches independently to train the copy of the model located on it.
You may wonder how multiple model copies would work if they consumed multiple different batches independently and where we would obtain the final machine learning model from these model copies. These are great questions. Rest assured that I will go through how the model training process works in chapter 3. For now, assume that we have patterns that allow multiple worker machines to consume multiple batches of datasets independently. These patterns will greatly speed up the model training process, which was slowed down due to the nature of sequential model training.
NOTE We will be using a pattern called the collection communication pattern in chapter 3 to train models with multiple model copies located on multiple worker machines. The collective communication pattern, for example, will be responsible for communicating updates of gradient calculations among worker machines and keeping the model copies in sync.
How would we produce the batches used by those worker machines? In our scenario, the dataset has 60 million training examples, and three worker machines are available. It’s simple to split the dataset into multiple non-overlapping subsets and then send each to the three worker machines, as shown in figure 2.11. The process of breaking large datasets into smaller chunks spread across multiple machines is called sharding, and the smaller data chunks are called data shards. Figure 2.11 shows the original dataset being sharded into multiple non-overlapping data shards and then consumed by multiple worker machines.
Figure 2.11 An architecture diagram in which the original dataset gets sharded into multiple nonoverlapping data shards and then consumed by multiple worker machines
2.4.2 解决方案 既然我们已经了解了仅使用批处理模式串行训练模型的速度很慢,那么我们可以做些什么来加快数据的摄取,从而大大地提高模型训练速度呢? 这里的主要问题是我们需要按顺序、逐个 batch 地训练模型。 我们能否同时将多个 batch 的数据提供给模型使用呢? 图 2.9 显示数据集被分为两个 batch ,每个 batch 都同时用于训练模型。 这种方法目前还不可行,因为我们无法同时将整个数据集(两个 batch 的数据)保存在内存中,但它已经非常接近我们所期望的解决方案了。
图 2.9 数据集被分为两个 batch;每个 batch 同时用于模型训练。
假设我们有多台机器,每台机器都存放了相同的机器学习模型副本。 每个模型副本可以消费一个 batch 的数据;因此,每台机器都可以独立地消耗多个 batch 的数据。 图 2.10 展示了多个工作节点的架构图;每个工作节点独立地消费各自的 batch 来训练模型副本。
图2.10 多个工作节点的架构图。每个工作节点独立地消费各自的 batch 来训练模型副本。
你可能会好奇,如果多个模型副本独立消费不同的 batch 训练各自的模型副本,我们将如何从这些模型副本中获得最终的模型。 请放心,我将在第 3 章中介绍模型训练的工作原理。 现在,假设我们有一个允许多个工作节点独立使用多个数据集 batch 的模式。这个模式将大大加快模型训练速度。
注意:我们将在第 3 章中使用一种名为集合通信(collection communication pattern)的模式来训练位于多个工作节点上的模型副本。 例如,集合通信模式将负责在工作节点之间传递梯度计算的结果,更新并保持模型副本之间的同步。
我们将如何生成供这些工作节点使用的 batch 呢? 在我们的场景中,数据集有 6000 万个训练样本,并且有 3 个工作节点可用。 将数据集分割成多个互不重叠的数据子集,然后将每个子集分发到 3 个工作节点上,如图 2.11 所示。 将大型数据集分解为分布在多台机器上的较小数据块的过程称为分片,这些较小的数据块称为数据分片。 图 2.11 显示了原始数据集被分片为多个互不重叠的数据分片,然后被多个工作节点所使用。
图2.11 原始数据集被分片为多个互不重叠的数据分片,然后被多个工作节点所使用的架构图
NOTE Although I am introducing sharding here, the concept isn’t new; it’s often used in distributed databases. Sharding in distributed databases is extremely useful for solving scaling challenges such as providing high availability of the databases, increasing throughput, and reducing query response time.
A shard is essentially a horizontal data partition that contains a subset of the entire dataset, and sharding is also referred to as horizontal partitioning. The distinction between horizontal and vertical comes from the traditional tabular view of a database. A database can be split vertically—storing different table columns in a separate database—or horizontally—storing rows of the same table in multiple databases. Figure 2.12 compares vertical partitioning and horizontal partitioning. Note that for vertical partitioning, we split the database into columns. Some of the columns may be empty, which is why we see only three of the five rows in the partition on the right side of the figure.
This sharding pattern can be summarized in the pseudocode in listing 2.6, where, first, we create data shards from one of the worker machines (in this case, worker machine with rank 0) and then send it to all other worker machines. Next, on each worker machine, we continuously try to read the next shard locally that will be used to train the model until no more shards are left locally.
Figure 2.12 Vertical partitioning vs. horizontal partitioning (Source: YugabyteDB, licensed under Apache License 2.0)
Listing 2.6 Pseudocode for sharding
With the help of the sharding pattern, we can split extremely large datasets into multiple data shards that can be spread among multiple worker machines, and then each of the worker machines is responsible for consuming individual data shards independently. As a result, we have just avoided the slowness of sequential model training due to the batching pattern. Sometimes it’s also useful to shard large datasets into subsets of different sizes so that each shard can run different computational workloads depending on the amount of computational resource available in each worker machine. We’ll apply this pattern in section 9.1.2.
2.4.3 Discussion We have successfully used the sharding pattern to split an extremely large dataset into multiple data shards that spread among multiple worker machines and then sped up the training process as we add additional worker machines that are responsible for model training on each of the data shards independently. This is great, and with this approach, we can train machine learning models on extremely large datasets.
Now here comes the question: What if the dataset is growing continuously and we need to incorporate the new data that just arrived into the model training process? In this case, we’ll have to reshard every once in a while if the dataset has been updated to rebalance each data shard to make sure they are split relatively evenly among the different worker machines. In section 2.3.2, we simply divided the dataset into two non-overlapping shards, but unfortunately in real-world systems, this manual approach is not ideal and may not work at all. One of the most significant challenges with manual sharding is uneven shard allocation. The disproportionate distribution of data could cause shards to become unbalanced, with some overloaded while others remain relatively empty. This imbalance could cause unexpected hanging of the model training process that involves multiple worker machines, which we’ll talk about further in the next chapter.
Figure 2.13 The original dataset gets sharded into multiple imbalanced data shards and then consumed by multiple worker machines.
Figure 2.13 is an example where the original dataset gets sharded into multiple imbalanced data shards and then consumed by multiple worker machines. It’s best to avoid having too much data in one individual shard, which could lead to slowdowns and machine crashes. This problem could also happen when we force the dataset to be spread across too few shards. This approach is acceptable in development and testing environments but not ideal in production.
In addition, when manual sharding is used every time we see an update in the growing dataset, the operational process is nontrivial. Now we will have to perform backups for multiple worker machines, and we must carefully coordinate data migration and schema changes to ensure that all shards have the same schema copy. To address that problem, we can apply autosharding based on algorithms instead of manually sharding datasets. Hash sharding, shown in figure 2.14, takes the key value of a data shard, which generates a hash value. Then the generated hash value is used to determine where a subset of the dataset should be located. With a uniform hashing algorithm, the hash function can distribute data evenly across different machines, reducing the problems mentioned earlier. In addition, data with shard keys that are close to one another are unlikely to be placed in the same shard.
注意:虽然这里介绍的是分片的概念,但这个概念并不新鲜。它经常被用于分布式数据库。 分片对于解决分布式数据库遇到中的各种挑战非常有用,例如它可以提高数据库的可用性、吞吐量和减少查询响应时间。
分片本质上是一种水平的数据拆分方式,每个分区包含了整个数据集的部分数据,因此分片也称为水平拆分。 水平拆分和垂直拆分之间的区别来自于数据库的传统表格视图。 数据库可以垂直拆分(将同一表中不同列的数据存储在不同的数据库中)或水平拆分(将同一表中不同行的数据存储在多个数据库中)。 图 2.12 比较了垂直拆分和水平拆分。 请注意,对于垂直拆分,我们将数据库按列分割。 有些列可能是空的,这就是为什么我们在图右侧的分区中只看到 5 行中的 3 行。
这种分片模式可以用代码 2.6 中的伪代码来概括,首先,我们在一个工作节点(标号为 rank 0)中创建数据分片,然后将其发送到所有其他的工作节点。 接下来,在每个节点上,我们不断尝试在本地读取下一个分片用于模型训练,直到所有分片都读取完毕。
图 2.12 垂直拆分与水平拆分(来源:YugabyteDB,遵循 Apache License 2.0 许可协议)
代码 2.6 分片伪代码
借助分片模式,我们可以将超大的数据集分割成多个数据分片,这些数据分片可以分布在多个工作节点上,然后每个节点负责独立地消费各个数据分片。 这样就避免了由于使用批处理模式而引起缓慢的串行模型训练。 有些时候,将大数据集分成不同大小的数据分片也是非常实用的,这样每个工作节点可以灵活地根据自身可用的计算资源来运行不同的计算任务,消费不同大小的数据分片。我们将在第9.1.2节中应用这种模式。
2.4.3 讨论 我们已经成功地使用分片模式将一个超大型的数据集分割成多个数据分片,这些数据分片分布在多个工作节点上,然后通过添加额外的工作节点来加速训练过程,这些节点负责独立地对每个数据分片进行模型训练。 有了这种方法,我们可以在超大型的数据集上训练机器学习模型。
现在问题来了:随着数据集不断增长,并且我们需要将新产生的数据传递给模型进行训练,应该怎么做呢? 在数据集已经更新的情况下,我们不得不定期对数据集重新进行分片,以重新平衡每个数据分片中数据量的大小,确保它们在不同的工作节点之间相对均匀地分配。 在 2.3.2 节中,我们简单地将数据集划分为两个数据不重叠的数据分片,但实际上这种手动分片的方法效果并不理想。 手动分片最大的问题之一就是分片的数据不均匀。 数据不成比例的分布可能会导致分片变得不均匀,一些分片所包含的数据过多,而另一些则相对较少。 这种不平衡可能会导致多个节点的模型训练过程意外挂起,我们将在下一章进一步讨论这个问题。
图 2.13 原始数据集被分割成多个不均匀的数据分片,然后被多个工作节点使用。
图 2.13 中的示例,原始数据集被分割成多个不均匀的数据分片,然后被多个工作节点使用。 最好避免在一个单独的分片中包含过多的数据,这可能会导致训练速度变慢甚至节点宕机。当一个数据集的分片数量太少时,也可能会出现这个问题。 这种方法在开发和测试环境中是可以接受的,但并不推荐在生产环境中使用。
此外,每次当数据集更新时,使用手动分片的方式,操作起来并不简单。 我们需要为多个工作节点的数据做备份,并且要小心地做数据迁移和调整。 为了解决这个问题,我们可以基于算法自动进行分片。哈希分片,如图 2.14 所示,获取数据分片的键值,生成哈希值。 然后使用生成的哈希值来确定数据子集应该位于哪个节点。 通过统一的哈希函数可以将数据均匀分布在不同的节点上,解决前面提到的问题。 此外,键值相近的数据不太可能被放置在同一个分片中。
Figure 2.14 A diagram of hash sharding. A hash value is generated to determine where a subset of the dataset should be located. (Source: YugabyteDB, licensed under Apache License 2.0)
The sharding pattern works by splitting extremely large datasets into multiple data shards spread among multiple worker machines; then each of the worker machines is responsible for consuming individual data shards independently. With this approach, we can avoid the slowness of sequential model training due to the batching pattern. Both the batching and sharding patterns work well for the model training process; eventually, the dataset will be iterated thoroughly. Some machine learning algorithms, however, require multiple scans of the dataset, which means that we might perform batching and sharding twice. The next section introduces a pattern to speed up this process.
2.4.4 Exercises 1 Does the sharding pattern introduced in this section use horizontal partitioning or vertical partitioning? 2 Where does the model read each shard from? 3 Is there any alternative to manual sharding?
2.5 Caching pattern Let’s recap the patterns we’ve learned so far. In section 2.3, we successfully used the batching pattern to handle and prepare large datasets for model training when the machine learning framework could not handle large datasets or the underlying implementation of the framework required domain expertise. With the help of batching, we can process large datasets and perform expensive operations under limited memory. In section 2.4, we applied the sharding pattern to split large datasets into multiple data shards spread among multiple worker machines. We speed up the training process as we add more worker machines that are responsible for model training on each data shard independently. Both of these patterns are great approaches that allow us to train machine learning models on large datasets that won’t fit on a single machine or that slows down the model training process. One fact that I haven’t mentioned is that modern machine learning algorithms, such as tree-based algorithms and deep learning algorithms, often require training for multiple epochs. Each epoch is a full pass-through of all the data we are training on, when every sample has been seen once. A single epoch refers to the single time the model sees all examples in the dataset. A single epoch in the Fashion-MNIST dataset means that the model we are training has processed and consumed all the 60,000 examples once. Figure 2.15 shows model training for multiple epochs.
Figure 2.15 A diagram of model training for multiple epochs at time t0, t1, and so on
Training these types of machine learning algorithms usually involves optimizing a large set of parameters that are heavily interdependent. In fact, it can require a lot of labeled training examples to get the model close to the optimal solution. This problem is exacerbated by the stochastic nature of batch gradient descent in deep learning algorithms, in which the underlying optimization algorithm is data-hungry. Unfortunately, the types of multidimensional data that these algorithms require, such as the data in the Fashion-MNIST dataset, may be expensive to label and take up large amounts of storage space. As a result, even though we need to feed the model lots of data, the number of samples available is generally much smaller than the number of samples that the optimization algorithm needs to reach a good-enough solution. There may be enough information in these training samples, but the gradient descent algorithm takes time to extract it. Fortunately, we can compensate for the limited number of samples by making multiple passes over the data. This approach gives the algorithm time to converge without requiring an impractical amount of data. In other words, we can train a good-enough model that consumes the training dataset for multiple epochs.
图 2.14 哈希分片示意图。生成哈希值以确定数据子集应位于哪个节点。 (来源:YugabyteDB,遵循 Apache License 2.0 许可协议)
分片模式通过将一个超大型的数据集分割成多个数据分片,这些分片分布在多个工作节点上;然后每个节点独立地消费各自的数据分片。 采用这种方法,避免了由于使用批处理模式而引起缓慢的串行模型训练。 批处理和分片模式都非常适合用于模型训练;最终,整个数据集将被更新迭代。 然而,一些机器学习算法需要对数据集进行多次扫描,这意味着我们可能会执行多次批处理和分片的步骤。 下一节将介绍一种加速此过程的模式。
2.4.4 练习 1 本节介绍的分片模式是采用水平拆分还是垂直拆分的方式? 2 模型从哪里读取到每个数据分片? 3 除了手动数据分片之外是否还有其他方式对数据进行分片?
2.5 缓存模式 回顾一下到目前为止我们所学的模式。在 2.3 节中,当机器学习框架无法处理大型数据集时,我们使用批处理模式来预处理用于模型训练的大型数据集。 使用批处理模式,我们可以处理大型数据集并在有限的内存下执行耗费计算资源的操作。 在 2.4 节中,我们使用分片模式将大型数据集分割成多个数据分片并分布在多个工作节点上。 随着更多工作节点的添加,它们能够并行且独立地对每个数据分片进行模型训练,从而加快了训练速度。 这两种模式都是行之有效的方法,使我们能够在单机无法存储大型数据集时或因数据集过大导致模型训练速度变慢时,依然能够高效地进行模型训练。 值得一提的是,现代机器学习算法(例如:基于树的算法和深度学习算法)通常需要进行多次迭代(epoch)训练。 每一次迭代是对训练数据集的完整遍历,即数据集中的每个样本都被遍历处理一次。也就是模型完成了一次数据集所有样本的读取和处理。 例如,使用 Fashion-MNIST 数据集完成了一次迭代,指的是模型已经处理了一次该数据集中的所有 60,000 个样本。 图 2.15 展示了多次迭代的模型训练。
图2.15 在 t0、t1 时刻的多次迭代模型训练图
训练这类机器学习算法通常需要优化大量相互依赖的参数。 事实上,它可能需要大量标记训练样本才能使模型接近最优解。 深度学习中的梯度下降算法(Gradient Descent)加剧了这个问题,该算法的优化需要大量的数据样本。 不巧的是,这些算法所需的多维数据类型(例如 Fashion-MNIST 数据集中的数据)的标记成本很高,并且会占用大量存储空间。 因此,尽管我们需要向模型提供大量数据,但可用样本的数量通常远小于通过算法优化达到期望效果所需的样本数量。 这些训练样本中可能包含足够的信息,但梯度下降算法提取这些信息需要大量时间。 幸运的是,我们可以通过对数据进行多次遍历来弥补样本数量有限带来的问题。 通过多次迭代的方法减少单次迭代提取的样本数据量,从而减少算法提取样本数据信息的时间, 换句话说,我们可以通过多次迭代训练数据集得到一个“足够好”的模型。
2.5.1 The problem: Re-accessing previously used data for efficient multi-epoch model training
Now that we know that we can train a machine learning model for multiple epochs on the training dataset, let’s assume that we want to do this on the Fashion-MNIST dataset. If training one epoch on the entire training dataset takes 3 hours, we need to double the amount of time spent on model training if we want to train two epochs, as shown in figure 2.16. In real-world machine learning systems, an even larger number of epochs is often required, so this approach is not efficient.
Figure 2.16 A diagram of model training for multiple epochs at time t0, t1, and so on. We spent 3 hours on each epoch.
2.5.2 The solution Given the unreasonable amount of time needed to train a machine learning model for multiple epochs, is there anything we can do to speed up the process? There isn’t anything we can do to improve the process for the first epoch because that epoch is the first time that the machine learning model sees the entire set of training datasets.
What about the second epoch? Can we make use of the fact that the model has already seen the entire training dataset once? Assume that the laptop we are using to train the model has sufficient computational resources, such as memory and disk space. As soon as the machine learning model consumes each training example from the entire dataset, we can hold off recycling, instead keeping the consumed training examples in memory. In other words, we are storing a cache of the training examples in the form of in-memory representation, which could provide speed-ups when we access it again in the following training epochs. In figure 2.17, after we finish fitting the model for the first epoch, we store a cache for both of the batches that we used for the first epoch of model training. Then we can start training the model for the second epoch by feeding the stored in-memory cache to the model directly without having to read from the data source again for future epochs.
Figure 2.17 A diagram of model training for multiple epochs at time t0, t1, and so on, using a cache instead of reading from the data source again
This caching pattern can be summarized as the pseudocode in the following listing. We read the next batch to train the model and then append this batch to the initialized cache during the first epoch. For the remaining epochs, we read batches from the cache and then use those batches for model training.
Listing 2.7 Pseudocode for caching
If we have performed expensive preprocessing steps on the original dataset, we could cache the processed dataset instead of the original dataset and avoid wasting time by processing the dataset again. The pseudocode is shown in the following listing.
Listing 2.8 Pseudocode for caching with preprocessing
Note that listing 2.8 is similar to listing 2.7. Two slight differences are that we initialize the cache with the preprocessed batch instead of the raw batch, as in listing 2.7, and we read the processed batch from the batch directly without having to preprocess the batch again before model training. With the help of the caching pattern, we can greatly speed up re-access to the dataset for a model training process that involves training on the same dataset for multiple epochs. Caching can also be useful for recovering from any failures quickly; a machine learning system can easily re-access the cached dataset and continue the rest of the processes in the pipeline. We’ll apply this pattern in section 9.1.1.
2.5.1 问题: 重新访问之前使用过的数据以进行高效的多轮模型训练
现在我们可以在训练数据集上进行多轮训练,假设我们想要用此方法在 Fashion-MNIST 数据集上做模型训练。 如图 2.16 所示,如果在整个训练数据集上迭代一次需要 3 个小时,那么如果我们想要做两轮迭代,模型训练所花费的时间将会翻倍。 在实际的机器学习系统中,通常需要多次迭代,因此这种方法效率不高。
图 2.16 在时间 t0、t1 等时刻的多轮模型训练示意图。我们在每轮迭代上花费了 3 个小时。
2.5.2 解决方案 考虑到模型多轮训练所需的时间较长,我们可以采取什么措施来加快这一过程? 对于第一轮迭代,我们无法优化数据集的读取时间,因为这是模型第一次读取整个训练数据集。
那么第二轮迭代呢?我们可以利用之前迭代中模型所使用的训练数据集吗? 假设我们用于训练模型的笔记本电脑具有足够的计算资源(例如:内存和磁盘空间)。 一旦机器学习模型使用了整个数据集中的每个样本,我们不立即回收样本数据,而是将它们保留在内存中。 也就是说,我们在内存中缓存了训练样本,这可以加快后续训练迭代中再次访问样本的速度。 在图 2.17 中,在完成第一轮迭代的模型训练后,我们将这一轮迭代所使用的两个数据批次缓存到内存中。 在第二轮迭代训练开始时,我们可以通过将缓存在内存中的数据直接提供给模型,而无需在未来的迭代中再次从数据源中读取数据。
图 2.17 t0、t1 时刻使用缓存而并非从数据源读取训练数据的多轮模型训练示意图
这种缓存模式可以用以下伪代码概括。 在第一轮迭代训练开始时,依次读取数据集中每一个 batch 的数据,将其追加到初始化缓存中。 对于剩余轮次迭代,我们直接从缓存中读取数据,然后使用这些数据进行模型训练。
代码 2.7 缓存的伪代码
如果我们在原始数据集上执行了耗时的预处理步骤,我们可以直接缓存预处理过的数据集,而不缓存原始数据集,这样避免了需要多次预处理数据集而耗费大量时间。伪代码如下所示。
代码 2.8 缓存预处理数据的伪代码
代码 2.8 与代码 2.7 类似。 两者的细微差别在于,我们初始化缓存的是预处理 batch 的数据而不是原始 batch 的数据,如代码 2.7 所示。因此,在每次模型训练前,我们直接从缓存中读取预处理过的数据,无需再对数据进行预处理。 借助缓存模式,我们可以大大加快模型训练过程中重复访问数据集的速度,该过程涉及在多个迭代轮次对同一数据集进行训练。 缓存还有助于快速故障恢复;机器学习系统可以轻松地重复访问缓存的数据集,并继续处理机器学习流水线中的其余流程。我们将在第 9.1.1 节中应用此模式。
2.5.3 Discussion We have successfully used the caching pattern to store the cache in memory on each worker machine, speeding up the process of accessing previously used data for multiple epochs of model training. What if a failure happens on the worker machine? If the training process gets killed due to an out-of-memory error, for example, we would lose all the previously stored cache in memory.
To avoid losing the previously stored cache, we can write the cache to disk instead of storing it in memory and persist it as long as the model training process still needs it. This way, we can easily recover the training process by using a previously stored cache of training data on disk. Chapter 3 discusses in depth how to recover the training process or make the training process more tolerant of failure. Storing the cache on disk is a good solution. One thing to note, however, that reading from or writing to memory is about six times faster when we are doing sequential access but about 100,000 times faster when we are doing random access rather than accessing from disk. Random-access memory (RAM) takes nanoseconds, whereas hard drive access speed is measured in milliseconds. In other words, there’s a tradeoff between storing a cache in memory and storing it on a disk due to the difference in access speedspeed. Figure 2.18 provides a diagram of model training with an on-disk cache.
Figure 2.18 A diagram of model training for multiple epochs at time t0, t1, and so on with an on-disk cache
Generally speaking, storing a cache on disk is preferable if we want to build a more reliable and fault-tolerant system; storing a cache in memory is preferable when we want to have more efficient model training and data ingestion processes. An on-disk cache can be extremely useful when the machine learning system requires reading from remote databases, whereas reading from memory cache is much faster than reading from remote databases, especially when the network connection isn’t fast and stable enough. What if the dataset gets updated and accumulated over time, as in section 2.3.3, where the data shard on each worker machine needs to be redistributed and balanced? In this case, we should take the freshness of the cache into account and update it on a schedule based on the specific application.
2.5.4 Exercises 1 Is caching useful for model training that requires training on the same dataset or on a different dataset for multiple epochs? 2 What should we store in the cache if the dataset needs to be preprocessed? 3 Is an on-disk cache faster to access than an in-memory cache?
2.6 Answers to exercises
Section 2.3.4 1 Sequentially 2 Yes. That’s one of the main use cases of batching. 3 No
Section 2.4.4 1 Horizontal partitioning 2 Locally on each worker machine 3 Automatic sharding, such as hash sharding
Section 2.5.4 1 Same dataset 2 We should store the preprocessed batches in the cache to avoid wasting time on preprocessing again in the following epochs. 3 No. Generally, an in-memory cache is faster to access.
Summary Data ingestion is usually the beginning process of a machine learning system, responsible for monitoring any incoming data and performing necessary processing steps to prepare for model training. The batching pattern helps handle large datasets in memory by consuming datasets in small batches. The sharding pattern prepares extremely large datasets as smaller chunks that are located on different machines. The caching pattern makes data fetching for multiple training rounds more efficient by caching previously accessed data that can be reused for the additional rounds of model training on the same dataset.
2.5.3 讨论 我们已经成功地使用缓存模式,将缓存存储在每个工作节点的内存中,这加快了在多个训练轮次中重复访问之前使用过的数据的过程。 但如果工作节点出现故障了怎么办? 例如,如果训练过程因内存不足而异常终止,我们将丢失之前存储在内存中的所有缓存。
为了避免丢失缓存,与其将缓存存储在内存中,不如将缓存写入到磁盘中,并在模型训练过程需要用到时一直保持持久化。 这样,我们可以使用磁盘上的训练数据缓存轻松恢复训练。 第3章深入讨论了如何恢复模型训练或使模型训练有更强的故障容忍能力。 将缓存存储在磁盘上是一个很好的解决方案。 然而需要注意的是,顺序访问数据时,内存读取或写入的速度大约是磁盘的六倍。随机访问数据时,内存读取或写入的速度大约是磁盘的 10 万倍。随机内存访问 (RAM) 的速度是纳秒级的,而硬盘的访问速度是毫秒级的。 换句话说,由于访问速度的差异,我们需要权衡将缓存存储在内存中还是将其存储在磁盘上。 图 2.18 提供了使用磁盘缓存数据进行模型训练的图表。
图 2.18 在时间 t0、t1 等时刻带有磁盘缓存的多轮次模型训练示意图
一般来说,如果我们想要构建一个可靠性和容错能力更强的系统,那么将缓存存储在磁盘上是更好的选择;而当我们想要更高效的模型训练和数据摄取过程时,将缓存存储在内存中是更好的选择。 当机器学习系统需要从远程数据库读取时,可以使用磁盘缓存。而当网络速度较慢或不稳定时,从内存缓存读取比从远程数据库读取要快得多。
如果数据集随着时间的推移而更新和累积,如第 2.3.3 节所示,每个工作节点上的数据分片需要重新分配和平衡,该怎么办? 在这种情况下,我们应该考虑缓存的有效期,并根据具体应用场景定期更新缓存。
2.5.4 练习 1 缓存一般作用于哪种模型训练场景?基于同一个数据集还是不同数据集上进行多轮次训练的场景? 2 如果数据集需要预处理,我们应该在缓存中存储什么数据? 3 磁盘缓存的访问速度是否比内存缓存更快?
2.6 习题答案
第2.3.4节 1 顺序地。 2 是的,这是批处理的主要使用场景之一。 3 否。
第2.4.4节 1 水平拆分。 2 在每个工作节点上本地读取。 3 自动分片,例如哈希分片。
第2.5.4节 1 基于同一个数据集 2 我们应该将预处理过的 batch 存储在缓存中,以避免在后续训练轮次中耗费时间进行重复地预处理。 3 不是,通常内存的访问速度更快。
总结 数据摄取通常是机器学习系统的起始阶段,它负责读取数据并对其执行必要的预处理步骤,为后续模型训练做准备。 批处理模式通过从数据集中划分多个小的 batch 来处理大型数据集。 分片模式将超大数据集划分为多个小数据块,存储在不同节点上。 缓存模式通过缓存之前访问过的数据,加快多轮训练的数据读取速度,这些数据可以在同一数据集上多个轮次的模型训练中重复使用。
加载Python内置的OS库,用于加载与PostgreSQL数据库相关的环境变量 构造用于访问 PostgreSQL 数据库的端点 从数据库中的 AirQualityUCI 表中选择两列并实例化 tf.data.Dataset 对象 检查数据集的规格,例如每列的形状和数据类型
两批数据集依次用于模型训练。
读取数据集中的下一批 使用该批次训练模型 训练当前批次后读取下一批
两批数据集依次用于模型训练。
垂直分区将不同的表列存储在单独的数据库中。 水平分区将同一个表的行存储在多个数据库中。
从等级 0 的工作机器创建分片并将其发送到所有其他工作机器 读取该工作机器中本地可用的下一个分片 使用我们刚刚从本地工作机器读取的分片来训练模型 一旦我们完成当前分片的训练,就读取下一个分片
哈希函数采用数据分片的键值来生成哈希值。 根据生成的哈希值,数据集的子集位于集群的不同机器上。
读取下一批数据集 初始化该批次的缓存 通过迭代批次来训练模型 使用之前缓存的批次来多轮次训练模型
使用预处理批次初始化缓存 从缓存中检索已处理的批次并将其用于模型训练
请注意,缓存存储在磁盘上,并且其读/写操作通常比内存缓存慢。