milvus Delete API流程源码分析
Delete API执行流程源码解析
milvus版本:v2.3.2
整体架构:
Delete 的数据流向:
1.客户端sdk发出Delete API请求。
from pymilvus import (connections,Collection,
)print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start delete entities")
## expr = "book_id in [0,1]" 主键
expr = "pk in [447868867306324066,447868867306324067]" ## 非主键
delete_result = hello_milvus.delete(expr)
print(delete_result)
2.服务端接受API请求,将request封装为deleteTask,并压入dmQueue队列。
注意这里是dmQueue。DDL类型的是ddQueue。
代码路径:internal\proxy\impl.go
// Delete delete records from collection, then these records cannot be searched.
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {......// request封装为deleteTaskdt := &deleteTask{ctx: ctx,Condition: NewTaskCondition(ctx),req: request,idAllocator: node.rowIDAllocator,chMgr: node.chMgr,chTicker: node.chTicker,lb: node.lbPolicy,}......// 将task压入dmQueue队列// MsgID will be set by Enqueue()if err := node.sched.dmQueue.Enqueue(dt); err != nil {......}......// 等待任务执行完if err := dt.WaitToFinish(); err != nil {......}......
}
DeleteRequest数据结构:
type DeleteRequest struct {Base *commonpb.MsgBaseDbName stringCollectionName stringPartitionName stringExpr stringHashKeys []uint32XXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}
3.执行deleteTask的3个方法PreExecute、Execute、PostExecute。
PreExecute()一般为参数校验等工作。
Execute()为真正执行逻辑。
PostExecute()执行完后的逻辑,什么都不做,返回nil。
代码路径:internal\proxy\task_delete.go
func (dt *deleteTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")defer sp.End()log := log.Ctx(ctx)if len(dt.req.GetExpr()) == 0 {return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")}dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := dt.chMgr.getOrCreateDmlStream(dt.collectionID)if err != nil {return err}plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)if err != nil {return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())}// 判断走simpleDelete还是complexDeleteisSimple, termExp := getExpr(plan)if isSimple {// if could get delete.primaryKeys from delete exprerr := dt.simpleDelete(ctx, termExp, stream)if err != nil {return err}} else {// if get complex delete expr// need query from querynode before deleteerr = dt.complexDelete(ctx, plan, stream)if err != nil {log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))return err}}return nil
}
expr如果是主键表达式则走simpleDelete,否则走complexDelete。
4.simpleDelete
func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)if err != nil {log.Info("Failed to get primary keys from expr", zap.Error(err))return err}log.Debug("get primary keys from expr",zap.Int64("len of primary keys", numRow),zap.Int64("collectionID", dt.collectionID),zap.Int64("partationID", dt.partitionID))err = dt.produce(ctx, stream, primaryKeys)if err != nil {return err}return nil
}
函数getPrimaryKeysFromExpr()的返回schemapb.IDs。
type IDs struct {// Types that are valid to be assigned to IdField://// *IDs_IntId// *IDs_StrIdIdField isIDs_IdFieldXXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}
isIDs_IdField是一个接口类型。
type isIDs_IdField interface {isIDs_IdField()
}
isIDs_IdField有2个实现:
- IDs_IntId
- IDs_StrId
type IDs_IntId struct {IntId *LongArray
}type LongArray struct {Data []int64XXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}type IDs_StrId struct {StrId *StringArray
}type StringArray struct {Data []stringXXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}
从expr提取主键存储到变量primaryKeys。
5.dt.produce()
func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {// 根据vchannels计算hashhashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)// repack delete msg by dmChannelresult := make(map[uint32]msgstream.TsMsg)numRows := int64(0)for index, key := range hashValues {vchannel := dt.vChannels[key]_, ok := result[key]if !ok {// 创建deleteMsgdeleteMsg, err := dt.newDeleteMsg(ctx)if err != nil {return err}deleteMsg.ShardName = vchannelresult[key] = deleteMsg}curMsg := result[key].(*msgstream.DeleteMsg)curMsg.HashValues = append(curMsg.HashValues, hashValues[index])curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)curMsg.NumRows++numRows++}// send delete request to log brokermsgPack := &msgstream.MsgPack{BeginTs: dt.BeginTs(),EndTs: dt.EndTs(),}// 将deleteMsg包装进msgPackfor _, msg := range result {if msg != nil {msgPack.Msgs = append(msgPack.Msgs, msg)}}log.Debug("send delete request to virtual channels",zap.String("collectionName", dt.req.GetCollectionName()),zap.Int64("collectionID", dt.collectionID),zap.Strings("virtual_channels", dt.vChannels),zap.Int64("taskID", dt.ID()),zap.Duration("prepare duration", dt.tr.RecordSpan()))// 发送给mqerr := stream.Produce(msgPack)if err != nil {return err}dt.result.DeleteCnt += numRowsreturn nil
}
msgstream.TsMsg是一个接口类型。
有如下实现:
- createCollectionMsg
- CreateDatabaseMsg
- CreateIndexMsg
- createPartitionMsg
- DataNodeTtMsg
- DeleteMsg
- DropCollectionMsg
- DropDatabaseMsg
- DropIndexMsg
- DropPartitionMsg
- FlushMsg
- InsertMsg
- LoadCollectionMsg
- ReleaseCollectionMsg
- TimeTickMsg
6.complexDelete
func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {err := dt.lb.Execute(ctx, CollectionWorkLoad{db: dt.req.GetDbName(),collectionName: dt.req.GetCollectionName(),collectionID: dt.collectionID,nq: 1,exec: dt.getStreamingQueryAndDelteFunc(stream, plan),})if err != nil {log.Warn("fail to get or create dml stream", zap.Error(err))return err}return nil
}
最终会执行dt.getStreamingQueryAndDelteFunc。
这个函数会调用:
dt.produce(ctx, stream, result.GetIds())
simpleDelete也是调用这个函数。
complexDelete会根据expr查询出主键,然后根据主键进行删除数据。
7.总结
- delete api根据expr走simpleDelete还是complexDelete。
- complexDelete最终也会转化为simpleDelete。
相关文章:

milvus Delete API流程源码分析
Delete API执行流程源码解析 milvus版本:v2.3.2 整体架构: Delete 的数据流向: 1.客户端sdk发出Delete API请求。 from pymilvus import (connections,Collection, )print("start connecting to Milvus") connections.connect("default", host"192…...

CentOS使用Docker搭建Halo网站并实现无公网ip远程访问
🔥博客主页: 小羊失眠啦. 🎥系列专栏:《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞👍收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默&…...

【JVM】垃圾回收算法
📝个人主页:五敷有你 🔥系列专栏:JVM ⛺️稳中求进,晒太阳 垃圾回收算法 Java是如何实现垃圾回收的呢?简单来说,垃圾回收就做两件事 找到内存中存活的对象释放不在存活对象的内存&…...
如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值
如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值 如何和将原始request的Header中的值传递给openfeign请求的Header参考 [https://www.jb51.net/article/282522.htm](https://www.jb51.net/article/28252…...
Flink 侧输出流(SideOutput)
🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。 🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也…...

C语言中关于#include的一些小知识
写代码的过程中,因为手误,重复包含了头文件 可以看到没有报错 如果是你自己编写的头文件,那么如果没加唯一包含标识的话,那么编译器会编译报错的。如果是系统自带的头文件,由于其每个头文件都加了特殊标识,…...
DSP芯片 机器码下载方法 【主要 “扯” 用Uniflash下载的方法】
还是以德州仪器的TMS320F28335芯片为 “栗子”,说说这事儿。 编制好的程序经过开发环境可以编译成扩展名为 .out 文件,这个文件就是DSP可以运行机器码,我们把这个文件下载到 DSP芯片中的程序区, 下载好了,这个芯片原…...
速盾网络:CDN用几天关了可以吗?安全吗?
在使用CDN(内容分发网络)时,有时候会考虑暂时关闭或暂停使用CDN服务的情况。但是,对于关闭CDN服务的时间长短以及安全性问题,很多人可能存在疑问。在本文中,我们将讨论CDN使用中关闭几天是否安全以及相关注…...
MR混合现实情景实训教学系统在高空作业课堂中的应用
高空作业是一项高风险的工作,对于从业者来说,不仅需要具备专业的技能,还需要有丰富的实践经验。然而,传统的课堂教学往往只能通过理论讲解和模拟训练来传授知识,无法提供真实的实践环境。而MR混合现实情景实训教学系统…...

Windows系统中定时执行python脚本
背景:本地Windows系统指定目录下会有文件的修改新增,这些变化的文件需要定时的被上传到git仓库中,这样不需要每次变更手动上传了。 首先编写一个检测文件夹下文件变化并且上传git仓库的python脚本(确保你已经在E:\edc_workspace\data_edc_et…...

HashMap 源码学习-jdk1.8
1、一些常量的定义 这里针对MIN_TREEIFY_CAPACITY 这个值进行解释一下。 java8里面,HashMap 的数据结构是数组 (链表或者红黑树),每个数组节点下可能会存在链表和红黑树之间的转换,当同一个索引下面的节点超过8个时…...

WebStorm 2023:让您更接近理想的开发环境 mac/win版
JetBrains WebStorm 2023激活版下载是一款强大而智能的Web开发工具,专为提高开发人员的生产力而设计。这款编辑器提供了许多先进的代码编辑功能,以及一系列实用的工具和插件,可帮助您更快地编写、调试和测试代码。 WebStorm 2023软件获取 We…...
java面试题:数字与字母的映射表
前言 好记性不如烂笔头。 问题: 现在有一个数字与字母的映射表,且有以下规则: 映射表: 数字 字母 3 A 7 B 9 C 规则: 1.碰到当前数字时,使用字母替换,例如,3-> A 2.碰到当前数…...

Jmeter教程-JMeter 环境安装及配置
Jmeter教程 JMeter 环境安装及配置 在使用 JMeter 之前,需要配置相应的环境,包括安装 JDK 和获取 JMeter ZIP 包。 安装JDK 1.JDK下载 示例环境为Windows11环境,读者应根据实际环境下载JDK的安装包。 JDK下载地址: Java21 下载 …...

十大基础排序算法
排序算法分类 排序:将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为: 内部排序:数据量不大,全部存在内存中;外部排序:数据量很大,无法一次性全部存在内存中,…...

IP协议及相关技术协议
一、IP基本认识 1. IP的作用 IP在TCP/IP模型中处于网络层,网络层的主要作用是实现主机与主机之间的通信,而IP的作用是在复杂的网络环境中将数据包发送给最终目的主机。 2. IP与MAC的关系 简单而言,MAC的作用是实现“直连”的两个设备之通信…...

小红书x-s算法及补环境 单旋转验证码
前言 大家好呀!新的一年,先祝大家新年快乐咯.祝大家逆向,风控都一把过咯. 新年第一篇文章,后续会持续更新哦! 春晚见证了中国经济的新风口,今年春晚互联网企业赞助商就两家,小红书和京东.小红书类似国外的ins,有预感未来小红书会大火,所以写了这篇文章,有需要的加我,联系方式…...

代码检测规范和git提交规范
摘要:之前开发的项目,代码检测和提交规范都是已经配置好的,最近自己新建的项目就记录下相关配置过程。 1. ESlint配置 2013年6月创建开源项目,提供一个插件化的JavaScript代码检测工具,创建项目是生成的eslintrc.js文…...

Elasticsearch:什么是搜索引擎?
搜索引擎定义 搜索引擎是一种软件程序或系统,旨在帮助用户查找存储在互联网或特定数据库中的信息。 搜索引擎的工作原理是对各种来源的内容进行索引和编目,然后根据用户的搜索查询向用户提供相关结果列表。 搜索引擎对于希望快速有效地查找特定信息的用…...
人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora
近30年,人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora 深蓝: 1997年,深蓝击败卡斯帕罗夫的比赛是通过一系列复杂的算法和策略实现的。深蓝的开发团队使用了一种名为“暴力搜索”的技术&…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...

376. Wiggle Subsequence
376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
Java求职者面试指南:计算机基础与源码原理深度解析
Java求职者面试指南:计算机基础与源码原理深度解析 第一轮提问:基础概念问题 1. 请解释什么是进程和线程的区别? 面试官:进程是程序的一次执行过程,是系统进行资源分配和调度的基本单位;而线程是进程中的…...
flow_controllers
关键点: 流控制器类型: 同步(Sync):发布操作会阻塞,直到数据被确认发送。异步(Async):发布操作非阻塞,数据发送由后台线程处理。纯同步(PureSync…...