Posted by Hao Liang's Blog on Monday, January 1, 0001

完整实现

本章内容 使用 TensorFlow 实现数据摄取组件 定义机器学习模型并提交分布式模型训练作业 实现单实例模型服务器和副本模型服务器 为机器学习系统构建高效的端到端工作流

在上一章中,我们学习了将在项目中使用的 4 种核心技术的基础知识:TensorFlow、Kubernetes、Kubeflow 和 Argo Workflows。 我们了解到 TensorFlow 执行数据处理、模型构建和模型评估。 还学习了 Kubernetes 的基本概念,并启动了本地 Kubernetes 集群,我们将使用它作为我们的核心分布式基础设施。 此外,我们还成功使用 Kubeflow 向本地 Kubernetes 集群提交了分布式模型训练作业。 最后,我们学习了如何使用 Argo Workflows 构建和提交基本的 hello world 工作流和复杂的有向无环图(DAG)结构的工作流。

在本章中,我们将使用第 7 章中设计的架构来实现端到端机器学习系统。 我们将使用之前讨论的模式完整地实现每个组件。 例如,使用一些流行的框架和前沿技术,特别是在第 8 章中介绍过的 TensorFlow、Kubernetes、Kubeflow、Docker 和 Argo Workflows 来构建本章中分布式机器学习工作流的不同组件。

9.1 数据摄取 我们端到端工作流中的第一个组件是数据摄取。 我们将使用第 2.2 节中介绍的 Fashion-MNIST 数据集来构建数据摄取组件。 图 9-1 在端到端工作流左侧的深色框中显示了该组件。

图 9-1 端到端机器学习系统中的数据摄取组件(深色框)

The machine learning workflow is triggered. 机器学习工作流被触发。

Has the data been updated recently? 数据最近是否有更新?

回想一下,该数据集由 60,000 个训练样本和 10,000 个测试样本组成。 每个样本都是一个 28 × 28 的灰度图像,代表了 Zalando 的一个商品图像,并与 10 个类别中的一个标签相关联。 此外,Fashion-MNIST 数据集旨在作为原始 MNIST 数据集的直接替代品,用于对机器学习算法进行基准测试。 它保持了相同的图像大小以及训练和测试分割的结构。 图 9-2 是 Fashion-MNIST 数据集的 10 个类别(T 恤/上衣、裤子、套头衫、连衣裙、外套、凉鞋、衬衫、运动鞋、包和靴子)的图像集合,其中每个类别在图像集中占 3 行。

图 9-3 训练集中的前几个样本图像以及每个图像对应的文本标签。

在第 9.1.1 节中,我们将介绍摄取 Fashion-MNIST 数据集的单节点数据流水线的实现。 此外,第 9.1.2 节将介绍分布式数据流水线的实现,为第 9.2 节中的分布式模型训练准备数据。

图 9-2 Fashion-MNIST 数据集的 10 个类别(T 恤/上衣、裤子、套头衫、连衣裙、外套、凉鞋、衬衫、运动鞋、包和靴子)的图像集合

Every three rows represent example images that represent a class. For example, the top three rows are images of T-shirts. 每 3 行代表代表一个类别的样本图像。 例如,前 3 行是 T 恤图像。

图 9-3 训练集中的前几个样本图像及其相应的文本标签

9.1.1 单节点数据流水线 首先,我们来看看如何构建一个单节点数据流水线,这个流水线可以在你的笔记本电脑上本地运行,不需要使用本地 Kubernetes 集群。 用 TensorFlow 编写的机器学习程序消费数据的最佳方式是通过 tf.data 模块中的方法进行消费。 tf.data API 能够让用户轻松构建复杂的输入流水线。 例如,图像模型的流水线可能会聚合各个文件系统中的数据,随机转换每个图像,并从图像中创建多个批次进行模型训练。

用户通过 tf.data API 能够处理大量数据,从不同的数据格式中读取数据并执行复杂的转换操作。 它包含一个 tf.data.Dataset 抽象,代表一个元素列表,其中每个元素由一个或多个组件组成。 让我们使用图像流水线来说明这一点。 图像输入流水线中的元素可能是单个训练样本,其中一对张量代表了该图像及其标签。

以下代码提供了将 Fashion-MNIST 数据集加载到 tf.data.Dataset 对象中的代码片段,并执行一些必要的预处理步骤来为我们的模型训练做准备: 1 将数据集从范围 (0, 255] 缩放到 (0., 1.]。 2 将图像的多维数组转换为我们的模型可以接受的 float32 类型。 3 选择训练数据,将其缓存在内存中以加快训练速度,并以缓冲区大小为 10,000 的方式打乱。

代码 9-1 加载 Fashion-MNIST 数据集

我们导入了 tensorflow_datasets 模块。 TensorFlow Datasets 模块由用于图像分类、对象检测、文档摘要等各种任务的数据集组成,可与 TensorFlow 和其他 Python 机器学习框架一起使用。

tf.data.Dataset 对象是一个打乱的数据集,其中每个元素由图像、标签、shape 和数据类型信息组成,如下所示。

代码 9-2 检查 tf.data 对象

9.1.2 分布式数据流水线 现在让我们看看如何分布式地消费数据集。 我们将在下一节中使用 tf.distribute.MultiWorkerMirroredStrategy 策略进行分布式训练。 假设我们已经实例化了一个策略对象。 我们将在策略的范围内通过 Python 的 with 语法实例化我们的数据集,使用之前为单节点用例定义的相同函数。

我们需要调整一些配置来构建分布式输入流水线。 首先,我们创建重复的数据批次,其中总 batch size 等于每个副本的 batch size 乘以聚合梯度的副本数量。 这确保了我们有足够的数据来训练每个模型训练节点中的每个批次。 换句话说,同步中的副本数量等于模型训练期间参与梯度 allreduce 操作的设备数量。 例如,当用户或训练代码在分布式数据迭代器上调用 next() 函数时,每个副本上都会返回一个 batch size 大小的数据。 重新批处理(rebatch)的数据集大小将始终是副本数量的倍数。

此外,我们希望配置 tf.data 以启用自动数据分片。 由于数据集是分布式的,因此在多机训练模式下输入的数据集将自动分片。 更具体地说,每个数据集将在工作节点的 CPU 设备上创建,并且当 tf.data.experimental.AutoShardPolicy 设置为 AutoShardPolicy.DATA 时,每组工作节点将在整个数据集的子集上训练模型。 这样的好处是,在每个模型训练步骤中,每个工作节点将处理一组全局 batch size 的非重叠数据集元素。 每个工作节点将处理整个数据集并丢弃不属于自己的部分。 为了使此模式正确划分数据集元素,数据集需要按确定的顺序生成元素,这通过 TensorFlow Datasets 库来实现。

代码 9-3 配置分布式数据流水线

9.2 模型训练 我们已经介绍了本地节点和分布式数据流水线的数据摄取组件的实现,并讨论了如何在不同的工作节点之间正确地对数据集分片,以便它能够与分布式模型训练一起工作。 在本节中,我们将深入探讨模型训练组件的实现细节。 模型训练组件的架构图如图 9-4 所示。

图 9-4 整体架构中的模型训练组件示意图。 在 3 个不同的模型训练步骤后是模型选择步骤。 这些模型训练步骤将训练出 3 个不同的模型:CNN、带 dropout 层的 CNN 和带 batch normalization 层的 CNN,它们之间相互竞争以获得更好的统计性能。

Three mode training steps train different models. 3 个模式训练步骤分别训练不同的模型。

This step picks the top model that will be used in the following two separate model serving steps. 这个步骤选择将在接下来的两个独立的模型服务步骤中使用的最优模型。

我们将在第 9.2.1 节中学习如何使用 TensorFlow 来定义这 3 个模型,并在第 9.2.2 节中学习如何使用 Kubeflow 执行分布式模型训练作业。 在第 9.2.3 节中,我们将实现模型选择步骤,该步骤选择将在接下来的模型服务组件中使用的最优模型。

9.2.1 模型定义和单节点训练 接下来,我们将使用 TensorFlow 代码来定义和初始化第一个模型,即我们在前面的章节中介绍的带有 3 个卷积层的卷积神经网络 (CNN) 模型。 首先,我们使用 Sequential() 初始化模型,这意味着我们将按顺序地添加层。 第一层是输入层(input layer),我们在其中指定之前定义的输入流水线的 shape。 同时,我们为输入层命了名,以便我们可以在推理输入中传递正确的键,我们将在第 9.3 节中更深入地讨论这一点。

添加了输入层后,我们在顺序模型中添加了 3 个卷积层(convolutional layers),然后是最大池化层(max-pooling layers)和密集层(dense layers)。 然后,我们将打印出模型架构的摘要,并以 Adam 作为优化器(optimizer)、以准确性作为评估模型的指标、以稀疏分类交叉熵(sparse categorical cross-entropy)作为损失函数来编译模型。

代码 9-4 定义基本的 CNN 模型

我们成功定义了基本的 CNN 模型。 接下来,我们再基于 CNN 模型定义 2 个模型。 第一个模型添加了 batch normalization 层,让特定层中每个神经元的预激活均值和单位标准差为零。 另一个模型添加了 dropout 层,其中一半的隐藏单元将被随机丢弃,以降低模型的复杂性并加快计算速度。其余代码与基本 CNN 模型相同。

代码 9-5 定义基本的 CNN 模型变体

模型定义好后,我们可以在笔记本电脑上本地训练它们。 我们以基本的 CNN 模型为例,创建 4 个将要在模型训练期间执行的回调函数: 1 PrintLR:在每个 epoch 结束时打印学习率的回调函数 2 TensorBoard:启动交互式 TensorBoard 可视化的回调函数,以监控训练进度和模型架构 3 ModelCheckpoint:保存模型权重以供后续模型推理的回调函数 4 LearningRateScheduler:在每个 epoch 结束时衰减学习率的回调函数

一旦定义了这些回调函数,我们将其传递给 fit() 方法进行训练。 fit() 方法使用指定的 epoch 数量和每个 epoch 的 step 数量来训练模型。 请注意,这里的数字仅用于演示,是为了加快我们的本地实验速度,并不足以在实际应用中训练出高质量的模型。

代码 9-6 使用回调函数进行模型训练

我们将在日志中看到如下的模型训练进度:

基于这个摘要,在此过程中将训练 93,000 个参数。 每层参数的 shape 和数量也可以在摘要中找到。

9.2.2 分布式模型训练 现在我们已经定义了模型并可以在单机中本地训练它们,下一步在代码中插入分布式训练逻辑,以便我们可以使用之前介绍过的集合通信模式。我们将使用包含 MultiWorkerMirroredStrategy 策略的 tf.distribute 模块。 这是一个用于在多个工作节点上进行同步训练的分布式策略。 它会在所有工作节点的每个设备上创建模型层中所有变量的副本。 该策略使用分布式集合通信实现(例如:allreduce),因此多个节点可以同时工作以加快训练速度。 如果你没有合适的 GPU,你可以将 communications_options 替换为其他实现方式。 由于我们希望分布式训练可以在没有 GPU 的多个节点上运行,因此我们将其替换为 CollectiveCommunication.AUTO,以便它自动选择任何可用的硬件。

一旦我们定义了分布式训练策略,我们将根据该策略启动分布式输入数据流水线(如前面第 9.1.2 节中所述)和模型。 需要注意的是,我们必须在 strategy.scope() 内定义模型,因为 TensorFlow 知道如何根据策略将模型层中的变量复制到每个工作节点中。 这里我们根据传递给 Python 脚本的命令行参数定义不同的模型类型(CNN、带 dropout 层的 CNN 和带 batch normalization 层的 CNN)。 其他的命令行参数我们很快就会讨论到。 在 strategy.scope() 中定义了数据流水线和模型后,我们在 strategy.scope() 外使用 fit() 函数训练模型。

代码 9-7 分布式模型训练逻辑

通过 fit() 函数完成模型训练后,我们需要保存模型。 用户容易犯的一个常见错误是在所有工作节点上保存模型,这样模型可能无法正常保存,并且计算和存储资源会被浪费。 保存模型的正确方法是只将模型保存在主(chief)节点上。 我们可以检查环境变量 TF_CONFIG,它包含集群信息,例如:任务类型和索引,通过该环境变量可以知道该工作节点是否为主节点。 此外,我们希望将模型保存到节点的唯一路径下,以避免意外错误。

代码 9-8 使用主节点保存模型

到目前为止,我们已经看到了两个命令行参数,即 saved_model_dir 和 model_type。 代码 9-9 提供了解析这些命令行参数的逻辑。 除了这两个参数之外,还有另一个 checkpoint_dir 参数,我们使用它来将模型保存为 TensorFlowSavedModel 格式,该格式可以被模型服务组件解析并使用。 我们将在 9.3 节中详细讨论这个问题。 除此之外,我们还禁用了 TensorFlow Datasets 模块的进度条,以减少日志输出。

代码 9-9 主函数入口

我们刚刚编写完包含分布式模型训练逻辑的 Python 脚本。 现在将其容器化,并构建用于在本地 Kubernetes 集群中运行的分布式训练镜像。 在 Dockerfile 中,我们将使用 Python 3.9 基础镜像,通过 pip 安装 TensorFlow 和 TensorFlow Datasets 模块,并复制多机分布式训练 Python 脚本。

代码 9-10 容器化

然后我们从刚刚定义的 Dockerfile 构建镜像。 由于集群还无法访问本地镜像仓库,我们还需要将镜像导入到 k3d 集群中。 然后我们将当前命名空间切换到 kubeflow。 请参考第 8 章并按照说明安装该项目所需的组件。

代码 9-11 构建并导入 docker 镜像

一旦 Pod 执行完成,Pod 中的所有文件都将被回收。 由于我们在 Kubernetes Pod 中跨多个工作节点运行分布式模型训练,因此所有模型检查点存档(checkpoint)都将丢失。 为了解决这个问题,我们将使用 PersistentVolume (PV) 和 PersistentVolumeClaim (PVC) 作为持久化存储。

PV 是集群中由管理员配置或组件动态配置的存储。 它是集群中的一种资源,就像 node 是集群的一种资源一样。 PV 是像 Volumes 一样的卷插件,但是它们的生命周期独立于使用 PV 的任何一个 Pod。 换句话说,即使 Pod 运行完成或被删除后,PV 仍将持续存在。

PVC 是用户侧对存储的请求。 它类似于 Pod。 Pod 消耗 node 资源,PVC 消耗 PV 资源。 Pod 可以请求特定级别的资源(CPU 和内存)。 通过 PVC 可以请求特定的存储空间大小和访问模式(例如,它们可以被挂载为 ReadWriteOnce、ReadOnlyMany 或 ReadWriteMany 访问模式)。

让我们创建一个 PVC 来提交存储请求,该存储请求将在 Pod 中用于存储训练好的模型。 在这里,我们提交一个请求,需要 1 Gi 的存储空间,访问模式为 ReadWriteOnce。

代码 9-12 PVC 配置声明

接下来,创建 PVC。

代码 9-13 创建 PVC

接下来,让我们定义第 7 章中介绍的 TFJob 配置,它使用了刚刚构建的包含分布式训练脚本的镜像。 我们将必要的命令行参数传递给容器来训练基本的 CNN 模型。 在 Worker 字段中,volumes 字段指定了我们刚刚创建的 PVC 的名称,而 containers 字段中的 volumeMounts 字段指定在容器和数据卷之间挂载的目录。 该模型将保存在数据卷中的 /trained_model 目录下。

代码 9-14 分布式模型训练作业配置声明

然后将这个 TFJob 提交到集群来开始分布式模型训练。

代码 9-15 提交 TFJob

一旦 Pod 运行完成,我们从 Pod 的日志中可以看到当前以分布式的方式训练了模型,并且工作节点之间正常地进行通信:

9.2.3 模型选择 到目前为止,我们已经实现了分布式模型训练组件。 我们最终将训练 3 个不同的模型,如第 9.2.1 节中所述,然后选择最优的模型进行模型服务。 假设我们已经提交了 3 个不同的 TFJobs,每个 TFJob 使用不同的模型类型完成了训练。

接下来,我们编写 Python 代码来加载测试数据集和训练好的模型,评估它们的性能。 我们将通过 keras.models.load_model() 函数从不同的目录加载每个训练好的模型,并执行 model.evaluate() 来返回损失值和准确率。 一旦我们找到准确率最高的模型,我们就可以将该模型复制为一个新的版本,放在一个新的目录中,命名为版本 4,该目录将被模型服务组件使用。

代码 9-16 模型评估

然后,trained_model/saved_model_versions 目录下的最新版本(版本 4),将被选取用于服务组件。 我们将在下一节中具体讨论这个问题。

然后,我们将此 Python 脚本添加到 Dockerfile 中,重新构建容器镜像,并创建运行模型选择组件的 Pod。 以下是模型选择 Pod 的 YAML 配置文件。

代码 9-17 模型选择的 Pod 配置声明

查看日志,发现第三个模型的准确率最高,因此我们把它复制成一个新的版本,供模型服务组件使用:

9.3 模型服务 现在我们已经实现了分布式模型训练以及在训练好的模型中进行模型选择。 接下来将要实现的下一个组件是模型服务组件。 模型服务组件对于最终用户体验至关重要,因为其结果将直接展示给用户,如果性能不足,用户能立马感知到。 图 9-5 展示了整体架构中的模型训练组件。

图 9-5 端到端机器学习系统中的模型服务组件(深色框)

The results from the two model serving steps are then aggregated via a result aggregation step to present to users. 通过结果聚合步骤聚合 2 个模型服务步骤的结果,展示给用户。

在图 9-5 中,模型服务组件是处于模型选择和结果聚合步骤之间的 2 个深色框。 我们首先来实现第 9.3.1 节中的单服务器模型推理组件,然后在第 9.3.2 节中使其更具可扩展性并提升性能。

9.3.1 单服务器模型推理 模型推理 Python 代码与模型评估代码非常相似。 唯一的区别是我们在加载训练模型后使用 model.predict() 方法,而不是 evaluate() 方法。 这是评估模型能否按预期进行预测的一个很好的方法。

代码 9-18 模型预测

在安装完成后,你可以使用如下代码在本地启动一个 TensorFlow Serving(https://github.com/tensorflow/serving)服务。

代码 9-19 TensorFlow Serving 命令

如果我们只在本地进行实验,这看起来很简单并且效果很好。 然而,还能使用更高效的方法来构建模型服务组件,这些方法能够为分布式模型服务的成功运行打下基础,这些模型服务采用了我们在前面章节中介绍的副本模型服务器模式。

在深入研究更好的解决方案之前,让我们确保训练的模型可以进行正常地输入预测,输入数据是一个以 instances 和 image_bytes 为键的 JSON 结构图像字节列表,如下所示:

现在是时候修改我们的分布式模型训练代码,以确保模型具有与我们提供的输入兼容的正确服务签名。 我们定义了一个预处理函数,它执行以下操作: 1 从输入的字节中对图像进行解码 2 将图像大小调整为 28 × 28,以兼容我们的模型架构 3 将图像转换为 tf.uint8 类型 4 定义输入签名,类型为 string,键为 image_bytes

定义好了预处理函数,我们就可以通过 tf.TensorSpec() 定义服务签名,然后将其传递给 tf.saved_model.save() 方法来保存与输入格式兼容的模型,并在 TensorFlow Serving 进行调用推理之前对其进行预处理。

代码 9-20 模型服务签名定义

修改了分布式模型训练脚本后,我们可以参考 9.2.2 节重新构建容器镜像并重新开始训练模型。

接下来,我们将使用在第 8 章中提到的 KServe 来创建推理服务。 代码 9-21 提供了定义 KServe 推理服务的 YAML 配置。 我们需要指定模型格式,以便 KServe 知道用什么组件来服务模型(例如:TensorFlow Serving)。 此外,我们需要向训练后的模型提供 URI。 在这种情况下,我们可以指定 PVC 名称和训练模型的路径,格式为 pvc:///

代码 9-21 推理服务配置声明

安装 KServe 并创建推理服务。

代码 9-22 安装 KServe 并创建推理服务

检查服务的状态以确保它已准备好提供服务。

代码 9-23 获取推理服务的详细信息

创建服务后,我们使用 port-forward 命令将其端口转发到本地,以便在本地向其发送请求。

代码 9-24 推理服务端口转发

如果端口转发配置成功,能够看到以下输出:

打开另一个终端并执行以下 Python 脚本,向我们的模型服务服务发送一个推理请求,并打印出响应的文本内容。

代码 9-25 使用 Python 脚本发送推理请求

我们的 KServe 模型服务服务的响应(包括 Fashion-MNIST 数据集中每个类别的预测概率)如下:

也可以使用 curl 来发送请求。

代码 9-26 使用 curl 发送推理请求

输出的概率应该与我们刚刚看到的相同:

如前所述,即使我们在 KServe 的 InferenceService 字段中指定了模型的整个目录,TensorFlow Serving 模型服务组件也会从该目录中选择最新版本(版本 4)的模型,也就是我们在 9.2.3 节中选择的最优模型。我们可以从模型服务 Pod 的日志中看到。

代码 9-27 查看模型服务日志

日志输出:

9.3.2 副本模型服务器 在上一节中,我们成功地在本地 Kubernetes 集群中部署了模型服务。 在本地可以成功运行实验样例,但如果将其部署到生成环境中,提供真实的模型服务时,效果并不理想。 当前的模型服务只部署在单个 Pod 中,它所分配到的计算资源是有限的。 当模型服务请求数量增加时,单实例的模型服务将无法支持,计算资源可能会被耗尽。 这可能足以运行本地服务实验,但如果部署到为真实世界的模型服务流量提供服务的生产系统,那就远远不够理想了。

为了解决这个问题,我们需要有多个模型服务器实例来处理大量的动态模型服务请求。 幸运的是,KServe 使用了 Knative Serving 的自动扩缩容功能,可以根据每个 Pod 的平均请求数进行自动缩放。

以下配置启动了包含自动扩缩容功能的推理服务。 scaleTarget 字段指定了自动扩缩容组件监控指标的目标值。 此外,scaleMetric 字段定义了自动扩缩容组件监控的扩缩容指标类型。 指标包括并发度、RPS、CPU 和内存。 这里我们只允许每个推理服务实例处理一个并发请求。 换句话说,当请求增多时,我们会扩容创建一个新的推理服务 Pod 来处理额外的请求。

代码 9.28 副本模型推理服务

假设没有请求,我们应该只会看到一个正在运行的推理服务 Pod。接下来,发送一个持续 30 秒的突发流量,同时保持 5 个在途请求。 我们使用相同的服务主机名、入口地址、推理输入和训练模型。 然后使用 hey 工具(一个向 Web 应用程序发送请求的小程序)发送请求。 在执行以下命令之前,请按照 https://github.com/rakyll/hey 中的指引进行安装。

代码 9-29 发送流量测试负载

以下是预期的输出,其中包括了推理服务处理请求的过程概要。 例如,该服务平均每秒处理 230,160 字节的推理输入和 95.7483 个请求。 你还可以看到一个响应时间直方图和延迟分布:

正如预期的那样,5 个推理服务 Pod 同时处理请求,其中每个 Pod 仅处理一个请求。

代码 9-30 获取模型服务 Pod 列表

一旦 hey 命令完成,我们将只能看到一个正在运行的 Pod。

代码 9-31 再次获取模型服务 Pod 列表

9.4 端到端工作流 我们刚刚在前面的小节中实现了所有组件,现在是时候把这些组件整合在一起了! 在本节中,我们将使用 Argo Workflows 定义一个包含刚刚实现的组件的端到端工作流。 如果你仍然对这些组件不熟悉,请回到前面的章节,在第 8 章中复习基本的 Argo Workflows 的相关知识。

这里回顾一下我们将要实现的端到端工作流。 图 9-6 是我们正在构建的端到端工作流的示意图。 该图包括 2 个模型服务步骤用于演示,但我们只会在 Argo Workflows 中实现其中一个。 它将根据请求的流量大小自动扩容更多实例,如第 9.3.2 节中所述。

在接下来的部分中,我们将通过使用 Argo 将各个步骤按顺序连接起来组成整个工作流,然后通过实现步骤记忆化来优化后续工作流的执行。

图 9-6 正在构建的端到端机器学习系统的架构图

触发机器学习工作流。

3 个训练步骤训练不同的模型。

此步骤选择将在接下来的 2 个独立模型服务步骤中使用的最优模型。

然后,通过结果聚合步骤将 2 个模型服务步骤的结果聚合,呈现给用户。

数据最近是否有更新?

9.4.1 顺序步骤 首先,让我们看一下 entry point 模板和工作流中涉及的主要步骤。 entry point 模板名称为 tfjob-wf,由以下步骤组成(为简单起见,每个步骤都使用同名的模板): 1 data-ingestion-step 为数据摄取步骤,我们在模型训练之前使用它来下载和预处理数据集。 2 distributed-tf-training-step 是一个包含多个子步骤的步骤组,其中每个子步骤代表一种模型类型的分布式模型训练步骤。 3 model-selection-step 为模型选择步骤,该步骤从前面步骤所训练出的模型中挑选出最优的模型。 4 create-model-serving-service 通过 KServe 创建模型服务。

代码 9-32 工作流 entry point 模板

请注意,我们将 podGC 策略指定为 OnPodSuccess,因为我们将在计算资源有限的本地 k3s 集群中为不同步骤创建大量 Pod,因此在 Pod 运行成功后立即将它删除可以为后续步骤释放计算资源。 也可以使用 OnPodCompletion 策略,它会在 Pod 运行结束后将它删除,无论运行的结果是失败还是成功。 但在这个例子中我们不会使用 OnPodCompletion 策略,因为我们希望保留运行失败的 Pod 来调试问题。

此外,我们还指定了持久化卷和 PVC,以确保可以持久化使用的文件。 我们可以将下载的数据集保存到持久化卷中进行模型训练,然后保留训练后的模型以用于后续的模型服务步骤。

第一步,即数据摄取步骤,实现起来非常简单。 它只指定了容器镜像和要执行的数据摄取 Python 脚本。 Python 脚本为一行代码,用 tfds.load(name=‘fashion_mnist’) 来下载数据集到容器的本地存储,该本地存储将被挂载到持久化卷上。

代码 9-33 数据摄取步骤

下一步是一个由多个子步骤组成的步骤组,其中每个子步骤代表一种模型类型(例如:CNN、带 dropout 层和带 batch normalization 层的 CNN)的分布式模型训练步骤。 以下配置声明了所有子步骤的模板。 多个模型的分布式训练步骤规定了这些步骤将并行执行。

代码 9-34 分布式训练步骤组

我们以第一个子步骤为例,它运行基本的 CNN 分布式模型训练。 该步骤模板的主要内容是 resource 资源字段,包括以下内容:  要执行操作的自定义资源定义 (CRD) 或 manifest。在这个例子中,我们在这个步骤中创建了一个 TFJob。  表示 CRD 是否创建成功的条件。在这个例子中,我们通过 Argo 监控 status.replicaStatuses.Worker.succeeded 和 status.replicaStatuses.Worker.failed 字段。 在 TFJob 定义的 container 字段中,我们指定了模型类型并将训练后的模型保存到不同的目录下,以便在后续步骤中选择和保存用于模型服务的最优模型。 我们还要确保添加了持久化卷,以便持久化经过训练的模型。

代码 9-35 CNN 模型训练步骤

对于 distributed-tf-training-steps 中的其余子步骤,配置声明非常相似,只是保存的模型目录和模型类型参数不同。 下一步是模型选择,我们使用相同的容器镜像,但执行的是我们之前实现的模型选择 Python 脚本。

代码 9-36 模型选择步骤

确保这些附加的脚本包含在 Dockerfile 中,并且已重新构建了镜像并将其重新导入到本地 Kubernetes 集群。

实现了模型选择步骤后,工作流的最后一步是启动 KServe 模型服务步骤。 它是一个类似于模型训练步骤的 resource 模板,但包含了 KServe 的 InferenceService CRD 和与其相匹配的运行成功条件(successCondition 字段)。

代码 9-37 模型服务步骤

提交这个工作流。

代码 9-38 提交端到端工作流

数据摄取步骤完成后,与之相关联的 Pod 将被删除。 在执行分布式模型训练步骤时再次获取 Pod 列表,我们将看到以 tfjob-wf-f4bql-cnn-model- 为前缀的 Pod,它们负责监控不同模型类型的分布式模型训练任务的状态。 此外,每种模型的训练任务都包含 2 个匹配 multi-worker-training--worker- 名称的 Pod。

代码 9-39 获取 Pod 列表

完成剩余的步骤并且模型服务已成功启动后,工作流应该包含 Succeeded 状态。 现在我们完成了端到端工作流的执行。

9.4.2 步骤记忆化 为了加快工作流的后续执行速度,我们可以利用缓存机制,从而跳过最近运行过的某些步骤。 在我们的例子中可以跳过数据摄取步骤,因为我们不必重复下载相同的数据集。

首先查看数据摄取步骤的日志:

数据集已经下载到容器中的某个路径下。 如果该路径挂载到了我们的持久化卷上,那么它将可用于后续的所有工作流步骤。 让我们使用 Argo Workflows 提供的步骤记忆化功能来优化我们的工作流。

在步骤的声明配置模板中,我们在 memoize 字段中提供了缓存键(key)和缓存期限(maxAge)。 当一个步骤完成后,将其保存在缓存中。 当这一个步骤在新的工作流中再次运行时,它将检查缓存是否在过去的一小时内创建。 如果是,则跳过此步骤,工作流将继续执行后续步骤。 对于我们的应用,数据集不会变化,所以理论上应该始终使用缓存中的数据,我们在这里指定 1 小时的缓存期限是为了演示目的。 在实际应用中,你可能需要根据数据更新的频率进行缓存期限时间的调整。

代码 9-40 记忆化数据摄取步骤

第一次运行工作流,并观察工作流节点状态中的 Memoization Status 字段。 因为这是第一次运行步骤,所以缓存没有命中。

代码 9-41 检查工作流的节点状态

如果我们在 1 小时内再次运行相同的工作流,会注意到该步骤被跳过(Memoization Status 字段中的 Hit 值为 true):

此外,请注意完成时间 Finished At 和开始时间 Started At 的时间戳是相同的。 也就是说,这个步骤是立即完成的,无需重新执行。

Argo Workflows 中的所有缓存都保存在 Kubernetes ConfigMap 对象中。 缓存包含节点 ID、步骤执行结果输出、缓存创建时间戳和上次命中该缓存的时间戳。

代码 9-42 查看 configmap 的详细信息

总结概括 数据摄取组件使用 TensorFlow 为 Fashion-MNIST 数据集实现了一个分布式输入流水线,使其易于与分布式模型训练集成。 机器学习模型和分布式模型训练逻辑可以在 TensorFlow 中定义,然后借助 Kubeflow 在 Kubernetes 集群中以分布式方式执行。 单实例模型服务器和副本模型服务器都可以通过 KServe 实现。 KServe 的自动扩缩容功能可以自动创建额外的模型服务 Pod 来处理不断增加的模型服务请求。 我们实现了端到端工作流,其中包括 Argo Workflows 中系统的所有组件,并使用步骤记忆化来避免重复执行耗时且冗余的数据摄取步骤。