【APIServer】APIServer 问题排查技巧总结(日志、缓存原理分析)

Posted by Hao Liang's Blog on Sunday, May 15, 2022

1、相关背景

在排查 apiserver 问题时,我们通过监控找到了可能存在性能瓶颈的节点,下一步就是在节点上通过 apiserver 日志进一步分析。

2、APIServer 日志分析技巧

Trace 日志

日志打印条件

apiserver 在请求总耗时超过阈值时(默认500ms),会打印 trace 日志,同时在 trace 的每一个 step 会计算一个 step 耗时的阈值(threshold/len(steps)),在 step 耗时超过阈值时也会打印。

可以简单理解为步骤耗时超过平均 step 耗时的时候会打印,举个例子:

一个耗时800ms的请求,一共分4步,那么单个步骤只有在超过 800/4=200ms 时才会打印(日志等级在4以上时,如果请求总耗时超过阈值,则会打印每个步骤耗时)

// k8s.io/utils/trace/trace.go

// LogIfLong is used to dump steps that took longer than its share
// 一般情况下入参为 500 ms,即整个请求超过 500 ms 就会打印 Trace 日志
func (t *Trace) LogIfLong(threshold time.Duration) {
    if time.Since(t.startTime) >= threshold {
        // if any step took more than it's share of the total allowed time, it deserves a higher log level
        // 每个 Step 的超时阈值为:总阈值(500ms)/ Step 数量,即平均值
    	stepThreshold := threshold / time.Duration(len(t.steps)+1)
        t.logWithStepThreshold(stepThreshold)
    }
}

func (t *Trace) logWithStepThreshold(stepThreshold time.Duration) {
    var buffer bytes.Buffer
    tracenum := rand.Int31()
    endTime := time.Now()

    totalTime := endTime.Sub(t.startTime)
    buffer.WriteString(fmt.Sprintf("Trace[%d]: %q ", tracenum, t.name))
    if len(t.fields) > 0 {
        writeFields(&buffer, t.fields)
        buffer.WriteString(" ")
    }
    buffer.WriteString(fmt.Sprintf("(started: %v) (total time: %v):\n", t.startTime, totalTime))
    lastStepTime := t.startTime
    for _, step := range t.steps {
        stepDuration := step.stepTime.Sub(lastStepTime)
        // 如果单步超过 stepThreshold 或日志级别大于 4 则打印 Trace 中每个 Step 的日志
        if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
            buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] ", tracenum, step.stepTime.Sub(t.startTime), stepDuration))
            buffer.WriteString(step.msg)
            if len(step.fields) > 0 {
                buffer.WriteString(" ")
                writeFields(&buffer, step.fields)
            }
            buffer.WriteString("\n")
        }
        lastStepTime = step.stepTime
    }
    stepDuration := endTime.Sub(lastStepTime)
    if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
        buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] END\n", tracenum, endTime.Sub(t.startTime), stepDuration))
    }

    klog.Info(buffer.String())
}

下图为trace日志的具体格式:

日志打印注意事项

当前 kube-apiserver 的 trace 实现还不太完善,即没有完全将整个流程打印到一个 trace 里,一个流程里可能 new 了多个 trace,导致通过一个 traceID 无法过滤出所有步骤的耗时:

  • 比如调用 webhook 时的 trace 就是新 new 的,日志中会有 call webhook 之类的单独 trace 日志,导致日志中的 webhook 相关 traceId 无法和源请求通过相同的 traceId 关联

  • 比如 apiserver 在 cacher list (直接从 cacher 缓存读取数据,不走 etcd 后端)时也是走的单独的 trace,无法和源请求通过相同的 traceId 关联

  • 遇到这类情况可以通过多查几行trace的上下文来确定范围。

一些日志关键步骤含义

  • About to check admission control:将要检查 admission control,此步骤耗时不包含检查 admission contorl 的时间

  • About to write a response:将要开始回包

  • Transformed response object:回包格式转换,同时将回包回写客户端,该步骤结束代表回包结束。

动态调整日志级别:

如需定位apiserver等组件异常原因,同时日志级别较低,担心重建之后没有现场的情况下,可通过 K8s 暴露的 debug 相关 flag 来动态调整日志级别,具体使用方式如下:

  • 获取token
TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='kube-admin')].data.token}" -n kube-system|base64 --decode)
  • 发送 debug flags 请求修改日志级别
curl -XPUT -k -H"Authorization: Bearer $TOKEN" https://127.0.0.1:60002/debug/flags/v -d '4'

kubelet 和 kcm 也可以用该方式(需替换对应的监听端口)

Audit 审计日志

由于高版本 apiserver 中社区移除了 apiserver_request_total 指标中的 user agent 相关label(即client信息),导致只通过监控无法准确定位到异常请求发起的客户端,此时如果集群的 apiserver 开启了 audit 审计日志,则可以通过审计日志来定位异常请求发起的客户端。

审计日志格式:

{
    "kind": "Event",
    "apiVersion": "audit.k8s.io/v1",
    "level": "Request",
    "auditID": "ceffa547-327f-4afe-8d8e-b6e33d4e4e41",
    "stage": "ResponseComplete",
    "requestURI": "/api/v1/namespaces/xxx/configmaps/xxx",
    "verb": "update",
    "user": {
        "username": "user1",
        "groups": [
            "tenant:default"
        ]
    },
    "sourceIPs": [
        "11.11.11.100"
    ],
    // 带有客户端字段,方便找到请求来源
    "userAgent": "test-operator/v0.0.0 (linux/amd64) kubernetes/$Format/platform.test_operator",
    "objectRef": {
        "resource": "configmaps",
        "namespace": "xxx",
        "name": "test-operator",
        "uid": "ca529b31-8b62-4163-9974-136246e39024",
        "apiVersion": "v1",
        "resourceVersion": "18892810911"
    },
    "responseStatus": {
        "metadata": {},
        "code": 200
    },
    "requestObject": {
        ...
    }
}

3、客户端分析

关于 list 请求

apiserver 本身自带了 ringbuffer cacher,将 etcd 的数据在内存中做缓存,缓存大小默认为 100(为每种类型的资源缓存最近 100 个事件变更,可通过 NewHeuristicWatchCacheSizes 参数指定):

// staging/src/k8s.io/apiserver/pkg/server/options/etcd.go

func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
	options := &EtcdOptions{
		StorageConfig:           *backendConfig,
		DefaultStorageMediaType: "application/json",
		DeleteCollectionWorkers: 1,
		EnableGarbageCollection: true,
		EnableWatchCache:        true,
		// 默认缓存 100 个事件变更
		DefaultWatchCacheSize:   100,
	}
	options.StorageConfig.CountMetricPollPeriod = time.Minute
	return options
}

watchCache 数据结构(缓存 capacity 个事件的滑动窗口切片,切片下标为 resourceVersion % capacity):

type watchCache struct {
	
	// Maximum size of history window.
	capacity int

	// cache is used a cyclic buffer - its first element (with the smallest
	// resourceVersion) is defined by startIndex, its last element is defined
	// by endIndex (if cache is full it will be startIndex + capacity).
	// Both startIndex and endIndex can be greater than buffer capacity -
	// you should always apply modulo capacity to get an index in cache array.
	cache      []*watchCacheEvent
	
    ...
}

每种资源类型都有对应的 cacheSize,都会创建单独 watcher,每个 watcher 创建单独的 ringbuffer 缓存队列(其它资源默认为 100)

// pkg/registry/cachesize/cachesize.go

// NewHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
// memory.
func NewHeuristicWatchCacheSizes(expectedRAMCapacityMB int) map[schema.GroupResource]int {
	// From our documentation, we officially recommend 120GB machines for
	// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
	// capacity per node.
	clusterSize := expectedRAMCapacityMB / 60

	// We should specify cache size for a given resource only if it
	// is supposed to have non-default value.
	watchCacheSizes := make(map[schema.GroupResource]int)
	watchCacheSizes[schema.GroupResource{Resource: "replicationcontrollers"}] = maxInt(5*clusterSize, 100)
	watchCacheSizes[schema.GroupResource{Resource: "endpoints"}] = maxInt(10*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "endpointslices", Group: "discovery.k8s.io"}] = maxInt(10*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "nodes"}] = maxInt(5*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "pods"}] = maxInt(50*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "services"}] = maxInt(5*clusterSize, 1000)
	// event 的 cacheSize 为0,不走缓存,直接查 etcd
	watchCacheSizes[schema.GroupResource{Resource: "events"}] = 0
	watchCacheSizes[schema.GroupResource{Resource: "events", Group: "events.k8s.io"}] = 0
	watchCacheSizes[schema.GroupResource{Resource: "apiservices", Group: "apiregistration.k8s.io"}] = maxInt(5*clusterSize, 1000)
	watchCacheSizes[schema.GroupResource{Resource: "leases", Group: "coordination.k8s.io"}] = maxInt(5*clusterSize, 1000)
	return watchCacheSizes
}

这也就意味着当客户端请求 apiserver 全量 list 某种资源时,并不一定会直接请求 etcd,而是直接通过 apiserver 的 cacher 将数据返回了。

那么向 apiserver 发起 list 请求时,什么情况下会直接读缓存?什么情况下会直接请求后端 etcd,不读缓存呢?

  • 节点高负载、update 事件产生太快、apiserver 的 cache buffer 过小,引发 too old resourceversion 后 relist etcd。

  • 客户端不指定 ResourceVersion 或者指定了分页的 Continue 参数,则直接透传给后端 etcd

    • kubectl get po -l app=nginx 全量从 apiserver 做 limit 500 的分页查询,不走 cache,后端直接请求 etcd,在 apiserver 中再做标签过滤

    • 使用client-go查询时,例如:k8sClient.CoreV1().Pods("").List(metav1.ListOptions{}) 是不走 cache 的,会全量从 etcd 进行 List。加上 ResourceVersion 后才会走 cache: k8sClient.CoreV1().Pods("").List(metav1.ListOptions{ResourceVersion: “0”})

  • 客户端指定的 ResourceVersion 为 0 、没有 Continue 参数、有 Limit 分页参数,则忽略 Limit,直接从缓存读取。如果 ResourceVersion 不为0、指定了 Limit 分页参数,则透传给后端 etcd。

  • 没有 Limit 分页参数的情况下,如果指定了 ResourceVersion 为 0 的情况下,如果 cacher 当前没 ready,则透传给后端 etcd

  • 没有 Limit 分页参数的情况下,如果指定了 ResourceVersion 且不为0,则从缓存中取数据。

    • 如果指定的 ResourceVersion 比当前缓存最新的 ResourceVersion 要大,则会阻塞3s等待 watchCache 同步数据,如果 3s 后数据仍未满足 ResourceVersion,则返回too large resourceverison。

    • 如果指定的 ResourceVersion 比当前缓存最新的 ResourceVersion 要小,则直接从缓存中取数据,并且可以利用索引(语义为获取到的数据至少比指定的ResourceVersion要新)

    • 如果指定的 ResourceVersion 比当前缓存最老的 ResourceVersion 要小,则直接从后端 etcd 读取,返回 too old resourceversion。

// staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go

// When specified with a watch call, shows changes that occur after that particular version of a resource.
// Defaults to changes from the beginning of history.
// When specified for list:
// - if unset, then the result is returned from remote storage based on quorum-read flag;
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
// - if set to non zero, then the result is at least as fresh as given rv.
ResourceVersion string

list 处理逻辑相关代码:

// k8s.io/apiserver/pkg/storage/cacher/cacher.go

// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
	pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
	hasContinuation := pagingEnabled && len(pred.Continue) > 0
	// 开启 limit 分页条件:开启 APIListChunking功能、请求参数带 limit、resourceVersion 不为 0
	hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
	// 满足以下条件直接请求后端 etcd
	if resourceVersion == "" || hasContinuation || hasLimit {
		// If resourceVersion is not specified, serve it from underlying
		// storage (for backward compatibility). If a continuation is
		// requested, serve it from the underlying storage as well.
		// Limits are only sent to storage when resourceVersion is non-zero
		// since the watch cache isn't able to perform continuations, and
		// limits are ignored when resource version is zero.
		return c.storage.List(ctx, key, resourceVersion, pred, listObj)
	}

	// If resourceVersion is specified, serve it from cache.
	// It's guaranteed that the returned value is at least that
	// fresh as the given resourceVersion.
	listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
	if err != nil {
		return err
	}

	// 缓存还没有 ready,直接请求后端 etcd
	if listRV == 0 && !c.ready.check() {
		// If Cacher is not yet initialized and we don't require any specific
		// minimal resource version, simply forward the request to storage.
		return c.storage.List(ctx, key, resourceVersion, pred, listObj)
	}

	trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
	defer trace.LogIfLong(500 * time.Millisecond)

	c.ready.wait()
	trace.Step("Ready")

	// List elements with at least 'listRV' from cache.
	listPtr, err := meta.GetItemsPtr(listObj)
	if err != nil {
		return err
	}
	listVal, err := conversion.EnforcePtr(listPtr)
	if err != nil {
		return err
	}
	if listVal.Kind() != reflect.Slice {
		return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
	}
	filter := filterWithAttrsFunction(key, pred)

	objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
	if err != nil {
		return err
	}
	trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
	if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
		// Resize the slice appropriately, since we already know that none
		// of the elements will be filtered out.
		listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
		trace.Step("Resized result")
	}
	for _, obj := range objs {
		elem, ok := obj.(*storeElement)
		if !ok {
			return fmt.Errorf("non *storeElement returned from storage: %v", obj)
		}
		if filter(elem.Key, elem.Labels, elem.Fields) {
			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
		}
	}
	trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
	if c.versioner != nil {
		if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
			return err
		}
	}
	return nil
}

关于 informer

informer 和 apiserver 的关系:

  • informer 启动时默认会从 apiserver cache list 对应资源的数据,即使连接断开的情况下,仍然会尝试从本地存储的 ResourceVersion 来请求 apiserver cache,只有在 apiserver 返回 too old resourceVersion 报错时,才会不走缓存,从 etcd 请求最新数据。

  • informer 启动的时候,会从 apiserver 进行 ListWatch,触发 Add 操作(本地 cache 没有就是Add,有就是 Update,启动时本地什么都没有,因此是Add)

  • resync 操作是定期(默认30s)将 indexer 的本地缓存数据重新入队,一般会触发 update 操作,需要在 update 中判断是否需要 reconcile。

informer处理事件慢/watch慢的原因:

  • 大多数情况是因为业务实现有问题,比如在 eventHandler 中作了一些耗时操作(eventHandler的调用是串行的,一旦有耗时操作则会阻塞后边的事件,尤其是在有resync的情况下,队列阻塞会更为严重)。因此不要在eventHandler中有任何阻塞操作,尤其是IO操作(接口调用等)。

  • 业务自己有调用client-go,没有调整默认QPS(client-go默认qps 5,很容易触发限速)

  • 为了区分是业务自身问题还是 apiserver 问题导致的 watch 慢,可通过kubectl get po -w 参数来确定收到事件的时间是否有延迟。如果kubectl没延迟,大概率是业务自身问题;如果kubectl收到事件也慢,则可以排查 apiserver 和 etcd 负载是否异常

关于 BookMark 机制

参考:监视书签

apiserver 引入了 bookmark 事件机制,用于标记客户端请求的给定 resourceVersion 的所有更改都已发送。 当客户端发起 watch 请求时,带上 allowWatchBookmarks=true 参数,即可在 watch 到当前最新的 resourceVersion 时收到一个 bookmark 类型的事件:

GET /api/v1/namespaces/test/pods?watch=1&resourceVersion=10245&allowWatchBookmarks=true
---
200 OK
Transfer-Encoding: chunked
Content-Type: application/json

{
  "type": "ADDED",
  "object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "10596", ...}, ...}
}
...
{
  "type": "BOOKMARK",
  "object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "12746"} }
}

引入 bookmark 机制的好处:

  • 引入 bookmark 前:客户端只对 watch 到的某些感兴趣的事件做更新 ResourceVersion 的操作(比如 kubelet 只 watch 本节点的 pod 的更新事件),如果集群里的事件一直都是客户端不感兴趣的(比如其它节点的 pod 不断更新),就会导致客户端本地的 ResourceVersion 一直不更新。 这时如果 watch 断连了,拿本地的 ResourceVersion 重新 watch 会触发 too old resourceversion,导致 relist。

  • 引入 bookmark 后:客户端只对 watch 到的某些感兴趣的事件做处理,同时能收到 bookmark 事件推送的最新 ResourceVersion(服务端推送的事件,不做特殊处理)。 这时如果 watch 断连了,就能拿到上次 bookmark 事件推送的最新 ResourceVersion 来重新 watch,不会触发 relist。

client-go 的 reflector 包中对 bookmark 事件的处理

  • 接收到 bookmark 事件不做任何处理,只更新 watcher 的 lastSyncResourceVersion 为最新的 ResourceVersion

  • 当需要 relist 时从最新的 ResourceVersion 开始 watch,不需要再从 ResourceVersion=0 开始全量 list

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			...
			switch event.Type {
			case watch.Added:
				...
			case watch.Modified:
				...
			case watch.Deleted:
				...
			case watch.Bookmark:
				// 接收到 bookmark 事件不做任何处理,只更新 watcher 的 lastSyncResourceVersion 为最新的 ResourceVersion
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			eventCount++
		}
	}
    ...
	return nil
}

// 更新最新的 ResourceVersion
func (r *Reflector) setLastSyncResourceVersion(v string) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
}

// 当需要 relist 时从最新的 ResourceVersion 开始 watch,不需要再从 ResourceVersion=0 开始全量 list
func (r *Reflector) relistResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()

    if r.isLastSyncResourceVersionUnavailable {
        // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
        // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
        // to the latest available ResourceVersion, using a consistent read from etcd.
        return ""
    }
    if r.lastSyncResourceVersion == "" {
        // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
        // be served from the watch cache if it is enabled.
        return "0"
    }
    return r.lastSyncResourceVersion
}

关于使用 reflector 的组件重启后 list 请求会透传到 etcd

reflector 请求 apiserver 默认会带 resourceVersion = 0,但同时也带了 Limit = 500 的分页参数,导致组件重启会全量 list etcd,无法走 apiserver 缓存。

// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	// 这里第一次 list 的 ResourceVersion 默认为0
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	if err := func() error {
		...
		go func() {
			...
			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
		}
		...
    }
    ...
}
// staging/src/k8s.io/client-go/tools/pager/pager.go

// 默认带 limit = 500 分页参数
const defaultPageSize = 500

func New(fn ListPageFunc) *ListPager {
	return &ListPager{
		PageSize:          defaultPageSize,
		PageFn:            fn,
		FullListIfExpired: true,
		PageBufferSize:    defaultPageBufferSize,
	}
}

4、社区相关优化

nodeName 到 pod 的 indexer 索引构建

在 1.20 版本之前,kubelet 或 kubectl describe node 获取节点上绑定的 pod 列表时需要对集群中的所有 pod 做一次全量 list, 再通过 apiserver 中的 fieldSelector 过滤逻辑将 pod 的 .spec.nodeName 为该节点的 pod 筛选出来组成该节点上绑定的 pod 列表:

GET https://kube-apiserver/api/v1/pods?fieldSelector=spec.nodeName=node1,status.phase!=Failed,status.phase!=Succeeded

当集群规模达到 5w+ pod 数、5000+ 节点数时,每次重启 apiserver 时,kubelet 和 apiserver 保持的 watch 长链接需要重连,导致出现大量的全量 list pod,容易引发 apiserver 高负载。 而实际上 kubelet 只需要拿到自己节点所绑定的 pod 列表即可,不需要关心其它的 pod.

因此 1.20 版本中,对 apiserver 的 本地 indexer 缓存做了优化:

通过创建 nodeName 索引,构建了 nodeName 和 pod 之间的映射关系,也就是说可以直接通过 nodeName 索引查到对应的 pod 列表了

相关代码:

// pkg/registry/core/pod/strategy.go

// Indexers returns the indexers for pod storage.
func Indexers() *cache.Indexers {
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.SelectorIndex) {
		return &cache.Indexers{
			storage.FieldIndex("spec.nodeName"): NodeNameIndexFunc,
		}
	}
	return nil
}

向 etcd 发送 watch progress event 主动同步最新 revision 实现 apiserver 缓存的一致性读

参考:

在从 apiserver 全量 list 数据时,当 resourceVersion="",仍然可以从缓存读取数据。返回数据前:

1、检查 reflector 缓存的 resourceVersion 是否等于 etcd 最新的 revision(往 etcd 发送 get key 请求即可拿到最新的 revision) 2、如果缓存的 resourceVersion < etcd 最新的 revision,说明缓存没有及时同步,主动往 etcd 发送 progress event 3、etcd 收到 progress event,会主动往 watcher 同步最新的 revision 信息 4、apiserver 的 watcher 收到最新的 revision 信息,同步最新的 reflector 缓存的 resourceVersion 5、从缓存中返回全量 list 数据