深入源码分析kubernetes informer机制(四)DeltaFIFO
[阅读指南]
这是该系列第四篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。
文章目录
- client-go中的存储结构
- DeltaFIFO
- delta
- 索引 key
- queue push操作
- delta push 去重
- queue pop操作
- 总结
client-go中的存储结构
如下图,clinet-go中定义了存储类型接口store,用来提供存储对象的基本能力。

queue继承了store接口,并提供了队列的能力,队列中可以保存需要增删改的存储对象的key,它会取出队头元素,调用PopProcessFunc处理。
queue的实现有两个:FIFO和deltaFIFO。
deltaFIFO的不同点在于,deltaFIFO队列中,key对应的不是对象本身,而是对象的delta。
另外deltaFIFO除了通过add、update、delete添加元素,还有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。
DeltaFIFO
下面是deltaFIFO数据结构的定义
type DeltaFIFO struct {// 并发读写锁lock sync.RWMutexcond sync.Cond// `items` maps a key to a Deltas.// 资源对象的key与对应的delta数组,每个数组至少都会有一个deltaitems map[string]Deltas// 按照FIFO队列顺序存储key,用来给pop()消费。// 该数组不会有重复值,并且所有元素都一定在items中queue []string// 生成key值的函数,默认是 MetaNamespaceKeyFunckeyFunc KeyFunc// 本地缓存中已知的所有资源对象的keyknownObjects KeyListerGetter......
}
delta
如前面所说,deltaFIFO中key映射的不是对象本身,是delta数组。
根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。
type Delta struct {Type DeltaTypeObject interface{}
}type DeltaType string
const (Added DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced"Sync DeltaType = "Sync"
)
举个栗子,本地已经有了一个pod对象,
&Pod{Name: "mypod",Namespace: "default",Labels: map[string]string{"app": "web", "version": "0.0.1"},
}
此时mypod的 lable从web变成了app-server,reflector就会创建一个这样的delta对象放入FIFO队列中。
&Delta{Type: "Updated",Object: &Pod{Name: "mypod",Namespace: "default",Labels: map[string]string{"app": "app-server"},},
}
索引 key
deltaFIFO队列中,存储的是delta的key值,通过key值可以在items map中获取到对应的delta对象。
这个key值在初始化FIFO时通过KeyFunction进行定义,使用者没有指定时,都会使用自带的命名函数 MetaNamespaceKeyFunc 进行命名,命名规则是
- namespace不为空,key为/
- namespace为空,key为
这里的name是在yaml资源配置中的matadata.name,比如上面的mypod。在同一个资源下,name在所有api version都一定是唯一的。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil}return meta.GetName(), nil
}
queue push操作
watcher监控的资源变更时,会调用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法,最终它们都会通过queueActionLocked 方法往deltaFIFO队列中加入对应类型的delta对象。
queueActionLocked 也就是deltaFIFO的入队操作。
和一般的入队不同的是,新加入的delta不是直接加入到队尾,队列queue数组中保存的是delta的key。所以入队的操作是这样的
- 获取delta对应的key值(还记得keyfunc吗,又是它)
- 如果delta所属的资源key已经在队列中,直接将delta添加到key对应到deltas数组末尾。更新已存在的资源delta并不会影响他的key在队列中的位置。
- 如果delta所属的资源key不在队列中,就将key添加到队列末尾,并在items中关联key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)// 自定义的转换函数。可以在delta事件被处理之前完成一些预处理// 常见的用法是用来过滤一些处理程序不关注的资源对象、以及处理数据格式等if f.transformer != nil {obj, err = f.transformer(obj)}// 将新的delta放入资源key对应的delta数组末尾// 如果原本的key不存在,就是创建了一个新的数组,并将新的delta放入其中oldDeltas := f.items[id]newDeltas := append(oldDeltas, Delta{actionType, obj})// 对delta数组中的delta去重newDeltas = dedupDeltas(newDeltas)// 判断key是否已经在队列中,并且更新key对应的delta数组if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()}return nil
}
delta push 去重
上一节提到,delta进行push操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个。
func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {deltas[n-2] = *outreturn deltas[:n-1]}return deltas
}// 判断a、b两个delta是否重复
// 目前暂时只有两个delete类型的delta会被判定为重复。
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}return nil
}// 判定两个delta是否都是deleted类型
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {return nil}if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}
举个小小的例子来回顾一下delta push操作。假设queue中有3个pod对象,对应了不同的变更事件,如下所示。

此时watcher监听到资源发生变化:
- pod2收到了updated事件
- pod1收到了deleted事件
- pod3收到了deleted事件
于是,三个delta入队成功后的队列图如下

pod1已有一个deleted事件,再次收到deleted后,经过dedupDeltas去重,最终只保留一个deleted。
pod3虽然有两个deleted事件,但是他们并不是连续的事件,不会被去重
queue pop操作
deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。
pop出队时,会调用传参PopProcessFunc对出队元素进行处理。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// 队列为空时阻塞if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}// 取出队首的资源对象keyid := f.queue[0]f.queue = f.queue[1:]// 获取key对应的deltas数组item, ok := f.items[id]// 执行pop处理函数,处理delta事件,如果处理失败了,资源对象会被重新加入到队列中。// 但是如果队列中存在相同的对象,资源对象会被丢弃。err := process(item, isInInitialList)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}return item, err}
}
这里一开始有个小疑问,如果资源的delta处理失败了,并且队列中又出现了同样的资源key,这部分delta数据不就丢失了吗?
但是仔细看出队入队公用一个锁,pop处理对象时不会有新的对象入队,所以理论上不会出现在addIfNotPresent时,key是persent的情况。而deltaFIFO入队的逻辑,也不会存在一个队列有两个相同的key的情况,所以不会有丢失的问题,addIfNotPresent应该只是加多一层保障。如果理解有问题,欢迎大佬们指正。
回顾一下pop的调用方processLoop,调用pop时传入PopProcessFunc(c.config.Process))。
系列第一篇介绍informer时提到过,c.config.Process最终调用的是processDeltas函数,它包含了数据同步到存储,以及调用注册的用户函数两个操作。
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {...}}
}// 数据处理函数
func processDeltas(handler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Object// 区分事件类型进行处理switch d.Type {case Sync, Replaced, Added, Updated:// 同步存储数据if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}// 回调用户函数handler.OnUpdate(old, obj)} else {// 同步存储数据if err := clientState.Add(obj); err != nil {return err}// 回调用户函数handler.OnAdd(obj, isInInitialList)}case Deleted:// 同步存储数据if err := clientState.Delete(obj); err != nil {return err}// 回调用户函数handler.OnDelete(obj)}}return nil
}
总结
还是用上一节的例子,小结回顾一下整体的流程

相关文章:
深入源码分析kubernetes informer机制(四)DeltaFIFO
[阅读指南] 这是该系列第四篇 基于kubernetes 1.27 stage版本 为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。 文章目录 client-go中的存储结构DeltaFIFOdelta索引 keyqueue push操作delta push 去重 queue pop操作 总结 client-go中的存储结构…...
UI设计师个人工作总结范文
UI设计师个人工作总结范文篇一 感受到了领导们“海纳百川”的胸襟,感受到了作为广告人“不经历风雨,怎能见彩虹”的豪气,也体会到了重庆广告从业人员作为拓荒者的艰难和坚定(就目前国内广告业而言,我认为重庆广告业尚在发展阶段并…...
explicit关键字 和 static成员
explicit关键字 和 static成员 1、explicit 关键字2、static成员(静态成员变量属于类的(只有所属这个类的对象才能修改),不同于全局变量(任何对象都能修改))2.1 定义和性质2.2 静态成员的使用场…...
安装Linux操作系统CentOS 6详细图文步骤
为满足业务对Linux操作系统部署的要求,本文档主要提供CentOS 6操作系统的最小化安装和基本配置, 安装本系统建议最少1GB内存和2GB磁盘空间。 1、 使用光盘或者挂载ISO镜像,在出现如下图形界面时选择【Install or upgrade an existing system】并按Ent…...
新增守护进程管理、支持添加MySQL远程数据库,支持PHP版本切换,1Panel开源面板v1.5.0发布
2023年8月14日,现代化、开源的Linux服务器运维管理面板1Panel正式发布v1.5.0版本。 在这个版本中,1Panel新增了守护进程管理功能;支持添加MySQL远程数据库;支持添加FTP/S和WebDAV的SFTP服务;支持PHP版本切换。此外&am…...
十、接口(1)
本章概要 抽象类和方法接口创建 默认方法多继承接口中的静态方法Instrument 作为接口 接口和抽象类提供了一种将接口与实现分离的更加结构化的方法。 这种机制在编程语言中不常见,例如 C 只对这种概念有间接的支持。而在 Java 中存在这些关键字,说明…...
percentile_approx 聚合函数
返回组内 expr 的百分位近似值。 此函数是 approx_percentile 聚合函数的同义词。 语法 percentile_approx ( [ALL | DISTINCT ] expr, percentile [, accuracy] ) [FILTER ( WHERE cond ) ] 还可以使用 OVER 子句将此函数作为窗口函数调用。 参数 expr:数值表达…...
面试热题(全排列)
给定一个不含重复数字的整数数组 nums ,返回其 所有可能的全排列 。可以 按任意顺序 返回答案。 输入:nums [1,2,3] 输出:[[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]] 先在这里说明一下排列和组合的区别? 组合:是指从一…...
一文走进时序数据库性能测试工具 TSBS
一、背景 在物联网、车联网等时序数据场景中,数据的高速写入能力至关重要,会对产品方案的可用性、可靠性和扩展性产生影响。 以物联网为例,当面临千万甚至上亿设备、平均每个设备采集几十个到几百个指标时,每秒生成的数据将达到…...
通俗讲解-动量梯度下降法原理与代码实例
本站原创文章,转载请说明来自《老饼讲解-BP神经网络》bp.bbbdata.com 目录 一.动量梯度下降法介绍 1.1 动量梯度下降法简介与思想 1.2 动量梯度下降法的算法流程 二.动量梯度下降法代码实例 2.1 动量梯度下降法实例代码 一.动量梯度下降法介绍…...
【【STM32-USART串口协议】】
STM32-USART串口协议 USART串口协议 •通信的目的:将一个设备的数据传送到另一个设备,扩展硬件系统 •通信协议:制定通信的规则,通信双方按照协议规则进行数据收发 就是我们并不能在芯片上设计完全部的一下子完成所有的设计&…...
vue3.0组件通信
1、props 没有加TS限制类型的时候 1. 数组写法 defineProps([count, changCount]) 2. 对象写法 defineProps({count: Number,changCount: Function }) 3. 配置对象 defineProps({count: {type: Number,default: 2},changCount: {type: Function,required: true} })注意: defi…...
费曼学习法
费曼学习法 费曼学习法(Feynman Technique)是一种学习和理解复杂概念的方法,以理查德费曼(Richard Feynman)这位著名的理论物理学家命名。该方法的核心思想是通过将学习内容简化并用自己的话解释给别人,来…...
Kubernetes介绍和部署,使用
1.k8s kubernetes来自希腊语舵手,google, 8是ubernete 1.管理docker容器 go写的(并发) 2.用于微服务 3.cncf云原生基金会 2.mater(管理节点)和nodes(微服务节点) 3.部署 1.minikube kind官网在线测试语句 2.kubeadm(官方)(安装比较方便 添加) 3.github下载二进制包 4.yum(老) …...
视频汇聚平台EasyCVR视频监控播放平台WebRTC流地址无法播放的问题解决方案
开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中,将分散的各类视频资源进行统一汇聚、整合、集中管理,在视频监控播放上,TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放,可同时播放多…...
node.js 基础高并发案例
什么是高并发 高并发是指系统在同一时间段内需要处理大量的并发请求或同时进行大量的操作。在计算机领域中,高并发通常指的是在短时间内有大量的用户或客户端同时访问系统或进行操作,对系统的并发处理能力提出了较高的要求。 高并发的特点包括 大量的…...
OpenCV实例(八)车牌字符识别技术(二)字符识别
车牌字符识别技术(二)字符识别 1.字符识别原理及其发展阶段2.字符识别方法3.英文、数字识别4.车牌定位实例 1.字符识别原理及其发展阶段 匹配判别是字符识别的基本思想,与其他模式识别的应用非常类似。字符识别的基本原理就是对字符图像进行…...
svn文章五:问题排查与修复 - 出了问题怎么办?SVN故障排除与修复指南
文章五:问题排查与修复 - “出了问题怎么办?SVN故障排除与修复指南” 概述:在使用SVN时,难免会遇到一些问题和错误。在这篇文章中,我们将教您如何进行故障排查和修复,保护您的SVN仓库和数据安全。 1. 引言…...
国产开源ambari之DataSophon部署
介绍 DataSophon致力于快速实现部署、管理、监控以及自动化运维大数据云原生平台,帮助您快速构建起稳定、高效、可弹性伸缩的大数据云原生平台。 主要特性有: 快速部署,可快速完成300个节点的大数据集群部署兼容复杂环境,极少的依赖使其很容易适配各种复杂环境监控指标全面丰…...
面试之快速学习STL- vector
1. vector底层实现机制刨析: 简述:使用三个迭代器表示的:  这也就解释了,为什么 vector 容器在进行扩容后,与其相关的指针、引用以及迭代器可能会失效的原因。 insert 整体向后移 erase 整体向前移…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...
Java 与 MySQL 性能优化:MySQL 慢 SQL 诊断与分析方法详解
文章目录 一、开启慢查询日志,定位耗时SQL1.1 查看慢查询日志是否开启1.2 临时开启慢查询日志1.3 永久开启慢查询日志1.4 分析慢查询日志 二、使用EXPLAIN分析SQL执行计划2.1 EXPLAIN的基本使用2.2 EXPLAIN分析案例2.3 根据EXPLAIN结果优化SQL 三、使用SHOW PROFILE…...
