【调度器插件专题】Co-Scheduling 分组批量 Pod 调度器插件

Posted by Hao Liang's Blog on Sunday, November 21, 2021

1、 背景

相关提案:Kep: Coscheduling based on PodGroup CRD

源码地址:Coscheduling

  • 某些场景下(Spark jobs、TensorFlow jobs 等批量运行的业务),一组 Pod 需要同时调度成功后才能正常运行,部分 Pod 调度成功仍然无法正常运行
  • 当前 Kubernetes 原生的调度器无法确保一组 Pod 全部创建完成后才开始调度
  • 需要一种机制,在批量 Pod 创建完成后,预估集群是否有足够资源,再决定是否进入调度流程
  • 如果不做预估,只有部分 Pod 能够调度成功,业务也无法正常运行,对集群资源是一种浪费

2、功能简介

Coscheduling 调度器插件引入了 PodGroup CRD 对象

# PodGroup CRD spec
apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
  name: nginx
spec:
  scheduleTimeoutSeconds: 10
  minMember: 3
---
# Add a label `pod-group.scheduling.sigs.k8s.io` to mark the pod belongs to a group
labels:
  pod-group.scheduling.sigs.k8s.io: nginx
  • minMember:该 Pod Group 下的 Pod 运行时最少需要满足的实例数量(Running 状态数量 + Waiting 状态数量,其中 Waiting 状态表示 Pod 走完了调度流程但还没有 Bind),如果小于该值则业务无法正常运行
  • scheduleTimeoutSeconds:该 Pod Group 下的 Pod 调度的超时时间,超过该时间则整个 Pod Group 被标记成不可调度

举例说明:

如果创建一个 minMember 为 4 的 PodGroup

apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
  name: nginx
spec:
  scheduleTimeoutSeconds: 10
  minMember: 4

创建 Pod 的实例数为 6

apiVersion: apps/v1
kind: ReplicaSet
metadata:
  name: nginx
  labels:
    app: nginx
spec:
  replicas: 6
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      name: nginx
      labels:
        app: nginx
        pod-group.scheduling.sigs.k8s.io: nginx
    spec:
      containers:
      - name: nginx
        image: nginx
        resources:
          limits:
            cpu: 3000m
            memory: 500Mi
          requests:
            cpu: 3000m
            memory: 500Mi

如果集群资源满足最少 4 个 Pod 正常运行,则允许调度(4个 Pod 调度成功正常运行,2个 Pending)

$ kubectl get pods
NAME          READY   STATUS    RESTARTS   AGE
nginx-4jw2m   1/1     Running   0          55s
nginx-4mn52   1/1     Running   0          55s
nginx-c9gv8   1/1     Running   0          55s
nginx-frm24   1/1     Running   0          55s
nginx-hsflk   0/1     Pending   0          55s
nginx-qtj5f   0/1     Pending   0          55s

如果集群资源不满足 4 个 Pod 正常运行,则该 Pod Group 下的所有 Pod 都不允许调度(6个 Pod 全部 Pending)

$ kubectl get pods
NAME          READY   STATUS    RESTARTS   AGE
nginx-4vqrk   0/1     Pending   0          3s
nginx-bw9nn   0/1     Pending   0          3s
nginx-gnjsv   0/1     Pending   0          3s
nginx-hqhhz   0/1     Pending   0          3s
nginx-n47r7   0/1     Pending   0          3s
nginx-n7vtq   0/1     Pending   0          3s

3、实现原理

Coscheduling 调度器插件实现了以下几个调度器的扩展插件

  • QueueSort: 为了确保一组 Pod 能够尽可能快的被调度成功,在 Pod 进入调度队列前先进行优先级排序:
    • 根据 Pod Priority 优先级进行排序,高优先级的 Pod 排前面
    • 如果优先级一样,则根据 Pod 或者 Pod Group 的创建时间排序,创建时间早的排前面
// Less is used to sort pods in the scheduling queue in the following order.
// 1. Compare the priorities of Pods.
// 2. Compare the initialization timestamps of PodGroups or Pods.
// 3. Compare the keys of PodGroups/Pods: <namespace>/<podname>.
func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
	prio1 := corev1helpers.PodPriority(podInfo1.Pod)
	prio2 := corev1helpers.PodPriority(podInfo2.Pod)
	if prio1 != prio2 {
		return prio1 > prio2
	}
	creationTime1 := cs.pgMgr.GetCreationTimestamp(podInfo1.Pod, podInfo1.InitialAttemptTimestamp)
	creationTime2 := cs.pgMgr.GetCreationTimestamp(podInfo2.Pod, podInfo2.InitialAttemptTimestamp)
	if creationTime1.Equal(creationTime2) {
		return core.GetNamespacedName(podInfo1.Pod) < core.GetNamespacedName(podInfo2.Pod)
	}
	return creationTime1.Before(creationTime2)
}
  • PreFilter: 预估集群中的资源是否满足 Pod Group 中的批量 Pod 调度条件
    • 如果 Pod 不属于任何 Pod Group(普通 Pod 调度),不做拦截直接进入后续调度节点
    • 过滤掉 Pod Group 中 Pod 数量小于该 Pod Group MinMember 最小成员数量要求的 Pod
    • 检查在满足Pod Group MinMember 最小 Pod 数量要求的情况下,集群中的节点是否有足够的资源进行调度,不满足则拦截,调度失败,进入 backoff 队列等待下次重试
// PreFilter filters out a pod if it
// 1. belongs to a podgroup that was recently denied or
// 2. the total number of pods in the podgroup is less than the minimum number of pods
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
	klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
	...
	
	// 如果 Pod 不属于任何 Pod Group(普通 Pod 调度),不做拦截直接进入后续调度节点
	if _, ok := pgMgr.lastDeniedPG.Get(pgFullName); ok {
		return fmt.Errorf("pod with pgName: %v last failed in 3s, deny", pgFullName)
	}
	...
	
	// 过滤掉 Pod Group 中 Pod 数量小于该 Pod Group MinMember 最小成员数量要求的 Pod
	if len(pods) < int(pg.Spec.MinMember) {
		return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+
			"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
	}
	...
	
	// 检查在满足Pod Group MinMember 最小 Pod 数量要求的情况下,集群中的节点是否有足够的资源进行调度,不满足则拦截,调度失败,进入 backoff 队列等待下次重试
	err = CheckClusterResource(nodes, minResources, pgFullName)
	if err != nil {
		klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
		pgMgr.AddDeniedPodGroup(pgFullName)
		return err
	}
	pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
	return nil
}
  • PostFilter: 当 PreFilter 或 Filter 阶段调度失败了,进入 PostFilter 阶段
    • 如果该 Pod Group 中已调度成功的 Pod 已经满足了 MinMember 的数量,我们认为这个 Pod 调度失败是可以接受的,标记成 Unschedulable 直接返回
    • 如果该 Pod Group 中已调度成功的 Pod 还差大于 10% 的数量才达到 MinMember ,直接拒绝整个 Pod Group 下所有 Pod 的调度请求,这个 Pod Group 下的其它 Pod 不会再进入调度流程(这个 Pod Group 下的其它 Pod 也会调度失败,没必要再次尝试调度)
// PostFilter is used to rejecting a group of pods if a pod does not pass PreFilter or Filter.
func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod,
	filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
	...
	// 如果该 Pod Group 中已调度成功的 Pod 已经满足了 MinMember 的数量,我们认为这个 Pod 调度失败是可以接受的,标记成 Unschedulable 直接返回
	assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace)
	if assigned >= int(pg.Spec.MinMember) {
		klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
		return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
	}

	// 如果该 Pod Group 中已调度成功的 Pod 只差小于 10% 的数量就满足 MinMember 了,Pod 进入 backoff 队列继续等待下次调度
	notAssignedPercentage := float32(int(pg.Spec.MinMember)-assigned) / float32(pg.Spec.MinMember)
	if notAssignedPercentage <= 0.1 {
		klog.V(4).InfoS("A small gap of pods to reach the quorum", "podGroup", klog.KObj(pg), "percentage", notAssignedPercentage)
		return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
	}

	// 如果该 Pod Group 中已调度成功的 Pod 还差大于 10% 的数量才达到 MinMember ,直接拒绝整个 Pod Group 下所有 Pod 的调度请求,这个 Pod Group 下的其它 Pod 不会再进入调度流程(这个 Pod Group 下的其它 Pod 也会调度失败,没必要再次尝试调度)
	cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
		if waitingPod.GetPod().Namespace == pod.Namespace && waitingPod.GetPod().Labels[util.PodGroupLabel] == pg.Name {
			klog.V(3).InfoS("PostFilter rejects the pod", "podGroup", klog.KObj(pg), "pod", klog.KObj(waitingPod.GetPod()))
			waitingPod.Reject(cs.Name(), "optimistic rejection in PostFilter")
		}
	})
	cs.pgMgr.AddDeniedPodGroup(pgName)
	cs.pgMgr.DeletePermittedPodGroup(pgName)
	return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable,
		fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name))
}
  • Permit: 检查当前 Pod Group 下的 Pod 是否满足 MinMember,如果满足则调度成功,如果不满足则标记成 Waiting 进入状态等待(超过 MaxScheduleTime 还没有成功则拒绝整个 Pod Group 的调度请求)
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
	pgFullName, pg := pgMgr.GetPodGroup(pod)
	if pgFullName == "" {
		return PodGroupNotSpecified
	}
	if pg == nil {
		// A Pod with a podGroup name but without a PodGroup found is denied.
		return PodGroupNotFound
	}

	assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
	// The number of pods that have been assigned nodes is calculated from the snapshot.
	// The current pod in not included in the snapshot during the current scheduling cycle.
	if int32(assigned)+1 >= pg.Spec.MinMember {
		return Success
	}
	return Wait
}
  • PostBind: 当前 Pod Group 下的 Pod 走完调度流程后,在 Pod Group 里标记状态
func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nodeName string) {
	pgMgr.Lock()
	defer pgMgr.Unlock()
	pgFullName, pg := pgMgr.GetPodGroup(pod)
	if pgFullName == "" || pg == nil {
		return
	}
	pgCopy := pg.DeepCopy()
	pgCopy.Status.Scheduled++

	if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember {
		pgCopy.Status.Phase = v1alpha1.PodGroupScheduled
	} else {
		pgCopy.Status.Phase = v1alpha1.PodGroupScheduling
		if pgCopy.Status.ScheduleStartTime.IsZero() {
			pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
		}
	}
	if pgCopy.Status.Phase != pg.Status.Phase {
		pg, err := pgMgr.pgLister.PodGroups(pgCopy.Namespace).Get(pgCopy.Name)
		if err != nil {
			klog.ErrorS(err, "Failed to get PodGroup", "podGroup", klog.KObj(pgCopy))
			return
		}
		patch, err := util.CreateMergePatch(pg, pgCopy)
		if err != nil {
			klog.ErrorS(err, "Failed to create merge patch", "podGroup", klog.KObj(pg), "podGroup", klog.KObj(pgCopy))
			return
		}
		if err := pgMgr.PatchPodGroup(pg.Name, pg.Namespace, patch); err != nil {
			klog.ErrorS(err, "Failed to patch", "podGroup", klog.KObj(pg))
			return
		}
		pg.Status.Phase = pgCopy.Status.Phase
	}
	pg.Status.Scheduled = pgCopy.Status.Scheduled
	return
}

4、使用手册

调度器插件配置样例:

apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: false
clientConnection:
  kubeconfig: "REPLACE_ME_WITH_KUBE_CONFIG_PATH"
profiles:
  - schedulerName: default-scheduler
    plugins:
      queueSort:
        enabled:
          - name: Coscheduling
        disabled:
          - name: "*"
      preFilter:
        enabled:
          - name: Coscheduling
      postFilter:
        enabled:
          - name: Coscheduling
      permit:
        enabled:
          - name: Coscheduling
      reserve:
        enabled:
          - name: Coscheduling
      postBind:
        enabled:
          - name: Coscheduling