【Scheduling】Co-Scheduling Grouped Batch Pod Scheduler Plugin

Posted by Hao Liang's Blog on Sunday, November 21, 2021

1. Background

Related proposals: Kep: Coscheduling based on PodGroup CRD

Source code address: Coscheduling

  • In some scenarios (batch-running businesses such as Spark jobs and TensorFlow jobs), a group of Pods need to be scheduled successfully at the same time before they can run normally. Some Pods still cannot run normally after being scheduled successfully.
  • The current Kubernetes native scheduler cannot ensure that a group of Pods is created before scheduling is started.
  • A mechanism is needed to estimate whether the cluster has sufficient resources after batch Pod creation is completed, and then decide whether to enter the scheduling process.
  • If no prediction is made, only some Pods can be scheduled successfully and the business cannot run normally, which is a waste of cluster resources.

2. Function introduction

Coscheduling scheduler plugin introduces PodGroup CRD object

# PodGroup CRD spec
apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
  name: nginx
spec:
  scheduleTimeoutSeconds: 10
  minMember: 3
---
# Add a label `pod-group.scheduling.sigs.k8s.io` to mark the pod belongs to a group
labels:
  pod-group.scheduling.sigs.k8s.io: nginx
  • minMember: The minimum number of instances that the Pod under the Pod Group needs to run (the number of Running states + the number of Waiting states, where the Waiting state means that the Pod has completed the scheduling process but has not yet been Bind). If it is less than this value, the business will not run normally.
  • scheduleTimeoutSeconds: The timeout time for Pod scheduling under this Pod Group. If this time is exceeded, the entire Pod Group will be marked as unschedulable.

For example:

If you create a PodGroup with minMember of 4

apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
  name: nginx
spec:
  scheduleTimeoutSeconds: 10
  minMember: 4

The number of instances of the Pod created is 6

apiVersion: apps/v1
kind: ReplicaSet
metadata:
  name: nginx
  labels:
    app: nginx
spec:
  replicas: 6
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      name: nginx
      labels:
        app: nginx
        pod-group.scheduling.sigs.k8s.io: nginx
    spec:
      containers:
      - name: nginx
        image: nginx
        resources:
          limits:
            cpu: 3000m
            memory: 500Mi
          requests:
            cpu: 3000m
            memory: 500Mi

If the cluster resources are sufficient to allow at least 4 Pods to run normally, scheduling is allowed (4 Pods are scheduled to run successfully and 2 are Pending)

$ kubectl get pods
NAME          READY   STATUS    RESTARTS   AGE
nginx-4jw2m   1/1     Running   0          55s
nginx-4mn52   1/1     Running   0          55s
nginx-c9gv8   1/1     Running   0          55s
nginx-frm24   1/1     Running   0          55s
nginx-hsflk   0/1     Pending   0          55s
nginx-qtj5f   0/1     Pending   0          55s

If the cluster resources are not sufficient for the normal operation of 4 Pods, all Pods under the Pod Group are not allowed to be scheduled (all 6 Pods are Pending)

$ kubectl get pods
NAME          READY   STATUS    RESTARTS   AGE
nginx-4vqrk   0/1     Pending   0          3s
nginx-bw9nn   0/1     Pending   0          3s
nginx-gnjsv   0/1     Pending   0          3s
nginx-hqhhz   0/1     Pending   0          3s
nginx-n47r7   0/1     Pending   0          3s
nginx-n7vtq   0/1     Pending   0          3s

3. Implementation principle

The Coscheduling scheduler plug-in implements the following scheduler extension plug-ins:

  • QueueSort: In order to ensure that a group of Pods can be scheduled successfully as quickly as possible, the Pods are prioritized before entering the scheduling queue:
    • Sort according to Pod Priority, with high-priority Pods at the front
    • If the priorities are the same, they will be sorted according to the creation time of the Pod or Pod Group, and the one with the earliest creation time will be ranked first.
// Less is used to sort pods in the scheduling queue in the following order.
// 1. Compare the priorities of Pods.
// 2. Compare the initialization timestamps of PodGroups or Pods.
// 3. Compare the keys of PodGroups/Pods: <namespace>/<podname>.
func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
	prio1 := corev1helpers.PodPriority(podInfo1.Pod)
	prio2 := corev1helpers.PodPriority(podInfo2.Pod)
	if prio1 != prio2 {
		return prio1 > prio2
	}
	creationTime1 := cs.pgMgr.GetCreationTimestamp(podInfo1.Pod, podInfo1.InitialAttemptTimestamp)
	creationTime2 := cs.pgMgr.GetCreationTimestamp(podInfo2.Pod, podInfo2.InitialAttemptTimestamp)
	if creationTime1.Equal(creationTime2) {
		return core.GetNamespacedName(podInfo1.Pod) < core.GetNamespacedName(podInfo2.Pod)
	}
	return creationTime1.Before(creationTime2)
}
  • PreFilter: Estimate whether the resources in the cluster meet the batch Pod scheduling conditions in the Pod Group
    • If the Pod does not belong to any Pod Group (ordinary Pod scheduling), it will directly enter the subsequent scheduling node without interception.
    • Filter out Pods whose number of Pods in the Pod Group is less than the minimum number of members required by the Pod Group MinMember
    • Check whether the nodes in the cluster have sufficient resources for scheduling when the minimum number of Pods required by the Pod Group MinMember is met. If not, it will be intercepted. If the scheduling fails, it will enter the backoff queue and wait for the next retry.
// PreFilter filters out a pod if it
// 1. belongs to a podgroup that was recently denied or
// 2. the total number of pods in the podgroup is less than the minimum number of pods
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
	klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
	...
	
	// If the Pod does not belong to any Pod Group (ordinary Pod scheduling), it will directly enter the subsequent scheduling node without interception.
	if _, ok := pgMgr.lastDeniedPG.Get(pgFullName); ok {
		return fmt.Errorf("pod with pgName: %v last failed in 3s, deny", pgFullName)
	}
	...
	
	// Filter out Pods whose number of Pods in the Pod Group is less than the minimum number of members required by the Pod Group MinMember
	if len(pods) < int(pg.Spec.MinMember) {
		return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+
			"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
	}
	...
	
	// Check whether the nodes in the cluster have sufficient resources for scheduling when the minimum Pod number requirement of Pod Group MinMember is met. If not met, it will be intercepted. If the scheduling fails, it will enter the backoff queue and wait for the next retry.
	err = CheckClusterResource(nodes, minResources, pgFullName)
	if err != nil {
		klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
		pgMgr.AddDeniedPodGroup(pgFullName)
		return err
	}
	pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
	return nil
}
  • PostFilter: When the PreFilter or Filter stage scheduling fails, enter the PostFilter stage
    • If the number of successfully scheduled Pods in the Pod Group has satisfied the MinMember number, we consider the scheduling failure of this Pod to be acceptable and mark it as Unschedulable and return it directly.
    • If the number of successfully scheduled Pods in the Pod Group is more than 10% short of reaching MinMember, the scheduling requests of all Pods in the entire Pod Group will be directly rejected, and other Pods in this Pod Group will not enter the scheduling process again (this Pod Other Pods under the Group will also fail to schedule, so there is no need to try scheduling again)
// PostFilter is used to rejecting a group of pods if a pod does not pass PreFilter or Filter.
func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod,
	filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
	...
	// If the number of successfully scheduled Pods in the Pod Group has met the MinMember number, we consider the Pod's scheduling failure to be acceptable and mark it as Unschedulable and return it directly.
	assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace)
	if assigned >= int(pg.Spec.MinMember) {
		klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
		return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
	}

	// If the number of successfully scheduled Pods in the Pod Group is less than 10% to meet MinMember, the Pod will enter the backoff queue and continue to wait for the next scheduling.
	notAssignedPercentage := float32(int(pg.Spec.MinMember)-assigned) / float32(pg.Spec.MinMember)
	if notAssignedPercentage <= 0.1 {
		klog.V(4).InfoS("A small gap of pods to reach the quorum", "podGroup", klog.KObj(pg), "percentage", notAssignedPercentage)
		return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
	}

	// If the number of successfully scheduled Pods in the Pod Group is less than 10% before reaching MinMember, the scheduling requests of all Pods in the entire Pod Group will be directly rejected, and other Pods in this Pod Group will not enter the scheduling process (this Pod Group Other Pods under it will also fail to schedule, so there is no need to try scheduling again)
	cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
		if waitingPod.GetPod().Namespace == pod.Namespace && waitingPod.GetPod().Labels[util.PodGroupLabel] == pg.Name {
			klog.V(3).InfoS("PostFilter rejects the pod", "podGroup", klog.KObj(pg), "pod", klog.KObj(waitingPod.GetPod()))
			waitingPod.Reject(cs.Name(), "optimistic rejection in PostFilter")
		}
	})
	cs.pgMgr.AddDeniedPodGroup(pgName)
	cs.pgMgr.DeletePermittedPodGroup(pgName)
	return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable,
		fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name))
}
  • Permit: Check whether the Pod under the current Pod Group meets MinMember. If it meets, the scheduling will be successful. If it does not, it will be marked as Waiting and enter the waiting state (the scheduling request of the entire Pod Group will be rejected if it has not succeeded after MaxScheduleTime).
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
	pgFullName, pg := pgMgr.GetPodGroup(pod)
	if pgFullName == "" {
		return PodGroupNotSpecified
	}
	if pg == nil {
		// A Pod with a podGroup name but without a PodGroup found is denied.
		return PodGroupNotFound
	}

	assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
	// The number of pods that have been assigned nodes is calculated from the snapshot.
	// The current pod in not included in the snapshot during the current scheduling cycle.
	if int32(assigned)+1 >= pg.Spec.MinMember {
		return Success
	}
	return Wait
}
  • PostBind: After the Pod under the current Pod Group completes the scheduling process, the status is marked in the Pod Group
func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nodeName string) {
	pgMgr.Lock()
	defer pgMgr.Unlock()
	pgFullName, pg := pgMgr.GetPodGroup(pod)
	if pgFullName == "" || pg == nil {
		return
	}
	pgCopy := pg.DeepCopy()
	pgCopy.Status.Scheduled++

	if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember {
		pgCopy.Status.Phase = v1alpha1.PodGroupScheduled
	} else {
		pgCopy.Status.Phase = v1alpha1.PodGroupScheduling
		if pgCopy.Status.ScheduleStartTime.IsZero() {
			pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
		}
	}
	if pgCopy.Status.Phase != pg.Status.Phase {
		pg, err := pgMgr.pgLister.PodGroups(pgCopy.Namespace).Get(pgCopy.Name)
		if err != nil {
			klog.ErrorS(err, "Failed to get PodGroup", "podGroup", klog.KObj(pgCopy))
			return
		}
		patch, err := util.CreateMergePatch(pg, pgCopy)
		if err != nil {
			klog.ErrorS(err, "Failed to create merge patch", "podGroup", klog.KObj(pg), "podGroup", klog.KObj(pgCopy))
			return
		}
		if err := pgMgr.PatchPodGroup(pg.Name, pg.Namespace, patch); err != nil {
			klog.ErrorS(err, "Failed to patch", "podGroup", klog.KObj(pg))
			return
		}
		pg.Status.Phase = pgCopy.Status.Phase
	}
	pg.Status.Scheduled = pgCopy.Status.Scheduled
	return
}

4. Usage

Scheduler plugin configuration example:

apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: false
clientConnection:
  kubeconfig: "REPLACE_ME_WITH_KUBE_CONFIG_PATH"
profiles:
  - schedulerName: default-scheduler
    plugins:
      queueSort:
        enabled:
          - name: Coscheduling
        disabled:
          - name: "*"
      preFilter:
        enabled:
          - name: Coscheduling
      postFilter:
        enabled:
          - name: Coscheduling
      permit:
        enabled:
          - name: Coscheduling
      reserve:
        enabled:
          - name: Coscheduling
      postBind:
        enabled:
          - name: Coscheduling