Client-go code breakdown (3): Informer mechanism

Posted by Hao Liang's Blog on Wednesday, August 12, 2020

1. Introduction

In Kubernetes, the controller needs to monitor the status of resource objects in the cluster to coordinate the actual status of the resource objects with the desired status defined through yaml. So how does the controller monitor the resource object and make corresponding processing based on the actual status changes of the object? In fact, it is implemented through the Informer mechanism in the Client-go package. Informer workflow architecture diagram Image source: Geek Time – “Kubernetes in a Simple and In-depth manner” From the picture above, we can roughly understand the entire process of the Informer mechanism:

  • The Reflector component initiates an HTTP request to the apiserver, and List obtains all the resource objects in the cluster. It then continuously monitors the etcd changing data through HTTP long connection Watch, encapsulates it into events (Events), and then puts the resource objects corresponding to the events into Delta. In FIFO queue
  • The Delta FIFO queue stores the resource objects that need to be operated and what operations are to be performed on the resource objects (Add, Updated, Deleted, Sync)
  • Indexer is a local cache with an index. It takes out the index of the resource object (consistent with the key of etcd) and the specific data from the Delta FIFO queue and saves it.
  • The resource object taken out from the Delta FIFO queue will also trigger callbacks of three methods: Add, Delete, and Update, which respectively correspond to the add, delete, and modify operations of resource objects in etcd. The callback method will put the index (Index) of the resource object that needs to be processed into the WorkQueue work queue.
  • The Control Loop control loop continuously retrieves the index of the object to be processed from the WorkQueue, obtains the specific resource object data from the local cache maintained by the Indexer through the Lister, and performs corresponding processing.

2. Analysis of the underlying principles of Reflector

Simply put, Reflector does two things: List and Watch

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	...
	if err := func() error {
		...
		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
		...
		w, err := r.listerWatcher.Watch(options)
		...
}

List refers to sending a GET request to the apiserver to obtain the data of all resources in the cluster. Watch monitors the change events of resource objects by establishing a long HTTP connection with the apiserver and transmitting it in chunked encoding (chunked). These requests are sent through the ClientSet client we mentioned earlier. Introduction to chunked encoding transfer

// Provides List and Watch interfaces to allow resource objects to implement specific interfaces
type ListWatch struct {
	ListFunc  ListFunc
	WatchFunc WatchFunc
	// Whether to disable chunked encoding transfers when watching
	DisableChunking bool
}

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

// Taking the Pod resource object as an example, the bottom layer sends a request to the apiserver through the ClientSet client.
// k8s.io/client-go/informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

When Watch monitors an event that a resource object changes in the cluster, it will trigger the watchHandler callback, put the changed resource object into the Delta FIFO, and update the local cache at the same time.

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
		...
		select {
		...
		// When there is a resource object change event, the channel is triggered
		case event, ok := <-w.ResultChan():
			...
			// Update local cache here
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				...
			case watch.Modified:
				err := r.store.Update(event.Object)
				...
			case watch.Deleted:
				err := r.store.Delete(event.Object)
				...
			}
			*resourceVersion = newResourceVersion
			// Update the resource version number here
			r.setLastSyncResourceVersion(newResourceVersion)
		}
	}
	return nil
}

3. Analysis of the underlying principles of Delta FIFO

Delta FIFO data structure:

// k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
	...
	items map[string]Deltas
	queue []string
	...
}

You can see that it is mainly composed of two parts: FIFO queue (slice) and Delta object collection. The Delta object saves the resource object and the object’s operation type (Add, Update, Delete, Sync) The FIFO queue is responsible for storing the data obtained from the data producer Reflector, and then the consumer Controller takes the data from the head of the queue.

Enqueue:

// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	// Calculate the key for the resource object and use it as the index of the indexer local cache object
	id, err := f.KeyOf(obj)
	...
	// Encapsulate the resource object obj and the object's operation type actionType into a complete Delta object
    // Add to FIFO queue
	newDeltas := append(f.items[id], Delta{actionType, obj})
	// Tthe last two consecutive Delta objects with the same operation in the queue are deduplicated.
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		// Broadcast to notify all consumers to unblock (use runtime_notifyListNotifyAll of the runtime package to implement wake-up)
		f.cond.Broadcast()
	}
	...
	return nil
}

Dequeue:

// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	// Locking to ensure thread safety of concurrent operations on the queue
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			...
			// If the FIFO queue is empty, block waiting
			f.cond.Wait()
		}
		// The head object is dequeued
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		// Determine whether the object in the Delta FIFO queue has been deleted, and ignore it if deleted.
		item, ok := f.items[id]
		if !ok {
			continue
		}
		delete(f.items, id)
		// If the object still exists in the Delta FIFO queue, call the process function to trigger the callback method of the object.
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		return item, err
	}
}

Resync mechanism: (a synchronization mechanism that regularly puts data into the FIFO queue again from the Indexer’s local cache) Resync is triggered by the pipeline in Reflector, because when creating Reflector, the NewRelector function passes in the resyncPeriod parameter, which sets the trigger time period of Resync.

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    ...
    go func() {
            resyncCh, cleanup := r.resyncChan()
            defer func() {
                cleanup()
            }()
            for {
                select {
                // Resync pipeline trigger
                case <-resyncCh:
                case <-stopCh:
                    return
                case <-cancelCh:
                    return
                }
                if r.ShouldResync == nil || r.ShouldResync() {
                    klog.V(4).Infof("%s: forcing resync", r.name)
                    // Execute Resync
                    if err := r.store.Resync(); err != nil {
                        resyncerrc <- err
                        return
                    }
                }
                cleanup()
                resyncCh, cleanup = r.resyncChan()
            }
        }()
    ......
}

The logic in Resyn that handles Indexer cache and Delta FIFO queue synchronization:

// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) syncKeyLocked(key string) error {
	// Retrieve objects based on index from Indexer's local storage
	obj, exists, err := f.knownObjects.GetByKey(key)
	...

	// In order to prevent the old version of the object from overwriting the new version of the object that is still in the Delta FIFO queue during Resync
    // Therefore, we need to first check if there are any new objects competing with it in the Delta FIFO queue. If there are any, just ignore it and do not do this.
    //Resync synchronization of objects
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	if len(f.items[id]) > 0 {
		return nil
	}
	// Enter the FIFO queue again, but the actionType is marked as Sync
	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

4. Indexer underlying implementation

Indexer is a local cache used to store Kubernetes resource objects consumed in the Delta FIFO queue. It saves each dequeued resource object through the index to facilitate retrieval when the Controller calls back. Since it is a data structure that is searched through an index, we can easily think of Map as a data structure. That’s right, the bottom layer of Indexer is implemented through a structure called ThreadSafeMap. There is a map field in this structure to store the data of resource objects.

// k8s.io/client-go/tools/cache/thread_safe_store_test.go
type threadSafeMap struct {
	...
	items map[string]interface{}
	// Stored indexer, key is the indexer name, value is the implementation of the indexer
	indexers Indexers
	// Stored cache, key is the cache name, value is the cached object data
	indices Indices
}

// k8s.io/client-go/tools/cache/index.go
// Data structure (K/V) to store cached data
type Index map[string]sets.String

// Stored indexer, key is the indexer name, value is the implementation of the indexer
type Indexers map[string]IndexFunc

// Stored cache, key is the cache name, value is the cached object data
type Indices map[string]Index

How to get index data through Indexer? threadSafeMap has a ByIndex method. Pass in the indexer name (indexName) and the index of the data (indexKey) to filter out the data from the cache:

// k8s.io/client-go/tools/cache/thread_safe_store_test.go
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
	...
	// Get the specific implementation of the indexer from indexers through the indexer name (indexName)
	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}
	// Get the cached object data from indices through the cache name (indexName)
	index := c.indices[indexName]
	// Then get the specific cache object data from index through the data index (indexedValue)
	set := index[indexedValue]
	list := make([]interface{}, 0, set.Len())
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}

Optimization mechanism in Informer

  1. Shared Informer sharing mechanism Resource objects of the same type share an Informer, so after obtaining resource objects of the same type, you only need to make one List and Watch request to the apiserver, which greatly reduces the burden on the apiserver and etcd.
  2. Indexer caching mechanism Indexer maintains a local cache of the Delta FIFO queue. Control Loop can directly obtain resource object data from the cache through the index. There is no need to interact with apiserver, which reduces the burden on apiserver and etcd.
  3. Resync mechanism In order to prevent the processing from failing when processing the Informer event callback, the data in the Delta FIFO is taken away but is not executed normally. The resync mechanism will periodically resynchronize Indexer’s cached data into the Delta FIFO queue and trigger the update callback again. (Edge drive trigger + horizontal drive trigger + timing synchronization)
  4. Only enqueue the index in the WorkQueue instead of putting the entire resource object into the work queue. On the one hand, it saves the consumption of memory space. On the other hand, when Control Loop accesses the local cache through the index later, it can perform real-time verification of the data of the resource object (if it is found that the index cannot find the corresponding object in the Indexer cache, it means The resource object has currently been deleted. Control Loop can obtain the latest status of the object in real time), and there will be no large delay in status changes due to the lag in Control Loop execution time.