当前位置: 首页 > news >正文

快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang)

Elasticsearch8.17.0在mac上的安装

Kibana8.17.0在mac上的安装

Elasticsearch检索方案之一:使用from+size实现分页

1、滚动查询的使用场景

滚动查询区别于上一篇文章介绍的使用from、size分页检索,最大的特点是,它能够检索超过10000条外的所有文档,可以理解为是一种全量检索的技术方案,也正是因为这种特性,使得滚动查询的代价非常高昂,检索过程消耗大量的内存,所以对于实时检索的场景,滚动查询是不适用的。

那滚动查询使用在什么场景呢?主要是应用在离线、检索全量数据,对于实时性要求不高的场景,比如一个数据平台,前台页面展示的数据用来预览,可以使用from+size分页查询,以提升检索效率以及平台的用户体验,如果还需要检索全量数据用于二次使用,那么后台离线检索全量就需要使用滚动查询以获取到全量数据,这将是一个耗费大量资源和时间的过程。

2、使用Kibana直观体验滚动查询

初始化滚动查询:

GET /new_tag_202411/_search?scroll=1m
{"size": 10,"sort":[{"doc_id":{"order": "asc"}}]
}

检索条件设置返回2条数据,按【doc_id】字段升序排列,doc_id分别为1-10的文档。

scroll=1m,表示Elasticsearch允许等待的最长时间是1分钟,如果在一分钟之内,接下来的 scroll 请求没有到达的话,那么当前请求的上下文将会失效:

 从上图返回可以看出,有一个【_scroll_id】字段,这个字段非常重要,接下来的滚动查询需要使用这个字段:

第一次滚动,返回doc_id从11开始的数据,第二次滚动时,需要使用第一次滚动返回的【_scroll_id】替换滚动请求,数据从doc_id为21的数据开始返回,之后循环这个过程,直到检索到全部数据。

注意一点,在测试过程中,我创建了多次滚动查询,发现scrool_id特别像,大家别误以为scrool_id没变,比如以下三个scrool_id,每个id只有3个字符不一样:

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAAWbhZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAActhZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAAjDxZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

3、代码实现滚动查询(golang)

首先是初始化一个滚动查询:

res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*1),
)

这行代码:

client.Search.WithScroll(time.Minute*1)

就是在设置滚动查询上下文的有效时间,其他几行很容易理解。

这几行代码执行完成后,除了能拿到检索数据,还能拿到scroll_id。之后就可以进行滚动查询:

for {docs = Documents{}res, err = client.Scroll(client.Scroll.WithScrollID(scrollId),client.Scroll.WithScroll(time.Minute),)if err != nil {fmt.Println("scroll err:", err.Error())return}err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("json decode err:", err)return}if len(docs.Hits.Hits) == 0 {break}fmt.Println("search count:", len(docs.Hits.Hits))scrollId = docs.ScrollID
}

这里要注意的一点是,循环滚动时,每个轮次,必须更新scrool_id为上一次滚动返回的值,如上面最后一行代码。

L17-L19行的代码,表示已经查出所有数据,本次没有数据了,同时循环结束。

4、一个必须要考虑的问题

对于滚动查询,前面也说过,会创建一个上下文,当es中存在的上下文数量超过一定限制后,将无法再次创建滚动查询,从而无法检索数据,这个【限制】es默认是500个,我们可以通过es的api查看当前系统中已经创建的上下文数量:

GET /_nodes/stats/indices/search

默认情况下,只要【open_contexts】值小于500,都能正常进行滚动查询,如果已经创建了500个上下文,就会出现问题,下面测试一下,利用代码,创建500个上下文:

 如上图,上下文已经创建500个,运行代码,再次执行滚动查询的动作:

无法查出任何数据,但是以下代码也无任何的报错:

res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*100),
)
if err != nil {fmt.Println("search err:", err.Error())return
}

没有走到err分支,经过调试发现,res的结构中的http状态码变了,我们加一行打印:

res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*100),)if err != nil {fmt.Println("search err:", err.Error())return}fmt.Println("resp code:", res.StatusCode)err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("decode err:", err.Error())return}

 运行结果如下:

状态码由正常值0变成了429,所以,在执行滚动查询时,我们需要加上对状态码的判断,以捕获到上下文超限的情况,否则没有检索到数据,还以为系统出bug了呢。

这个问题就是滚动查询的一个短板,系统用户量大了,发起滚动查询一旦超过500,就会影响用户检索数据,当然了,es还是有其他解决方案来进行全量的数据检索,还是那句话,下一篇文章再写。

5、所有代码

github:GitHub - liupengh3c/career 

代码位于以下文件:

https://github.com/liupengh3c/career/blob/main/elastic/scrool/main.go

代码也粘过来吧:

package mainimport ("fmt""os""strings""time""github.com/elastic/go-elasticsearch/v8"jsoniter "github.com/json-iterator/go""github.com/liupengh3c/esbuilder"
)// 最外层数据结构
type Documents struct {ScrollID string      `json:"_scroll_id"`Shards   Shards      `json:"_shards"`Hits     HitOutLayer `json:"hits"`TimedOut bool        `json:"timed_out"`Took     int         `json:"took"`
}
type Shards struct {Failed     int `json:"failed"`Skipped    int `json:"skipped"`Successful int `json:"successful"`Total      int `json:"total"`
}
type HitOutLayer struct {Hits     []Hits  `json:"hits"`MaxScore float64 `json:"max_score"`Total    Total   `json:"total"`
}
type Hits struct {ID     string         `json:"_id"`Index  string         `json:"_index"`Score  float64        `json:"_score"`Source map[string]any `json:"_source"`Type   string         `json:"_type"`
}
type Total struct {Relation string `json:"relation"`Value    int    `json:"value"`
}func main() {client, err := NewEsClient()if err != nil {fmt.Println("create client err:", err.Error())return}fmt.Println("connect success")for i := 0; i < 510; i++ {ScrollSearch(client)}
}
func NewEsClient() (*elasticsearch.Client, error) {cert, _ := os.ReadFile("/Users/liupeng/Documents/study/elasticsearch-8.17.0/config/certs/http_ca.crt")client, err := elasticsearch.NewClient(elasticsearch.Config{Username:  "elastic",Password:  "XBS=adqa799j_Aoz=A+h",Addresses: []string{"https://127.0.0.1:9200"},CACert:    cert,})if err != nil {// fmt.Println("create client err:", err.Error())return client, err}return client, nil
}func ScrollSearch(client *elasticsearch.Client) {var json = jsoniter.ConfigCompatibleWithStandardLibrarydocs := Documents{}dslQuery := esbuilder.NewDsl()boolQuery := esbuilder.NewBoolQuery()dslQuery.SetOrder(esbuilder.NewSortQuery("doc_id", "asc"))dslQuery.SetQuery(boolQuery)dslQuery.SetSize(10000)res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*20),)if err != nil {fmt.Println("search err:", err.Error())return}err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("decode err:", err.Error())return}fmt.Println("search count:", len(docs.Hits.Hits))scrollId := docs.ScrollIDfor {docs = Documents{}res, err = client.Scroll(client.Scroll.WithScrollID(scrollId),client.Scroll.WithScroll(time.Minute),)if err != nil {fmt.Println("scroll err:", err.Error())return}err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("decode err:", err.Error())return}defer res.Body.Close()if res.StatusCode == 429 {fmt.Println("scroll contexts is more than 500")return}if len(docs.Hits.Hits) == 0 {break}fmt.Println("search count:", len(docs.Hits.Hits))scrollId = docs.ScrollID}client.ClearScroll(client.ClearScroll.WithScrollID(scrollId),)
}

相关文章:

快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang)

Elasticsearch8.17.0在mac上的安装 Kibana8.17.0在mac上的安装 Elasticsearch检索方案之一&#xff1a;使用fromsize实现分页 1、滚动查询的使用场景 滚动查询区别于上一篇文章介绍的使用from、size分页检索&#xff0c;最大的特点是&#xff0c;它能够检索超过10000条外的…...

C++设计模式:状态模式(自动售货机)

什么是状态模式&#xff1f; 状态模式是一种行为型设计模式&#xff0c;它允许一个对象在其内部状态发生改变时&#xff0c;动态改变其行为。通过将状态相关的逻辑封装到独立的类中&#xff0c;状态模式能够将状态管理与行为解耦&#xff0c;从而让系统更加灵活和可维护。 通…...

【网络安全实验室】脚本关实战详情

难道向上攀爬的那条路&#xff0c;不是比站在顶峰更让人热血澎湃吗 1.key又又找不到了 点击链接&#xff0c;burp抓包&#xff0c;发送到重放模块&#xff0c;点击go 得到key 2.快速口算 python3脚本 得到key 3.这个题目是空的 试了一圈最后发现是 4.怎么就是不弹出key呢…...

ts总结一下

ts基础应用 /*** 泛型工具类型*/ interface IProps {id: string;title: string;children: number[]; } type omita Omit<IProps, id | title>; const omitaA: omita {children: [1] }; type picka Pick<IProps, id | title>; const pickaA: picka {id: ,title…...

MySQL数据库笔记——主从复制

大家好&#xff0c;这里是Good Note&#xff0c;关注 公主号&#xff1a;Goodnote&#xff0c;本文详细介绍 MySQL的主从复制&#xff0c;从原理到配置再到同步过程。 文章目录 简介核心组件主从复制的原理作用主从复制的线程模型主从复制的模式形式复制的方式设计复制机制主从…...

OpenAI发布o3:圣诞前夜的AI惊喜,颠覆性突破还是技术焦虑?

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…...

欧拉-伯努利梁自由波动的频散关系

梁和杆都是一维结构,但是梁的弯曲波比杆的纵波要复杂多。例如即使最简单的欧拉-伯努利(Euler-Bernoulli)梁的弯曲波也具有频散特征,且当梁的特征尺寸和弯曲波波长满足某个比值时,欧拉-伯努利梁不再适用,需要引入铁摩辛克(Timoshenko)梁模型。 考察某一欧拉-伯努利梁,长度…...

Cursor小试1.生成一个网页的接口请求工具

一般开发过程中,会涉及到接口的调试,往往有时候开发的电脑不是我们自己的,没有安装一些类似postman 的接口调用工具,所以发现问题或者要测试某些接口是否正常的时候会很麻烦,而且现在网上也没有找到很好的免费的网页端接口请求的网址,所以我们使用Cursor来编写这样一个小工具, …...

Xilinx DCI技术

Xilinx DCI技术 DCI技术概述Xilinx DCI技术实际使用某些Bank特殊DCI要求 DCI级联技术DCI端接方式阻抗控制驱动器&#xff08;源端接&#xff09;半阻抗控制阻抗驱动器&#xff08;源端接&#xff09;分体式DCI&#xff08;戴维宁等效端接到VCCO/2&#xff09;DCI和三态DCI&…...

Kubernetes Pod 优雅关闭:如何让容器平稳“退休”?

Kubernetes Pod 优雅关闭&#xff1a;如何让容器平稳“退休”&#xff1f; 在 Kubernetes 中&#xff0c;Pod 是应用的基本单元。你可能会遇到需要停止某个 Pod 或容器的情况&#xff0c;可能是因为要更新、调整或故障恢复。在这种情况下&#xff0c;Pod 的优雅关闭&#xff0…...

鸿蒙应用开发(1)

可能以为通过 鸿蒙应用开发启航计划&#xff08;点我去看上一节&#xff09; 的内容&#xff0c;就足够了&#xff0c;其实还没有。 可是我还是要告诉你&#xff0c;你还需要学习新的语言 -- ArkTS。 &#xff0c;ArkTS是HUAWEI开发的程序语言。你需要学习这门语言。这会花费你…...

SimForge HSF 案例分享|复杂仿真应用定制——UAVSim无人机仿真APP(技术篇)

导读 「神工坊」核心技术——「SimForge HSF高性能数值模拟引擎」支持工程计算应用的快速开发、自动并行&#xff0c;以及多域耦合、AI求解加速&#xff0c;目前已实现航发整机数值模拟等多个系统级高保真数值模拟应用落地&#xff0c;支持10亿阶、100w核心量级的高效求解。其低…...

使用 Adaptive Mesh Refinement 加速 CFD 仿真:最佳实践

CFD 仿真中的网格划分挑战 技术的进步正在增强设计探索&#xff0c;数值仿真在优化工程设计方面发挥着至关重要的作用。通常&#xff0c;计算流体动力学 &#xff08;CFD&#xff09; 仿真从定制的手工网格开始&#xff0c;具有精细和粗糙的区域&#xff0c;以平衡分辨率和单元…...

前端-动画库Lottie 3分钟学会使用

目录 1. Lottie地址 2. 使用html实操 3. 也可以选择其他的语言 1. Lottie地址 LottieFiles: Download Free lightweight animations for website & apps.Effortlessly bring the smallest, free, ready-to-use motion graphics for the web, app, social, and designs.…...

智能工厂的设计软件 应用场景的一个例子:为AI聊天工具添加一个知识系统 之5

本文要点 前端 问题描述语言 本文继续完善 “描述” ---现在我们应该可以将它称为 “问题problem描述语言 ”。 它 通过对话框的question 引发 表征的issue 的“涌现” 最终 厘清应用程序的“problem”。即它合并了 ISO七层模型中的上面三层&#xff0c;通过将三层 分别形成…...

java web

流程 1.浏览器发送http协议的格式数据和url给服务器软件tomcat 2.浏览器解析http格式数据并创建request和response对象,把数据封装到request对象里。 3.tomcat解析url确定访问路径&#xff0c;如果是静态资源html等&#xff0c;直接将html数据作为http格式响应体返回&#x…...

【嵌入式软件开发】嵌入式软件计时逻辑的两种实现:累加与递减的深入对比

本文主要从四个方面详细阐述了嵌入式软件编程中计时逻辑的两种实现方式:累加和递减。让我为您详细解析各个部分: 1. 基本概念对比 累加方式 从0开始向上计数每个周期增加固定值(通常为1)类似于我们日常生活中的秒表计时方式递减方式 从预设值开始向下计数每个周期减少固定…...

如何将vCenter6.7升级7.0?

vCenter是什么&#xff1f; vCenter是一种虚拟化管理软件&#xff0c;由VMware公司开发和发布。它是VMware vSphere虚拟化平台的核心组件之一&#xff0c;主要用于集中管理和监控虚拟化环境中的虚拟机、虚拟存储和网络资源。vCenter可以实现对多个ESXi主机的集中管理&#xff…...

服务器网卡绑定mode和交换机的对应关系

互联网各领域资料分享专区(不定期更新)&#xff1a; Sheet 模式类别 网卡绑定mode共有七种(0~6): bond0、bond1、bond2、bond3、bond4、bond5、bond6 mode详解 mode0 &#xff0c;即:(balance-rr) Round-robin policy(平衡轮循环策略&#xff0c;需要配置交换机静态聚合) mode…...

Maven (day04)

什么是maven? Maven 是 Apache 旗下的一个开源项目&#xff0c;是一款用于管理和构建 java 项目的工具。 官网&#xff1a;Welcome to Apache Maven – Maven https://maven.apache.org/ Maven的作用 依赖管理&#xff08;方便快捷的管理项目依赖的资源(jar包)&#xff…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

JavaSec-RCE

简介 RCE(Remote Code Execution)&#xff0c;可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景&#xff1a;Groovy代码注入 Groovy是一种基于JVM的动态语言&#xff0c;语法简洁&#xff0c;支持闭包、动态类型和Java互操作性&#xff0c…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序

一、开发准备 ​​环境搭建​​&#xff1a; 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 ​​项目创建​​&#xff1a; File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...

什么是EULA和DPA

文章目录 EULA&#xff08;End User License Agreement&#xff09;DPA&#xff08;Data Protection Agreement&#xff09;一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA&#xff08;End User License Agreement&#xff09; 定义&#xff1a; EULA即…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...