当前位置: 首页 > news >正文

Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

在这里插入图片描述

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**

  • 清理ES历史数据定时任务
    */
    @Component
    public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**

    • 根据索引名称删除当前日期两个月前的那一天的历史文档数据
    • @param jobContext
      */
      @Scheduled
      public void cleanESHistoryData(JobContext jobContext) {
      // jobContext为定时任务中回传数据
      String indexName = jobContext.getData();
      if (StringUtils.isBlank(indexName)) {
      LOGGER.warn(“ES索引名称不能为空!”);
      return;
      }
      long startTimeMillis = System.currentTimeMillis();
      String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
      try {
      String startTimeStr = twoMonthsAgoDate + " 00:00:00";
      // 初始化时间,形如2023-08-06 00:00:00
      Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
      // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
      for (int i = 0; i < 24; i++) {
      Date startTime = initialStartTime;
      startTime = DateUtils.addHours(startTime, i);
      Date endTime = DateUtils.addHours(startTime, 1);
      LOGGER.info(“正在清理索引:[{}],时间:{} 至 {}的历史文档数据…”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
      long currentStartTimeMillis = System.currentTimeMillis();
      // 指定操作的索引库
      SearchRequest searchRequest = new SearchRequest(indexName);
      // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery(“createTimeStr.keyword”)
      .from(DateTool.format(startTime, DF_FULL))
      .to(DateTool.format(endTime, DF_FULL)))
      .size(1000);
      // 设置滚动查询结果在内存中的过期时间为1min
      Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
      // 将滚动以及构造的查询条件放入查询请求
      searchRequest.scroll(scroll).source(searchSourceBuilder);
      SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      // 记录要滚动的ID
      String scrollId = searchResponse.getScrollId();
      SearchHit[] hits = searchResponse.getHits().getHits();
      while (hits != null && hits.length > 0) {
      // 创建批量处理请求对象
      BulkRequest bulkRequest = new BulkRequest();
      for (SearchHit hit : hits) {
      DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
      bulkRequest.add(deleteRequest);
      }
      // 执行批量删除请求操作
      restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      // 构造滚动查询条件,继续滚动查询
      SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
      scrollRequest.scroll(scroll);
      searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
      scrollId = searchResponse.getScrollId();
      hits = searchResponse.getHits().getHits();
      }
      // 当前滚动查询结束,清除滚动,释放服务器内存资源
      ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
      clearScrollRequest.addScrollId(scrollId);
      restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
      LOGGER.info(“清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
      }
      LOGGER.info(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
      } catch (Exception e) {
      LOGGER.error(String.format(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
      }
      }
      }

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

在这里插入图片描述

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。

相关文章:

Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

&#x1f389;工作中遇到这样一个需求场景&#xff1a;由于ES数据库中历史数据过多&#xff0c;占用太多的磁盘空间&#xff0c;需要定期地进行清理&#xff0c;在一定程度上可以释放磁盘空间&#xff0c;减轻磁盘空间压力。 &#x1f388;在经过调研之后发现&#xff0c;某服务…...

C++最易读手撸神经网络两隐藏层(任意Nodes每层)梯度下降230821a

// c神经网络手撸20梯度下降22_230820a.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 #include<iostream> #include<vector> #include<iomanip> // setprecision #include<sstream> // getline stof() #include<fstream…...

Leetcode 2235.两整数相加

一、两整数相加 给你两个整数 num1 和 num2&#xff0c;返回这两个整数的和。 示例 1&#xff1a; 输入&#xff1a;num1 12, num2 5 输出&#xff1a;17 解释&#xff1a;num1 是 12&#xff0c;num2 是 5 &#xff0c;它们的和是 12 5 17 &#xff0c;因此返回 17 。示例…...

Postman —— postman实现参数化

什么时候会用到参数化 比如&#xff1a;一个模块要用多组不同数据进行测试 验证业务的正确性 Login模块&#xff1a;正确的用户名&#xff0c;密码 成功&#xff1b;错误的用户名&#xff0c;正确的密码 失败 postman实现参数化 在实际的接口测试中&#xff0c;部分参数每…...

LeetCode--HOT100题(41)

目录 题目描述&#xff1a;102. 二叉树的层序遍历&#xff08;中等&#xff09;题目接口解题思路代码 PS: 题目描述&#xff1a;102. 二叉树的层序遍历&#xff08;中等&#xff09; 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&am…...

微信小程序教学系列(6)

第六章&#xff1a;小程序商业化 第一节&#xff1a;小程序的商业模式 在这一节中&#xff0c;我们将探讨微信小程序的商业模式&#xff0c;让你了解如何将你的小程序变成一个赚钱的机器&#xff01; 1. 广告收入 小程序的商业模式之一是通过广告收入赚钱。你可以在小程序中…...

小程序中的全局配置以及常用的配置项(window,tabBar)

全局配置文件和常用的配置项 app.json: pages:是一个数组&#xff0c;用于记录当前小程序所有页面的存放路径&#xff0c;可以通过它来创建页面 window:全局设置小程序窗口的外观(导航栏&#xff0c;背景&#xff0c;页面的主体) tabBar:设置小程序底部的 tabBar效果 style:是否…...

数据工厂调研及结果展示

数据工厂 一、背景 在开发自测、测试迭代测试、产品验收的过程中&#xff0c;都需要各种各样的前置数据&#xff0c;大致分为如下几类&#xff1a; 账号&#xff08;实名、权益等级、注册等&#xff09; 货源&#xff08;优货、急走、相似、一手、普通货源等&#xff09; …...

抓包相关,抓包学习

检查网络流量 - 提琴手经典 (telerik.com) Headers Reference - Fiddler Classic (telerik.com) 以上是fiddler官方文档 F12要勾选保留日志 不勾选的话跳转到新页面之前页面的日志不会在下方显示 会保留所有抓到的包 如果重定向到别的页面 F12抓包可能看不到响应信息,但是…...

云原生之使用Docker部署SSCMS内容管理系统

云原生之使用Docker部署SSCMS内容管理系统 一、SSCMS介绍二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本 四、下载SSCMS镜像五、部署SSCMS内容管理系统5.1 创建SSCMS容器5.2 检查SSC…...

uniapp -- 在组件中拿到pages.json下pages设置navigationBarTitleText这个值?

1:在 pages.json 文件中设置 navigationBarTitleText,例如: {"pages": [{"path": "pages/home/index","style": {"navigationBarTitleText": "首页",&...

Java获取环境变量和运行时环境信息和自定义配置信息

System.getenv() 获取系统环境变量 public static void main1() {Map<String, String> envMap System.getenv();envMap.entrySet().forEach(x-> System.out.println(x.getKey() "" x.getValue())); } System.getenv() 获取的是操作系统环境变量列表&…...

React入门 组件学习笔记

项目页面以组件形式层层搭起来&#xff0c;组件提高复用性&#xff0c;可维护性 目录 一、函数组件 二、类组件 三、 组件的事件绑定 四、获取事件对象 五、事件绑定传递额外参数 六、组件状态 初始化状态 读取状态 修改状态 七、组件-状态修改counter案例 八、this问…...

Windows商店引入SUSE Linux Enterprise Server和openSUSE Leap

在上个月的Build 2017开发者大会上&#xff0c;微软宣布将SUSE&#xff0c;Ubuntu和Fedora引入Windows 商店&#xff0c;反应出微软对开放源码社区的更多承诺。 该公司去年以铂金会员身份加入Linux基金会。现在&#xff0c;微软针对内测者的Windows商店已经开始提供 部分Linux发…...

[NLP]深入理解 Megatron-LM

一. 导读 NVIDIA Megatron-LM 是一个基于 PyTorch 的分布式训练框架&#xff0c;用来训练基于Transformer的大型语言模型。Megatron-LM 综合应用了数据并行&#xff08;Data Parallelism&#xff09;&#xff0c;张量并行&#xff08;Tensor Parallelism&#xff09;和流水线并…...

软考高级系统架构设计师系列论文七十八:论软件产品线技术

软考高级系统架构设计师系列论文七十八:论软件产品线技术 一、摘要二、正文三、总结一、摘要 本人作为某软件公司负责人之一,通过对位于几个省的国家甲级、乙级、丙级设计院的考查和了解,我决定采用软件产品线方式开发系列《设计院信息管理平台》产品。该产品线开发主要有如…...

yolov5中添加ShuffleAttention注意力机制

ShuffleAttention注意力机制简介 关于ShuffleAttention注意力机制的原理这里不再详细解释.论文参考如下链接here   yolov5中添加注意力机制 注意力机制分为接收通道数和不接受通道数两种。这次属于接受通道数注意力机制,这种注意力机制由于有通道数要求,所示我们添加的时候…...

Effective C++条款17——以独立语句将newed 对象置入智能指针(资源管理)

假设我们有个函数用来揭示处理程序的优先权&#xff0c;另一个函数用来在某动态分配所得的widget上进行某些带有优先权的处理: void priority(); void processWidget(std::tr1::shared_ptr<Widget>pw, int priority);由于谨记“以对象管理资源”&#xff08;条款13&…...

奇迹MU服务器如何选择配置?奇迹MU服务器租用

不同的服务器&#xff0c;根据其特点与性能适用于不同的应用场景&#xff0c;为了让你们更好的理解&#xff0c;我们对服务器进行了分类归纳&#xff0c;结合了服务器不同的特点以及价位进行一个区分&#xff0c;帮助我们更好的选择合适的服务器配置。 VPS服务器 VPS服务器又…...

如何远程管理服务器详解

文章目录 前言一、远程管理类型二、远程桌面三、telnet 命令行远程四、查看本地开放端口 前言 很多公司是有自己的机房的&#xff0c;机房里面会有若干个服务器为员工和用户提供服务。大家可以想想&#xff1a;假设这家公司有上百台服务器&#xff0c;我们作为网络工程师&…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...

Caliper 负载(Workload)详细解析

Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...