深入源码分析kubernetes informer机制(四)DeltaFIFO
[阅读指南]
这是该系列第四篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。
文章目录
- client-go中的存储结构
- DeltaFIFO
- delta
- 索引 key
- queue push操作
- delta push 去重
- queue pop操作
- 总结
client-go中的存储结构
如下图,clinet-go中定义了存储类型接口store,用来提供存储对象的基本能力。

queue继承了store接口,并提供了队列的能力,队列中可以保存需要增删改的存储对象的key,它会取出队头元素,调用PopProcessFunc处理。
queue的实现有两个:FIFO和deltaFIFO。
deltaFIFO的不同点在于,deltaFIFO队列中,key对应的不是对象本身,而是对象的delta。
另外deltaFIFO除了通过add、update、delete添加元素,还有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。
DeltaFIFO
下面是deltaFIFO数据结构的定义
type DeltaFIFO struct {// 并发读写锁lock sync.RWMutexcond sync.Cond// `items` maps a key to a Deltas.// 资源对象的key与对应的delta数组,每个数组至少都会有一个deltaitems map[string]Deltas// 按照FIFO队列顺序存储key,用来给pop()消费。// 该数组不会有重复值,并且所有元素都一定在items中queue []string// 生成key值的函数,默认是 MetaNamespaceKeyFunckeyFunc KeyFunc// 本地缓存中已知的所有资源对象的keyknownObjects KeyListerGetter......
}
delta
如前面所说,deltaFIFO中key映射的不是对象本身,是delta数组。
根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。
type Delta struct {Type DeltaTypeObject interface{}
}type DeltaType string
const (Added DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced"Sync DeltaType = "Sync"
)
举个栗子,本地已经有了一个pod对象,
&Pod{Name: "mypod",Namespace: "default",Labels: map[string]string{"app": "web", "version": "0.0.1"},
}
此时mypod的 lable从web变成了app-server,reflector就会创建一个这样的delta对象放入FIFO队列中。
&Delta{Type: "Updated",Object: &Pod{Name: "mypod",Namespace: "default",Labels: map[string]string{"app": "app-server"},},
}
索引 key
deltaFIFO队列中,存储的是delta的key值,通过key值可以在items map中获取到对应的delta对象。
这个key值在初始化FIFO时通过KeyFunction进行定义,使用者没有指定时,都会使用自带的命名函数 MetaNamespaceKeyFunc 进行命名,命名规则是
- namespace不为空,key为/
- namespace为空,key为
这里的name是在yaml资源配置中的matadata.name,比如上面的mypod。在同一个资源下,name在所有api version都一定是唯一的。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil}return meta.GetName(), nil
}
queue push操作
watcher监控的资源变更时,会调用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法,最终它们都会通过queueActionLocked 方法往deltaFIFO队列中加入对应类型的delta对象。
queueActionLocked 也就是deltaFIFO的入队操作。
和一般的入队不同的是,新加入的delta不是直接加入到队尾,队列queue数组中保存的是delta的key。所以入队的操作是这样的
- 获取delta对应的key值(还记得keyfunc吗,又是它)
- 如果delta所属的资源key已经在队列中,直接将delta添加到key对应到deltas数组末尾。更新已存在的资源delta并不会影响他的key在队列中的位置。
- 如果delta所属的资源key不在队列中,就将key添加到队列末尾,并在items中关联key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)// 自定义的转换函数。可以在delta事件被处理之前完成一些预处理// 常见的用法是用来过滤一些处理程序不关注的资源对象、以及处理数据格式等if f.transformer != nil {obj, err = f.transformer(obj)}// 将新的delta放入资源key对应的delta数组末尾// 如果原本的key不存在,就是创建了一个新的数组,并将新的delta放入其中oldDeltas := f.items[id]newDeltas := append(oldDeltas, Delta{actionType, obj})// 对delta数组中的delta去重newDeltas = dedupDeltas(newDeltas)// 判断key是否已经在队列中,并且更新key对应的delta数组if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()}return nil
}
delta push 去重
上一节提到,delta进行push操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个。
func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {deltas[n-2] = *outreturn deltas[:n-1]}return deltas
}// 判断a、b两个delta是否重复
// 目前暂时只有两个delete类型的delta会被判定为重复。
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}return nil
}// 判定两个delta是否都是deleted类型
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {return nil}if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}
举个小小的例子来回顾一下delta push操作。假设queue中有3个pod对象,对应了不同的变更事件,如下所示。

此时watcher监听到资源发生变化:
- pod2收到了updated事件
- pod1收到了deleted事件
- pod3收到了deleted事件
于是,三个delta入队成功后的队列图如下

pod1已有一个deleted事件,再次收到deleted后,经过dedupDeltas去重,最终只保留一个deleted。
pod3虽然有两个deleted事件,但是他们并不是连续的事件,不会被去重
queue pop操作
deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。
pop出队时,会调用传参PopProcessFunc对出队元素进行处理。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// 队列为空时阻塞if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}// 取出队首的资源对象keyid := f.queue[0]f.queue = f.queue[1:]// 获取key对应的deltas数组item, ok := f.items[id]// 执行pop处理函数,处理delta事件,如果处理失败了,资源对象会被重新加入到队列中。// 但是如果队列中存在相同的对象,资源对象会被丢弃。err := process(item, isInInitialList)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}return item, err}
}
这里一开始有个小疑问,如果资源的delta处理失败了,并且队列中又出现了同样的资源key,这部分delta数据不就丢失了吗?
但是仔细看出队入队公用一个锁,pop处理对象时不会有新的对象入队,所以理论上不会出现在addIfNotPresent时,key是persent的情况。而deltaFIFO入队的逻辑,也不会存在一个队列有两个相同的key的情况,所以不会有丢失的问题,addIfNotPresent应该只是加多一层保障。如果理解有问题,欢迎大佬们指正。
回顾一下pop的调用方processLoop,调用pop时传入PopProcessFunc(c.config.Process))。
系列第一篇介绍informer时提到过,c.config.Process最终调用的是processDeltas函数,它包含了数据同步到存储,以及调用注册的用户函数两个操作。
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {...}}
}// 数据处理函数
func processDeltas(handler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Object// 区分事件类型进行处理switch d.Type {case Sync, Replaced, Added, Updated:// 同步存储数据if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}// 回调用户函数handler.OnUpdate(old, obj)} else {// 同步存储数据if err := clientState.Add(obj); err != nil {return err}// 回调用户函数handler.OnAdd(obj, isInInitialList)}case Deleted:// 同步存储数据if err := clientState.Delete(obj); err != nil {return err}// 回调用户函数handler.OnDelete(obj)}}return nil
}
总结
还是用上一节的例子,小结回顾一下整体的流程

相关文章:
深入源码分析kubernetes informer机制(四)DeltaFIFO
[阅读指南] 这是该系列第四篇 基于kubernetes 1.27 stage版本 为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。 文章目录 client-go中的存储结构DeltaFIFOdelta索引 keyqueue push操作delta push 去重 queue pop操作 总结 client-go中的存储结构…...
UI设计师个人工作总结范文
UI设计师个人工作总结范文篇一 感受到了领导们“海纳百川”的胸襟,感受到了作为广告人“不经历风雨,怎能见彩虹”的豪气,也体会到了重庆广告从业人员作为拓荒者的艰难和坚定(就目前国内广告业而言,我认为重庆广告业尚在发展阶段并…...
explicit关键字 和 static成员
explicit关键字 和 static成员 1、explicit 关键字2、static成员(静态成员变量属于类的(只有所属这个类的对象才能修改),不同于全局变量(任何对象都能修改))2.1 定义和性质2.2 静态成员的使用场…...
安装Linux操作系统CentOS 6详细图文步骤
为满足业务对Linux操作系统部署的要求,本文档主要提供CentOS 6操作系统的最小化安装和基本配置, 安装本系统建议最少1GB内存和2GB磁盘空间。 1、 使用光盘或者挂载ISO镜像,在出现如下图形界面时选择【Install or upgrade an existing system】并按Ent…...
新增守护进程管理、支持添加MySQL远程数据库,支持PHP版本切换,1Panel开源面板v1.5.0发布
2023年8月14日,现代化、开源的Linux服务器运维管理面板1Panel正式发布v1.5.0版本。 在这个版本中,1Panel新增了守护进程管理功能;支持添加MySQL远程数据库;支持添加FTP/S和WebDAV的SFTP服务;支持PHP版本切换。此外&am…...
十、接口(1)
本章概要 抽象类和方法接口创建 默认方法多继承接口中的静态方法Instrument 作为接口 接口和抽象类提供了一种将接口与实现分离的更加结构化的方法。 这种机制在编程语言中不常见,例如 C 只对这种概念有间接的支持。而在 Java 中存在这些关键字,说明…...
percentile_approx 聚合函数
返回组内 expr 的百分位近似值。 此函数是 approx_percentile 聚合函数的同义词。 语法 percentile_approx ( [ALL | DISTINCT ] expr, percentile [, accuracy] ) [FILTER ( WHERE cond ) ] 还可以使用 OVER 子句将此函数作为窗口函数调用。 参数 expr:数值表达…...
面试热题(全排列)
给定一个不含重复数字的整数数组 nums ,返回其 所有可能的全排列 。可以 按任意顺序 返回答案。 输入:nums [1,2,3] 输出:[[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]] 先在这里说明一下排列和组合的区别? 组合:是指从一…...
一文走进时序数据库性能测试工具 TSBS
一、背景 在物联网、车联网等时序数据场景中,数据的高速写入能力至关重要,会对产品方案的可用性、可靠性和扩展性产生影响。 以物联网为例,当面临千万甚至上亿设备、平均每个设备采集几十个到几百个指标时,每秒生成的数据将达到…...
通俗讲解-动量梯度下降法原理与代码实例
本站原创文章,转载请说明来自《老饼讲解-BP神经网络》bp.bbbdata.com 目录 一.动量梯度下降法介绍 1.1 动量梯度下降法简介与思想 1.2 动量梯度下降法的算法流程 二.动量梯度下降法代码实例 2.1 动量梯度下降法实例代码 一.动量梯度下降法介绍…...
【【STM32-USART串口协议】】
STM32-USART串口协议 USART串口协议 •通信的目的:将一个设备的数据传送到另一个设备,扩展硬件系统 •通信协议:制定通信的规则,通信双方按照协议规则进行数据收发 就是我们并不能在芯片上设计完全部的一下子完成所有的设计&…...
vue3.0组件通信
1、props 没有加TS限制类型的时候 1. 数组写法 defineProps([count, changCount]) 2. 对象写法 defineProps({count: Number,changCount: Function }) 3. 配置对象 defineProps({count: {type: Number,default: 2},changCount: {type: Function,required: true} })注意: defi…...
费曼学习法
费曼学习法 费曼学习法(Feynman Technique)是一种学习和理解复杂概念的方法,以理查德费曼(Richard Feynman)这位著名的理论物理学家命名。该方法的核心思想是通过将学习内容简化并用自己的话解释给别人,来…...
Kubernetes介绍和部署,使用
1.k8s kubernetes来自希腊语舵手,google, 8是ubernete 1.管理docker容器 go写的(并发) 2.用于微服务 3.cncf云原生基金会 2.mater(管理节点)和nodes(微服务节点) 3.部署 1.minikube kind官网在线测试语句 2.kubeadm(官方)(安装比较方便 添加) 3.github下载二进制包 4.yum(老) …...
视频汇聚平台EasyCVR视频监控播放平台WebRTC流地址无法播放的问题解决方案
开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中,将分散的各类视频资源进行统一汇聚、整合、集中管理,在视频监控播放上,TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放,可同时播放多…...
node.js 基础高并发案例
什么是高并发 高并发是指系统在同一时间段内需要处理大量的并发请求或同时进行大量的操作。在计算机领域中,高并发通常指的是在短时间内有大量的用户或客户端同时访问系统或进行操作,对系统的并发处理能力提出了较高的要求。 高并发的特点包括 大量的…...
OpenCV实例(八)车牌字符识别技术(二)字符识别
车牌字符识别技术(二)字符识别 1.字符识别原理及其发展阶段2.字符识别方法3.英文、数字识别4.车牌定位实例 1.字符识别原理及其发展阶段 匹配判别是字符识别的基本思想,与其他模式识别的应用非常类似。字符识别的基本原理就是对字符图像进行…...
svn文章五:问题排查与修复 - 出了问题怎么办?SVN故障排除与修复指南
文章五:问题排查与修复 - “出了问题怎么办?SVN故障排除与修复指南” 概述:在使用SVN时,难免会遇到一些问题和错误。在这篇文章中,我们将教您如何进行故障排查和修复,保护您的SVN仓库和数据安全。 1. 引言…...
国产开源ambari之DataSophon部署
介绍 DataSophon致力于快速实现部署、管理、监控以及自动化运维大数据云原生平台,帮助您快速构建起稳定、高效、可弹性伸缩的大数据云原生平台。 主要特性有: 快速部署,可快速完成300个节点的大数据集群部署兼容复杂环境,极少的依赖使其很容易适配各种复杂环境监控指标全面丰…...
面试之快速学习STL- vector
1. vector底层实现机制刨析: 简述:使用三个迭代器表示的:  这也就解释了,为什么 vector 容器在进行扩容后,与其相关的指针、引用以及迭代器可能会失效的原因。 insert 整体向后移 erase 整体向前移…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
【深度学习新浪潮】什么是credit assignment problem?
Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...
路由基础-路由表
本篇将会向读者介绍路由的基本概念。 前言 在一个典型的数据通信网络中,往往存在多个不同的IP网段,数据在不同的IP网段之间交互是需要借助三层设备的,这些设备具备路由能力,能够实现数据的跨网段转发。 路由是数据通信网络中最基…...
Python第七周作业
Python第七周作业 文章目录 Python第七周作业 1.使用open以只读模式打开文件data.txt,并逐行打印内容 2.使用pathlib模块获取当前脚本的绝对路径,并创建logs目录(若不存在) 3.递归遍历目录data,输出所有.csv文件的路径…...
