【Troubleshooting】Summary of kube-apiserver troubleshooting techniques (analysis of logs and caching principles)

Posted by Hao Liang's Blog on Sunday, May 15, 2022

When troubleshooting apiserver issues, we found nodes that may have performance bottlenecks through monitoring. The next step is to further analyze the apiserver logs on the nodes.

2. APIServer log analysis skills

Trace log

Log printing conditions

When the total request time exceeds the threshold (default 500ms), apiserver will print the trace log, and at each step of the trace, it will calculate a step time-consuming threshold (threshold/len(steps)). When the step time-consuming exceeds the threshold, It will also print.

It can be simply understood that when a step takes more time than the average step, it will be printed. For example:

A request that takes 800ms is divided into 4 steps. Then a single step will only be printed when it exceeds 800/4=200ms (when the log level is above 4, if the total request time exceeds the threshold, each step will be printed. time consuming)

// k8s.io/utils/trace/trace.go

// LogIfLong is used to dump steps that took longer than its share
// Generally, the input parameter is 500 ms, that is, if the entire request exceeds 500 ms, the Trace log will be printed.
func (t *Trace) LogIfLong(threshold time.Duration) {
    if time.Since(t.startTime) >= threshold {
        // if any step took more than it's share of the total allowed time, it deserves a higher log level
        //The timeout threshold for each step is: total threshold (500ms)/number of steps, that is, the average value
    stepThreshold := threshold / time.Duration(len(t.steps)+1)
        t.logWithStepThreshold(stepThreshold)
    }
}

func (t *Trace) logWithStepThreshold(stepThreshold time.Duration) {
    var buffer bytes.Buffer
    tracenum := rand.Int31()
    endTime := time.Now()

    totalTime := endTime.Sub(t.startTime)
    buffer.WriteString(fmt.Sprintf("Trace[%d]: %q ", tracenum, t.name))
    if len(t.fields) > 0 {
        writeFields(&buffer, t.fields)
        buffer.WriteString(" ")
    }
    buffer.WriteString(fmt.Sprintf("(started: %v) (total time: %v):\n", t.startTime, totalTime))
    lastStepTime := t.startTime
    for _, step := range t.steps {
        stepDuration := step.stepTime.Sub(lastStepTime)
        // If a single step exceeds stepThreshold or the log level is greater than 4, print the log of each Step in Trace
        if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
            buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] ", tracenum, step.stepTime.Sub(t.startTime), stepDuration))
            buffer.WriteString(step.msg)
            if len(step.fields) > 0 {
                buffer.WriteString(" ")
                writeFields(&buffer, step.fields)
            }
            buffer.WriteString("\n")
        }
        lastStepTime = step.stepTime
    }
    stepDuration := endTime.Sub(lastStepTime)
    if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
        buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] END\n", tracenum, endTime.Sub(t.startTime), stepDuration))
    }

    klog.Info(buffer.String())
}

The following figure shows the specific format of the trace log:

Notes on log printing

The current trace implementation of kube-apiserver is not perfect yet, that is, the entire process is not completely printed into one trace. Multiple traces may be new in one process, resulting in the inability to filter out the time consuming of all steps through a traceID:

  • For example, the trace when calling webhook is new, and there will be a separate trace log such as call webhook in the log. As a result, the webhook-related traceId in the log cannot be associated with the source request through the same traceId.

  • For example, apiserver also runs a separate trace when it is in the cacher list (reading data directly from the cacher cache, without using the etcd backend), and cannot be associated with the source request through the same traceId.

  • If you encounter this kind of situation, you can determine the scope by checking the context of a few more lines of trace.

The meaning of some key log steps

  • About to check admission control:Will check admission control. The time taken for this step does not include the time for checking admission control.

  • About to write a response:about to start returning packages

  • Transformed response object: Convert the return packet format and write the return packet back to the client. The end of this step means the end of the return packet.

Dynamically adjust the log level:

If you need to locate the cause of exceptions in components such as apiserver, and the log level is low, and you are worried that there will be no scene after reconstruction, you can dynamically adjust the log level through the debug related flags exposed by K8s. The specific usage is as follows:

  • Get token
TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='kube-admin')].data.token}" -n kube-system|base64 --decode)
  • Send debug flags request to modify log level
curl -XPUT -k -H"Authorization: Bearer $TOKEN" https://127.0.0.1:60002/debug/flags/v -d '4'

This method can also be used with kubelet and kcm (the corresponding listening port needs to be replaced)

Audit audit log

Since the community has removed the user agent-related label (i.e. client information) in the apiserver_request_total indicator in higher versions of apiserver, it is impossible to accurately locate the client that initiated the abnormal request through monitoring alone. At this time, if the apiserver of the cluster turns on the audit log, You can use the audit log to locate the client that initiated the abnormal request.

Audit log format:

{
    "kind": "Event",
    "apiVersion": "audit.k8s.io/v1",
    "level": "Request",
    "auditID": "ceffa547-327f-4afe-8d8e-b6e33d4e4e41",
    "stage": "ResponseComplete",
    "requestURI": "/api/v1/namespaces/xxx/configmaps/xxx",
    "verb": "update",
    "user": {
        "username": "user1",
        "groups": [
            "tenant:default"
        ]
    },
    "sourceIPs": [
        "11.11.11.100"
    ],
    // With client fields, it is easy to find the source of the request
    "userAgent": "test-operator/v0.0.0 (linux/amd64) kubernetes/$Format/platform.test_operator",
    "objectRef": {
        "resource": "configmaps",
        "namespace": "xxx",
        "name": "test-operator",
        "uid": "ca529b31-8b62-4163-9974-136246e39024",
        "apiVersion": "v1",
        "resourceVersion": "18892810911"
    },
    "responseStatus": {
        "metadata": {},
        "code": 200
    },
    "requestObject": {
        ...
    }
}

3. Client analysis

About list requests

The apiserver itself comes with a ringbuffer cacher, which caches etcd data in memory. The cache size defaults to 100 (the latest 100 event changes are cached for each type of resource, which can be specified through the NewHeuristicWatchCacheSizes parameter):

// staging/src/k8s.io/apiserver/pkg/server/options/etcd.go

func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
	options := &EtcdOptions{
		StorageConfig:           *backendConfig,
		DefaultStorageMediaType: "application/json",
		DeleteCollectionWorkers: 1,
		EnableGarbageCollection: true,
		EnableWatchCache:        true,
		// default cache of 100 event changes
		DefaultWatchCacheSize:   100,
	}
	options.StorageConfig.CountMetricPollPeriod = time.Minute
	return options
}

watchCache data structure (cache sliding window slice of capacity events, slice index is resourceVersion % capacity):

type watchCache struct {
	
	// Maximum size of history window.
	capacity int

	// cache is used a cyclic buffer - its first element (with the smallest
	// resourceVersion) is defined by startIndex, its last element is defined
	// by endIndex (if cache is full it will be startIndex + capacity).
	// Both startIndex and endIndex can be greater than buffer capacity -
	// you should always apply modulo capacity to get an index in cache array.
	cache      []*watchCacheEvent
	
    ...
}

Each resource type has a corresponding cacheSize, which creates a separate watcher. Each watcher creates a separate ringbuffer cache queue (the default for other resources is 100)

// pkg/registry/cachesize/cachesize.go

// NewHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
// memory.
func NewHeuristicWatchCacheSizes(expectedRAMCapacityMB int) map[schema.GroupResource]int {
	// From our documentation, we officially recommend 120GB machines for
	// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
	// capacity per node.
	clusterSize := expectedRAMCapacityMB / 60

	// We should specify cache size for a given resource only if it
	// is supposed to have non-default value.
	watchCacheSizes := make(map[schema.GroupResource]int)
	watchCacheSizes[schema.GroupResource{Resource: "replicationcontrollers"}] = maxInt(5*clusterSize, 100)
	watchCacheSizes[schema.GroupResource{Resource: "endpoints"}] = maxInt(10*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "endpointslices", Group: "discovery.k8s.io"}] = maxInt(10*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "nodes"}] = maxInt(5*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "pods"}] = maxInt(50*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "services"}] = maxInt(5*clusterSize, 1000)
	// The cacheSize of event is 0, do not go to the cache, directly query etcd
	watchCacheSizes[schema.GroupResource{Resource: "events"}] = 0
	watchCacheSizes[schema.GroupResource{Resource: "events", Group: "events.k8s.io"}] = 0
	watchCacheSizes[schema.GroupResource{Resource: "apiservices", Group: "apiregistration.k8s.io"}] = maxInt(5*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "leases", Group: "coordination.k8s.io"}] = maxInt(5*clusterSize, 1000)
	return watchCacheSizes
}

This means that when the client requests apiserver’s full list of a certain resource, it does not necessarily directly request etcd, but directly returns the data through the apiserver’s cacher.

So when making a list request to apiserver, under what circumstances will the cache be read directly? Under what circumstances will the backend etcd be requested directly without reading the cache?

  • The node is under high load, the update event is generated too quickly, and the apiserver’s cache buffer is too small, causing too old resourceversion and then relist etcd.

  • If the client does not specify ResourceVersion or specifies the paging Continue parameter, it will be directly transparently transmitted to the backend etcd.

    • kubectl get po -l app=nginx does a paging query of limit 500 from apiserver in full, without going through the cache, the backend directly requests etcd, and then does tag filtering in apiserver

    • When using client-go to query, for example: k8sClient.CoreV1().Pods("").List(metav1.ListOptions{}) will not go through the cache, and will be fully listed from etcd. Add ResourceVersion before going cache: k8sClient.CoreV1().Pods("").List(metav1.ListOptions{ResourceVersion: “0”})

  • If the ResourceVersion specified by the client is 0, there is no Continue parameter, and there is a Limit paging parameter, then Limit is ignored and read directly from the cache. If ResourceVersion is not 0 and the Limit paging parameter is specified, it will be transparently transmitted to the backend etcd.

  • In the absence of Limit paging parameter, if ResourceVersion is specified as 0, if the cacher is not ready currently, it will be transparently transmitted to the backend etcd

  • Without the Limit paging parameter, if ResourceVersion is specified and is not 0, the data will be fetched from the cache.

    • If the specified ResourceVersion is larger than the latest ResourceVersion currently cached, it will block for 3 seconds waiting for watchCache to synchronize data. If the data still does not meet the ResourceVersion after 3 seconds, too large resourceverison will be returned.

    • If the specified ResourceVersion is smaller than the latest ResourceVersion currently cached, the data is fetched directly from the cache and the index can be used (the semantics is that the data obtained is at least newer than the specified ResourceVersion)

    • If the specified ResourceVersion is smaller than the oldest ResourceVersion currently cached, it will be read directly from the backend etcd and the too old resourceversion will be returned.

// staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go

// When specified with a watch call, shows changes that occur after that particular version of a resource.
// Defaults to changes from the beginning of history.
// When specified for list:
// - if unset, then the result is returned from remote storage based on quorum-read flag;
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
// - if set to non zero, then the result is at least as fresh as given rv.
ResourceVersion string

code related to list processing logic

// k8s.io/apiserver/pkg/storage/cacher/cacher.go

// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
	pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
	hasContinuation := pagingEnabled && len(pred.Continue) > 0
	// Conditions for enabling limit paging: enable the APIListChunking function, request parameters with limit, and resourceVersion is not 0
	hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
	// Directly request the backend etcd when the following conditions are met:
	if resourceVersion == "" || hasContinuation || hasLimit {
		// If resourceVersion is not specified, serve it from underlying
		// storage (for backward compatibility). If a continuation is
		// requested, serve it from the underlying storage as well.
		// Limits are only sent to storage when resourceVersion is non-zero
		// since the watch cache isn't able to perform continuations, and
		// limits are ignored when resource version is zero.
		return c.storage.List(ctx, key, resourceVersion, pred, listObj)
	}

	// If resourceVersion is specified, serve it from cache.
	// It's guaranteed that the returned value is at least that
	// fresh as the given resourceVersion.
	listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
	if err != nil {
		return err
	}

	// 缓存还没有 ready,直接请求后端 etcd
	if listRV == 0 && !c.ready.check() {
		// If Cacher is not yet initialized and we don't require any specific
		// minimal resource version, simply forward the request to storage.
		return c.storage.List(ctx, key, resourceVersion, pred, listObj)
	}

	trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
	defer trace.LogIfLong(500 * time.Millisecond)

	c.ready.wait()
	trace.Step("Ready")

	// List elements with at least 'listRV' from cache.
	listPtr, err := meta.GetItemsPtr(listObj)
	if err != nil {
		return err
	}
	listVal, err := conversion.EnforcePtr(listPtr)
	if err != nil {
		return err
	}
	if listVal.Kind() != reflect.Slice {
		return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
	}
	filter := filterWithAttrsFunction(key, pred)

	objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
	if err != nil {
		return err
	}
	trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
	if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
		// Resize the slice appropriately, since we already know that none
		// of the elements will be filtered out.
		listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
		trace.Step("Resized result")
	}
	for _, obj := range objs {
		elem, ok := obj.(*storeElement)
		if !ok {
			return fmt.Errorf("non *storeElement returned from storage: %v", obj)
		}
		if filter(elem.Key, elem.Labels, elem.Fields) {
			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
		}
	}
	trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
	if c.versioner != nil {
		if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
			return err
		}
	}
	return nil
}

About informer

The relationship between informer and apiserver:

  • When informer is started, the data corresponding to the resource will be obtained from the apiserver cache list by default. Even if the connection is disconnected, it will still try to request the apiserver cache from the locally stored ResourceVersion. It will only stop when the apiserver returns too old resourceVersion and reports an error. Cache, request the latest data from etcd.

  • When informer starts, ListWatch will be performed from apiserver to trigger the Add operation (if there is no local cache, it is Add, and if it is, it is Update. There is nothing locally at startup, so it is Add)

  • The resync operation is to re-enqueue the local cache data of the indexer periodically (default 30s). It usually triggers the update operation, and it is necessary to determine whether reconciliation is needed in the update.

Reasons why informer is slow in processing events/watch is slow:

  • In most cases, it is due to problems with business implementation, such as some time-consuming operations in eventHandler (eventHandler calls are serial, and once there are time-consuming operations, subsequent events will be blocked, especially when there is resync. , queue blocking will be more serious). Therefore, do not have any blocking operations in eventHandler, especially IO operations (interface calls, etc.).

  • The business itself calls client-go and does not adjust the default QPS (client-go defaults to qps 5, which can easily trigger speed limits)

  • In order to distinguish whether the slow watch is caused by the business itself or the apiserver problem, you can use the kubectl get po -w parameter to determine whether there is a delay in receiving the event. If kubectl is not delayed, it is most likely a problem with the business itself; if kubectl is also slow to receive events, you can check whether the apiserver and etcd loads are abnormal.

About BookMark mechanism

Reference: Watch Bookmark

apiserver introduces a bookmark event mechanism to mark that all changes for a given resourceVersion requested by a client have been sent. When the client initiates a watch request, with the allowWatchBookmarks=true parameter, a bookmark type event will be received when watch reaches the latest resourceVersion:

GET /api/v1/namespaces/test/pods?watch=1&resourceVersion=10245&allowWatchBookmarks=true
---
200 OK
Transfer-Encoding: chunked
Content-Type: application/json

{
  "type": "ADDED",
  "object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "10596", ...}, ...}
}
...
{
  "type": "BOOKMARK",
  "object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "12746"} }
}

The benefits of introducing the bookmark mechanism:

  • Before the introduction of bookmarks: the client only updates the ResourceVersion for certain events of interest that are watched (for example, kubelet only watches the update events of the pod of this node). If the events in the cluster are always not of interest to the client (For example, if the pods of other nodes are constantly updated), the local ResourceVersion of the client will not be updated. At this time, if the watch is disconnected and the local ResourceVersion is used to watch again, the too old resourceversion will be triggered, resulting in a relist.

  • After the introduction of bookmark: the client only processes certain events of interest that are watched, and at the same time can receive the latest ResourceVersion pushed by the bookmark event (events pushed by the server do not undergo special processing). At this time, if the watch is disconnected, you can get the latest ResourceVersion pushed by the last bookmark event to watch again, and the relist will not be triggered.

Handling of bookmark events in the reflector package of client-go

  • No processing is done after receiving the bookmark event, and only the lastSyncResourceVersion of the watcher is updated to the latest ResourceVersion.

  • When relisting is required, start watching from the latest ResourceVersion. There is no need to start the full list from ResourceVersion=0.

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			...
			switch event.Type {
			case watch.Added:
				...
			case watch.Modified:
				...
			case watch.Deleted:
				...
			case watch.Bookmark:
				// 接收到 bookmark 事件不做任何处理,只更新 watcher 的 lastSyncResourceVersion 为最新的 ResourceVersion
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			eventCount++
		}
	}
    ...
	return nil
}

// update the latest resourceversion
func (r *Reflector) setLastSyncResourceVersion(v string) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
}

// When relisting is required, start watching from the latest ResourceVersion. There is no need to start the full list from ResourceVersion=0.
func (r *Reflector) relistResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()

    if r.isLastSyncResourceVersionUnavailable {
        // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
        // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
        // to the latest available ResourceVersion, using a consistent read from etcd.
        return ""
    }
    if r.lastSyncResourceVersion == "" {
        // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
        // be served from the watch cache if it is enabled.
        return "0"
    }
    return r.lastSyncResourceVersion
}

After the component using reflector is restarted, the list request will be transparently transmitted to etcd

The reflector’s request to the apiserver will carry resourceVersion = 0 by default, but it also carries the paging parameter of Limit = 500, which will cause the component to restart and list etcd in full, making it impossible to access the apiserver cache.

// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	// The ResourceVersion of the first list here defaults to 0
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	if err := func() error {
		...
		go func() {
			...
			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
		}
		...
    }
    ...
}
// staging/src/k8s.io/client-go/tools/pager/pager.go

// default is limit 500 paging parameter
const defaultPageSize = 500

func New(fn ListPageFunc) *ListPager {
	return &ListPager{
		PageSize:          defaultPageSize,
		PageFn:            fn,
		FullListIfExpired: true,
		PageBufferSize:    defaultPageBufferSize,
	}
}

4. Relevant optimizations from the community

NodeName to indexer index construction of pod

Before version 1.20, when using kubelet or kubectl describe node to obtain the list of pods bound to a node, you need to do a full list of all pods in the cluster. Then use the fieldSelector filtering logic in apiserver to filter out the pods of the node with the .spec.nodeName of the pod to form a list of pods bound to the node:

GET https://kube-apiserver/api/v1/pods?fieldSelector=spec.nodeName=node1,status.phase!=Failed,status.phase!=Succeeded

When the cluster scale reaches 50,000+ pods and 5,000+ nodes, the long watch links maintained by kubelet and apiserver need to be reconnected every time the apiserver is restarted, resulting in a large number of full list pods, which can easily cause high load on the apiserver. In fact, kubelet only needs to get the pod list bound to its own node, and does not need to care about other pods.

Therefore, in version 1.20, the local indexer cache of apiserver has been optimized:

By creating the nodeName index, the mapping relationship between nodeName and pod is constructed, which means that the corresponding pod list can be found directly through the nodeName index.

Related code:

// pkg/registry/core/pod/strategy.go

// Indexers returns the indexers for pod storage.
func Indexers() *cache.Indexers {
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.SelectorIndex) {
		return &cache.Indexers{
			storage.FieldIndex("spec.nodeName"): NodeNameIndexFunc,
		}
	}
	return nil
}

Send watch progress event to etcd to proactively synchronize the latest revision to achieve consistent reading of apiserver cache

reference

When full list data is loaded from apiserver, when resourceVersion="", data can still be read from the cache. Before returning data:

  1. Check whether the resourceVersion cached by the reflector is equal to the latest revision of etcd (send a get key request to etcd to get the latest revision)
  2. If the cached resourceVersion < the latest revision of etcd, it means that the cache is not synchronized in time and actively sends a progress event to etcd.
  3. When etcd receives the progress event, it will actively synchronize the latest revision information to the watcher.
  4. The apiserver’s watcher receives the latest revision information and synchronizes the latest reflector’s cached resourceVersion.
  5. Return the full list data from the cache