Posted by Hao Liang's Blog on Monday, January 1, 0001

Cube 的 Workers,联合起来!

本章内容包括:

回顾编排系统中 worker 组件的目的 回顾 Task 和 Docker 结构体 定义并实现处理传入任务的算法 构建一个简单的状态机以在任务状态之间进行转换 实现 worker 启动和停止任务的方法 想象一下运行一个提供静态页面的 web 服务器。在许多情况下,在一台物理或虚拟机上运行单个实例的 web 服务器已经足够了。然而,随着网站的受欢迎程度增加,这种设置会带来几个问题:

资源可用性——考虑到机器上运行的其他进程,是否有足够的内存、CPU 和磁盘空间来满足我们的 web 服务器的需求? 弹性——如果运行 web 服务器的机器宕机,我们的网站也会随之宕机。 运行多个 web 服务器实例有助于我们解决这些问题。 在本章中,我们将重点完善第2章中勾勒出的 Worker 框架。它将使用我们在第3章中介绍的 Task 实现。在本章结束时,我们将使用 worker 的实现来运行多个简单 web 服务器实例,就像我们之前描述的场景中那样。

4.1 Cube Worker 在编排系统中,worker 组件使我们能够轻松扩展应用程序,例如在前述场景中的 web 服务器。图4.1展示了我们如何运行三个网站实例,分别由 W1、W2 和 W3 代表,每个实例运行在一个独立的 worker 上。在这个图中,重要的是要意识到 Worker 这个术语有双重含义:它既代表运行在物理或虚拟机上的编排系统的 worker 组件,也代表这些物理或虚拟机。

Manager Load balancer User Developer Orchestration system Worker1 Worker2 Worker3 W1 W2 W3

图4.1 这个图中的 worker 框既代表物理或虚拟机,也代表运行在这些机器上的编排系统的 Worker 组件。

现在我们不太可能遇到资源可用性问题了。因为我们在三个不同的 worker 上运行了我们网站的三个实例,用户请求可以分散到这三个实例上,而不是集中到单个机器上的单个实例。 同样,我们的网站现在对故障更具弹性。例如,如果图4.2中的 Worker1 崩溃,它会带走我们网站的 W3 实例。虽然这可能会让我们感到难过,并需要一些工作来让 Worker1 重新上线,但我们网站的用户不应该察觉到任何差异。他们仍然能够继续向我们的网站发出请求,并获得预期的静态内容。

Manager Load balancer User Developer Orchestration system Worker1 Worker2 Worker3 W1 W2 W3

图4.2 在 worker 节点故障的情况下,运行在其他节点上的 web 服务器仍然可以响应请求。

worker 由执行特定角色的较小组件组成。这些组件如图4.3所示,包括一个 API、一个运行时、一个任务队列、一个任务数据库(DB)和指标。在本章中,我们将重点关注其中的三个组件:运行时、任务队列和任务数据库。我们将在后续章节中处理其他两个组件。

图4.3 我们的 worker 将由这五个组件组成,但本章将仅关注运行时、任务队列和任务数据库。

worker 通过将任务存储在数据库中来维护其任务的状态。 API 是 worker 的入口。它是 worker 从 manager 接收任务的方式。 当 worker 从 manager 接收到任务时,它首先将任务放入任务队列,直到可以处理它们。 worker 通过 Docker 运行时启动和停止任务。 worker 提供关于其主机机器和正在运行的任务的指标。这些指标通过 API 提供给 manager。

Worker

Task DB

Task queue

Runtime (Docker)

Metrics

图4.3 我们的 worker 将由这五个组件组成,但本章将仅关注运行时、任务队列和任务数据库。

4.2 任务与 Docker

在第1章中,我们提到任务是编排系统中最小的工作单元。然后在第3章中,我们在 Task 结构体中实现了这个定义,如清单4.1所示。这个结构体是 worker 的主要关注点。它从 manager 接收任务并运行它。我们将在本章中反复使用这个结构体。 作为最小的工作单元,任务通过作为 Docker 容器运行来执行其工作。因此,任务与容器之间存在一对一的对应关系。worker 使用这个结构体来启动和停止任务。

清单4.1 第2章中定义的 Task 结构体

在第3章中,我们还定义了 Docker 结构体,如下清单所示。worker 将使用这个结构体来将任务作为 Docker 容器启动和停止。

清单4.2 第3章中定义的 Docker 结构体

这两个对象将是允许我们的 worker 启动和停止任务的核心。

4.3 队列的角色

看看清单4.3,回顾一下 Worker 结构体的样子。这个结构体保持了我们在第2章中留下的状态。 worker 将使用 Worker 结构体中的 Queue 字段作为需要处理的传入任务的临时存放区。当 manager 向 worker 发送任务时,任务会进入队列,worker 将按先进先出的顺序处理这些任务。

清单4.3 第2章中的 Worker 框架

需要注意的是,Queue 字段本身是一个结构体,它定义了几个方法,我们可以用来将项目推入队列(Enqueue)、从队列中弹出项目(Dequeue)以及获取队列长度(Len)。Queue 字段是 Go 中组合的一个例子。因此,我们可以使用其他结构体来组合新的、更高级的对象。还要注意,Queue 是从 github.com/golangcollections/collections/queue 包中导入的。因此,我们正在重用别人为我们编写的 Queue 实现。如果你还没有这样做,你需要将这个包指定为依赖项(参见附录)。

4.4 数据库的角色

worker 将使用 Db 字段来存储其任务的状态。这个字段是一个映射,其中键是 github.com/google/uuid 包中的 uuid.UUID 类型,值是我们任务包中的 task 类型。关于使用映射作为 Db 字段,有一点需要注意。我们从映射开始是为了方便。这将使我们能够快速编写出可工作的代码。但这也带来了一个权衡:每次我们重启 worker 时,我们都会丢失数据。这个权衡在开始时是可以接受的,但以后我们会用一个持久的数据存储来替换这个映射,这样在重启 worker 时就不会丢失数据。

4.5 任务计数

最后,TaskCount 字段提供了 worker 被分配的任务的简单计数。我们将在下一章中使用这个字段。

4.6 实现 worker 的方法

现在我们已经回顾了 Worker 结构体中的字段,让我们继续讨论第2章中我们预先定义的方法。下一个清单中的 RunTask、StartTask 和 StopTask 方法目前除了打印一条语句外没有做太多事情,但在本章结束时,我们将完全实现每一个方法。

清单4.4 预定义的 RunTask、StartTask 和 StopTask 方法

我们将按与清单4.4中相反的顺序实现这些方法。这样做的原因是 RunTask 方法将使用另外两个方法来启动和停止任务。

4.6.1 实现 StopTask 方法

StopTask 方法并不复杂。它的唯一目的是停止运行的任务,记住任务对应于一个正在运行的容器。其实现如清单4.5所示,可以总结为以下步骤:

  1. 创建一个 Docker 结构体的实例,允许我们使用 Docker SDK 与 Docker 守护进程通信。
  2. 调用 Docker 结构体的 Stop() 方法。
  3. 检查停止任务时是否有任何错误。
  4. 更新任务 t 的 FinishTime 字段。
  5. 将更新后的任务 t 保存到 worker 的 Db 字段。
  6. 打印一条信息性消息并返回操作结果。

清单4.5 我们对 StopTask 方法的实现

注意,StopTask 方法返回一个 task.DockerResult 类型。这个类型的定义可以在清单4.6中看到。如果你还记得,Go支持多重返回类型。我们本可以将 DockerResult 结构体中的每个字段枚举为 StopTask 方法的返回类型。虽然这种方法在技术上没有问题,但使用 DockerResult 方法允许我们将与操作结果相关的所有信息封装到一个结构体中。当我们想了解操作结果的任何信息时,只需查看 DockerResult 结构体即可。

清单4.6 DockerResult 类型的定义 4.6.2 实现 StartTask 方法 接下来,让我们实现 StartTask 方法。与 StopTask 方法类似,StartTask 也相对简单,但启动任务的过程有几个额外的步骤。具体步骤如下:

更新任务 t 的 StartTime 字段。 创建一个 Docker 结构体的实例,以便与 Docker 守护进程通信。 调用 Docker 结构体的 Run() 方法。 检查启动任务时是否有任何错误。 将运行中的容器 ID 添加到任务 t 的 Runtime.ContainerId 字段。 将更新后的任务 t 保存到 worker 的 Db 字段。 返回操作结果。 这些步骤的实现可以在下一个清单中看到。

清单4.7 我们对 StartTask 方法的实现 通过在 StartTask 方法中记录 StartTime,并结合在 StopTask 方法中记录 FinishTime,我们以后可以在其他输出中使用这些时间戳。例如,在本书的后面部分,我们将编写一个命令行界面,允许我们与编排器进行交互,StartTime 和 FinishTime 值可以作为任务状态的一部分输出。

在我们继续讨论这两个方法之前,我想指出,它们都没有直接与 Docker SDK 交互。相反,它们只是调用我们创建的 Docker 对象上的 Run 和 Stop 方法。是 Docker 对象处理与 Docker 客户端的直接交互。通过将与 Docker 的交互封装在 Docker 对象中,我们的 worker 不需要了解底层实现细节。

StartTask 和 StopTask 方法是我们 worker 的基础。但查看我们在第2章中创建的框架,我们发现还有一个基础方法缺失。我们如何将任务添加到 worker 中?记住,我们说过 worker 会使用其 Queue 字段作为传入任务的临时存储,当它准备好时,它会从队列中弹出一个任务并执行必要的操作。

让我们通过添加下一个清单中的 AddTask 方法来解决这个问题。这个方法执行一个简单的任务:将任务 t 添加到 Queue 中。

清单4.8 worker 的 AddTask 方法 4.6.3 关于任务状态的插曲 现在剩下的就是实现 RunTask 方法了。然而,在我们这样做之前,让我们暂停一下,回顾一下 RunTask 方法的目的。在第2章中,我们说过 RunTask 方法将负责识别任务的当前状态,然后根据状态启动或停止任务。但为什么我们甚至需要 RunTask 呢?

处理任务有两种可能的情况:

任务是第一次提交,因此 worker 不会知道它。 任务是第 n 次提交,其中提交的任务表示当前任务应过渡到的期望状态。 在处理从 manager 接收到的任务时,worker 需要确定它正在处理哪种情况。我们将使用一个简单的启发式方法来帮助 worker 解决这个问题。

记住我们的 worker 有 Queue 和 Db 字段。对于我们的简单实现,worker 将使用 Queue 字段来表示任务的期望状态。当 worker 从队列中弹出一个任务时,它会将其解释为“将任务 t 置于状态 s”。worker 会将其 Db 字段中已有的任务解释为现有任务,即它至少已经见过一次的任务。如果一个任务在 Queue 中但不在 Db 中,那么这是 worker 第一次看到这个任务,我们默认启动它。

除了识别它正在处理的两种情况中的哪一种,worker 还需要验证从当前状态到期望状态的过渡是否有效。

让我们回顾一下我们在第2章中定义的状态。下一个清单显示了我们有 Pending、Scheduled、Running、Completed 和 Failed 状态。

清单4.9 定义任务有效状态的 State 类型 但这些状态代表什么呢?我们在第2章中解释了这些状态,但让我们快速回顾一下:

Pending:这是每个任务的初始状态,起点。 Scheduled:一旦 manager 将任务调度到 worker 上,任务就会进入这个状态。 Running:当 worker 成功启动任务(即启动容器)时,任务进入这个状态。 Completed:当任务以正常方式完成其工作(即没有失败)时,任务进入这个状态。 Failed:如果任务失败,它会进入这个状态。 为了强化这些状态的含义,我们还可以回顾第2章中的状态图,如图4.4所示。

图4.4 任务在其生命周期中经历的状态

我们已经定义了与任务相关的状态,但还没有定义任务如何从一个状态过渡到下一个状态。也没有讨论哪些过渡是有效的。例如,如果一个任务已经在运行状态(Running),它能否过渡到已调度状态(Scheduled)?如果一个任务失败了,它是否应该能够从失败状态(Failed)过渡到已调度状态(Scheduled)?

在回到 RunTask 方法之前,我们需要解决状态过渡的问题。为此,我们可以使用状态表来建模我们的状态和过渡,如表 4.1 所示。

这个表有三列,分别表示任务的当前状态(CurrentState)、触发状态过渡的事件(Event)和任务应过渡到的下一个状态(NextState)。表中的每一行代表一个特定的有效过渡。注意,没有从运行状态(Running)到已调度状态(Scheduled)或从失败状态(Failed)到已调度状态(Scheduled)的过渡。

表 4.1 显示了从一个状态到另一个状态的有效过渡

现在我们对状态和它们之间的过渡有了更好的理解,可以将我们的理解转化为代码。像 Borg、Kubernetes 和 Nomad 这样的编排器使用状态机来处理状态过渡问题。然而,为了尽量减少我们需要处理的概念和技术数量,我们将把状态过渡硬编码到 stateTransitionMap 类型中,如清单 4.10 所示。这个映射编码了我们在表 4.1 中确定的过渡。

stateTransitionMap 创建了一个状态(State)和状态切片([]State)之间的映射。因此,这个映射中的键是当前状态,值是有效的过渡状态。例如,待处理状态(Pending)只能过渡到已调度状态(Scheduled)。然而,已调度状态(Scheduled)可以过渡到运行状态(Running)、完成状态(Completed)或失败状态(Failed)。

清单 4.10 stateTransitionMap 映射

除了 stateTransitionMap,我们还将实现 Contains 和 ValidStateTransition 辅助函数,如清单 4.11 所示。这些函数将执行实际的逻辑,以验证任务是否可以从一个状态过渡到下一个状态。

让我们从 Contains 函数开始。它接受两个参数:一个类型为 State 的切片 states 和一个类型为 State 的 state。如果在 states 切片中找到 state,则返回 true;否则返回 false。

ValidStateTransition 函数是 Contains 函数的包装器。它为函数的调用者提供了一种方便的方式来询问:“嘿,任务能否从这个状态过渡到那个状态?”所有的繁重工作都由 Contains 函数完成。

你应该将以下清单中的代码添加到项目的 task 目录中的 state.go 文件中。

清单 4.11 辅助方法

4.6.4 实现 RunTask 方法 现在我们终于可以更具体地讨论 RunTask 方法了。虽然花了一些时间才到这里,但我们需要先解决那些其他细节,才能讨论这个方法。而且因为我们做了这些准备工作,实现 RunTask 方法会顺利一些。

正如我们在本章前面所说,RunTask 方法将识别任务的当前状态,然后根据该状态启动或停止任务。我们可以使用一个相当简单的算法来确定工作者是否应该启动或停止任务。它看起来像这样:

从队列中取出一个任务。 将其从接口类型转换为 task.Task 类型。 从工作者的数据库中检索任务。 检查状态过渡是否有效。 如果队列中的任务处于已调度状态(Scheduled),调用 StartTask。 如果队列中的任务处于完成状态(Completed),调用 StopTask。 否则,存在无效的过渡,返回错误。 现在我们只需要在代码中实现这些步骤,如以下清单所示。

清单 4.12 我们对 RunTask 方法的实现

调用 Dequeue() 方法

将任务转换为正确的类型

尝试从数据库中检索相同的任务

如果有一个有效的状态过渡并且队列中的任务处于已调度状态,调用 StartTask 方法

如果队列中的任务处于完成状态,调用 StopTask 方法

如果没有有效的过渡,设置 result 变量的 Error 字段

返回结果

我们首先调用 Dequeue() 方法从工作者的队列中弹出一个任务。注意,我们正在检查是否从队列中接收到任务。如果没有,这意味着队列是空的,那么我们记录一条消息并返回一个 Error 字段为 nil 的结果。接下来,我们必须将从队列中弹出的任务转换为正确的类型,即 task.Task。这一步是必要的,因为队列的 Dequeue 方法返回一个接口类型。现在我们有了一个来自队列的任务,所以我们需要尝试从数据库中获取相同的任务。如果我们在数据库中找不到任务,这意味着这是我们第一次看到这个任务,我们将其添加。然后我们可以使用我们在本章前面创建的 ValidStateTransition 函数。注意,我们传递的是来自数据库的状态 taskPersisted.State 和来自队列的状态 taskQueued.State。如果有一个有效的状态过渡并且队列中的任务处于已调度状态,那么我们调用 StartTask 方法。或者,如果有一个有效的状态过渡但队列中的任务处于完成状态,我们调用 StopTask 方法。如果没有有效的过渡——换句话说,从 taskPersisted.State 到 taskQueued.State 的过渡无效——那么我们设置 result 变量的 Error 字段。

4.7 综合实现 呼,我们终于完成了!在实现工作者的方法时,我们覆盖了很多内容。如果你还记得第3章,我们最后写了一个程序,使用了我们在本章早些时候完成的工作。在本章中,我们将继续这种做法。

不过,在此之前,请记住在第3章中,我们构建了 Task 和 Docker 结构体,这些工作使我们能够启动和停止容器。本章的工作建立在上一章的基础之上。因此,我们将再次编写一个程序来启动和停止任务。工作者操作的是任务级别,而 Docker 结构体操作的是更低级别的容器。

现在让我们编写一个程序,将所有内容整合到一个功能齐全的工作者中。你可以注释掉上一章 main.go 文件中的代码,或者为本章创建一个新的 main.go 文件。

这个程序很简单。我们创建一个工作者 w,它具有我们在本章开头讨论的 Queue 和 Db 字段。接下来,我们创建一个任务 t。这个任务以已调度状态(Scheduled)开始,并使用名为 strm/helloworld-http 的 Docker 镜像。稍后会详细介绍这个镜像。在创建工作者和任务后,我们调用工作者的 AddTask 方法并传递任务 t。然后调用工作者的 RunTask 方法。这个方法会从队列中取出任务 t 并执行相应的操作。它捕获 RunTask 方法的返回值并存储在变量 result 中。(如果你记得 RunTask 返回的类型,可以加分。)

此时,我们有一个正在运行的容器。休眠30秒后(你可以根据需要更改休眠时间),我们开始停止任务的过程。我们将任务的状态更改为已完成(Completed),再次调用 AddTask 并传递相同的任务,最后再次调用 RunTask。这次,当 RunTask 从队列中取出任务时,任务将有一个容器 ID 和不同的状态。结果,任务被停止。以下清单展示了我们的程序,创建一个工作者,添加一个任务,启动它,最后停止它。

清单 4.13 将所有内容整合到一个功能齐全的工作者中

让我们暂停一下,讨论一下前面代码清单中使用的镜像。在本章开头,我们讨论了使用编排器(特别是工作者组件)扩展静态网站的场景。这个镜像 strm/helloworld-http 提供了一个静态网站的具体示例:它运行一个提供单个文件的 Web 服务器。为了验证这种行为,当你运行程序时,在另一个终端中输入 docker ps 命令。你应该看到类似于清单 4.14 的输出。在该输出中,你可以通过查看 PORTS 列找到 Web 服务器运行的端口。然后打开浏览器并输入 localhost:。在以下清单的输出中,我会在浏览器中输入 localhost:49161。输出已被截断以提高可读性。

清单 4.14 docker ps 命令的截断输出

当我在我的机器上浏览到服务器时,我看到“Hello from 90566e236f88”。继续运行程序。你应该看到类似于以下清单的输出。

清单 4.15 运行你的主程序

恭喜!你现在有一个功能齐全的工作者。在进入下一章之前,玩一下你构建的内容。特别是,修改清单 4.13 中的主函数,创建多个工作者,然后向每个工作者添加任务。

总结 任务作为容器执行,这意味着任务和容器之间存在一对一的关系。 工作者对任务执行两个基本操作:启动或停止。这些操作导致任务从一个状态过渡到下一个有效状态。 工作者展示了 Go 语言如何支持对象组合。工作者本身是其他对象的组合;特别是,工作者的 Queue 字段是 github.com/golang-collections/collections/queue 包中定义的结构体。 我们设计和实现的工作者很简单。我们使用了清晰简洁的流程,易于在代码中实现。 工作者不直接与 Docker SDK 交互。相反,它使用我们的 Docker 结构体,这是 SDK 的包装器。通过将与 SDK 的交互封装在 Docker 结构体中,我们可以保持 StartTask 和 StopTask 方法的小巧和可读性。