ES数据过多,索引拆分
公司企微聊天数据存储在 ES 中,虽然按照企业分储在不同的ES 索引中,但某些常用的企微主体使用量还是很大。4年中一个索引存储数据已经达到46多亿条数据,占用存储3.1tb,

ES 配置
由于多一个副本,存储得翻倍,成本考虑,所以没有设置副本分片(不建议你们这么做)
索引拆分后,只需要修改插入时的代码逻辑,设置好别名后,查询代码是不需要改动的
进入主题,拆分索引 ,数据按年进行拆分
1.设置索引模板
PUT _template/chat_message_template
{"index_patterns": ["chat-message-*"],"settings": {"number_of_shards": 15, // 这里和集群的节点对应,需要是节点的整数倍"number_of_replicas": 0, // 分片副本,生产环境最好设置>=1"refresh_interval": "20s","codec": "best_compression","max_result_window": "100000000"},"mappings": {"properties": {"msg_id": {"type": "keyword"},"msg_time": {"type": "long"},"msg_type": {"type": "keyword"},"text": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","term_vector": "with_positions_offsets"},"image": {"type": "text"},.........}}
}
2.根据模板新建索引
PUT chat-message-2021
3.reindex 到新索引
!!重要!!
# reindex 前查看磁盘是否够用,索引切分后,占用磁盘大小比一个索引大了一些
# 尽量多留一些空间给新索引 , 扩容前磁盘占用40%左右是个不错的选择,请提前进行扩容
# 我们遇到了空间不够用的情况,后扩容,虽然说是滚动扩容,客服说任务可能会取消,但扩容后任务还在,所以尽量提前把空间扩容好
# 查看各节点的分片数量及磁盘使用
GET /_cat/nodes?v&h=name,shards,disk.used_percent
# 结果
name disk.used_percent
es-cn-**-data-g4-3 86.12
es-cn-**-data-g4-2 81.75
es-cn-**-data-g4-0 85.94
es-cn-**-data-g4-4 85.73
es-cn-**-data-g4-1 85.67
reindex命令
# 限流保护:添加?requests_per_second=1000参数避免集群过载
# 异步执行,会直接返回 taskId: wait_for_completion
# 执行完后,修改以下 msg_time 再执行,可以并行迁移每年的数据POST _reindex?requests_per_second=1000&wait_for_completion=false
{"conflicts": "proceed", // 默认情况下,当发生version conflict的时候,_reindex会被abort。解决方案设置为“proceed”:"source": {"index": "chat-message-0613","size":5000,"query": {"range": {"msg_time": {"gte": 1609430400000,"lt": 1610380800111}}}},"dest": {"index": "chat-message-2021","op_type": "create" // 把op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。}
}
4.查看进度
GET _tasks?detailed=true&actions=*reindex
# 返回结果
{"nodes" : {"z5VL_HJ2Qn****AMQ" : {"name" : "es-cn-**-data-g4-3","transport_address" : "121.**.114.80:9300","host" : "121.**.114.80","ip" : "121.**.114.80:9300","roles" : ["data","ingest","master","ml","remote_cluster_client","transform"],"attributes" : {"zone_id" : "cn-shanghai-g","ml.machine_memory" : "64887980032","ml.max_open_jobs" : "20","xpack.installed" : "true","zone" : "cn-shanghai-g","transform.node" : "true"},"tasks" : {"z5VL_HJ2Qn****YhoAMQ:6597302" : {"node" : "z5VL_HJ2Qn****YhoAMQ","id" : 6597302,"type" : "transport","action" : "indices:data/write/reindex","status" : {"total" : 484234,"updated" : 0,"created" : 0,"deleted" : 0,"batches" : 11,"version_conflicts" : 55000,"noops" : 0,"retries" : {"bulk" : 0,"search" : 0},"throttled_millis" : 99999,"requests_per_second" : 1000.0,"throttled_until_millis" : 2410},"description" : "reindex from [chat-message-0613] to [chat-message-2021][_doc]","start_time_in_millis" : 1742189556123,"running_time_in_nanos" : 109710051531,"cancellable" : true,"headers" : {"trace_id" : "ywiWopUBbHoVT9Iz8TeX"}}}}}
}
5.查看文档数量是否相同
把数据和 kibana 中索引文档数据比对即可
例:查看2021年的数据量
GET chat-message-0613/_count
{"query": {"range": {"msg_time": {"gte": 1609459200000,"lt": 1640966400000}}}
}
# 数量对的上的话,可以把老索引给删除了,释放磁盘,在 kibana 中操作即可
6.修改别名
先删除之前的别名,把老的别名放到新建的索引上
POST _aliases
{"actions" : [{"remove" : {"index" : "chat-message-0613" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2021" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2022" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2023" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2024" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2025" , "alias" : "chat-message"}}]
}
修改别名后,ES 查询部分不用修改
取消reindex命令
POST _tasks/z5VL_HJ2Qn****YhoAMQ:6597302/_cancel
插入文档数据的部分逻辑代码
func (s *ElasticService) CreateIndex(index, alias, mapping string) (err error) {// 判断索引是否存在exists, err := sys.Elastic().IndexExists(index).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}if exists {// 加锁,说明此索引已经存在_, _ = model.Factory.Lock.AddLock(model.LockSync, "mapping:ext:"+index, 370*24*3600)return}defer func() {model.Factory.Lock.UnLocK(model.LockSync, "mapping:create:"+index)}()// 多线程创建防止出错,加锁ok, err := model.Factory.Lock.AddLock(model.LockSync, "mapping:create:"+index, 120)if err != nil {return}if !ok {err = errors.New("未抢占到锁")return}// 创建indexcreateIndex, err := sys.Elastic().CreateIndex(index).Body(mapping).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}if !createIndex.Acknowledged {// Not acknowledgedsys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}// 创建aliasputAlias, err := sys.Elastic().Alias().Add(index, alias).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000402).Error() + alias)return}// 可选:检查别名操作是否被集群确认(按需添加)if !putAlias.Acknowledged {sys.Log().Error("Alias creation not acknowledged: " + alias)return}return
}// 数据按年切分后请求此方法
func (s *ElasticService) Bulk(index, alias string, chatMsg []model.ChatMessage) (bulkResponseItem []*elastic.BulkResponseItem, err error) {tryTime := 0
CreateIndex:// 分布锁ok, err := model.Factory.Lock.ExistLock(model.LockSync, "mapping:ext:"+index)if err != nil {return}if !ok {errCreate := s.CreateIndex(index, alias, ChatMessageMapping)if errCreate != nil {if tryTime < 10 {tryTime++time.Sleep(time.Second)goto CreateIndex} else {err = errCreatereturn}}}bulkRequest := sys.Elastic().Bulk().........}
最后
整个迁移非常费时间。 让grok 帮我算了下(ps. 同样的提问,国内大模型都没算出来 ,还是MASK的强啊)

相关文章:
ES数据过多,索引拆分
公司企微聊天数据存储在 ES 中,虽然按照企业分储在不同的ES 索引中,但某些常用的企微主体使用量还是很大。4年中一个索引存储数据已经达到46多亿条数据,占用存储3.1tb, ES 配置 由于多一个副本,存储得翻倍,成本考虑…...
Rust 与 FFmpeg 实现视频水印添加:技术解析与应用实践
引言 在短视频、直播、影视制作等领域,视频水印是一种常见的工具,用于保护版权、提升品牌辨识度或满足合规性要求。然而,开发者在实现水印添加时往往面临以下挑战: 手动处理效率低:使用图像编辑软件(如 P…...
Python语言的游戏物理
Python语言的游戏物理 引言 在现代游戏开发中,物理引擎是一个重要的组成部分,通过模拟真实世界的物理现象,增加了游戏的沉浸感和可玩性。Python作为一种高效、易用的编程语言,虽然在性能方面不如C等语言,但其灵活性和…...
uni-app自动升级功能
效果图 一、VUE login.vue <template><view><view class"uni-common-mt"><view class"uni-flex uni-column"><view class"flex-item flex-item-V"><view class"logo"><image src"/st…...
使用AI一步一步实现若依(26)
功能26:新增一个新员工培训页面 功能25:角色管理 功能24:菜单管理 功能23:从后端获取路由/菜单数据 功能22:用户管理 功能21:使用axios发送请求 功能20:使用分页插件 功能19:集成My…...
逻辑回归(Logistic Regression)模型的概率预测函数
以二分类问题为例,常见的损失函数有 负对数似然损失(neg log-likelihood loss),交叉熵损失(cross entropy loss),deviance loss指数损失(exponential loss)。 前三者虽然名字不同,但却具有相同的表达形式。此外,neg …...
【零基础学python】python高级语法(四)
接续上面的系列文章: 【零基础学python】python基础语法(一)-CSDN博客 【零基础学python】python基础语法(二)-CSDN博客 【零基础学python】python高级语法(三)-CSDN博客 目录 2,…...
HarmonyOS 之 @Require 装饰器自学指南
在 HarmonyOS 应用开发工作中,我频繁碰到组件初始化传参校验的难题。在复杂的组件嵌套里,要是无法确保必要参数在构造时准确传入,就极易引发运行时错误,而且排查起来费时费力。一次偶然的机会,我接触到了 Require 装饰…...
Redis Cluster 详解
Redis Cluster 详解 1. 为什么需要 Redis Cluster? Redis 作为一个高性能的内存数据库,在单机模式下可能会遇到以下问题: 单机容量受限:Redis 是基于内存存储的,单机的内存资源有限,单实例的 Redis 只能…...
基于CNN的FashionMNIST数据集识别6——ResNet模型
前言 之前我们在cnn已经搞过VGG和GoogleNet模型了,这两种较深的模型出现了一些问题: 梯度传播问题 在反向传播过程中,梯度通过链式法则逐层传递。对于包含 L 层的网络,第 l 层的梯度计算为: 其中 a(k) 表示第 k层的…...
0323-B树、B+树
多叉树---->B树(磁盘)、B树 磁盘由多个盘片组成,每个盘片分为多个磁道和扇区。数据存储在这些扇区中,扇区之间通过指针链接,形成链式结构。 内存由连续的存储单元组成,每个单元有唯一地址,数…...
深度学习3-pytorch学习
深度学习3-pytorch学习 Tensor 定义与 PyTorch 操作 1. Tensor 定义: Tensor 是 PyTorch 中的数据结构,类似于 NumPy 数组。可以通过不同方式创建 tensor 对象: import torch# 定义一个 1D Tensor x1 torch.Tensor([3, 4])# 定义一个 Fl…...
【工作记录】F12查看接口信息及postman中使用
可参考 详细教程:如何从前端查看调用接口、传参及返回结果(附带图片案例)_f12查看接口及参数-CSDN博客 1、接口信息 接口基础知识2:http通信的组成_接口请求信息包括-CSDN博客 HTTP类型接口之请求&响应详解 - 三叔测试笔记…...
正则表达式-万能表达式
1、正则 正则表达式是一组由字母和符号组成的特殊文本, 它可以用来从文本中找 出满足你想要的格式的句子. {“basketId”: 0, “count”: 1, “prodId”: #prodId#, “shopId”: 1, “skuId”: #skuId#} #prodId# re相关的文章: https://www.cnblogs.com/Simple-S…...
2024年认证杯SPSSPRO杯数学建模B题(第二阶段)神经外科手术的定位与导航全过程文档及程序
2024年认证杯SPSSPRO杯数学建模 B题 神经外科手术的定位与导航 原题再现: 人的大脑结构非常复杂,内部交织密布着神经和血管,所以在大脑内做手术具有非常高的精细和复杂程度。例如神经外科的肿瘤切除手术或血肿清除手术,通常需要…...
Android 12系统源码_系统启动(二)Zygote进程
前言 Zygote(意为“受精卵”)是 Android 系统中的一个核心进程,负责 孵化(fork)应用进程,以优化应用启动速度和内存占用。它是 Android 系统启动后第一个由 init 进程启动的 Java 进程,后续所有…...
MOSN(Modular Open Smart Network)-05-MOSN 平滑升级原理解析
前言 大家好,我是老马。 sofastack 其实出来很久了,第一次应该是在 2022 年左右开始关注,但是一直没有深入研究。 最近想学习一下 SOFA 对于生态的设计和思考。 sofaboot 系列 SOFAStack-00-sofa 技术栈概览 MOSN(Modular O…...
Flink介绍与安装
Apache Flink是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。 一、主要特点和功能 1. 实时流处理: 低延迟: Flink 能够以亚秒级的延迟处理数据流,非常…...
【gradio】从零搭建知识库问答系统-Gradio+Ollama+Qwen2.5实现全流程
从零搭建大模型问答系统-GradioOllamaQwen2.5实现全流程(一) 前言一、界面设计(计划)二、模块设计1.登录模块2.注册模块3. 主界面模块4. 历史记录模块 三、相应的接口(前后端交互)四、实现前端界面的设计co…...
PowerBI,用度量值实现表格销售统计(含合计)的简单示例
假设我们有产品表 和销售表 我们想实现下面的效果 表格显示每个产品的信息,以及单个产品的总销量 有一个切片器能筛选各个门店的产品销量 还有一个卡片图显示所筛选条件下,所有产品的总销量 实现方法: 1.我们新建一个计算表,把…...
Mac 常用命令
一、文件操作(必知必会) 1. 快速导航 cd ~/Documents # 进入文档目录 cd .. # 返回上级目录 pwd # 显示当前路径 2. 文件管理 touch new_file.txt # 创建空文件 mkdir -p project/{src,docs} # 递归创建目录 cp …...
26考研——查找_树形查找_二叉排序树(BST)(7)
408答疑 文章目录 三、树形查找二叉排序树(BST)二叉排序树中结点值之间的关系二叉树形查找二叉排序树的查找过程示例 向二叉排序树中插入结点插入过程示例 构造二叉排序树的过程构造示例 二叉排序树中删除结点的操作情况一:被删除结点是叶结点…...
美摄科技开启智能汽车车内互动及娱乐解决方案2.0
在科技飞速发展的今天,汽车已不再仅仅是简单的代步工具,而是逐渐演变为集出行、娱乐、社交于一体的智能移动空间。美摄科技,作为前沿视觉技术与人工智能应用的领航者,凭借其卓越的技术实力和创新精神,携手汽车行业&…...
【行驶证识别】批量咕嘎OCR识别行驶证照片复印件图片里的文字信息保存表格或改名字,基于QT和腾讯云api_ocr的实现方式
项目背景 在许多业务场景中,如物流管理、车辆租赁、保险理赔等,常常需要处理大量的行驶证照片复印件。手动录入行驶证上的文字信息,像车主姓名、车辆型号、车牌号码等,不仅效率低下,还容易出现人为错误。借助 OCR(光学字符识别)技术,能够自动识别行驶证图片中的文字信…...
Vue-admin-template安装教程
#今天配置后台管理模板发现官方文档的镜像网站好像早失效了,自己稍稍总结了一下方法# 该项目环境需要node17及以下,如果npm install这一步报错可能是这个原因 git clone https://github.com/PanJiaChen/vue-admin-template.git cd vue-admin-template n…...
21.Excel自动化:如何使用 xlwings 进行编程
一 将Excel用作数据查看器 使用 xlwings 中的 view 函数。 1.导包 import datetime as dt import xlwings as xw import pandas as pd import numpy as np 2.view 函数 创建一个基于伪随机数的DataFrame,它有足够多的行,使得只有首尾几行会被显示。 df …...
LabVIEW FPGA与Windows平台数据滤波处理对比
LabVIEW在FPGA和Windows平台均可实现数据滤波处理,但两者的底层架构、资源限制、实时性及应用场景差异显著。FPGA侧重硬件级并行处理,适用于高实时性场景;Windows依赖软件算法,适合复杂数据处理与可视化。本文结合具体案例&#x…...
【NLP 48、大语言模型的神秘力量 —— ICL:in context learning】
目录 一、ICL的优势 1.传统做法 2.ICL做法 二、ICL的发展 三、ICL成因的两种看法 1.meta learning 2.Bayesian Inference 四、ICL要点 ① 语言模型的规模 ② 提示词prompt中提供的examples数量和顺序 ③ 提示词prompt的形式(format) 五、fine-tune VS I…...
vue 中渲染 markdown 格式的文本
文章目录 需求分析第一步:安装依赖第二步:创建 Markdown 渲染组件第三步,使用实例扩展功能1. 代码高亮:2. 自定义渲染规则:需求 渲染 markdown 格式的文本 分析 在Vue 3中实现Markdown渲染的常见方法。通常有两种方式:使用现有的Markdown解析库,或者自己编写解析器…...
工业4G路由器赋能智慧停车场高效管理
工业4G路由器作为智慧停车场管理系统通信核心,将停车场内的各个子系统连接起来,包括车牌识别系统、道闸控制系统、车位检测系统、收费系统以及监控系统等。通过4G网络,将这些系统采集到的数据传输到云端服务器或管理中心,实现信息…...
