Client-go code breakdown (4): Work Queue

Posted by Hao Liang's Blog on Friday, August 21, 2020

1. Introduction to WorkQueue

In Informer, the Delta FIFO queue triggers Add, Update, and Delete callbacks. In the callback method, the key of the resource object change event that needs to be processed is put into the WorkQueue work queue. Wait for the Control Loop to be retrieved from the work queue, and then retrieve the complete resource object from the Indexer local cache through Lister for processing. Informer workflow architecture diagram

Image source: Geek Time – “Kubernetes in a Simple and In-depth manner” The main function of WorkQueue is marking and deduplication, and supports the following features:

  • Ordered: Process elements (items) in the order they are added
  • Deduplication: The same element will not be processed repeatedly at the same time. For example, if an element is added multiple times before processing, it will only be processed once.
  • Concurrency: multiple producers and multiple consumers
  • Marking mechanism: supports marking function to mark whether an element has been processed, and also allows elements to be requeued during processing.
  • Notification mechanism: The ShutDown method notifies the queue through a semaphore that it will no longer receive new elements, and notifies the metric goroutine to exit.
  • Delay: Supports delay queue, which delays elements for a period of time before storing them in the queue.
  • Rate limit: Supports rate-limit queue, and the rate is limited when elements are stored in the queue. Limit the number of times an element is requeued.
  • Metric: supports metric monitoring indicators and can be used for Prometheus monitoring.

2. Complete use case of WorkQueue combined with Informer

Kubernetes officially gives an example of using the Informer mechanism to implement a controller:

// staging/src/k8s.io/sample-controller/controller.go
// 1. Define the Controller structure
type Controller struct {
	// The official Clientset client obtains the built-in resource object
	kubeclientset kubernetes.Interface
	// Custom Clientset client obtains custom resource objects
	sampleclientset clientset.Interface

	deploymentsLister appslisters.DeploymentLister
	deploymentsSynced cache.InformerSynced
	foosLister        listers.FooLister
	foosSynced        cache.InformerSynced
	// WorkQueue implemented using current-limit queue
	workqueue workqueue.RateLimitingInterface
	recorder record.EventRecorder
}
// 2. Initialize the controller object
func NewController(
	kubeclientset kubernetes.Interface,
	sampleclientset clientset.Interface,
	deploymentInformer appsinformers.DeploymentInformer,
	fooInformer informers.FooInformer) *Controller {
	...

	controller := &Controller{
		kubeclientset:     kubeclientset,
		sampleclientset:   sampleclientset,
		deploymentsLister: deploymentInformer.Lister(),
		deploymentsSynced: deploymentInformer.Informer().HasSynced,
		foosLister:        fooInformer.Lister(),
		foosSynced:        fooInformer.Informer().HasSynced,
		workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
		recorder:          recorder,
	}

	// Initialize Informer for custom resource Foo and set event callback function
	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		// If the Add event of the Foo object is obtained from the FIFO queue, it is put into the workQueue
		AddFunc: controller.enqueueFoo,
		// If the Update event of the Foo object is fetched from the FIFO queue, the new object is put into the workQueue
		UpdateFunc: func(old, new interface{}) {
			controller.enqueueFoo(new)
		},
	})
	...
	return controller
}

// 3. WorkQueue’s enqueue logic, put the object’s key into the WorkQueue
func (c *Controller) enqueueFoo(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}
// 4. Turn on Control Loop logic
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	...
	// Start goroutine running Control Loop
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
	...
	return nil
}
// 5. Continuously call processNextWorkItem in a loop to retrieve objects from the workQueu queue for processing
func (c *Controller) runWorker() {
	for c.processNextWorkItem() {
	}
}
func (c *Controller) processNextWorkItem() bool {
	// Get the element at the head of the queue in woekQueu queue
	obj, shutdown := c.workqueue.Get()
	...
	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		if key, ok = obj.(string); !ok {
			// If an exception occurs, the object will be dequeued, otherwise there will always be abnormal elements in the queue.
			c.workqueue.Forget(obj)
			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Call syncHandler to implement the controller’s real processing logic
		if err := c.syncHandler(key); err != nil {
			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
		}
		// Push the element out of the queue after processing is completed
		c.workqueue.Forget(obj)
		glog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)
	...
	return true
}
// 6. The core processing logic of the controller
func (c *Controller) syncHandler(key string) error {
	// Separate the namespace and name of the resource object from key
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	...
	// Get the locally cached complete object from Indexer through Lister
	foo, err := c.foosLister.Foos(namespace).Get(name)
	...
	// The following is to write custom control logic for the obtained objects.
	...
	...
	return nil
}

WorkQueue supports 3 types of queues:

  • Interface: FIFO queue interface, first-in-first-out queue, and supports deduplication mechanism.
  • DelayingInterface: Delay queue interface, based on Interface interface encapsulation, delays elements for a period of time before storing them in the queue.
  • RateLimitingInterface: Rate limiting queue interface, encapsulated based on the DelayingInterface interface, supports rate limiting when elements are stored in the queue.

The differences between the three queues are introduced below.

3. FIFO queue

FIFO is the most basic first-in-first-out queue and is very simple to use:

//Initialize FIFO queue
workqueue:= workqueue.NewNamed("Foos")
//Enqueue
workqueu.Add(xxx)
// The head of the queue takes out the element and puts it into the processing collection (indicating that the element is being processed)
workqueu.Get(xxx)
// Complete the processing of an element of the processing collection (indicating that the element is processed)
workqueu.Done(xxx)

The queue implements the Interface interface:

type Interface interface {
	//Enqueue
	Add(item interface{})
	// Get the queue length
	Len() int
	// Get the element at the head of the queue
	Get() (item interface{}, shutdown bool)
	// Mark the queue head element as having been processed
	Done(item interface{})
	//Close the queue
	ShutDown()
	// Check whether the queue is closed
	ShuttingDown() bool
}

FIFO queue data structure:

type Type struct {
	//Actually store the slice of elements (to ensure that the elements are in order)
	queue[]t
	// Used for deduplication, ensuring that the same element will only be processed once after being added multiple times (underlying HashMap)
	dirty set
	// Used to mark whether an element is being processed (underlying HashMap)
	processing set
	cond *sync.Cond
	shuttingDown bool
	metrics queueMetrics
}

Elements with keys 1, 2, and 3 are added to the queue. When the Get method is called, 1 is taken out from the queue head and put into the processing collection to indicate that the element is being processed. After processing 1, the Done method is called to mark that the element has been processed. The processing field is removed. FIFO stored procedure

Elements with keys 1, 2, and 3 are added to the queue. When the Get method is called, 1 is taken out from the head of the queue and put into the processing collection to indicate that the element is being processed. At this time, another element No. 1 is added to the queue through the Add method. At this time There is already element No. 1 in processing, so the repeated element No. 1 will not enter the queue, but will go to the dirty collection. After the old 1 is processed, call the Done method to mark that the element has been processed, and the processing field is deleted. Then put the new 1 into the queue. FIFO concurrent stored procedure

4. Delay Queue

Based on the FIFO queue interface, the AddAfter method is added. Calling this method and passing in the time delay parameter can implement delayed entry into the queue.

// k8s.io/client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
	Interface
	// Insert the item element into the queue after the specified duration time
	AddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {
	Interface
	clock clock.Clock
	stopCh chan struct{}
	heartbeat clock.Ticker

	// Cache channel with initial size of 1000, blocking beyond 1000
	waitingForAddCh chan *waitFor
	metrics retryMetrics
}

Principle of operation of delay queue

When entering the queue, the elements are first put into the waitingForAddCh cache pipeline, and the consumer waitingLoop coroutine continuously calls the AfterNow method to retrieve elements from the pipeline. If the delay time has not expired, it will be placed in the priority queue on the left. If the time has expired, it will be placed in the FIFO queue. At the same time, the ordered queue is traversed, and the elements up to the time are taken out and put into the FIFO queue. **Why is waitingForAddCh pipe needed? ** Because waitingLoop is a coroutine, communication between coroutines needs to pass through pipes.

5. Speed limit queue

The interface based on the delay queue DelayingInterface adds the AddRateLimited, Forget, and NumRequeues methods. At the same time, it also implements the interface of four current limiting algorithms (RateLimiter), using the characteristics of the delay queue to delay the entry time of elements to achieve current limiting.

// k8s.io/client-go/util/workqueue/rate_limitting_queue.go
type RateLimitingInterface interface {
	// Interface based on delay queue DelayingInterface
	DelayingInterface
	// Enqueue method. When RateLimiter's current limiting algorithm agrees that a new element is enqueued, the element will be enqueued.
	AddRateLimited(item interface{})
	// Specify to release an element from RateLimiter (clear the queue count)
	Forget(item interface{})
	// Get the number of times an element has been queued (the number of times the same element has been queued multiple times)
	NumRequeues(item interface{}) int
}
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
type RateLimiter interface {
	// Gets the time the specified element should wait
	When(item interface{}) time.Duration
	// Specify to release an element from RateLimiter (clear the number of queues)
	Forget(item interface{})
	// Get the number of times an element has been queued (the number of times the same element has been queued multiple times)
	NumRequeues(item interface{}) int
}

Token Bucket Algorithm

Token Bucket Algorithm

Before the element is added to the queue, the token must be obtained from the bucket. The speed at which tokens are placed in the bucket is limited. If an element consumes tokens too quickly, the tokens in the bucket will be exhausted first. At this time, it is necessary to wait for the issuance of tokens before joining the team, thus achieving current limiting.

// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// Initialize token bucket RateLimiter
// Indicates that 10 tokens are put into the bucket every second (10 qps), and the bucket size is 100
BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
	return r.Limiter.Reserve().Delay()
}

// golang.org\x\time\rate\rate.go
// Calculate the time after delay
func (lim *Limiter) Reserve() *Reservation {
	return lim.ReserveN(time.Now(), 1)
}
// Calculate the time difference between the delay time and the current time
func (r *Reservation) Delay() time.Duration {
	return r.DelayFrom(time.Now())
}
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
	if !r.ok {
		return InfDuration
	}
	// Calculate time difference
	delay := r.timeToAct.Sub(now)
	if delay < 0 {
		return 0
	}
	return delay
}

Queuing index algorithm

The queuing index refers to the number of times the same element is queued. Each time the AddRateLimited function is called to queue, the element is queued once. The delay time of inserting the same element into the queue multiple times will increase exponentially, which limits the number of times the same element can be added to the queue.

// k8s.io/client-go/util/workqueue/default_rate_limiters.go
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	...
	// Each time AddRateLimited is called, the number of queues for the element is increased by one.
	exp := r.failures[item]
	r.failures[item] = r.failures[item] + 1

	// Each time the queuing delay increases exponentially, the longest will not exceed maxDelay
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}

	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {
		return r.maxDelay
	}

	return calculated
}

Counter algorithm

Limit the number of elements allowed through a period of time. Each time an element is enqueued, the counter is incremented by one. When the counter reaches the threshold, the element’s enqueuing speed is slowed down (the enqueuing delay is increased). After the rate limit time window has passed, the counter is cleared and new elements continue to be queued.

// k8s.io/client-go/util/workqueue/default_rate_limiters.go
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
	...
	// Enqueuing counter for the same element
	r.failures[item] = r.failures[item] + 1
	// If it is less than the threshold, join the queue quickly (small waiting time)
	if r.failures[item] <= r.maxFastAttempts {
		return r.fastDelay
	}
	// If it is greater than the threshold, slow down and join the queue (long waiting time)
	return r.slowDelay
}

Blending Mode

Mixed use of multiple current limiting algorithms

//Initialize the token bucket algorithm and queuing index algorithm
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		// Indicates that the queue entry delay range of the queuing index algorithm is 5ms to 1000s
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// Indicates that 10 tokens are put into the bucket every second (10 qps), and the bucket size is 100
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}