1、相关背景
在排查 apiserver 问题时,我们通过监控找到了可能存在性能瓶颈的节点,下一步就是在节点上通过 apiserver 日志进一步分析。
2、APIServer 日志分析技巧
Trace 日志
日志打印条件
apiserver 在请求总耗时超过阈值时(默认500ms),会打印 trace 日志,同时在 trace 的每一个 step 会计算一个 step 耗时的阈值(threshold/len(steps)),在 step 耗时超过阈值时也会打印。
可以简单理解为步骤耗时超过平均 step 耗时的时候会打印,举个例子:
一个耗时800ms的请求,一共分4步,那么单个步骤只有在超过 800/4=200ms 时才会打印(日志等级在4以上时,如果请求总耗时超过阈值,则会打印每个步骤耗时)
// k8s.io/utils/trace/trace.go
// LogIfLong is used to dump steps that took longer than its share
// 一般情况下入参为 500 ms,即整个请求超过 500 ms 就会打印 Trace 日志
func (t *Trace) LogIfLong(threshold time.Duration) {
if time.Since(t.startTime) >= threshold {
// if any step took more than it's share of the total allowed time, it deserves a higher log level
// 每个 Step 的超时阈值为:总阈值(500ms)/ Step 数量,即平均值
stepThreshold := threshold / time.Duration(len(t.steps)+1)
t.logWithStepThreshold(stepThreshold)
}
}
func (t *Trace) logWithStepThreshold(stepThreshold time.Duration) {
var buffer bytes.Buffer
tracenum := rand.Int31()
endTime := time.Now()
totalTime := endTime.Sub(t.startTime)
buffer.WriteString(fmt.Sprintf("Trace[%d]: %q ", tracenum, t.name))
if len(t.fields) > 0 {
writeFields(&buffer, t.fields)
buffer.WriteString(" ")
}
buffer.WriteString(fmt.Sprintf("(started: %v) (total time: %v):\n", t.startTime, totalTime))
lastStepTime := t.startTime
for _, step := range t.steps {
stepDuration := step.stepTime.Sub(lastStepTime)
// 如果单步超过 stepThreshold 或日志级别大于 4 则打印 Trace 中每个 Step 的日志
if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] ", tracenum, step.stepTime.Sub(t.startTime), stepDuration))
buffer.WriteString(step.msg)
if len(step.fields) > 0 {
buffer.WriteString(" ")
writeFields(&buffer, step.fields)
}
buffer.WriteString("\n")
}
lastStepTime = step.stepTime
}
stepDuration := endTime.Sub(lastStepTime)
if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] END\n", tracenum, endTime.Sub(t.startTime), stepDuration))
}
klog.Info(buffer.String())
}
下图为trace日志的具体格式:
日志打印注意事项
当前 kube-apiserver 的 trace 实现还不太完善,即没有完全将整个流程打印到一个 trace 里,一个流程里可能 new 了多个 trace,导致通过一个 traceID 无法过滤出所有步骤的耗时:
-
比如调用 webhook 时的 trace 就是新 new 的,日志中会有 call webhook 之类的单独 trace 日志,导致日志中的 webhook 相关 traceId 无法和源请求通过相同的 traceId 关联
-
比如 apiserver 在 cacher list (直接从 cacher 缓存读取数据,不走 etcd 后端)时也是走的单独的 trace,无法和源请求通过相同的 traceId 关联
-
遇到这类情况可以通过多查几行trace的上下文来确定范围。
一些日志关键步骤含义
- About to check admission control:将要检查 admission control,此步骤耗时不包含检查 admission contorl 的时间
- About to write a response:将要开始回包
- Transformed response object:回包格式转换,同时将回包回写客户端,该步骤结束代表回包结束。
动态调整日志级别:
如需定位apiserver等组件异常原因,同时日志级别较低,担心重建之后没有现场的情况下,可通过 K8s 暴露的 debug 相关 flag 来动态调整日志级别,具体使用方式如下:
- 获取token
TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='kube-admin')].data.token}" -n kube-system|base64 --decode)
- 发送 debug flags 请求修改日志级别
curl -XPUT -k -H"Authorization: Bearer $TOKEN" https://127.0.0.1:60002/debug/flags/v -d '4'
kubelet 和 kcm 也可以用该方式(需替换对应的监听端口)
Audit 审计日志
由于高版本 apiserver 中社区移除了 apiserver_request_total 指标中的 user agent 相关label(即client信息),导致只通过监控无法准确定位到异常请求发起的客户端,此时如果集群的 apiserver 开启了 audit 审计日志,则可以通过审计日志来定位异常请求发起的客户端。
审计日志格式:
{
"kind": "Event",
"apiVersion": "audit.k8s.io/v1",
"level": "Request",
"auditID": "ceffa547-327f-4afe-8d8e-b6e33d4e4e41",
"stage": "ResponseComplete",
"requestURI": "/api/v1/namespaces/xxx/configmaps/xxx",
"verb": "update",
"user": {
"username": "user1",
"groups": [
"tenant:default"
]
},
"sourceIPs": [
"11.11.11.100"
],
// 带有客户端字段,方便找到请求来源
"userAgent": "test-operator/v0.0.0 (linux/amd64) kubernetes/$Format/platform.test_operator",
"objectRef": {
"resource": "configmaps",
"namespace": "xxx",
"name": "test-operator",
"uid": "ca529b31-8b62-4163-9974-136246e39024",
"apiVersion": "v1",
"resourceVersion": "18892810911"
},
"responseStatus": {
"metadata": {},
"code": 200
},
"requestObject": {
...
}
}
3、客户端分析
关于 list 请求
apiserver 本身自带了 ringbuffer cacher,将 etcd 的数据在内存中做缓存,缓存大小默认为 100(为每种类型的资源缓存最近 100 个事件变更,可通过 NewHeuristicWatchCacheSizes 参数指定):
// staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
options := &EtcdOptions{
StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableWatchCache: true,
// 默认缓存 100 个事件变更
DefaultWatchCacheSize: 100,
}
options.StorageConfig.CountMetricPollPeriod = time.Minute
return options
}
watchCache 数据结构(缓存 capacity 个事件的滑动窗口切片,切片下标为 resourceVersion % capacity):
type watchCache struct {
// Maximum size of history window.
capacity int
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
cache []*watchCacheEvent
...
}
每种资源类型都有对应的 cacheSize,都会创建单独 watcher,每个 watcher 创建单独的 ringbuffer 缓存队列(其它资源默认为 100)
// pkg/registry/cachesize/cachesize.go
// NewHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
// memory.
func NewHeuristicWatchCacheSizes(expectedRAMCapacityMB int) map[schema.GroupResource]int {
// From our documentation, we officially recommend 120GB machines for
// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
// capacity per node.
clusterSize := expectedRAMCapacityMB / 60
// We should specify cache size for a given resource only if it
// is supposed to have non-default value.
watchCacheSizes := make(map[schema.GroupResource]int)
watchCacheSizes[schema.GroupResource{Resource: "replicationcontrollers"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Resource: "endpoints"}] = maxInt(10*clusterSize, 1000)
watchCacheSizes[schema.GroupResource{Resource: "endpointslices", Group: "discovery.k8s.io"}] = maxInt(10*clusterSize, 1000)
watchCacheSizes[schema.GroupResource{Resource: "nodes"}] = maxInt(5*clusterSize, 1000)
watchCacheSizes[schema.GroupResource{Resource: "pods"}] = maxInt(50*clusterSize, 1000)
watchCacheSizes[schema.GroupResource{Resource: "services"}] = maxInt(5*clusterSize, 1000)
// event 的 cacheSize 为0,不走缓存,直接查 etcd
watchCacheSizes[schema.GroupResource{Resource: "events"}] = 0
watchCacheSizes[schema.GroupResource{Resource: "events", Group: "events.k8s.io"}] = 0
watchCacheSizes[schema.GroupResource{Resource: "apiservices", Group: "apiregistration.k8s.io"}] = maxInt(5*clusterSize, 1000)
watchCacheSizes[schema.GroupResource{Resource: "leases", Group: "coordination.k8s.io"}] = maxInt(5*clusterSize, 1000)
return watchCacheSizes
}
这也就意味着当客户端请求 apiserver 全量 list 某种资源时,并不一定会直接请求 etcd,而是直接通过 apiserver 的 cacher 将数据返回了。
那么向 apiserver 发起 list 请求时,什么情况下会直接读缓存?什么情况下会直接请求后端 etcd,不读缓存呢?
-
节点高负载、update 事件产生太快、apiserver 的 cache buffer 过小,引发 too old resourceversion 后 relist etcd。
-
客户端不指定 ResourceVersion 或者指定了分页的 Continue 参数,则直接透传给后端 etcd
-
kubectl get po -l app=nginx 全量从 apiserver 做 limit 500 的分页查询,不走 cache,后端直接请求 etcd,在 apiserver 中再做标签过滤
-
使用client-go查询时,例如:k8sClient.CoreV1().Pods("").List(metav1.ListOptions{}) 是不走 cache 的,会全量从 etcd 进行 List。加上 ResourceVersion 后才会走 cache: k8sClient.CoreV1().Pods("").List(metav1.ListOptions{ResourceVersion: “0”})
-
-
客户端指定的 ResourceVersion 为 0 、没有 Continue 参数、有 Limit 分页参数,则忽略 Limit,直接从缓存读取。如果 ResourceVersion 不为0、指定了 Limit 分页参数,则透传给后端 etcd。
-
没有 Limit 分页参数的情况下,如果指定了 ResourceVersion 为 0 的情况下,如果 cacher 当前没 ready,则透传给后端 etcd
-
没有 Limit 分页参数的情况下,如果指定了 ResourceVersion 且不为0,则从缓存中取数据。
-
如果指定的 ResourceVersion 比当前缓存最新的 ResourceVersion 要大,则会阻塞3s等待 watchCache 同步数据,如果 3s 后数据仍未满足 ResourceVersion,则返回too large resourceverison。
-
如果指定的 ResourceVersion 比当前缓存最新的 ResourceVersion 要小,则直接从缓存中取数据,并且可以利用索引(语义为获取到的数据至少比指定的ResourceVersion要新)
-
如果指定的 ResourceVersion 比当前缓存最老的 ResourceVersion 要小,则直接从后端 etcd 读取,返回 too old resourceversion。
-
// staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go
// When specified with a watch call, shows changes that occur after that particular version of a resource.
// Defaults to changes from the beginning of history.
// When specified for list:
// - if unset, then the result is returned from remote storage based on quorum-read flag;
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
// - if set to non zero, then the result is at least as fresh as given rv.
ResourceVersion string
list 处理逻辑相关代码:
// k8s.io/apiserver/pkg/storage/cacher/cacher.go
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
// 开启 limit 分页条件:开启 APIListChunking功能、请求参数带 limit、resourceVersion 不为 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
// 满足以下条件直接请求后端 etcd
if resourceVersion == "" || hasContinuation || hasLimit {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
// 缓存还没有 ready,直接请求后端 etcd
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
if err != nil {
return err
}
trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
trace.Step("Resized result")
}
for _, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
return nil
}
关于 informer
informer 和 apiserver 的关系:
-
informer 启动时默认会从 apiserver cache list 对应资源的数据,即使连接断开的情况下,仍然会尝试从本地存储的 ResourceVersion 来请求 apiserver cache,只有在 apiserver 返回 too old resourceVersion 报错时,才会不走缓存,从 etcd 请求最新数据。
-
informer 启动的时候,会从 apiserver 进行 ListWatch,触发 Add 操作(本地 cache 没有就是Add,有就是 Update,启动时本地什么都没有,因此是Add)
-
resync 操作是定期(默认30s)将 indexer 的本地缓存数据重新入队,一般会触发 update 操作,需要在 update 中判断是否需要 reconcile。
informer处理事件慢/watch慢的原因:
-
大多数情况是因为业务实现有问题,比如在 eventHandler 中作了一些耗时操作(eventHandler的调用是串行的,一旦有耗时操作则会阻塞后边的事件,尤其是在有resync的情况下,队列阻塞会更为严重)。因此不要在eventHandler中有任何阻塞操作,尤其是IO操作(接口调用等)。
-
业务自己有调用client-go,没有调整默认QPS(client-go默认qps 5,很容易触发限速)
-
为了区分是业务自身问题还是 apiserver 问题导致的 watch 慢,可通过kubectl get po -w 参数来确定收到事件的时间是否有延迟。如果kubectl没延迟,大概率是业务自身问题;如果kubectl收到事件也慢,则可以排查 apiserver 和 etcd 负载是否异常
关于 BookMark 机制
参考:监视书签
apiserver 引入了 bookmark 事件机制,用于标记客户端请求的给定 resourceVersion 的所有更改都已发送。 当客户端发起 watch 请求时,带上 allowWatchBookmarks=true 参数,即可在 watch 到当前最新的 resourceVersion 时收到一个 bookmark 类型的事件:
GET /api/v1/namespaces/test/pods?watch=1&resourceVersion=10245&allowWatchBookmarks=true
---
200 OK
Transfer-Encoding: chunked
Content-Type: application/json
{
"type": "ADDED",
"object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "10596", ...}, ...}
}
...
{
"type": "BOOKMARK",
"object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "12746"} }
}
引入 bookmark 机制的好处:
-
引入 bookmark 前:客户端只对 watch 到的某些感兴趣的事件做更新 ResourceVersion 的操作(比如 kubelet 只 watch 本节点的 pod 的更新事件),如果集群里的事件一直都是客户端不感兴趣的(比如其它节点的 pod 不断更新),就会导致客户端本地的 ResourceVersion 一直不更新。 这时如果 watch 断连了,拿本地的 ResourceVersion 重新 watch 会触发 too old resourceversion,导致 relist。
-
引入 bookmark 后:客户端只对 watch 到的某些感兴趣的事件做处理,同时能收到 bookmark 事件推送的最新 ResourceVersion(服务端推送的事件,不做特殊处理)。 这时如果 watch 断连了,就能拿到上次 bookmark 事件推送的最新 ResourceVersion 来重新 watch,不会触发 relist。
client-go 的 reflector 包中对 bookmark 事件的处理
-
接收到 bookmark 事件不做任何处理,只更新 watcher 的 lastSyncResourceVersion 为最新的 ResourceVersion
-
当需要 relist 时从最新的 ResourceVersion 开始 watch,不需要再从 ResourceVersion=0 开始全量 list
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
...
switch event.Type {
case watch.Added:
...
case watch.Modified:
...
case watch.Deleted:
...
case watch.Bookmark:
// 接收到 bookmark 事件不做任何处理,只更新 watcher 的 lastSyncResourceVersion 为最新的 ResourceVersion
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
return nil
}
// 更新最新的 ResourceVersion
func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
}
// 当需要 relist 时从最新的 ResourceVersion 开始 watch,不需要再从 ResourceVersion=0 开始全量 list
func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable {
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
if r.lastSyncResourceVersion == "" {
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
// be served from the watch cache if it is enabled.
return "0"
}
return r.lastSyncResourceVersion
}
关于使用 reflector 的组件重启后 list 请求会透传到 etcd
reflector 请求 apiserver 默认会带 resourceVersion = 0,但同时也带了 Limit = 500 的分页参数,导致组件重启会全量 list etcd,无法走 apiserver 缓存。
// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// 这里第一次 list 的 ResourceVersion 默认为0
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
...
go func() {
...
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
}
...
}
...
}
// staging/src/k8s.io/client-go/tools/pager/pager.go
// 默认带 limit = 500 分页参数
const defaultPageSize = 500
func New(fn ListPageFunc) *ListPager {
return &ListPager{
PageSize: defaultPageSize,
PageFn: fn,
FullListIfExpired: true,
PageBufferSize: defaultPageBufferSize,
}
}
4、社区相关优化
nodeName 到 pod 的 indexer 索引构建
在 1.20 版本之前,kubelet 或 kubectl describe node 获取节点上绑定的 pod 列表时需要对集群中的所有 pod 做一次全量 list,
再通过 apiserver 中的 fieldSelector 过滤逻辑将 pod 的 .spec.nodeName
为该节点的 pod 筛选出来组成该节点上绑定的 pod 列表:
GET https://kube-apiserver/api/v1/pods?fieldSelector=spec.nodeName=node1,status.phase!=Failed,status.phase!=Succeeded
当集群规模达到 5w+ pod 数、5000+ 节点数时,每次重启 apiserver 时,kubelet 和 apiserver 保持的 watch 长链接需要重连,导致出现大量的全量 list pod,容易引发 apiserver 高负载。 而实际上 kubelet 只需要拿到自己节点所绑定的 pod 列表即可,不需要关心其它的 pod.
因此 1.20 版本中,对 apiserver 的 本地 indexer 缓存做了优化:
通过创建 nodeName 索引,构建了 nodeName 和 pod 之间的映射关系,也就是说可以直接通过 nodeName 索引查到对应的 pod 列表了
相关代码:
// pkg/registry/core/pod/strategy.go
// Indexers returns the indexers for pod storage.
func Indexers() *cache.Indexers {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.SelectorIndex) {
return &cache.Indexers{
storage.FieldIndex("spec.nodeName"): NodeNameIndexFunc,
}
}
return nil
}
向 etcd 发送 watch progress event 主动同步最新 revision 实现 apiserver 缓存的一致性读
参考:
在从 apiserver 全量 list 数据时,当 resourceVersion="",仍然可以从缓存读取数据。返回数据前:
1、检查 reflector 缓存的 resourceVersion 是否等于 etcd 最新的 revision(往 etcd 发送 get key 请求即可拿到最新的 revision) 2、如果缓存的 resourceVersion < etcd 最新的 revision,说明缓存没有及时同步,主动往 etcd 发送 progress event 3、etcd 收到 progress event,会主动往 watcher 同步最新的 revision 信息 4、apiserver 的 watcher 收到最新的 revision 信息,同步最新的 reflector 缓存的 resourceVersion 5、从缓存中返回全量 list 数据