informer中DeltaFIFO机制的实现分析与源码解读
informer中的DeltaFIFO机制的实现分析与源码解读
DeltaFIFO作为informer中重要组件,本文从源码层面了解是如何DelatFIFO是实现的。
DeltaFIFO的定义
找到delta_fifo.go的源码,位于client-go/tools/cache/delta_fifo.go
代码结构大致如下:
store定义了一个通用的存储接口
type Store interface {Add(obj interface{}) errorUpdate(obj interface{}) errorDelete(obj interface{}) errorList() []interface{}ListKeys() []stringGet(obj interface{}) (item interface{}, exists bool, err error)GetByKey(key string) (item interface{}, exists bool, err error)// Replace will delete the contents of the store, using instead the// given list. Store takes ownership of the list, you should not reference// it after calling this function.Replace([]interface{}, string) errorResync() error
}
Queue接口继承了store,但添加了Pop()重要方法,实现了队列的能力
// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {Store// Pop blocks until it has something to process.// It returns the object that was process and the result of processing.// The PopProcessFunc may return an ErrRequeue{...} to indicate the item// should be requeued before releasing the lock on the queue.Pop(PopProcessFunc) (interface{}, error)// AddIfNotPresent adds a value previously// returned by Pop back into the queue as long// as nothing else (presumably more recent)// has since been added.AddIfNotPresent(interface{}) error// HasSynced returns true if the first batch of items has been poppedHasSynced() bool// Close queueClose()
}
FIFO类型实现了Queue接口
type FIFO struct {lock sync.RWMutexcond sync.Cond// We depend on the property that items in the set are in the queue and vice versa.items map[string]interface{}queue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first.populated bool// initialPopulationCount is the number of items inserted by the first call of Replace()initialPopulationCount int// keyFunc is used to make the key used for queued item insertion and retrieval, and// should be deterministic.keyFunc KeyFunc// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations.closed boolclosedLock sync.Mutex
}var (_ = Queue(&FIFO{}) // FIFO is a Queue
)
DeltaFIFO类型也实现了Queue接口,与FIFO主要的区别是有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。
type DeltaFIFO struct {// lock/cond protects access to 'items' and 'queue'.lock sync.RWMutexcond sync.Cond// We depend on the property that items in the set are in// the queue and vice versa, and that all Deltas in this// map have at least one Delta.items map[string]Deltasqueue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first.populated bool// initialPopulationCount is the number of items inserted by the first call of Replace()initialPopulationCount int// keyFunc is used to make the key used for queued item// insertion and retrieval, and should be deterministic.keyFunc KeyFunc// knownObjects list keys that are "known", for the// purpose of figuring out which items have been deleted// when Replace() or Delete() is called.knownObjects KeyListerGetter// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations.closed boolclosedLock sync.Mutex
}var (_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)
DeltaFIFO的构造函数
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {return NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc,KnownObjects: knownObjects,})
}func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {if opts.KeyFunction == nil {opts.KeyFunction = MetaNamespaceKeyFunc // 如果不指定keyFunc,默认就是MetaNamespaceKeyFunc}f := &DeltaFIFO{items: map[string]Deltas{}, // 存放[]Delta的数组,queue: []string{}, // 存储obj的key,key通常是ns/name格式的字符串keyFunc: opts.KeyFunction, // 由obj生成key的函数knownObjects: opts.KnownObjects,emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,transformer: opts.Transformer,}f.cond.L = &f.lockreturn f
}var (_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)
Delta的定义
根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。
// Delta is a member of Deltas (a list of Delta objects) which
// in its turn is the type stored by a DeltaFIFO. It tells you what
// change happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {Type DeltaType // 表示对obj的操作类型"Added/Updated/Deleted/Replaced/Sync"Object interface{} // 表示某个资源对象,比如命名为"one"的pod
}// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string// Change type definition
const (Added DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"// Replaced is emitted when we encountered watch errors and had to do a// relist. We don't know if the replaced object has changed.//// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events// as well. Hence, Replaced is only emitted when the option// EmitDeltaTypeReplaced is true.Replaced DeltaType = "Replaced"// Sync is for synthetic events during a periodic resync.Sync DeltaType = "Sync"
)
Deletas的ADD()入队分析
watch机制监控到事件后,会把事件入队操作。
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {f.lock.Lock()defer f.lock.Unlock()f.populated = truereturn f.queueActionLocked(Added, obj) // 实际调用的是函数queueActionLocked()
}
queueActionLocked的逻辑主要包括从obj生产key(代码中是id),再有actionType和Obj构建一个新的Delta, 再把Delta加入Deltas切片中,之后,把Deltas放入items哈希表,key放入Queue队列中去。要注意Delta加入Deltas时需要进行出重。
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj) // 先用keyFunc,通过obj获取到对应的keyif err != nil {return KeyError{obj, err}}newDeltas := append(f.items[id], Delta{actionType, obj}) // 用actionType和Obj构建一个新的Delta,再把Delta追加到(f.items[id]返回的)Deltas切片newDeltas = dedupDeltas(newDeltas) // 对新的Deltas切去去重if len(newDeltas) > 0 { // 如果newDeltas切片中存在Deltaif _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) // 将key放到queue中}f.items[id] = newDeltas // 把新的Deltes切片,放到items哈希表f.cond.Broadcast()}return nil
}
Delta去重
Delta进行Add()操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个
。
// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {d := append(Deltas{}, deltas[:n-2]...)return append(d, *out)}return deltas
}// If a & b represent the same event, returns the delta that ought to be kept.
// Otherwise, returns nil.
// TODO: is there anything other than deletions that need deduping?
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}// TODO: Detect other duplicate situations? Are there any?return nil
}// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted { // 仅处理a,b都是"Deleted"类型的事件;return nil}// Do more sophisticated checks, or is this sufficient?if _, ok := b.Object.(DeletedFinalStateUnknown); ok { // 如果a,b都是"Deleted",就只返回一个Deltareturn a}return b
}
Deltas的pop出队
deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。
pop出队时,会调用传参PopProcessFunc对出队元素进行处理。
// Pop blocks until an item is added to the queue, and then returns it. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {// 如果队列为空,就f.cond.Wait阻塞等待for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}f.cond.Wait() }// 从f.queue中去重第一个元素id := f.queue[0]f.queue = f.queue[1:]if f.initialPopulationCount > 0 {f.initialPopulationCount--}// 从items哈希表中根据id,取出Deltasitem, ok := f.items[id]// 如果itmes哈希表中差不到id对应的Deltas,就结束进入下次循环if !ok {// Item may have been deleted subsequently.continue}// 从items哈希表中删除id对应的Deltasdelete(f.items, id)// process()函数来处理从items哈希表中取出的Deltaserr := process(item)// 如果出现错误,就把id加回queue,同时把Deltas加回itemsif e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err}
接着我们看看process()函数具体是什么。
如果对informer启动比较熟悉的话,可以知道在创建informer时,newInformer()函数需要指定ProcessFunc。这个处理函数包括数据同步到存储,以及调用注册的用户函数两个操作。
func newInformer(lw ListerWatcher,objType runtime.Object,resyncPeriod time.Duration,h ResourceEventHandler,clientState Store,
) Controller {// This will hold incoming changes. Note how we pass clientState in as a// KeyLister, that way resync operations will result in the correct set// of update/delete deltas.fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects: clientState,EmitDeltaTypeReplaced: true,})cfg := &Config{Queue: fifo,ListerWatcher: lw,ObjectType: objType,FullResyncPeriod: resyncPeriod,RetryOnError: false,// 指定处理从deltaFIFO队列pop处理的数据的处理函数ProcessFuncProcess: func(obj interface{}) error {// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Replaced, Added, Updated:// 同步存储数据,clientState是一个storeif old, exists, err := clientState.Get(d.Object); err == nil && exists {if err := clientState.Update(d.Object); err != nil {return err}// 回调用户定义的hander函数h.OnUpdate(old, d.Object)} else {// 同步存储数据if err := clientState.Add(d.Object); err != nil {return err}// 回调用户定义的hander函数h.OnAdd(d.Object)}case Deleted:// 同步存储数据if err := clientState.Delete(d.Object); err != nil {return err}// 回调用户定义的hander函数h.OnDelete(d.Object)}}return nil},}return New(cfg)
}
进一步探究一下,informer启动run()后,会调用controller.Run(),最后c.processLoop会循环处理pop出队处理,流程大致如下:
informer.run(stopCh) —> s.controller.Run(stopCh)—>c.processLoop—>c.config.Queue.Pop(PopProcessFunc(c.config.Process))
源码位于:vender/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// 构建一个Reflectorr := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = rc.reflectorMutex.Unlock()var wg wait.Groupwg.StartWithChannel(stopCh, r.Run)// 调用c.processLoop函数wait.Until(c.processLoop, time.Second, stopCh)wg.Wait()
}func (c *controller) processLoop() {for {// 循环的Pop出队,把出队的事件交给PopProcessFunc函数处理obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}
DeltaFIFO出队入队的流程图
相关文章:

informer中DeltaFIFO机制的实现分析与源码解读
informer中的DeltaFIFO机制的实现分析与源码解读 DeltaFIFO作为informer中重要组件,本文从源码层面了解是如何DelatFIFO是实现的。 DeltaFIFO的定义 找到delta_fifo.go的源码,位于client-go/tools/cache/delta_fifo.go 代码结构大致如下: store定义…...
树莓派下,centos7amr64下,搭建目标检测开发环境,java语言
在树莓派(Raspberry Pi)上使用CentOS 7 ARM64搭建基于Java的目标检测开发环境,可以按照以下步骤进行。需要注意的是,CentOS 7 ARM64的官方镜像可能不支持树莓派的某些硬件,因此你可能需要寻找第三方镜像或进行一些额外的配置。 1. 安装CentOS 7 ARM64 首先,确保你已经正…...

SpringBoot+Redis 发布与订阅
两个应用都引入 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artif…...
huggingface无法访问怎么办?一招教你解决,使用hf-mirror.com镜像站快速下载各种大模型
huggingface.co无法访问怎么办? 请访问 https://hf-mirror.com/ hf-mirror.com是一个旨在为中国国内的人工智能开发者提供更快更稳定下载服务的镜像站点,它镜像了Hugging Face的huggingface.co网站上的模型和数据集资源。由于网络环境和地理限制…...

如何用密码保护你的 WordPress 管理员 (wp-admin) 目录
在维护 WordPress 网站时,确保 wp-admin 目录安全是非常重要的。为该目录添加密码保护可以有效提高网站安全性,防止未经授权的访问。这篇文章将介绍实现这一目标的两种方法。 1.为什么要为 wp-admin 目录添加密码保护 WordPress 管理员后台是网站的核心…...
java 程序包org.junit.jupiter.api不存在
问题描述 正常启动springboot报错java 程序包org.junit.jupiter.api不存在。 问题分析 pom文件中缺少junit依赖,但是添加后问题仍然没解决,后面把test部分删掉解决问题。 解决方法 添加junit依赖 <dependency><groupId>junit</group…...

简单的docker学习 第4章 docker容器
第4章 Docker容器 4.1 容器基础 4.1.1 容器启动流程 通过 docker run 命令可以启动运行一个容器。该命令在执行时首先会在本地查找指定的镜像,如果找到了,则直接启动,否则会到镜像中心查找。如果镜像中心存在该镜像,则会下载到…...

零基础入门转录组数据分析——机器学习算法之SVM-RFE(筛选特征基因)
零基础入门转录组数据分析——机器学习算法之SVM-RFE(筛选特征基因) 目录 零基础入门转录组数据分析——机器学习算法之SVM-RFE(筛选特征基因)1. SVM-RFE基础知识2. SVM-RFE(Rstudio)——代码实操2. 1 数据…...

Python酷库之旅-第三方库Pandas(067)
目录 一、用法精讲 266、pandas.Series.dt.second属性 266-1、语法 266-2、参数 266-3、功能 266-4、返回值 266-5、说明 266-6、用法 266-6-1、数据准备 266-6-2、代码示例 266-6-3、结果输出 267、pandas.Series.dt.microsecond属性 267-1、语法 267-2、参数 …...

Spring快速学习
目录 IOC控制反转 引言 IOC案例 Bean的作用范围 Bean的实例化 bean生命周期 DI 依赖注入 setter注入 构造器注入 自动装配 自动装配的方式 注意事项; 集合注入 核心容器 容器的创建方式 Bean的三种获取方式 Bean和依赖注入相关总结 IOC/DI注解开发 注解开发…...

【Web开发手礼】探索Web开发的秘密(十五)-Vue2(2)AJAX、前后端分离、前端工程化
主要介绍了AJAX、前后端分离所需的YApi、前端工程化所需要的环境安装!!! 目录 前言 AJAX 原生Ajax Axios Axios入门 案例 前后端分离开发 YApi 前端工程化 环境准备 总结 前言 主要介绍了AJAX、前后端分离所需的YApi、前端工…...

Phalco安装过程以及踩的一些坑(mac环境)
一 背景 公司用Phalcon框架好长时间了,中途发现了一些Phalcon使用的上的问题,于是想在本地搭建一套Phalcon的环境,方便排查问题使用。 二 Mac系统下的安装 看了很多说法,最终发现还是官网给力,安装Phalcon使用下列命令即可(前提条件是PHP已安装好,工具pecl也安装好了):…...
Ubuntu修改双系统默认启动顺序
1.打开grub的默认启动配置文件 sudo gedit /etc/default/grub# If you change this file, run update-grub afterwards to update # /boot/grub/grub.cfg. # For full documentation of the options in this file, see: # info -f grub -n Simple configurationGRUB_DEFAULT…...

高仲富:49岁搞AI,白天种菜卖菜,晚上学数学搞程序
这是《开发者说》的第13期,本期我们邀请的开发者是高仲富,曾是一位数学老师,自学成为一名程序员,在北京漂过,后逃回了成都,一边与病魔抗争,一边写代码,一写就是15年,制作…...

光线追踪(纹理映射)
最近在跟着ray trace in one week来学习光线追踪(很多概念茅塞顿开)做到一半想着记录一下(比较随心)上面是之前的效果。ray trace in one week Texture Coordinates for Spheres(球体纹理坐标) u, v 纹理…...

传统产品经理VS现在AI产品经理,你要学习的太多了,超详细收藏我这一篇就够了
传统产品经理想要转行成为AI产品经理,需要经历一系列的学习和实践过程。下面是一份详细的学习路线图,旨在帮助你顺利转型。 学习路线图 了解AI基础知识 AI概览:阅读《人工智能:一种现代的方法》这样的书籍,以获得对AI…...

C#使用Socket实现TCP服务器端
1、TCP服务器实现代码 using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks;namespace PtLib.TcpServer {public delegate void Tcp…...

MTK联发科MT8766/MT8166安卓核心板性能参数对
MT8766核心板 采用联发科四核2G主频芯片方案,国内4G全网通。12nm先进工艺,支持 Android9.0系统。 GPU 采用超强 IMG GE8300 ,主频600MHz。支持高速LPDDR4/X,主频高达1600MHz。支持EMMC5.1。标配 WIFI 802.11 ac/abgn,BT 5.0。 支持…...

ps绘制动图
ps绘制动图教程(简易版)-直播gif动态效果图 第一步 打开ps绘制几个简单的长方形 第二步 将图层转化为智能图层 第三部 在窗口找到时间轴创建时间轴 第五步 通过变换来鼠标控制图像的变化并打下结束点 第六部 通过图像中的图像大小控制gif的大小 第七部 …...
AI学习指南机器学习篇-强化学习和深度学习简介
AI学习指南机器学习篇-强化学习和深度学习简介 强化学习和深度学习基本概念回顾 强化学习是一种机器学习方法,其目标是让智能体通过与环境的交互来学习最优的行为策略。在强化学习中,智能体不需要标记的训练数据,而是通过试错来提升自己的表…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...

跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
Redis:现代应用开发的高效内存数据存储利器
一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发,其初衷是为了满足他自己的一个项目需求,即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源,Redis凭借其简单易用、…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧
上周三,HubSpot宣布已构建与ChatGPT的深度集成,这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋,但同时也存在一些关于数据安全的担忧。 许多网络声音声称,这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...