1、 背景
相关提案:Kep: Coscheduling based on PodGroup CRD
源码地址:Coscheduling
- 某些场景下(Spark jobs、TensorFlow jobs 等批量运行的业务),一组 Pod 需要同时调度成功后才能正常运行,部分 Pod 调度成功仍然无法正常运行
- 当前 Kubernetes 原生的调度器无法确保一组 Pod 全部创建完成后才开始调度
- 需要一种机制,在批量 Pod 创建完成后,预估集群是否有足够资源,再决定是否进入调度流程
- 如果不做预估,只有部分 Pod 能够调度成功,业务也无法正常运行,对集群资源是一种浪费
2、功能简介
Coscheduling 调度器插件引入了 PodGroup CRD 对象
# PodGroup CRD spec
apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
name: nginx
spec:
scheduleTimeoutSeconds: 10
minMember: 3
---
# Add a label `pod-group.scheduling.sigs.k8s.io` to mark the pod belongs to a group
labels:
pod-group.scheduling.sigs.k8s.io: nginx
- minMember:该 Pod Group 下的 Pod 运行时最少需要满足的实例数量(Running 状态数量 + Waiting 状态数量,其中 Waiting 状态表示 Pod 走完了调度流程但还没有 Bind),如果小于该值则业务无法正常运行
- scheduleTimeoutSeconds:该 Pod Group 下的 Pod 调度的超时时间,超过该时间则整个 Pod Group 被标记成不可调度
举例说明:
如果创建一个 minMember 为 4 的 PodGroup
apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
name: nginx
spec:
scheduleTimeoutSeconds: 10
minMember: 4
创建 Pod 的实例数为 6
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: nginx
labels:
app: nginx
spec:
replicas: 6
selector:
matchLabels:
app: nginx
template:
metadata:
name: nginx
labels:
app: nginx
pod-group.scheduling.sigs.k8s.io: nginx
spec:
containers:
- name: nginx
image: nginx
resources:
limits:
cpu: 3000m
memory: 500Mi
requests:
cpu: 3000m
memory: 500Mi
如果集群资源满足最少 4 个 Pod 正常运行,则允许调度(4个 Pod 调度成功正常运行,2个 Pending)
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
nginx-4jw2m 1/1 Running 0 55s
nginx-4mn52 1/1 Running 0 55s
nginx-c9gv8 1/1 Running 0 55s
nginx-frm24 1/1 Running 0 55s
nginx-hsflk 0/1 Pending 0 55s
nginx-qtj5f 0/1 Pending 0 55s
如果集群资源不满足 4 个 Pod 正常运行,则该 Pod Group 下的所有 Pod 都不允许调度(6个 Pod 全部 Pending)
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
nginx-4vqrk 0/1 Pending 0 3s
nginx-bw9nn 0/1 Pending 0 3s
nginx-gnjsv 0/1 Pending 0 3s
nginx-hqhhz 0/1 Pending 0 3s
nginx-n47r7 0/1 Pending 0 3s
nginx-n7vtq 0/1 Pending 0 3s
3、实现原理
Coscheduling 调度器插件实现了以下几个调度器的扩展插件
- QueueSort: 为了确保一组 Pod 能够尽可能快的被调度成功,在 Pod 进入调度队列前先进行优先级排序:
- 根据 Pod Priority 优先级进行排序,高优先级的 Pod 排前面
- 如果优先级一样,则根据 Pod 或者 Pod Group 的创建时间排序,创建时间早的排前面
// Less is used to sort pods in the scheduling queue in the following order.
// 1. Compare the priorities of Pods.
// 2. Compare the initialization timestamps of PodGroups or Pods.
// 3. Compare the keys of PodGroups/Pods: <namespace>/<podname>.
func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
prio1 := corev1helpers.PodPriority(podInfo1.Pod)
prio2 := corev1helpers.PodPriority(podInfo2.Pod)
if prio1 != prio2 {
return prio1 > prio2
}
creationTime1 := cs.pgMgr.GetCreationTimestamp(podInfo1.Pod, podInfo1.InitialAttemptTimestamp)
creationTime2 := cs.pgMgr.GetCreationTimestamp(podInfo2.Pod, podInfo2.InitialAttemptTimestamp)
if creationTime1.Equal(creationTime2) {
return core.GetNamespacedName(podInfo1.Pod) < core.GetNamespacedName(podInfo2.Pod)
}
return creationTime1.Before(creationTime2)
}
- PreFilter: 预估集群中的资源是否满足 Pod Group 中的批量 Pod 调度条件
- 如果 Pod 不属于任何 Pod Group(普通 Pod 调度),不做拦截直接进入后续调度节点
- 过滤掉 Pod Group 中 Pod 数量小于该 Pod Group MinMember 最小成员数量要求的 Pod
- 检查在满足Pod Group MinMember 最小 Pod 数量要求的情况下,集群中的节点是否有足够的资源进行调度,不满足则拦截,调度失败,进入 backoff 队列等待下次重试
// PreFilter filters out a pod if it
// 1. belongs to a podgroup that was recently denied or
// 2. the total number of pods in the podgroup is less than the minimum number of pods
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
...
// 如果 Pod 不属于任何 Pod Group(普通 Pod 调度),不做拦截直接进入后续调度节点
if _, ok := pgMgr.lastDeniedPG.Get(pgFullName); ok {
return fmt.Errorf("pod with pgName: %v last failed in 3s, deny", pgFullName)
}
...
// 过滤掉 Pod Group 中 Pod 数量小于该 Pod Group MinMember 最小成员数量要求的 Pod
if len(pods) < int(pg.Spec.MinMember) {
return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+
"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
}
...
// 检查在满足Pod Group MinMember 最小 Pod 数量要求的情况下,集群中的节点是否有足够的资源进行调度,不满足则拦截,调度失败,进入 backoff 队列等待下次重试
err = CheckClusterResource(nodes, minResources, pgFullName)
if err != nil {
klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
pgMgr.AddDeniedPodGroup(pgFullName)
return err
}
pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
return nil
}
- PostFilter: 当 PreFilter 或 Filter 阶段调度失败了,进入 PostFilter 阶段
- 如果该 Pod Group 中已调度成功的 Pod 已经满足了 MinMember 的数量,我们认为这个 Pod 调度失败是可以接受的,标记成 Unschedulable 直接返回
- 如果该 Pod Group 中已调度成功的 Pod 还差大于 10% 的数量才达到 MinMember ,直接拒绝整个 Pod Group 下所有 Pod 的调度请求,这个 Pod Group 下的其它 Pod 不会再进入调度流程(这个 Pod Group 下的其它 Pod 也会调度失败,没必要再次尝试调度)
// PostFilter is used to rejecting a group of pods if a pod does not pass PreFilter or Filter.
func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod,
filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
...
// 如果该 Pod Group 中已调度成功的 Pod 已经满足了 MinMember 的数量,我们认为这个 Pod 调度失败是可以接受的,标记成 Unschedulable 直接返回
assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace)
if assigned >= int(pg.Spec.MinMember) {
klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
// 如果该 Pod Group 中已调度成功的 Pod 只差小于 10% 的数量就满足 MinMember 了,Pod 进入 backoff 队列继续等待下次调度
notAssignedPercentage := float32(int(pg.Spec.MinMember)-assigned) / float32(pg.Spec.MinMember)
if notAssignedPercentage <= 0.1 {
klog.V(4).InfoS("A small gap of pods to reach the quorum", "podGroup", klog.KObj(pg), "percentage", notAssignedPercentage)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
// 如果该 Pod Group 中已调度成功的 Pod 还差大于 10% 的数量才达到 MinMember ,直接拒绝整个 Pod Group 下所有 Pod 的调度请求,这个 Pod Group 下的其它 Pod 不会再进入调度流程(这个 Pod Group 下的其它 Pod 也会调度失败,没必要再次尝试调度)
cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if waitingPod.GetPod().Namespace == pod.Namespace && waitingPod.GetPod().Labels[util.PodGroupLabel] == pg.Name {
klog.V(3).InfoS("PostFilter rejects the pod", "podGroup", klog.KObj(pg), "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(cs.Name(), "optimistic rejection in PostFilter")
}
})
cs.pgMgr.AddDeniedPodGroup(pgName)
cs.pgMgr.DeletePermittedPodGroup(pgName)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name))
}
- Permit: 检查当前 Pod Group 下的 Pod 是否满足 MinMember,如果满足则调度成功,如果不满足则标记成 Waiting 进入状态等待(超过 MaxScheduleTime 还没有成功则拒绝整个 Pod Group 的调度请求)
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(pod)
if pgFullName == "" {
return PodGroupNotSpecified
}
if pg == nil {
// A Pod with a podGroup name but without a PodGroup found is denied.
return PodGroupNotFound
}
assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
return Success
}
return Wait
}
- PostBind: 当前 Pod Group 下的 Pod 走完调度流程后,在 Pod Group 里标记状态
func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nodeName string) {
pgMgr.Lock()
defer pgMgr.Unlock()
pgFullName, pg := pgMgr.GetPodGroup(pod)
if pgFullName == "" || pg == nil {
return
}
pgCopy := pg.DeepCopy()
pgCopy.Status.Scheduled++
if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember {
pgCopy.Status.Phase = v1alpha1.PodGroupScheduled
} else {
pgCopy.Status.Phase = v1alpha1.PodGroupScheduling
if pgCopy.Status.ScheduleStartTime.IsZero() {
pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
}
}
if pgCopy.Status.Phase != pg.Status.Phase {
pg, err := pgMgr.pgLister.PodGroups(pgCopy.Namespace).Get(pgCopy.Name)
if err != nil {
klog.ErrorS(err, "Failed to get PodGroup", "podGroup", klog.KObj(pgCopy))
return
}
patch, err := util.CreateMergePatch(pg, pgCopy)
if err != nil {
klog.ErrorS(err, "Failed to create merge patch", "podGroup", klog.KObj(pg), "podGroup", klog.KObj(pgCopy))
return
}
if err := pgMgr.PatchPodGroup(pg.Name, pg.Namespace, patch); err != nil {
klog.ErrorS(err, "Failed to patch", "podGroup", klog.KObj(pg))
return
}
pg.Status.Phase = pgCopy.Status.Phase
}
pg.Status.Scheduled = pgCopy.Status.Scheduled
return
}
4、使用手册
调度器插件配置样例:
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
clientConnection:
kubeconfig: "REPLACE_ME_WITH_KUBE_CONFIG_PATH"
profiles:
- schedulerName: default-scheduler
plugins:
queueSort:
enabled:
- name: Coscheduling
disabled:
- name: "*"
preFilter:
enabled:
- name: Coscheduling
postFilter:
enabled:
- name: Coscheduling
permit:
enabled:
- name: Coscheduling
reserve:
enabled:
- name: Coscheduling
postBind:
enabled:
- name: Coscheduling