11. Informer 机制总结

Informer 机制

在 Kubernetes 系统中,组件之间通过 HTTP 协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 机制。Kubernetes 的其他组件都是通过 client-go 的 Informer 机制与 Kubernetes API Server 进行通信的。

Informer 机制架构设计

本节介绍 Informer 机制架构设计,Informer 运行原理如图

在这里插入图片描述

在 Informer 架构设计中,有多个核心组件,分别介绍如下:

1.Reflector

Reflector 用于监控(Watch)指定的 Kubernetes 资源,当监控的资源发生变化时,触发相应的变更事件,例如 Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存 DeltaFIFO 中。

2.DeltaFIFO

DeltaFIFO 可以分开理解,FIFO 是一个先进先出的队列,它拥有队列操作的基本方法,例如 Add、Update、Delete、List、Pop、Close等,而 Delta 是一个资源对象存储,它可以保存资源对象的操作类型,例如 Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等。

3.Indexer

Indexer 是 client-go 用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储至 Indexer。Indexer 与 Etcd 集群中的数据完全保持一致。client-go 可以很方便地从本地存储中读取相应的资源对象数据,而无须每次从远程 Etcd 集群中读取,以减轻 Kubernetes API Server 和 Etcd 集群的压力

直接阅读 Informer 机制代码会比较晦涩,通过 Informers Example 代码示例来理解 Informer,印象会更深刻。Informers Example 代码示例如下:

func main() {config, err := clientcmd.BuildConfigFromFlags(masterUrl:"", kubeconfigPath: "/root/.kube/config")if err != nil {panic(err)}clientSet, err := kubernetes.NewForConfig(config)if err != nil {panic(err)}stopCh := make(chan struct{})defer close(stopCh)shardInformer := informers.NewSharedInformerFactory(clientSet, time.Minute)informer := shardInformer.Core().V1().Pods().Informer()informer. AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {mObj := obj.(v1.Object)fmt.Println("New Pod Added to Store: %s", mObj.GetName())},UpdateFunc: func(obj interface{}) {oObj := oldObj.(v1.Object)nObj := newObj.(v1.Object)fmt.Println("%s Pod Update to %s", oObj.GetName(), nObj.GetName())},DeleteFunc: func(obj interface{}) {mObj := obj.(v1.Object)fmt.Println("Pod delete from Store: %s", mObj.GetName())},})informer.Run(stopCh)
}

首先通过 kubernetes.NewForConfig 创建 clientset 对象,Informer 需要通过 ClientSet 与 Kubernetes API Server 进行交互。另外,创建 stopCh 对象,该对象用于在程序进程退出之前通知 Informer 提前退出,因为 Informer 是一个持久运行的 goroutine。

informers.NewSharedInformerFactory 函数实例化了 SharedInformer 对象,它接收两个参数:第 1 个参数 clientset 是用于与 Kubernetes API Server 交互的客户端,第 2 个参数 time.Minute 用于设置多久进行一次 resync(重新同步),resync 会周期性地执行 List 操作,将所有的资源存放在 Informer Store 中,如果该参数为 0,则禁用 resync 功能

在 Informers Example 代码示例中,通过 sharedInformers.Core().V1().Pods().Informer 可以得到具体 Pod 资源的 informer 对象。通过 informer.AddEventHandler 函数可以为 Pod 资源添加资源事件回调方法,支持 3 种资源事件回调方法,分别介绍如下。

  • AddFunc:当创建 Pod 资源对象时触发的事件回调方法。
  • UpdateFunc:当更新 Pod 资源对象时触发的事件回调方法。
  • DeleteFunc:当删除 Pod 资源对象时触发的事件回调方法。

在正常的情况下,Kubernetes 的其他组件在使用 Informer 机制时触发资源事件回调方法,将资源对象推送到 WorkQueue 或其他队列中,在 Informers Example 代码示例中,我们直接输出触发的资源事件。最后通过 informer.Run 函数运行当前的 Informer,内部为 Pod 资源类型创建 Informer。

通过 Informer 机制可以很容易地监控我们所关心的资源事件,例如,当监控 Kubernetes Pod 资源时,如果 Pod 资源发生了 Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,就通知 client-go,告知 Kubernetes 资源事件变更了并且需要进行相应的处理。

1.资源 Informer

每一个 Kubernetes 资源上都实现了 Informer 机制。每一个 Informer 上都会实现 Informer 和 Lister 方法,例如 PodInformer,代码示例如下:

vendor/k8s.io/client-go/informers/core/v1/pod.gotype PodInformer interface {Informer() cache.SharedIndexInformerLister() v1.PodLister
}type podInformer struct {factory          internalinterfaces.SharedInformerFactorytweakListOptions internalinterfaces.TweakListOptionsFuncnamespace        string
}

调用不同资源的 Informer,代码示例如下:

informer := shardInformer.Core().V1().Pods().Informer()
nodeinformer := shardInformer.Node().V1beta1().RuntimeClasses().Informer()

定义不同资源的 Informer,允许监控不同资源的资源事件,例如,监听 Node 资源对象,当 Kubernetes 集群中有新的节点(Node)加入时,client-go 能够及时收到资源对象的变更信息。

2. Shared Informer 共享机制

Informer 也被称为 Shared Informer,它是可以共享使用的。在用 client-go 编写代码程序时,若同一资源的 Informer 被实例化了多次,每个 Informer 使用一个 Reflector,那么会运行过多相同的 ListAndWatch,太多重复的序列化和反序列化操作会导致 Kubernetes API Server 负载过重。

Shared Informer 可以使同一类资源 Informer 共享一个 Reflector,这样可以节约很多资源。通过 map 数据结构实现共享的 Informer 机制。Shared Informer 定义了一个 map 数据结构,用于存放所有 Informer 的字段,代码示例如下:

vendor/k8s.io/client-go/informers/factory.gotype sharedInformerFactory struct {client           kubernetes.Interfacenamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunclock             sync.MutexdefaultResync    time.DurationcustomResync     map[reflect.Type]time.Durationinformers map[reflect.Type]cache.SharedIndexInformer// startedInformers is used for tracking which informers have been started.// This allows Start() to be called multiple times safely.startedInformers map[reflect.Type]bool
}// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}

informers 字段中存储了资源类型和对应于 SharedIndexInformer 的映射关系。InformerFor 函数添加了不同资源的 Informer,在添加过程中如果已经存在同类型的资源 Informer,则返回当前 Informer,不再继续添加。

最后通过 Shared Informer 的 Start 方法使 f.informers 中的每个 informer 通过 goroutine 持久运行。

Reflector

Informer 可以对 Kubernetes API Server 的资源执行监控(Watch)操作,资源类型可以是 Kubernetes 内置资源,也可以是 CRD 自定义资源,其中最核心的功能是 Reflector。Reflector 用于监控指定资源的 Kubernetes 资源,当监控的资源发生变化时,触发相应的变更事件,例如 Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存 DeltaFIFO 中

通过 NewReflector 实例化 Reflector 对象,实例化过程中须传入 ListerWatcher 数据接口对象,它拥有 List 和 Watch 方法,用于获取及监控资源列表。只要实现了 List 和 Watch 方法的对象都可以称为 ListerWatcher。Reflector 对象通过 Run 函数启动监控并处理监控事件。而在 Reflector 源码实现中,其中最主要的是 ListAndWatch 函数,它负责获取资源列表(List)和监控(Watch)指定的 Kubernetes API Server 资源

ListAndWatch 函数实现可分为两部分:第 1 部分获取资源列表数据,第 2 部分监控资源对象。

1. 获取资源列表数据

ListAndWatch List 在程序第一次运行时获取该资源下所有的对象数据并将其存储至 DeltaFIFO 中。以 Informers Example 代码示例为例,在其中,我们获取的是所有 Pod 的资源数据。ListAndWatch List 流程图如图所示。

在这里插入图片描述

(1)r.listerWatcher.List 用于获取资源下的所有对象的数据,例如,获取所有 Pod 的资源数据。获取资源数据是由 options 的 ResourceVersion(资源版本号)参数控制的,如果 ResourceVersion 为 0,则表示获取所有 Pod 的资源数据;如果 ResourceVersion 非 0,则表示根据资源版本号继续获取,功能有些类似于文件传输过程中的“断点续传”,当传输过程中遇到网络故障导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。可以使本地缓存中的数据与 Etcd 集群中的数据保持一致

(2)listMetaInterface.GetResourceVersion 用于获取资源版本号, ResourceVersion (资源版本号)非常重要,Kubernetes 中所有的资源都拥有该字段,它标识当前资源对象的版本号。每次修改当前资源对象时,Kubernetes API Server 都会更改 ResourceVersion,使得 client-go 执行 Watch 操作时可以根据 ResourceVersion 来确定当前资源对象是否发生变化。

(3)meta.ExtractList 用于将资源数据转换成资源对象列表,将 runtime.Object 对象转换成 []runtime.Object 对象。因为 r.listerWatcher.List 获取的是资源下的所有对象的数据,例如所有的 Pod 资源数据,所以它是一个资源列表。

(4) r.syncWith 用于将资源对象列表中的资源对象和资源版本号存储至 DeltaFIFO 中,并会替换已存在的对象。

(5)r.setLastSyncResourceVersion 用于设置最新的资源版本号。

ListAndWatch List 代码示例如下:

vendor/k8s.io/client-go/tools/cache/reflector.go// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)err := r.list(stopCh)if err != nil {return err}resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup() // Call the last one written into cleanup}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()resyncCh, cleanup = r.resyncChan()}}()retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)for {// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect {case <-stopCh:return nildefault:}timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options := metav1.ListOptions{ResourceVersion: r.LastSyncResourceVersion(),// We want to avoid situations of hanging watchers. Stop any watchers that do not// receive any events within the timeout window.TimeoutSeconds: &timeoutSeconds,// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.// Reflector doesn't assume bookmarks are returned at all (if the server do not support// watch bookmarks, it will ignore this field).AllowWatchBookmarks: true,}// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sentstart := r.clock.Now()w, err := r.listerWatcher.Watch(options)if err != nil {// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case begin exponentially backing off and resend watch request.// Do the same for "429" errors.if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {<-r.initConnBackoffManager.Backoff().C()continue}return err}err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)retry.After(err)if err != nil {if err != errorStopRequested {switch {case isExpiredError(err):// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already// has a semantic that it returns data at least as fresh as provided RV.// So first try to LIST with setting RV to resource version of last observed object.klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)case apierrors.IsTooManyRequests(err):klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)<-r.initConnBackoffManager.Backoff().C()continuecase apierrors.IsInternalError(err) && retry.ShouldRetry():klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)continuedefault:klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)}}return nil}}
}

r.listerWatcher.List 函数实际调用了 Pod Informer 下的 ListFunc 函数,它通过 ClientSet 客户端与 Kubernetes API Server 交互并获取 Pod 资源列表数据,代码示例如下:

vendor/k8s.io/client-go/informers/core/v1/pod.gofunc NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.CoreV1().Pods(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)},},&corev1.Pod{},resyncPeriod,indexers,)
}

2. 监控资源对象

Watch(监控)操作通过 HTTP 协议与 Kubernetes API Server 建立长连接,接收 Kubernetes API Server 发来的资源变更事件。Watch 操作的实现机制使用 HTTP 协议的分块传输编码(Chunked Transfer Encoding)。当 client-go 调用 Kubernetes API Server 时,Kubernetes API Server 在 Response 的 HTTP Header 中设置 Transfer-Encoding 的值为 chunked,表示采用分块传输编码,客户端收到该信息后,便与服务端进行连接,并等待下一个数据块(即资源的事件信息)

ListAndWatch Watch 代码示例如下:

vendor\k8s.io\client-go\tools\cache\reflector.gofunc (r *Reflector) list(stopCh <-chan struct{}) error {var resourceVersion stringoptions := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar paginatedResult boolvar err errorlistCh := make(chan struct{}, 1)panicCh := make(chan interface{}, 1)go func() {defer func() {if r := recover(); r != nil {panicCh <- r}}()// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first// list request will return the full response.pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))switch {case r.WatchListPageSize != 0:pager.PageSize = r.WatchListPageSizecase r.paginatedResult:// We got a paginated result initially. Assume this resource and server honor// paging requests (i.e. watch cache is probably disabled) and leave the default// pager size set.case options.ResourceVersion != "" && options.ResourceVersion != "0":// User didn't explicitly request pagination.//// With ResourceVersion != "", we have a possibility to list from watch cache,// but we do that (for ResourceVersion != "0") only if Limit is unset.// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly// switch off pagination to force listing from watch cache (if enabled).// With the existing semantic of RV (result is at least as fresh as provided RV),// this is correct and doesn't lead to going back in time.//// We also don't turn off pagination for ResourceVersion="0", since watch cache// is ignoring Limit in that case anyway, and if watch cache is not enabled// we don't introduce regression.pager.PageSize = 0}list, paginatedResult, err = pager.List(context.Background(), options)if isExpiredError(err) || isTooLargeResourceVersionError(err) {r.setIsLastSyncResourceVersionUnavailable(true)// Retry immediately if the resource version used to list is unavailable.// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on// continuation pages, but the pager might not be enabled, the full list might fail because the// resource version it is listing at is expired or the cache may not yet be synced to the provided// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure// the reflector makes forward progress.list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})}close(listCh)}()select {case <-stopCh:return nilcase r := <-panicCh:panic(r)case <-listCh:}initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})if err != nil {klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)}// We check if the list was paginated and if so set the paginatedResult based on that.// However, we want to do that only for the initial list (which is the only case// when we set ResourceVersion="0"). The reasoning behind it is that later, in some// situations we may force listing directly from etcd (by setting ResourceVersion="")// which will return paginated result, even if watch cache is enabled. However, in// that case, we still want to prefer sending requests to watch cache if possible.//// Paginated result returned for request with ResourceVersion="0" mean that watch// cache is disabled and there are a lot of objects of a given type. In such case,// there is no need to prefer listing from watch cache.if options.ResourceVersion == "0" && paginatedResult {r.paginatedResult = true}r.setIsLastSyncResourceVersionUnavailable(false) // list was successfullistMetaInterface, err := meta.ListAccessor(list)if err != nil {return fmt.Errorf("unable to understand list result %#v: %v", list, err)}resourceVersion = listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")items, err := meta.ExtractList(list)if err != nil {return fmt.Errorf("unable to understand list result %#v (%v)", list, err)}initTrace.Step("Objects extracted")if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("unable to sync list result: %v", err)}initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil
}

r.listerWatcher.Watch 函数实际调用了 Pod Informer 下的 WatchFunc 函数,它通过 ClientSet 客户端与 Kubernetes API Server 建立长连接,监控指定资源的变更事件,代码示例如下:

vendor/k8s.io/client-go/informers/core/v1/pod.gofunc NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.CoreV1().Pods(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)},},&corev1.Pod{},resyncPeriod,indexers,)
}

r.watchHandler 用于处理资源的变更事件。当触发 Added(资源添加)事件、Updated (资源更新)事件、Deleted(资源删除)事件时,将对应的资源对象更新到本地缓存 DeltaFIFO 中并更新 ResourceVersion 资源版本号。r.watchHandler 代码示例如下:

k8s.io/client-go/tools/cache/reflector.go// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler(start time.Time,w watch.Interface,store Store,expectedType reflect.Type,expectedGVK *schema.GroupVersionKind,name string,expectedTypeName string,setLastSyncResourceVersion func(string),clock clock.Clock,errc chan error,stopCh <-chan struct{},
) error {eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():if !ok {break loop}if event.Type == watch.Error {return apierrors.FromObject(event.Object)}if expectedType != nil {if e, a := expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))continue}}if expectedGVK != nil {if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))continue}}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))continue}resourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added:err := store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))}case watch.Modified:err := store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.err := store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))}case watch.Bookmark:// A `Bookmark` means watch has synced here, just update the resourceVersiondefault:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))}setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}eventCount++}}watchDuration := clock.Since(start)if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)return nil
}

DeltaFIFO

DeltaFIFO 可以分开理解,FIFO 是一个先进先出的队列,它拥有队列操作的基本方法,例如 Add、Update、Delete、List、Pop、Close等,而 Delta 是一个资源对象存储,它可以保存资源对象的操作类型,例如 Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等。DeltaFIFO 结构代码示例如下:

vendor/k8s.io/client-go/tools/cache/delta_fifo.gotype DeltaFIFO struct {// lock/cond protects access to 'items' and 'queue'.lock sync.RWMutexcond sync.Cond// `items` maps a key to a Deltas.// Each such Deltas has at least one Delta.items map[string]Deltas// `queue` maintains FIFO order of keys for consumption in Pop().// There are no duplicates in `queue`.// A key is in `queue` if and only if it is in `items`.queue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update/AddIfNotPresent was called first.populated bool// initialPopulationCount is the number of items inserted by the first call of Replace()initialPopulationCount int// keyFunc is used to make the key used for queued item// insertion and retrieval, and should be deterministic.keyFunc KeyFunc// knownObjects list keys that are "known" --- affecting Delete(),// Replace(), and Resync()knownObjects KeyListerGetter// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRUD operations.closed bool// emitDeltaTypeReplaced is whether to emit the Replaced or Sync// DeltaType when Replace() is called (to preserve backwards compat).emitDeltaTypeReplaced bool// Called with every object if non-nil.transformer TransformFunc
}

DeltaFIFO 与其他队列最大的不同之处是,它会保留所有关于资源对象(obj)的操作类型,队列中会存在拥有不同操作类型的同一个资源对象,消费者在处理该资源对象时能够了解该资源对象所发生的事情。queue 字段存储资源对象的 key,该 key 通过 KeyOf 函数计算得到。items 字段通过 map 数据结构的方式存储,value 存储的是对象的 Deltas 数组。DeltaFIFO 存储结构如图所示。

在这里插入图片描述

DeltaFIFO 本质上是一个先进先出的队列,有数据的生产者和消费者,其中生产者是 Reflector 调用的 Add 方法,消费者是 Controller 调用的 Pop 方法。下面分析 DeltaFIFO 的核心功能:生产者方法、消费者方法及 Resync 机制。

1. 生产者方法

DeltaFIFO 队列中的资源对象在 Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件中都调用了 queueActionLocked 函数,它是 DeltaFIFO 实现的关键,代码示例如下:

vendor\k8s.io\client-go\tools\cache\delta_fifo.go// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}// Every object comes through this code path once, so this is a good// place to call the transform func.  If obj is a// DeletedFinalStateUnknown tombstone, then the containted inner object// will already have gone through the transformer, but we document that// this can happen. In cases involving Replace(), such an object can// come through multiple times.if f.transformer != nil {var err errorobj, err = f.transformer(obj)if err != nil {return err}}oldDeltas := f.items[id]newDeltas := append(oldDeltas, Delta{actionType, obj})newDeltas = dedupDeltas(newDeltas)if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()} else {// This never happens, because dedupDeltas never returns an empty list// when given a non-empty list (as it is here).// If somehow it happens anyway, deal with it but complain.if oldDeltas == nil {klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)return nil}klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)f.items[id] = newDeltasreturn fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)}return nil
}

queueActionLocked 代码执行流程如下。

(1)通过 f.KeyOf 函数计算出资源对象的 key。

(2)如果操作类型为 Sync,则标识该数据来源于 Indexer(本地存储)。如果 Indexer 中的资源对象已经被删除,则直接返回。

(3)将 actionType 和资源对象构造成 Delta,添加到 items 中,并通过 dedupDeltas 函数进行去重操作。

(4)更新构造后的 Delta 并通过 cond.Broadcast 通知所有消费者解除阻塞。

2. 消费者方法

Pop 方法作为消费者方法使用,从 DeltaFIFO 的头部取出最早进入队列中的资源对象数据。Pop 方法须传入 process 函数,用于接收并处理对象的回调方法,代码示例如下:

vendor\k8s.io\client-go\tools\cache\delta_fifo.gofunc (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}id := f.queue[0]f.queue = f.queue[1:]depth := len(f.queue)if f.initialPopulationCount > 0 {f.initialPopulationCount--}item, ok := f.items[id]if !ok {// This should never happenklog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)continue}delete(f.items, id)// Only log traces if the queue depth is greater than 10 and it takes more than// 100 milliseconds to process one item from the queue.// Queue depth never goes high because processing an item is locking the queue,// and new items can't be added until processing finish.// https://github.com/kubernetes/kubernetes/issues/103789if depth > 10 {trace := utiltrace.New("DeltaFIFO Pop Process",utiltrace.Field{Key: "ID", Value: id},utiltrace.Field{Key: "Depth", Value: depth},utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})defer trace.LogIfLong(100 * time.Millisecond)}err := process(item)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err}
}

当队列中没有数据时,通过 f.cond.wait 阻塞等待数据,只有收到 cond.Broadcast 时才说明有数据被添加,解除当前阻塞状态。如果队列中不为空,取出 f.queue 的头部数据,将该对象传入 process 回调函数,由上层消费者进行处理。如果 process 回调函数处理出错,则将该对象重新存入队列。

Controller 的 processLoop 方法负责从 DeltaFIFO 队列中取出数据传递给 process 回调函数。process 回调函数代码示例如下:

vendor\k8s.io\client-go\tools\cache\shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()if deltas, ok := obj.(Deltas); ok {return processDeltas(s, s.indexer, deltas)}return errors.New("object given as Process argument is not Deltas")
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {// Invocation of this function is locked under s.blockDeltas, so it is// save to distribute the notifications.cacheMutationDetector.AddObject(obj)s.processor.distribute(addNotification{newObj: obj}, false)
}// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {isSync := false// If is a Sync event, isSync should be true// If is a Replaced event, isSync is true if resource version is unchanged.// If RV is unchanged: this is a Sync/Replaced event, so isSync is trueif accessor, err := meta.Accessor(new); err == nil {if oldAccessor, err := meta.Accessor(old); err == nil {// Events that didn't change resourceVersion are treated as resync events// and only propagated to listeners that requested resyncisSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()}}// Invocation of this function is locked under s.blockDeltas, so it is// save to distribute the notifications.cacheMutationDetector.AddObject(new)s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {// Invocation of this function is locked under s.blockDeltas, so it is// save to distribute the notifications.processor.distribute(deleteNotification{oldObj: old}, false)
}vendor\k8s.io\client-go\tools\cache\controller.gofunc processDeltas(// Object which receives event notifications from the given deltashandler ResourceEventHandler,clientState Store,deltas Deltas,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Objectswitch d.Type {case Sync, Replaced, Added, Updated:if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}handler.OnUpdate(old, obj)} else {if err := clientState.Add(obj); err != nil {return err}handler.OnAdd(obj)}case Deleted:if err := clientState.Delete(obj); err != nil {return err}handler.OnDelete(obj)}}return nil
}

HandleDeltas 函数作为 process 回调函数,当资源对象的操作类型为 Added、Updated、Deleted 时,将该资源对象存储至 Indexer(它是并发安全的存储),并通过 distribute 函数将资源对象分发至 SharedInformer。还记得 Informers Example 代码示例吗?在 Informers Example 代码示例中,我们通过 informer.AddEventHandler 函数添加了对资源事件进行处理的函数,distribute 函数则将资源对象分发到该事件处理函数中。

3. Resync 机制

Resync 机制会将 Indexer 本地存储中的资源对象同步到 DeltaFIFO 中,并将这些资源对象设置为 Sync 的操作类型。Resync 函数在 Reflector 中定时执行,它的执行周期由 NewReflector 函数传入的 resyncPeriod 参数设定。

vendor\k8s.io\client-go\tools\cache\delta_fifo.gofunc (f *DeltaFIFO) Resync() error {f.lock.Lock()defer f.lock.Unlock()if f.knownObjects == nil {return nil}keys := f.knownObjects.ListKeys()for _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}}return nil
}func (f *DeltaFIFO) syncKeyLocked(key string) error {obj, exists, err := f.knownObjects.GetByKey(key)if err != nil {klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)return nil} else if !exists {klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)return nil}// If we are doing Resync() and there is already an event queued for that object,// we ignore the Resync for it. This is to avoid the race, in which the resync// comes with the previous value of object (since queueing an event for the object// doesn't trigger changing the underlying store <knownObjects>.id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}if len(f.items[id]) > 0 {return nil}if err := f.queueActionLocked(Sync, obj); err != nil {return fmt.Errorf("couldn't queue object: %v", err)}return nil
}

Indexer

Indexer 是 client-go 用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储至 Indexer。Indexer 中的数据与 Etcd 集群中的数据保持完全一致。client-go 可以很方便地从本地存储中读取相应的资源对象数据,而无须每次都从远程 Etcd 集群中读取,这样可以减轻 Kubernetes API Server 和 Etcd 集群的压力。

在介绍 Indexer 之前,先介绍一下 ThreadSafeMap。ThreadSafeMap 是实现并发安全的存储。作为存储,它拥有存储相关的增、删、改、查操作方法,例如 Add、Update、Delete、List、Get、Replace、Resync 等。Indexer 在 ThreadSafeMap 的基础上进行了封装,它继承了与 ThreadSafeMap 相关的操作方法并实现了 Indexer Func 等功能,例如 Index、IndexKeys、GetIndexers 等方法,这些方法为 ThreadSafeMap 提供了索引功能。Indexer 存储结构如图所示。

在这里插入图片描述

1. ThreadSafeMap并发安全存储

ThreadSafeMap 是一个内存中的存储,其中的数据并不会写入本地磁盘中,每次的增、删、改、查操作都会加锁,以保证数据的一致性。ThreadSafeMap 将资源对象数据存储于一个 map 数据结构中,ThreadSafeMap 结构代码示例如下:

vendor/k8s.io/client-go/tools/cache/thread_safe_store.go// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {lock  sync.RWMutexitems map[string]interface{}// indexers maps a name to an IndexFuncindexers Indexers// indices maps a name to an Indexindices Indices
}

items 字段中存储的是资源对象数据,其中 items 的 key 通过 keyFunc 函数计算得到,计算默认使用 MetaNamespaceKeyFunc 函数,该函数根据资源对象计算出 <namespace>/<name> 格式的 key,如果资源对象的 <namespace> 为空,则 <name> 作为 key,而 items 的 value 用于存储资源对象。

2. Indexer 索引器

在每次增、删、改 ThreadSafeMap 数据时,都会通过 updateIndices 或 deleteFromIndices 函数变更 Indexer。Indexer 被设计为可以自定义索引函数,这符合 Kubernetes 高扩展性的特点。Indexer 有4个非常重要的数据结构,分别是 Indices、Index、Indexers 及 IndexFunc。直接阅读相关代码会比较晦涩,通过 Indexer Example 代码示例来理解 Indexer,印象会更深刻。Indexer Example 代码示例如下:

在这里插入图片描述

首先定义一个索引器函数 UsersIndexFunc,在该函数中,我们定义查询出所有 Pod 资源下 Annotations 字段的 key 为 users 的 Pod。

cache.NewIndexer 函数实例化了 Indexer 对象,该函数接收两个参数:第 1 个参数是 KeyFunc,它用于计算资源对象的 key,计算默认使用 cache.MetaNamespaceKeyFunc 函数;第 2 个参数是 cache.Indexers,用于定义索引器,其中 key 为索引器的名称(即 byUser),value 为索引器。通过 index.Add 函数添加 3 个 Pod 资源对象。最后通过 index.ByIndex 函数查询 byUser 索引器下匹配 ernie 字段的 Pod 列表。Indexer Example 代码示例最终检索出名称为 one和 three 的 Pod。

现在再来理解 Indexer 的 4 个重要的数据结构就非常容易了,它们分别是 Indexers、IndexFunc、Indices、Index,数据结构如下:

// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index

Indexer 数据结构说明如下。

  • Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数。
  • IndexFunc:索引器函数,定义为接收一个资源对象,返回检索结果列表。
  • Indices:存储缓存器,key 为缓存器名称(在 Indexer Example 代码示例中,缓存器命名与索引器命名相对应),value 为缓存数据。
  • Index:存储缓存数据,其结构为 K/V。

3. Indexer索引器核心实现

index.ByIndex 函数通过执行索引器函数得到索引结果,代码示例如下:

vendor\k8s.io\client-go\tools\cache\thread_safe_store.go// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {c.lock.RLock()defer c.lock.RUnlock()indexFunc := c.indexers[indexName]if indexFunc == nil {return nil, fmt.Errorf("Index with name %s does not exist", indexName)}index := c.indices[indexName]set := index[indexedValue]list := make([]interface{}, 0, set.Len())for key := range set {list = append(list, c.items[key])}return list, nil
}

ByIndex 接收两个参数:IndexName(索引器名称)和 indexKey(需要检索的 key)。首先从 c.indexers 中查找指定的索引器函数,从 c.indices 中查找指定的缓存器函数,然后根据需要检索的 indexKey 从缓存数据中查到并返回数据。

提示:Index 中的缓存数据为 Set 集合数据结构,Set 本质与 Slice 相同,但 Set 中不存在相同元素。由于 Go 语言标准库没有提供 Set 数据结构,Go 语言中的 map 结构类型是不能存在相同 key 的,所以 Kubernetes 将 map 结构类型的 key 作为 Set 数据结构,实现 Set 去重特性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2814809.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

python|闲谈2048小游戏和数组的旋转及翻转和转置

目录 2048 生成数组 n阶方阵 方阵旋转 顺时针旋转 逆时针旋转 mxn矩阵 矩阵旋转 测试代码 测试结果 翻转和转置 2048 《2048》是一款比较流行​的数字游戏​&#xff0c;最早于2014年3月20日发行。原版2048由Gabriele Cirulli首先在GitHub上发布&#xff0c;后被移…

华为手动ipv6-to-ipv4隧道

中间r2的两个接口配置两个地址就行了&#xff0c;其它什么都不用配置 两边出接口R1和R3手动隧道建立&#xff1a;先把IPV4打通&#xff0c;并配置默认路由 再起隧道接口上进行配置&#xff0c;再配置带隧道的默认路由 PC上和上联接口网关只有IPV6地址 最终两个PC可以ping通 …

node 之 http模块

1.什么是http模块 在网络节点中&#xff0c;负责消费资源的电脑叫做客户端&#xff1b;负责对外提供网络资源的电脑&#xff0c;叫做服务器 http模块是node.js官方提供的&#xff0c;用来创建web服务器的模块&#xff0c;通过http模块提供的http.createServer()方法&#xff0c…

武器大师——操作符详解(上)

目录 一、操作符的分类 二、二进制和进制转换 2.1.二进制与十进制的互相转化 2.1.1 二进制转十进制 2.1.2 十进制转二进制 ​编辑 2.2.二进制转8进制和16进制 2.2.1 转8进制 2.2.2 转16进制 三、原码、反码、补码 四、移位操作符 4.1.左移操作符&#xff08;<…

【C语言】linux内核netdev_start_xmit函数

一、中文注释 static inline netdev_tx_t netdev_start_xmit(struct sk_buff *skb, struct net_device *dev, struct netdev_queue *txq, bool more) {// 获取网络设备操作集合const struct net_device_ops *ops dev->netdev_ops;int rc;// 调用实际发送数据包的函数&…

【UE 材质】水晶材质

效果 步骤 1. 先在Quixel Bridge上下载冰纹理 2. 新建一个材质&#xff0c;这里命名为“M_Ice”并打开&#xff0c;添加如下纹理采样节点 继续添加如下节点 此时效果如下&#xff1a; 可以看到此时的材质颜色比较浅&#xff0c;如果希望颜色深一点可以继续添加如下节点 此时效…

基于springboot+vue的大学城水电管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

Leetcoder Day25| 回溯part05:子集+排列

491.递增子序列 给定一个整型数组, 你的任务是找到所有该数组的递增子序列&#xff0c;递增子序列的长度至少是2。 示例: 输入:[4, 7, 6, 7]输出: [[4, 6], [4, 7], [4, 6, 7], [6, 7], [7,7], [4,7,7]] 说明: 给定数组的长度不会超过15。数组中的整数范围是 [-100,100]。给定数…

智慧物流之道:数据可视化引领全局监控

在智慧物流的背景下&#xff0c;数据可视化催生了物流管理的全新范式。首先&#xff0c;通过数据可视化&#xff0c;物流企业可以实现对整个供应链的全景式监控。下面我就可以可视化从业者的角度&#xff0c;简单聊聊这个话题。 首先&#xff0c;图表和地图的直观展示使决策者能…

前后端项目-part03

文章目录 5.4.4 机构名称5.4.4.1 创建实体类Company5.4.4.2 创建实体类CompanyMapper5.4.4.3 创建实体类CompanyService5.4.4.4 创建实体类CompanyController5.4.4.5 后端测试5.4.4.6 修改basic.js5.4.4.7 修改course.vue5.4.4.8 测试5.4.5 课程标签5.4.5.1 效果5.4.5.2 修改co…

用html实现一个简易的百度热榜

用html实现一个简易的百度热榜 相关代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document…

轻松统一图片格式,提升工作效率,高效管理不同格式图片

在数字创意的海洋中&#xff0c;图片是最具表现力的元素之一。但你是否曾因不同格式的图片而头疼&#xff1f;格式不统一&#xff0c;不仅影响管理效率&#xff0c;还可能影响作品的最终呈现效果。今天&#xff0c;我们为您带来了一款高效、便捷的图片批量转换工具&#xff0c;…

【Simulink系列】——Simulink子系统子系统封装模块库技术

声明&#xff1a;本系列博客参考有关专业书籍&#xff0c;截图均为自己实操&#xff0c;仅供交流学习&#xff01; 引入 前面对于简单的动态系统仿真&#xff0c;可以直接建立模型&#xff0c;然后仿真。但是对于复杂的系统&#xff0c;直接建立系统会显得杂乱无章&#xff0…

C语言 int和unsigned int逻辑比较

文章目录 测试1、测试 CMP (int,int)2、测试 CMP (int ,unsigned int)3、测试 CMP (unsigned int ,unsigned int)4、测试 CMP(int ,常量&#xff09; 总结 测试 在IAR(8.40.2)平台下测试单片机为STM32F103ZET6 1、测试 CMP (int,int) //a -2,b 3 int test_fun(int a, int…

二次供水物联网:HiWoo Cloud助力城市水务管理升级

随着城市化的快速推进&#xff0c;二次供水系统作为城市基础设施的重要组成部分&#xff0c;其稳定运行和高效管理显得至关重要。然而&#xff0c;传统的二次供水管理方式在应对复杂多变的城市供水需求时&#xff0c;显得力不从心。为了破解这一难题&#xff0c;HiWoo Cloud平台…

StarRocks实战——携程酒店实时数仓

目录 一、实时数仓 二、实时数仓架构介绍 2.1 Lambda架构 2.2 Kappa架构 三、携程酒店实时数仓架构 3.1 架构选型 3.2 实时计算引擎选型 3.3 OLAP选型 四、携程酒店实时订单 4.1 数据源 4.2 ETL数据处理 4.3 应用效果 4.4 总结 原文大佬的这篇实时数仓建设案例有借…

龙蜥OS 尝试

> 尝试一下龙蜥OS&#xff0c;和Centos8应该没什么区别。 阿里云版本龙蜥 https://alinux3.oss-cn-hangzhou.aliyuncs.com/aliyun_3_x64_20G_nocloud_alibase_20230727.vhd Index of /anolis/8.8/isos/GA/x86_64/ (openanolis.cn) 网卡 我在虚拟机上安装完后&#xff0c;…

Spring常见面试题知识点总结(二)

4. 面向切面编程&#xff08;AOP&#xff09;&#xff1a; AOP的基本概念。 AOP&#xff08;Aspect-Oriented Programming&#xff0c;面向切面编程&#xff09;是一种编程范式&#xff0c;旨在通过切面&#xff08;Aspect&#xff09;将横切关注点与核心业务逻辑分离&#x…

Chondrex--Mouse Anti-OVA IgG1 Antibody Assay Kit

卵清蛋白&#xff08;ovalbumin, OVA&#xff09;是鸡蛋清中的主要蛋白成分&#xff0c;是一种大而复杂的糖蛋白&#xff0c;能够引起机体适度的免疫性&#xff0c;常用作各种免疫学研究的抗原。OVA已被用于诱导IgE介导的疾病&#xff0c;评估疫苗递送方法&#xff0c;以及通过…

动态规划|【斐波那契数列模型 】|面试题08.01三步问题

目录 题目 思路 普通思路 动态规划思路 1.状态表示 2.状态转移方程 3.初始化 4.填表顺序 5.返回值 代码 空间优化 题目 题目链接 面试题 08.01. 三步问题https://leetcode.cn/problems/three-steps-problem-lcci/ 三步问题。有个小孩正在上楼梯&#xff0c;楼梯有n…