Client-go code breakdown (2): Resync mechanism analysis in Informer

Posted by Hao Liang's Blog on Sunday, August 9, 2020

1. Informer workflow diagram in Client-go

Insert image description here

The Reflector in Informer obtains the change events (events) of all resource objects in the cluster from the apiserver through List/watch, puts them into the Delta FIFO queue (saved in the form of Key and Value), and triggers onAdd, onUpdate, and onDelete callbacks. Put the Key into the WorkQueue. At the same time, the Key is updated in the Indexer local cache. Control Loop obtains the Key from the WorkQueue, obtains the Value of the Key from the Indexer, and performs corresponding processing.

2. Introduction of Resync mechanism

When we use SharedInformerFactory to create SharedInformer, we need to fill in a ResyncDuration parameter.

// k8s.io/client-go/informers/factory.go
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

This parameter refers to how often data is synchronized from the Indexer cache to the Delta FIFO queue, and the process is repeated.

// k8s.io/client-go/tools/cache/delta_fifo.go
// Resynchronize the Indexer cache data into the Delta FIFO queue
func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}
	// Traverse the keys in indexer and pass them into syncKeyLocked for processing
	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}
	// If it is found that there is already an event with the same key in the FIFO queue, it means that the resource object has a new event.
	// The old cache in Indexer should be invalid, so nil is returned directly without Resync processing.
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	if len(f.items[id]) > 0 {
		return nil
	}
    // Put back into FIFO queue
	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

Why is the Resync mechanism needed? Because there may be processing failures when processing SharedInformer event callbacks, scheduled Resync gives these failed events a chance to be reprocessed. So what is the difference between events that are put back into the Delta FIFO queue through Resync and events that are directly obtained from watch in apiserver?

// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		// Determine the event type to see whether the event is generated by adding, updating, replacing, deleting or Resync resynchronization
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				
				isSync := false
				switch {
				case d.Type == Sync:
					// If the event is resynchronized through Resync, mark it
					isSync = true
				case d.Type == Replaced:
					...
				}
				// If the event is resynchronized through Resync, the onUpdate callback is triggered.
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

As can be seen from the Delta FIFO queue processing source code above, if the event is resynchronized from Resync to the Delta FIFO queue, it will be distributed to the callback that triggers onUpdate in updateNotification.

3. Summary

The introduction of the Resync mechanism regularly resynchronizes Indexer cache events to the Delta FIFO queue, allowing failed events to be reprocessed when processing SharedInformer event callbacks. And by judging whether there is an updated version of the event in the FIFO queue before joining the queue, we can decide whether to discard the Indexer cache and not join the Resync queue. When processing Resync event data in the Delta FIFO queue, trigger the onUpdate callback to allow the event to be reprocessed.