当前位置: 首页 > news >正文

milvus upsert流程源码分析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Upsert 的数据流向:

在这里插入图片描述

1.客户端sdk发出Upsert API请求。

import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx:       ctx,Condition: NewTaskCondition(ctx),req:       request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator:   node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr:         node.chMgr,chTicker:      node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

msgPack变量:

在这里插入图片描述

msgPack包含了insertRequest和deleteRequest。

在这里插入图片描述

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

在这里插入图片描述

deleteRequest包含主键值。

相关文章:

milvus upsert流程源码分析

milvus版本:v2.3.2 整体架构: Upsert 的数据流向: 1.客户端sdk发出Upsert API请求。 import numpy as np from pymilvus import (connections,Collection, )num_entities, dim 4, 3print("start connecting to Milvus") connections.connect("default",…...

QT网络通信

九、网络 基础概念 1.1 TCP/UDP TCP/UDP UDP TCP 协议相同点:都存在于传输层,全双工通信 TCP:全双工通信、面向连接、可靠 TCP(即传输控制协议):是一种面向连接的传输层协议,它能提供高可靠性通…...

案例分析|山西某光伏发电站轨道巡检机器人解决方案

随着光伏发电技术的不断发展,光伏变电站配电室作为能量转换和输送的关键节点,承担着重要的电力分配和保护功能。然而,传统的人工巡检方式存在诸多问题,如巡检周期长、效率低、安全风险高等,已经无法满足光伏变电站配电…...

Apache POl

介绍 Apache POl是一个处理Miscrosoft Ofice各种文件格式的开源项目。简单来说就是,我们可以使用 POI 在 Java 程序中对Miscrosoft Office各种文件进行读写操作,一般情况下,POI都是用于操作 Excel 文件。 Apache POl 的应用场景 1.银行网银系统导出交易…...

高防服务器托管应注意什么

选择高防服务器托管主要考虑的因素:1.服务商的服务器大小。2.服务器的防御值大小。3.服务器机房的位置以及机房的资质。 具体内容如下: 1.服务器大小是按照U来定的,U是一种表示服务器外部尺寸的单位(计量单位:高度或厚…...

swagger-ui.html报错404,解决办法

swagger-ui.html报错404,解决办法!现在后端开发项目中,为了节省时间,使用swagger插件,可以方便的快捷生成接口文档。但是如果你在请求前端页面路径比如:http://127.0.0.1:7777/swagger-ui.html。找不到。那是因为你的配…...

golang 函数式编程库samber/mo使用: Future

golang 函数式编程库samber/mo使用: Future 如果您对samber/mo库不了解, 请先阅读第一篇 Option 本节讲述Future的使用,它可以帮助我们处理异步编程问题。 示例 我们先来看看下面代码的示例, 注释解释了每一步的操作。 packa…...

【Spring连载】使用Spring Data访问 MongoDB(十四)----Mongodb特有的查询方法

【Spring连载】使用Spring Data访问 MongoDB(十四)----Mongodb特有的查询方法 一、定义通用查询方法二、MongoDB特有的查询方法2.1 地理空间查询Geo-spatial Queries2.2 基于JSON的查询方法和字段限制2.3 使用SpEL表达式的基于JSON的查询2.4 全文检索查询…...

消息中间件篇之RabbitMQ-消息重复消费

一、导致重复消费的情况 1. 网络抖动。 2. 消费者挂了。 消费者消费消息后,当确认消息还没有发送到MQ时,就发生网络抖动或者消费者宕机。那当消费者恢复后,由于MQ没有收到消息,而且消费者有重试机制,消费者就会再一次消…...

常见设计模式之单例模式

单例模式 单例模式是一种常用的软件设计模式,主要目的是确保一个类在整个应用程序生命周期中只有一个实例,并提供一个全局访问点以获取该实例。 单例模式分为几种不同的实现方式,包括懒汉模式和饿汉模式。每种方式都有其特点和适用场景。例如…...

VL817-Q7 USB3.0 HUB芯片 适用于扩展坞 工控机 显示器

VL817-Q7 USB3.1 GEN1 HUB芯片 VL817-Q7 USB3.1 GEN1 HUB芯片 VIA Lab的VL817是一款现代USB 3.1 Gen 1集线器控制器,具有优化的成本结构和完全符合USB标准3.1 Gen 1规范,包括ecn和2017年1月的合规性测试更新。VL817提供双端口和双端口4端口配置&…...

【Android安全】Windows 环境下载 AOSP 源码

准备环境 安装 git 安装 Python 硬盘剩余容量最好大于 800G 打开 Git Bash,用 git 克隆源代码仓库 git clone https://android.googlesource.com/platform/manifest.git //没有梯子使用清华源 git clone https://aosp.tuna.tsinghua.edu.cn/platform/manifest.git这…...

Vue.js+SpringBoot开发快递管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 数据中心模块2.2 快递类型模块2.3 快递区域模块2.4 快递货架模块2.5 快递档案模块 三、界面展示3.1 登录注册3.2 快递类型3.3 快递区域3.4 快递货架3.5 快递档案3.6 系统基础模块 四、免责说明 一、摘要 1.1 项目介绍 …...

Linux/Spectra

Enumeration nmap 第一次扫描发现系统对外开放了22,80和3306端口,端口详细信息如下 22端口运行着ssh,80端口还是http,不过不同的是打开了mysql的3306端口 TCP/80 进入首页,点击链接时,提示域名不能解析&…...

C 嵌入式系统设计模式 08:硬件代理模式

本书的原著为:《Design Patterns for Embedded Systems in C ——An Embedded Software Engineering Toolkit 》,讲解的是嵌入式系统设计模式,是一本不可多得的好书。 本系列描述我对书中内容的理解。本文章描述访问硬件的设计模式之一&…...

【k8s配置与存储--持久化存储(PV、PVC、存储类)】

1、PV与PVC 介绍 持久卷(PersistentVolume,PV) 是集群中的一块存储,可以由管理员事先制备, 或者使用存储类(Storage Class)来动态制备。 持久卷是集群资源,就像节点也是集群资源一样…...

【Vite】解决Vite http proxy error: Error: connect ECONNREFUSED

今天写bug,发现了这个问题 我经过我一晚上的搜索努力,在github上找到了解决办法,不得不说,交友网站还是很好用的。 参考 这一行是关键代码。 因为我连的是本地后台服务,所以最后配置成这样 server: {open: true,pro…...

FPGA领域顶级学术会议

FPGA领域顶级学术会议主要有FPGA,FCCM,FPL和FPT。 1 FPGA 会议全名是: ACM/SIGDA International Symposium on Field-Programmable Gate Arrays 网站是:https://dl.acm.org/conference/fpga FPGA常年在美国举办,每年2月,偏FPGA基础研究; 该会议的论文免费下载。这个比…...

罗技鼠标滚轮模式介绍 | 鼠标滚轮异响 - 解决方案

滚轮模式介绍 针对罗技的滚轮模式进行介绍: 普通滚轮:滚动时有明显段落感,无法快速滚动。 智能滚轮:滚动力量较弱时,与普通滚轮无异;滚动力量大时,鼠标会自动减小滚轮阻尼,从而使滚…...

Scrapy与分布式开发(2.2):正则表达式

使用Python的re模块进行正则表达式操作详细讲解 一、引言 正则表达式是一种强大的文本处理工具,它使用特定的模式来搜索、匹配和替换文本。Python的re模块(正则表达式模块)提供了正则表达式匹配操作的所有功能。下面我们将详细讲解如何使用re模块进行正则表达式的操作。 …...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

R语言速释制剂QBD解决方案之三

本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...