当前位置: 首页 > news >正文

Neo4j插入数据逐级提升速度4倍又4倍

语雀版:https://www.yuque.com/xw76/back/dtukgqfkfwg1d6yo

目录

    • 背景介绍
    • 初始方案Node()创建
    • 事务批量提交
    • 记录Node是否存在
    • 生成Cypher语句执行
    • 数据库参数优化
    • 切换成85k个三元组测试
    • 建索引(很显著!!!)
    • MATCH替代MERGE
    • CREATE替代MERGE
    • 总结
    • Other可能的优化

背景介绍

待插入数据的背景:

Wikipedia的数据,一个标题title就是一个页面,一个页面下有很多个子章节chapter(我这里称之为section),每个section都是由若干个块chunk组成的(我这里每个块100个token)。

如图所示,红色是title,棕色是section,绿色是chunk(显示的chunkID)

我的三元组,(s,p,o)是三元组主谓宾,

titleOrSection表示主语s是一个标题还是章节:为了标记label

titleName表示这个主语谓语是哪一个title页面下的:为了唯一区分相同的section,比如上面图里"Career"就出现了两个,但是它们的属性是不同的,一个是"Henry->Career",一个是"Adelina->Career"。就应该分开,举个例子,论文A包含章节"Approach",“Approach"包含"Retrieval Augmented Generation”,论文B也包含章节"Approach",“Approach"包含"Bacterial Culture”,如果没有区分"Approach",那论文A的节点向下探索就会找到论文B的章节块,所以这里会有一个titleName进行区分。

初始方案Node()创建

Node()创建,用的py2neo,是逻辑层面。

def create_node_and_relation_t1(data):for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]oLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + ostart_node = Node(sLabel, name=s, unique_name=nameAndTitleNameS)end_node = Node(oLabel, name=o, unique_name=nameAndTitleNameO)# 使用 MERGE 确保节点不会重复创建graph.merge(start_node, sLabel, "unique_name")# 根据标签和 unique_name 属性进行合并graph.merge(end_node, oLabel, "unique_name")relation = Relationship(start_node, p, end_node)graph.create(relation)

用的数据包含2697个三元组,用时:32s。

事务批量提交

事务开销大

  • 每次创建节点和关系都会产生网络开销
  • 频繁的单个事务提交会显著降低性能

批量操作不充分

  • 没有利用Neo4j的批量写入能力
  • 每个节点和关系都单独处理

每次执行 graph.run() 都会发送一个请求给数据库,如果是逐行执行,会非常慢。使用事务可以将多条语句合并到一个请求中,这样能大大减少请求次数。

加事务,batch

def create_node_and_relation_t2_batch(data, batch_size=1000):tx = graph.begin()for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]oLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + ostart_node = Node(sLabel, name=s, unique_name=nameAndTitleNameS)end_node = Node(oLabel, name=o, unique_name=nameAndTitleNameO)# 使用 MERGE 确保节点不会重复创建tx.merge(start_node, sLabel, "unique_name")  # 根据描述和 unique_name 属性进行合并tx.merge(end_node, oLabel, "unique_name")relation = Relationship(start_node, p, end_node)tx.create(relation)# 每 batch_size 条记录提交一次if i % batch_size == 0:graph.commit(tx)tx = graph.begin()  # 开始新的事务# 提交剩余的数据graph.commit(tx)

用的数据包含2697个三元组,用时:20s,快了12秒,快了37.5%。

不同的batchSize不一样的结果,为10的时候最好

记录Node是否存在

如果Node存在,就不用去new了

def create_node_and_relation_batch_exist(data, batch_size=1000):tx = graph.begin()existNode = {}for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]if p != "hasChunk" and p != "hasSection":logging.error(f"关系类型错误:{p}")breakoLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + oif nameAndTitleNameS in existNode:start_node = existNode[nameAndTitleNameS]else:start_node = Node(sLabel, name=s, unique_name=nameAndTitleNameS)existNode[nameAndTitleNameS] = start_nodeif nameAndTitleNameO in existNode:end_node = existNode[nameAndTitleNameO]else:end_node = Node(oLabel, name=o, unique_name=nameAndTitleNameO)existNode[nameAndTitleNameO] = end_noderelation = Relationship(start_node, p, end_node)tx.create(relation)if i % batch_size == 0:graph.commit(tx)tx = graph.begin()  # 开始新的事务graph.commit(tx)

用的数据包含2697个三元组,用时:8s,快了12秒,快了60%。比初始快了75%。

生成Cypher语句执行

cypher语句生成,依然每一个都得tx.run()一次。

tx.run()的批量参数,可惜我这里不适用

def create_cypher(data, batch_size=1000):# 开始一个事务tx = graph.begin()for i in tqdm(range(1, len(data))):s = data[i][0]p = data[i][1]o = data[i][2]sLabel = data[i][3]oLabel = "Section" if p == "hasSection" else "Chunk"nameAndTitleNameS = data[i][4] + "->" + snameAndTitleNameO = data[i][4] + "->" + oparams = {'s': s,'o': o,'nameAndTitleNameS': nameAndTitleNameS,'nameAndTitleNameO': nameAndTitleNameO}query = f"""MERGE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})MERGE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})MERGE (s)-[:{p}]->(o)"""tx.run(query, params)# 每 batch_size 条记录提交一次if i % batch_size == 0:graph.commit(tx)tx = graph.begin()  # 开始新的事务# 提交剩余的数据graph.commit(tx)

用的数据包含2697个三元组,用时:7s,快了1秒,快了12.5%。比初始快了78%。

数据库参数优化

数据库参数调整。没什么感觉,可能数据量比较小吧

server.memory.heap.initial_size=8g
server.memory.heap.max_size=16g
server.memory.pagecache.size=10g
dbms.memory.transaction.total.max=6g

ChatGPT推荐的

# 确保 neo4j.conf 中的内存配置足够大
dbms.memory.pagecache.size
# 禁用事务日志
dbms.tx_log.rotation.retention_policy=0
dbms.tx_log.rotation.retention_policy=100Mdbms.memory.heap.initial_size=8G
dbms.memory.heap.max_size=16G
dbms.memory.pagecache.size=4G

Claude推荐的

# 堆内存和页缓存
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=16G
dbms.memory.pagecache.size=8G# 并发和事务配置
dbms.tx_log.rotation.size=512M
dbms.tx_log.rotation.retention_policy=keep_latest 3# 索引和约束优化
dbms.index.operational_sampling_enabled=false
dbms.index.background_sampling_enabled=true# 批量导入优化
dbms.import.csv.legacy_quote_escaping=false
dbms.import.csv.multi_line_fields=false

切换成85k个三元组测试

事务批量提交+Node是否存在+数据库参数优化:用的数据包含85661个三元组,

batch_size = 1000 用时:09:25

batch_size = 10 用时:04:40

事务批量提交+Cypher语句+数据库参数优化:用的数据包含85661个三元组,

batch_size = 1000 用时:1:12:57

:::color5
Node是否存在完胜

:::

建索引(很显著!!!)

如果在 MERGE 操作中某些属性(如 unique_name)只是为了保证唯一性,但不会频繁变动,可以通过优化索引和减少查询字段来提高性能。

CREATE CONSTRAINT FOR (s:Title) REQUIRE s.unique_name IS UNIQUE;
CREATE CONSTRAINT FOR (s:Section) REQUIRE s.unique_name IS UNIQUE;
CREATE CONSTRAINT FOR (o:Chunk) REQUIRE o.unique_name IS UNIQUE;

事务批量提交+Node是否存在+数据库参数优化:用的数据包含85661个三元组

batch_size = 1000 用时:09:25

batch_size = 10 用时:04:50

事务批量提交+Cypher语句+数据库参数优化:用的数据包含85661个三元组

batch_size = 1000 用时:03:14

batch_size = 10 用时:02:06

:::color5
Cypher语句完胜,因为索引对Node是否存在没用

batch_size小一点会更好

:::

MATCH替代MERGE

合并操作:当前代码是基于 MERGE 来进行节点的创建和关系的建立。每次都在执行 MERGE 操作,会影响性能。如果确定节点已经存在,可以使用 MATCH 代替 MERGE。python代码里用内存记录是否存在,不用Neo4j记录,if nameAndTitleNameS in alreadyNode:

同步可以改第一种方案不去用MERGE来做判断,而是用内存,并记录存在的Node()对象,直接字典取。理论来说这个应该最快,但是这是py2neo,不是cypher语句层面

  • 对于每个源节点和目标节点,首先检查它是否已经在 alreadyNode 中。
  • 如果节点已经存在,使用 MATCH 语句来查找它。
  • 如果节点不存在,使用 MERGE 来创建它。
if nameAndTitleNameS in alreadyNode:s_query = f"""MATCH (s:{sLabel} {{unique_name: $nameAndTitleNameS}})"""
else:s_query = f"""MERGE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})"""alreadyNode.append(nameAndTitleNameS)if nameAndTitleNameO in alreadyNode:o_query = f"""MATCH (o:{oLabel} {{unique_name: $nameAndTitleNameO}})"""
else:o_query = f"""MERGE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})"""alreadyNode.append(nameAndTitleNameO)# MERGE 和 MATCH 之间需要有 with
query = f"""
{s_query}
WITH s
{o_query}
WITH s, o
CREATE (s)-[:{p}]->(o)
"""
tx.run(query, params)

batch_size = 1000 用时:04:11

batch_size = 10 用时:03:37

CREATE替代MERGE

使用MERGE或者MATCH,会随着KG里数据的变多而变得缓慢

CREATEMERGE 的区别:

  • **CREATE**:直接创建新的节点或关系,不做任何检查。如果节点或关系已经存在,CREATE 不会做任何检查,也不会返回已存在的节点。
  • **MERGE**:执行节点或关系的查找(MATCH),如果不存在则创建。如果节点已经存在,MERGE 会进行比 CREATE 更多的检查(例如,索引匹配等),这会增加查询的开销。

因为明确知道了是不存在的Node,可以直接CREATE,不用走MERGE的检查。如果你确定不会出现重复的节点或关系,CREATE 可能更快。

if nameAndTitleNameS in alreadyNode:s_query = f"""MATCH (s:{sLabel} {{unique_name: $nameAndTitleNameS}})"""
else:# s_query = f"""MERGE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})"""s_query = f"""CREATE (s:{sLabel} {{name: $s, unique_name: $nameAndTitleNameS}})"""alreadyNode.append(nameAndTitleNameS)if nameAndTitleNameO in alreadyNode:o_query = f"""MATCH (o:{oLabel} {{unique_name: $nameAndTitleNameO}})"""
else:# o_query = f"""MERGE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})"""o_query = f"""CREATE (o:{oLabel} {{name: $o, unique_name: $nameAndTitleNameO}})"""alreadyNode.append(nameAndTitleNameO)

batch_size = 10 用时:03:12

总结

当没有建立索引的时候:记录Node是否存在,是最快的。

当建立了索引之后:纯MERGE的Cypher是最快的,各种判断更换为MATCH和CREATE只会减慢速度(与理论分析有点相反)。

事务批量提交不是越大越好,设置为10最佳

以上总结都针对于我当前的环境和数据,建议选取少部分数据进行实验,针对上述提到的几种方案进行测试,找到适用的。

85647个三元组:用时02:06,平均680条/秒

469101个三元组:用时10:40,平均733条/秒

Other可能的优化

MATCH优化?能不能也存到内存里?每次MATCH也很花时间,能不能把match省略掉,比如能不能把这个节点存到字典里,如果key存在,就直接去取,然后再放到后面的query里?

网上搜的性能对比参考:

  • py2neo: 约100-500条/秒
  • neo4j-python-driver: 约1000-5000条/秒
  • 直接Cypher批量: 可达10000+条/秒

LOAD_CSV方式:如果节点有大量重复数据,先通过 LOAD CSV 等批量导入方式将数据导入到数据库,再通过 MATCHCREATEMERGE 来建立关系。

异步执行:如果你的应用允许,你可以考虑异步提交查询,将多个 tx.run(query, params) 放在异步队列中并并行执行。

相关文章:

Neo4j插入数据逐级提升速度4倍又4倍

语雀版:https://www.yuque.com/xw76/back/dtukgqfkfwg1d6yo 目录 背景介绍初始方案Node()创建事务批量提交记录Node是否存在生成Cypher语句执行数据库参数优化切换成85k个三元组测试建索引(很显著!!!)MATCH…...

C++特殊类设计(单例模式等)

目录 引言 1.请设计一个类,不能被拷贝 2. 请设计一个类,只能在堆上创建对象 为什么设置实例的方法为静态成员呢 3. 请设计一个类,只能在栈上创建对象 4. 请设计一个类,不能被继承 5. 请设计一个类,只能创建一个对…...

J8学习打卡笔记

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 Inception v1算法实战与解析 导入数据数据预处理划分数据集搭建模型训练模型正式训练结果可视化详细网络结构图个人总结 import os, PIL, random, pathlib imp…...

前端学习-操作元素内容(二十二)

目录 前言 目标 对象.innerText 属性 对象.innerHTML属性 案例 年会抽奖 需求 方法一 方法二 总结 前言 曾经沧海难为水,除却巫山不是云。 目标 能够修改元素的文本更换内容 DOM对象都是根据标签生成的,所以操作标签,本质上就是操作DOM对象,…...

【踩坑】pip离线+在线在虚拟环境中安装指定版本cudnn攻略

pip离线在线在虚拟环境中安装指定版本cudnn攻略 在线安装离线安装Windows环境:Linux环境: 清华源官方帮助文档 https://mirrors.tuna.tsinghua.edu.cn/help/pypi/ 标题的离线的意思是先下载whl文件再安装到虚拟环境,在线的意思是直接在当前虚…...

golang操作sqlite3加速本地结构化数据查询

目录 摘要Sqlite3SQLite 命令SQLite 语法SQLite 数据类型列亲和类型——优先选择机制 SQLite 创建数据库SQLite 附加数据库SQLite 分离数据库 SQLite 创建表SQLite 删除表 SQLite Insert 语句SQLite Select 语句SQLite 运算符SQLite 算术运算符SQLite 比较运算符SQLite 逻辑运算…...

vllm加速(以Qwen2.5-7B-instruction为例)与流式响应

1. vllm介绍 什么是vllm? vLLM 是一个高性能的大型语言模型推理引擎,采用创新的内存管理和执行架构,显著提升了大模型推理的速度和效率。它支持高度并发的请求处理,能够同时服务数千名用户,并且兼容多种深度学习框架,…...

WordPress弹窗公告插件-ts小陈

使用效果 使用后网站所有页面弹出窗口 插件特色功能 设置弹窗公告样式:这款插件可展示弹窗样式公告,用户点击完之后不再弹出,不会频繁打扰用户。可设置弹窗中间的logo图:这款插件针对公告图片进行独立设置,你可以在设…...

【ELK】容器化部署Elasticsearch1.14.3集群【亲测可用】

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1. 部署1.1 单节点1.2 新节点加入集群1.3 docker-compose部署集群 1. 部署 按照官网流程进行部署 使用 Docker 安装 Elasticsearch |Elasticsearch 指南 [8.14] |…...

[SAP ABAP] ALV状态栏GUI STATUS的快速创建

使用事务码SE38进入到指定程序,点击"显示对象列表"按钮 鼠标右键,选择"GUI状态" 弹出【创建状态】窗口,填写状态以及短文本描述以后,点击按钮 点击"调整模板",复制已有程序的状态栏 填…...

【Linux】NET9运行时移植到低版本GLIBC的Linux纯内核板卡上

背景介绍 自制了一块Linux板卡(基于全志T113i) 厂家给的SDK和根文件系统能够提供的GLIBC的版本比较低 V2.25/GCC 7.3.1 这个版本是无法运行dotnet以及dotnet生成的AOT应用的 我用另一块同Cortex-A7的板子运行dotnet的报错 版本不够,运行不了 而我的板子是根本就识…...

深入浅出支持向量机(SVM)

1. 引言 支持向量机(SVM, Support Vector Machine)是一种常见的监督学习算法,广泛应用于分类、回归和异常检测等任务。自1990年代初期由Vapnik等人提出以来,SVM已成为机器学习领域的核心方法之一,尤其在模式识别、文本…...

Vue脚手架相关记录

脚手架 安装与配置 安装node node -> 16.20.2 切换淘宝镜像 npm install -g cnpm -registryhttp://registry.npm.taobao.orgnpm config set registry http://registry.npm.taobao.org/使用了第二个,下一步才有用 安装vue npm install -g vue/clivscode中不给运行vue解…...

基于Docker的Minio分布式集群实践

目录 1. 说明 2. 配置表 3. 步骤 3.1 放行服务端口 3.2 docker-compose 编排 4. 入口反向代理与负载均衡配置 4.1 api入口 4.2 管理入口 5. 用例 6. 参考 1. 说明 以多节点的Docker容器方式实现minio存储集群,并配以nginx反向代理及负载均衡作为访问入口。…...

Scala 的迭代器

迭代器定义:迭代器不是一种集合,它是一种用于访问集合的方法。 迭代器需要通过集合对应的迭代器调用迭代器的方法来访问。 支持函数式编程风格,便于链式操作。 创建一个迭代器,相关代码如下: object Test {def mai…...

vue实现文件流形式的导出下载

文章目录 Vue 项目中下载返回的文件流操作步骤一、使用 Axios 请求文件流数据二、设置响应类型为 ‘blob’三、创建下载链接并触发下载四、在 Vue 组件中集成下载功能五、解释与实例说明1、使用 Axios 请求文件流数据:设置响应类型为 blob:创建下载链接并…...

【DIY飞控板PX4移植】深入理解NuttX下PX4串口配置:ttyS设备编号与USARTUART对应关系解析

深入理解NuttX下PX4串口配置:ttyS设备编号与USART&UART对应关系解析 引言问题描述原因分析结论 引言 在嵌入式系统开发中,串口(USART/UART)的配置是一个常见但关键的任务。对于使用 NuttX 作为底层操作系统的飞控系统&#x…...

【报错解决】vsvars32.bat 不是内部或外部命令,也不是可运行的程序或批处理文件

报错信息: 背景问题:Boost提示 “cl” 不是内部或外部命令,也不是可运行的程序或批处理文件时,   按照这篇博客的方法【传送】添加了环境变量后,仍然报错: 报错原因: vsvars32.bat 的路径不正…...

CTFshow-文件上传(Web151-170)

CTFshow-文件上传(Web151-170) 参考了CTF show 文件上传篇(web151-170,看这一篇就够啦)-CSDN博客 Web151 要求png,然后上传带有一句话木马的a.png,burp抓包后改后缀为a.php,然后蚁剑连接,找fl…...

深度学习基础--将yolov5的backbone模块用于目标识别会出现怎么效果呢??

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 前言 yolov5网络结构比较复杂,上次我们简要介绍了yolov5网络模块,并且复现了C3模块,深度学习基础–yolov5网络结构简介&a…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层&#xf…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

高危文件识别的常用算法:原理、应用与企业场景

高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...