milvus upsert流程源码分析
milvus版本:v2.3.2
整体架构:

Upsert 的数据流向:

1.客户端sdk发出Upsert API请求。
import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)
2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。
注意这里是dmQueue。DDL类型的是ddQueue。
代码路径:internal\proxy\impl.go
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx: ctx,Condition: NewTaskCondition(ctx),req: request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator: node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr: node.chMgr,chTicker: node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}
3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。
PreExecute()一般为参数校验等工作。
Execute()为真正执行逻辑。
PostExecute()执行完后的逻辑,什么都不做,返回nil。
代码路径:internal\proxy\task_upsert.go
func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs: it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}
msgPack变量:

msgPack包含了insertRequest和deleteRequest。

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

deleteRequest包含主键值。
相关文章:
milvus upsert流程源码分析
milvus版本:v2.3.2 整体架构: Upsert 的数据流向: 1.客户端sdk发出Upsert API请求。 import numpy as np from pymilvus import (connections,Collection, )num_entities, dim 4, 3print("start connecting to Milvus") connections.connect("default",…...
QT网络通信
九、网络 基础概念 1.1 TCP/UDP TCP/UDP UDP TCP 协议相同点:都存在于传输层,全双工通信 TCP:全双工通信、面向连接、可靠 TCP(即传输控制协议):是一种面向连接的传输层协议,它能提供高可靠性通…...
案例分析|山西某光伏发电站轨道巡检机器人解决方案
随着光伏发电技术的不断发展,光伏变电站配电室作为能量转换和输送的关键节点,承担着重要的电力分配和保护功能。然而,传统的人工巡检方式存在诸多问题,如巡检周期长、效率低、安全风险高等,已经无法满足光伏变电站配电…...
Apache POl
介绍 Apache POl是一个处理Miscrosoft Ofice各种文件格式的开源项目。简单来说就是,我们可以使用 POI 在 Java 程序中对Miscrosoft Office各种文件进行读写操作,一般情况下,POI都是用于操作 Excel 文件。 Apache POl 的应用场景 1.银行网银系统导出交易…...
高防服务器托管应注意什么
选择高防服务器托管主要考虑的因素:1.服务商的服务器大小。2.服务器的防御值大小。3.服务器机房的位置以及机房的资质。 具体内容如下: 1.服务器大小是按照U来定的,U是一种表示服务器外部尺寸的单位(计量单位:高度或厚…...
swagger-ui.html报错404,解决办法
swagger-ui.html报错404,解决办法!现在后端开发项目中,为了节省时间,使用swagger插件,可以方便的快捷生成接口文档。但是如果你在请求前端页面路径比如:http://127.0.0.1:7777/swagger-ui.html。找不到。那是因为你的配…...
golang 函数式编程库samber/mo使用: Future
golang 函数式编程库samber/mo使用: Future 如果您对samber/mo库不了解, 请先阅读第一篇 Option 本节讲述Future的使用,它可以帮助我们处理异步编程问题。 示例 我们先来看看下面代码的示例, 注释解释了每一步的操作。 packa…...
【Spring连载】使用Spring Data访问 MongoDB(十四)----Mongodb特有的查询方法
【Spring连载】使用Spring Data访问 MongoDB(十四)----Mongodb特有的查询方法 一、定义通用查询方法二、MongoDB特有的查询方法2.1 地理空间查询Geo-spatial Queries2.2 基于JSON的查询方法和字段限制2.3 使用SpEL表达式的基于JSON的查询2.4 全文检索查询…...
消息中间件篇之RabbitMQ-消息重复消费
一、导致重复消费的情况 1. 网络抖动。 2. 消费者挂了。 消费者消费消息后,当确认消息还没有发送到MQ时,就发生网络抖动或者消费者宕机。那当消费者恢复后,由于MQ没有收到消息,而且消费者有重试机制,消费者就会再一次消…...
常见设计模式之单例模式
单例模式 单例模式是一种常用的软件设计模式,主要目的是确保一个类在整个应用程序生命周期中只有一个实例,并提供一个全局访问点以获取该实例。 单例模式分为几种不同的实现方式,包括懒汉模式和饿汉模式。每种方式都有其特点和适用场景。例如…...
VL817-Q7 USB3.0 HUB芯片 适用于扩展坞 工控机 显示器
VL817-Q7 USB3.1 GEN1 HUB芯片 VL817-Q7 USB3.1 GEN1 HUB芯片 VIA Lab的VL817是一款现代USB 3.1 Gen 1集线器控制器,具有优化的成本结构和完全符合USB标准3.1 Gen 1规范,包括ecn和2017年1月的合规性测试更新。VL817提供双端口和双端口4端口配置&…...
【Android安全】Windows 环境下载 AOSP 源码
准备环境 安装 git 安装 Python 硬盘剩余容量最好大于 800G 打开 Git Bash,用 git 克隆源代码仓库 git clone https://android.googlesource.com/platform/manifest.git //没有梯子使用清华源 git clone https://aosp.tuna.tsinghua.edu.cn/platform/manifest.git这…...
Vue.js+SpringBoot开发快递管理系统
目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 数据中心模块2.2 快递类型模块2.3 快递区域模块2.4 快递货架模块2.5 快递档案模块 三、界面展示3.1 登录注册3.2 快递类型3.3 快递区域3.4 快递货架3.5 快递档案3.6 系统基础模块 四、免责说明 一、摘要 1.1 项目介绍 …...
Linux/Spectra
Enumeration nmap 第一次扫描发现系统对外开放了22,80和3306端口,端口详细信息如下 22端口运行着ssh,80端口还是http,不过不同的是打开了mysql的3306端口 TCP/80 进入首页,点击链接时,提示域名不能解析&…...
C 嵌入式系统设计模式 08:硬件代理模式
本书的原著为:《Design Patterns for Embedded Systems in C ——An Embedded Software Engineering Toolkit 》,讲解的是嵌入式系统设计模式,是一本不可多得的好书。 本系列描述我对书中内容的理解。本文章描述访问硬件的设计模式之一&…...
【k8s配置与存储--持久化存储(PV、PVC、存储类)】
1、PV与PVC 介绍 持久卷(PersistentVolume,PV) 是集群中的一块存储,可以由管理员事先制备, 或者使用存储类(Storage Class)来动态制备。 持久卷是集群资源,就像节点也是集群资源一样…...
【Vite】解决Vite http proxy error: Error: connect ECONNREFUSED
今天写bug,发现了这个问题 我经过我一晚上的搜索努力,在github上找到了解决办法,不得不说,交友网站还是很好用的。 参考 这一行是关键代码。 因为我连的是本地后台服务,所以最后配置成这样 server: {open: true,pro…...
FPGA领域顶级学术会议
FPGA领域顶级学术会议主要有FPGA,FCCM,FPL和FPT。 1 FPGA 会议全名是: ACM/SIGDA International Symposium on Field-Programmable Gate Arrays 网站是:https://dl.acm.org/conference/fpga FPGA常年在美国举办,每年2月,偏FPGA基础研究; 该会议的论文免费下载。这个比…...
罗技鼠标滚轮模式介绍 | 鼠标滚轮异响 - 解决方案
滚轮模式介绍 针对罗技的滚轮模式进行介绍: 普通滚轮:滚动时有明显段落感,无法快速滚动。 智能滚轮:滚动力量较弱时,与普通滚轮无异;滚动力量大时,鼠标会自动减小滚轮阻尼,从而使滚…...
Scrapy与分布式开发(2.2):正则表达式
使用Python的re模块进行正则表达式操作详细讲解 一、引言 正则表达式是一种强大的文本处理工具,它使用特定的模式来搜索、匹配和替换文本。Python的re模块(正则表达式模块)提供了正则表达式匹配操作的所有功能。下面我们将详细讲解如何使用re模块进行正则表达式的操作。 …...
AGI推理延迟压至8.3ms?揭秘2026奇点大会上3家头部厂商联合发布的异构硬件栈,性能提升417%
第一章:2026奇点智能技术大会:AGI与硬件设计 2026奇点智能技术大会(https://ml-summit.org) AGI架构对芯片微架构的倒逼演进 本届大会首次披露了基于全栈可微分计算范式的AGI参考模型——Singularity-7B,其训练阶段要求硬件具备动态稀疏张量…...
不止于找gadget:挖掘ROPgadget在Linux二进制分析中的隐藏用法与实用技巧
超越ROP利用:ROPgadget在Linux二进制分析中的高阶应用指南 在安全研究领域,我们常常陷入工具定位的思维定式——将ROPgadget仅仅视为CTF比赛中的ROP链构造工具。但当你真正深入探索这个工具的代码解析能力时,会发现它实际上是一个被严重低估的…...
StarUML插件DDL实战:5分钟搞定ER图到MySQL建表脚本(含Java代码生成)
StarUML插件DDL实战:5分钟搞定ER图到MySQL建表脚本(含Java代码生成) 在数据库设计领域,效率往往决定着项目推进的速度。想象一下这样的场景:产品经理刚刚确认完需求,开发团队需要在两小时内完成数据库设计并…...
【PolarCTF】x64
先检查下,发现是64位的程序IDA分析程序这里很明显read函数存在溢出然后可以看到后面函数Shell同时也可以找到/bin/sh字符串这里我们可以通过IDA查找攻击思路如下:填充垃圾数据pop_rdi_ret将/bin/sh传递到rdi中执行Shell函数获得shellgdb调试程序将cyclic…...
iOS快捷指令进阶玩法:自动抓取并修改网页数据,打造你的移动端‘爬虫’小工具
iOS快捷指令进阶:构建移动端数据抓取与处理工作流 每次看到同事手动从网页复制数据到Excel再整理格式,我都忍不住想分享这个秘密武器——用快捷指令打造的移动端"爬虫"工具。上周市场部的Lisa用这套方法,把原本每天半小时的数据整理…...
TPA626芯片资料(1)
一、芯片介绍1. 概述TPA626是3PEAK(思瑞浦)生产的一款双向电流和功率监测器芯片,用于精确测量电流、电压和功耗,广泛应用于电源管理、服务器和电信设备等领域。TPA626是一款电流与功率监测器,具备I2C或SMBUS兼容接口。…...
别再踩坑了!ABAP里用CL_JAVA_SCRIPT调用JS计算MD5的完整流程(含中文UTF-8处理)
ABAP中通过JavaScript引擎实现跨平台MD5校验的完整实践指南 当ABAP系统需要与Java等外部系统进行数据校验时,标准函数MD5_CALCULATE_HASH_FOR_CHAR的结果差异常常让开发者陷入困境。本文将深入解析编码差异背后的本质原因,并提供一个基于CL_JAVA_SCRIPT…...
AGI情感可信度认证体系(ISO/IEC 23894-3:2024中国落地首测实录)
第一章:AGI情感可信度认证体系的范式跃迁 2026奇点智能技术大会(https://ml-summit.org) 传统AI伦理评估框架长期依赖静态规则库与人工标注反馈,难以应对AGI在开放语境中动态生成共情表达、自我修正情绪表征及跨文化情感适配等高阶能力。情感可信度不再…...
从宏观到微观:交通流模型如何驱动现代仿真系统
1. 交通流模型的三大流派:宏观、微观与混合 第一次接触交通流模型时,我被各种术语搞得晕头转向。直到在智慧城市项目里实际调试仿真系统,才真正理解不同模型的适用场景。简单来说,交通流模型就像观察蚂蚁搬家——你可以站在高处看…...
从理论到实践:流体机械核心知识点与工程应用解析
1. 流体机械基础:从速度三角形到能量转换 第一次接触流体机械时,我被那些旋转的叶轮和复杂的流道搞得一头雾水。直到导师在黑板上画出第一个速度三角形,突然就明白了流体如何在叶轮中"跳舞"。速度三角形是理解流体机械的钥匙&#…...
