当前位置: 首页 > news >正文

milvus Delete API流程源码分析

Delete API执行流程源码解析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Delete 的数据流向:

在这里插入图片描述

1.客户端sdk发出Delete API请求。

from pymilvus import (connections,Collection,
)print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start delete entities")
## expr = "book_id in [0,1]" 主键
expr = "pk in [447868867306324066,447868867306324067]" ## 非主键
delete_result = hello_milvus.delete(expr)
print(delete_result)

2.服务端接受API请求,将request封装为deleteTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Delete delete records from collection, then these records cannot be searched.
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {......// request封装为deleteTaskdt := &deleteTask{ctx:         ctx,Condition:   NewTaskCondition(ctx),req:         request,idAllocator: node.rowIDAllocator,chMgr:       node.chMgr,chTicker:    node.chTicker,lb:          node.lbPolicy,}......// 将task压入dmQueue队列// MsgID will be set by Enqueue()if err := node.sched.dmQueue.Enqueue(dt); err != nil {......}......// 等待任务执行完if err := dt.WaitToFinish(); err != nil {......}......
}

DeleteRequest数据结构:

type DeleteRequest struct {Base                 *commonpb.MsgBaseDbName               stringCollectionName       stringPartitionName        stringExpr                 stringHashKeys             []uint32XXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

3.执行deleteTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_delete.go

func (dt *deleteTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")defer sp.End()log := log.Ctx(ctx)if len(dt.req.GetExpr()) == 0 {return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")}dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := dt.chMgr.getOrCreateDmlStream(dt.collectionID)if err != nil {return err}plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)if err != nil {return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())}// 判断走simpleDelete还是complexDeleteisSimple, termExp := getExpr(plan)if isSimple {// if could get delete.primaryKeys from delete exprerr := dt.simpleDelete(ctx, termExp, stream)if err != nil {return err}} else {// if get complex delete expr// need query from querynode before deleteerr = dt.complexDelete(ctx, plan, stream)if err != nil {log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))return err}}return nil
}

expr如果是主键表达式则走simpleDelete,否则走complexDelete。

4.simpleDelete

func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)if err != nil {log.Info("Failed to get primary keys from expr", zap.Error(err))return err}log.Debug("get primary keys from expr",zap.Int64("len of primary keys", numRow),zap.Int64("collectionID", dt.collectionID),zap.Int64("partationID", dt.partitionID))err = dt.produce(ctx, stream, primaryKeys)if err != nil {return err}return nil
}

函数getPrimaryKeysFromExpr()的返回schemapb.IDs。

type IDs struct {// Types that are valid to be assigned to IdField:////	*IDs_IntId//	*IDs_StrIdIdField              isIDs_IdFieldXXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

isIDs_IdField是一个接口类型。

type isIDs_IdField interface {isIDs_IdField()
}

isIDs_IdField有2个实现:

  • IDs_IntId
  • IDs_StrId
type IDs_IntId struct {IntId *LongArray
}type LongArray struct {Data                 []int64XXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}type IDs_StrId struct {StrId *StringArray
}type StringArray struct {Data                 []stringXXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

在这里插入图片描述

从expr提取主键存储到变量primaryKeys。

5.dt.produce()

func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {// 根据vchannels计算hashhashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)// repack delete msg by dmChannelresult := make(map[uint32]msgstream.TsMsg)numRows := int64(0)for index, key := range hashValues {vchannel := dt.vChannels[key]_, ok := result[key]if !ok {// 创建deleteMsgdeleteMsg, err := dt.newDeleteMsg(ctx)if err != nil {return err}deleteMsg.ShardName = vchannelresult[key] = deleteMsg}curMsg := result[key].(*msgstream.DeleteMsg)curMsg.HashValues = append(curMsg.HashValues, hashValues[index])curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)curMsg.NumRows++numRows++}// send delete request to log brokermsgPack := &msgstream.MsgPack{BeginTs: dt.BeginTs(),EndTs:   dt.EndTs(),}// 将deleteMsg包装进msgPackfor _, msg := range result {if msg != nil {msgPack.Msgs = append(msgPack.Msgs, msg)}}log.Debug("send delete request to virtual channels",zap.String("collectionName", dt.req.GetCollectionName()),zap.Int64("collectionID", dt.collectionID),zap.Strings("virtual_channels", dt.vChannels),zap.Int64("taskID", dt.ID()),zap.Duration("prepare duration", dt.tr.RecordSpan()))// 发送给mqerr := stream.Produce(msgPack)if err != nil {return err}dt.result.DeleteCnt += numRowsreturn nil
}

msgstream.TsMsg是一个接口类型。

有如下实现:

  • createCollectionMsg
  • CreateDatabaseMsg
  • CreateIndexMsg
  • createPartitionMsg
  • DataNodeTtMsg
  • DeleteMsg
  • DropCollectionMsg
  • DropDatabaseMsg
  • DropIndexMsg
  • DropPartitionMsg
  • FlushMsg
  • InsertMsg
  • LoadCollectionMsg
  • ReleaseCollectionMsg
  • TimeTickMsg

6.complexDelete

func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {err := dt.lb.Execute(ctx, CollectionWorkLoad{db:             dt.req.GetDbName(),collectionName: dt.req.GetCollectionName(),collectionID:   dt.collectionID,nq:             1,exec:           dt.getStreamingQueryAndDelteFunc(stream, plan),})if err != nil {log.Warn("fail to get or create dml stream", zap.Error(err))return err}return nil
}

最终会执行dt.getStreamingQueryAndDelteFunc。

这个函数会调用:

dt.produce(ctx, stream, result.GetIds())

simpleDelete也是调用这个函数。

complexDelete会根据expr查询出主键,然后根据主键进行删除数据。

7.总结

  • delete api根据expr走simpleDelete还是complexDelete。
  • complexDelete最终也会转化为simpleDelete。

相关文章:

milvus Delete API流程源码分析

Delete API执行流程源码解析 milvus版本:v2.3.2 整体架构: Delete 的数据流向: 1.客户端sdk发出Delete API请求。 from pymilvus import (connections,Collection, )print("start connecting to Milvus") connections.connect("default", host"192…...

CentOS使用Docker搭建Halo网站并实现无公网ip远程访问

🔥博客主页: 小羊失眠啦. 🎥系列专栏:《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞👍收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默&…...

【JVM】垃圾回收算法

📝个人主页:五敷有你 🔥系列专栏:JVM ⛺️稳中求进,晒太阳 垃圾回收算法 Java是如何实现垃圾回收的呢?简单来说,垃圾回收就做两件事 找到内存中存活的对象释放不在存活对象的内存&…...

如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值

如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值 如何和将原始request的Header中的值传递给openfeign请求的Header参考 [https://www.jb51.net/article/282522.htm](https://www.jb51.net/article/28252…...

Flink 侧输出流(SideOutput)

🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。 🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也…...

C语言中关于#include的一些小知识

写代码的过程中,因为手误,重复包含了头文件 可以看到没有报错 如果是你自己编写的头文件,那么如果没加唯一包含标识的话,那么编译器会编译报错的。如果是系统自带的头文件,由于其每个头文件都加了特殊标识&#xff0c…...

DSP芯片 机器码下载方法 【主要 “扯” 用Uniflash下载的方法】

还是以德州仪器的TMS320F28335芯片为 “栗子”,说说这事儿。 编制好的程序经过开发环境可以编译成扩展名为 .out 文件,这个文件就是DSP可以运行机器码,我们把这个文件下载到 DSP芯片中的程序区, 下载好了,这个芯片原…...

速盾网络:CDN用几天关了可以吗?安全吗?

在使用CDN(内容分发网络)时,有时候会考虑暂时关闭或暂停使用CDN服务的情况。但是,对于关闭CDN服务的时间长短以及安全性问题,很多人可能存在疑问。在本文中,我们将讨论CDN使用中关闭几天是否安全以及相关注…...

MR混合现实情景实训教学系统在高空作业课堂中的应用

高空作业是一项高风险的工作,对于从业者来说,不仅需要具备专业的技能,还需要有丰富的实践经验。然而,传统的课堂教学往往只能通过理论讲解和模拟训练来传授知识,无法提供真实的实践环境。而MR混合现实情景实训教学系统…...

Windows系统中定时执行python脚本

背景:本地Windows系统指定目录下会有文件的修改新增,这些变化的文件需要定时的被上传到git仓库中,这样不需要每次变更手动上传了。 首先编写一个检测文件夹下文件变化并且上传git仓库的python脚本(确保你已经在E:\edc_workspace\data_edc_et…...

HashMap 源码学习-jdk1.8

1、一些常量的定义 这里针对MIN_TREEIFY_CAPACITY 这个值进行解释一下。 java8里面,HashMap 的数据结构是数组 (链表或者红黑树),每个数组节点下可能会存在链表和红黑树之间的转换,当同一个索引下面的节点超过8个时…...

WebStorm 2023:让您更接近理想的开发环境 mac/win版

JetBrains WebStorm 2023激活版下载是一款强大而智能的Web开发工具,专为提高开发人员的生产力而设计。这款编辑器提供了许多先进的代码编辑功能,以及一系列实用的工具和插件,可帮助您更快地编写、调试和测试代码。 WebStorm 2023软件获取 We…...

java面试题:数字与字母的映射表

前言 好记性不如烂笔头。 问题: 现在有一个数字与字母的映射表,且有以下规则: 映射表: 数字 字母 3 A 7 B 9 C 规则: 1.碰到当前数字时,使用字母替换,例如,3-> A 2.碰到当前数…...

Jmeter教程-JMeter 环境安装及配置

Jmeter教程 JMeter 环境安装及配置 在使用 JMeter 之前,需要配置相应的环境,包括安装 JDK 和获取 JMeter ZIP 包。 安装JDK 1.JDK下载 示例环境为Windows11环境,读者应根据实际环境下载JDK的安装包。 JDK下载地址: Java21 下载 …...

十大基础排序算法

排序算法分类 排序:将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为: 内部排序:数据量不大,全部存在内存中;外部排序:数据量很大,无法一次性全部存在内存中,…...

IP协议及相关技术协议

一、IP基本认识 1. IP的作用 IP在TCP/IP模型中处于网络层,网络层的主要作用是实现主机与主机之间的通信,而IP的作用是在复杂的网络环境中将数据包发送给最终目的主机。 2. IP与MAC的关系 简单而言,MAC的作用是实现“直连”的两个设备之通信…...

小红书x-s算法及补环境 单旋转验证码

前言 大家好呀!新的一年,先祝大家新年快乐咯.祝大家逆向,风控都一把过咯. 新年第一篇文章,后续会持续更新哦! 春晚见证了中国经济的新风口,今年春晚互联网企业赞助商就两家,小红书和京东.小红书类似国外的ins,有预感未来小红书会大火,所以写了这篇文章,有需要的加我,联系方式…...

代码检测规范和git提交规范

摘要:之前开发的项目,代码检测和提交规范都是已经配置好的,最近自己新建的项目就记录下相关配置过程。 1. ESlint配置 2013年6月创建开源项目,提供一个插件化的JavaScript代码检测工具,创建项目是生成的eslintrc.js文…...

Elasticsearch:什么是搜索引擎?

搜索引擎定义 搜索引擎是一种软件程序或系统,旨在帮助用户查找存储在互联网或特定数据库中的信息。 搜索引擎的工作原理是对各种来源的内容进行索引和编目,然后根据用户的搜索查询向用户提供相关结果列表。 搜索引擎对于希望快速有效地查找特定信息的用…...

人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora

近30年,人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora 深蓝: 1997年,深蓝击败卡斯帕罗夫的比赛是通过一系列复杂的算法和策略实现的。深蓝的开发团队使用了一种名为“暴力搜索”的技术&…...

超级AI数字员工源码系统,支持贴牌OEM,独立部署交付

温馨提示:文末有资源获取方式最近“龙虾AI”概念很火,到处都在讨论。但说实话,这类技术对普通用户而言存在明显门槛,部署要代码、配置要工程师、日常运行的Token成本也不低——轻度使用每月100-200元,重度甚至单日上千…...

FanControl深度应用指南:从噪音溯源到智能散热系统搭建

FanControl深度应用指南:从噪音溯源到智能散热系统搭建 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending/f…...

Linux下RTL8188无线网卡变身AP热点:从驱动安装到自动分配IP全流程(附避坑指南)

Linux下RTL8188无线网卡配置AP热点全攻略:从驱动到自动IP分配的实战指南 在嵌入式开发和物联网应用中,将无线网卡配置为接入点(AP)是常见需求。RTL8188系列USB无线网卡因其高性价比和广泛兼容性,成为开发者的热门选择。…...

65R125-ASEMI超结MOS管TO-220封装

编辑:LL65R125-ASEMI超结MOS管TO-220封装型号:65R125品牌:ASEMI沟道:NPN封装:TO-220漏源电流:31A漏源电压:650VRDS(on):125mΩ批号:最新引脚数量:3封装尺寸:如…...

Three.js 3D地图实战:从GeoJSON数据到交互式可视化(附完整代码)

Three.js 3D地图实战:从GeoJSON数据到交互式可视化 当我们需要在网页上展示一个具有真实地理特征的3D地图时,Three.js无疑是最强大的工具之一。它不仅能让地图以立体的形式呈现,还能添加各种交互效果,让数据可视化变得更加生动。本…...

3步打造极速安全系统:AtlasOS开源优化方案全解析

3步打造极速安全系统:AtlasOS开源优化方案全解析 【免费下载链接】Atlas 🚀 An open and lightweight modification to Windows, designed to optimize performance, privacy and security. 项目地址: https://gitcode.com/GitHub_Trending/atlas1/Atl…...

用了Qoder写代码飞快,联调时却总因字段不一致返工,问题出在哪?

发版前夜,前端字段对不上后端接口,联调卡了整晚。这种场景在 AI Coding 普及后并不罕见,不少团队用了 Qoder 觉得生成快、跑通快,可一旦要改需求,系统就僵住了。看似工具背锅,其实根子往往不在速度&#xf…...

刚刚,英伟达革了自己的命:智能体自主进化7天,干掉所有算子工程师、GPU专家

这应该是今天刚刚出炉的、最炸裂的文章。在很多算子开发的微信群组,已经掀起了轩然大波。「这或许是超人类智能在软件领域的真正首次展露。」英伟达许冰刚刚在 X 上发出了如此断言。他所评论的,正是他与 Terry Chen 和 Zhifan Ye 为共同一作的一项英伟达…...

vLLM-v0.17.1实操手册:Prometheus监控指标接入与告警配置

vLLM-v0.17.1实操手册:Prometheus监控指标接入与告警配置 1. vLLM框架简介 vLLM是一个专为大型语言模型(LLM)设计的高性能推理和服务库,由加州大学伯克利分校的天空计算实验室(Sky Computing Lab)开发,现已发展为社区驱动的开源项目。这个框…...

保姆级教程:CLIP-GmP-ViT-L-14图文匹配工具一键部署,小白也能玩转AI识图

保姆级教程:CLIP-GmP-ViT-L-14图文匹配工具一键部署,小白也能玩转AI识图 你是不是经常好奇,AI到底是怎么看懂图片的?给它一张照片和几个文字描述,它怎么知道哪个描述最贴切?今天,我就带你亲手搭…...