当前位置: 首页 > article >正文

ES海量数据更新及导入导出备份

一、根据查询条件更新字段

from elasticsearch import Elasticsearch
import redis
import json# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password '
es_host = 'https://127.0.0.2:30674'# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(hosts=[es_host],basic_auth=(username, password),verify_certs=False# 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数# use_ssl=True,# verify_certs=True,# ca_certs='/path/to/ca/cert',
)# 使用Elasticsearch实例进行操作,例如搜索
# print(es.cluster.state())# response = es.search(index="remote_statistics_202409", query={"match_all": {}})
# print(response)# # 连接Elasticsearch和Redis
# #es = Elasticsearch("http://localhost:9200")
r = redis.StrictRedis(host='10.7.9.13', port=32197, db=11)# 假设你的Redis键是'my_key'
keys = r.keys()count = 0my_map = dict()
for key in keys:maps = r.hgetall(key)print(key)#list_map = dict()for key01, value in maps.items():# print(type(json.loads(key)))# print(type(value))# print(type(json.loads(value)))# print(json.loads(key),  json.loads(value))aa = json.loads(value).get("Latitude")if not aa.startswith("0."):# list_map[json.loads(key01)] = json.loads(value)my_map[key.decode('utf-8') + str(json.loads(key01))] = json.loads(value)# my_list.append(list_map)print(len(my_map))# print(type(my_map.get("21V70000110122B000139"+str(1719713910000))))# print(my_map.get("21V70000110122B000139"+str(1719713910000)))# print(my_map["21V70000110122B0001391719713910000"])# new_map={}# new_map["21V70000110122B000139"] = my_map["21V70000110122B0001391719713910000"]# for key02, value in my_map.items():#     print(key02)#     print(type(key02))#     print(type(value))#     count = len(value) + count#     print(count)# print(key)# print(maps.__len__())# count = count + maps.__len__()# print(count)## one = r.hget(name=key, key='1719563700000')# print(one)# print(type(one))## if one is not None:#     one_json = json.loads(one)#     print(type(one_json))#     print(one_json.get("Latitude"))#     print(one_json.get("Longitude"))## print("-------------------------------------------------")# 从Redis获取数据# redis_data = r.get(redis_key)# key_map = my_map["21V70000110122B000139"]# print(type(key_map))# key_json = key_map.get(1719713910000)# print(type(key_json))# print(key_json)# print(key_json.get('Latitude'))# minutes = create_at // 60000# left = create_at % 60000 // 1000## if left <= 15:#     left = 0# elif left > 45:#     left = 60# else:#     left = 30## map_key = minutes * 60000 + left * 1000# 如果存在,解析数据并更新Elasticsearch# 构建更新查询script = {# "source": "ctx._source.field_to_update = params.new_value",# "source": "ctx._source['Latitude'] = params.new_value[ctx._source.RemoteId][ctx._source.createAt].get('Latitude')",# "source": "ctx._source['Latitude'] = params.new_value['21V70000110122B0001391719713910000']['Latitude'];ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['tmp'] = params.new_value['21V70000110122B0001391719713910000']",# "source": "def aa=ctx._source.RemoteId;ctx._source['Latitude'] = params.new_value['21V70000110122B0001391719713910000']['Latitude'];ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['rrc'] = params.new_value['21V70000110122B0001391719713910000'];ctx._source.remove('tmp')","source": "def create_at=ctx._source.createAt;def minutes = Math.floor(create_at / 60000);""def left = Math.floor(create_at % 60000 / 1000);if(left<=15) {left=0} else if(left >45){ left=60} else {left=30} def form_time= minutes * 60000 + left * 1000;""ctx._source['create_lltime'] = form_time;def key = ctx._source.RemoteId + (Long)form_time ;def rru=  params.new_value[key];""if(rru !=null) {ctx._source['Latitude']=rru['Latitude'];ctx._source['Longitude']=rru['Longitude'];ctx._source['rrc'] = rru}",# "source": "def aa=ctx._source.RemoteId;ctx._source['Latitude'] = aa;ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['tmp'] = params.new_value['21V70000110122B0001391719713910000']","params": {"new_value": my_map}, "lang": "painless"}
#
# # 更新查询
ret = es.update_by_query(index="remote_statistics_202406",script=script,# body={#     "query": {#         "match": {#             "id_field": my_list['id_value'],,,,,,,#         }#     }## },slices="auto",wait_for_completion=False,conflicts="proceed")print(ret)

二、ES数据批量导出

import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
import urllib3
urllib3.disable_warnings()
# 变量
start_date = 1735689600000
end_date = 1738368000000
#index_name = 'remote_statistics_202410'
index_name = 'remote_statistics_202501'# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password'
es_host = 'https://127.0.0.1:32293'# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(hosts=[es_host],basic_auth=(username, password),verify_certs=False# 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数# use_ssl=True,# verify_certs=True,# ca_certs='/path/to/ca/cert',
)print("----------start---------------")def fetch_data(start_time, end_time):results = helpers.scan(es, body={"query": {"range": {"createAt": {"gte": start_time,"lt": end_time}}}}, index=index_name)return resultsif __name__ == "__main__":step = 60 * 60 * 1000for i in range(start_date, end_date+8*step, step):date = datetime.fromtimestamp(i / 1000)print("**********************************************************")print(date)print("**********************************************************")ret = fetch_data(i, i + step)count = 0with open(str(i) + '.json', 'w') as f:for doc in ret:f.write(json.dumps(doc['_source']) + '\n')count = count + 1print(count)

三、ES数据批量导入

import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import os
import urllib3
urllib3.disable_warnings()
# 变量
#index_name = 'remote_statistics_202410'
index_name = 'network_statistics_202410'# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password '
es_host = 'https://127.0.0.1:32067'# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(hosts=[es_host],basic_auth=(username, password),verify_certs=False# 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数# use_ssl=True,# verify_certs=True,# ca_certs='/path/to/ca/cert',
)def bulk_index_file(idx_name, file_path):current_dir = os.getcwd()file_names = os.listdir(current_dir)for file_name in file_names:if file_name.endswith(".json"):with open(file_name, 'r') as file:try:print(file_name)actions = (json.loads(line) for line in file)helpers.bulk(es, actions, index=idx_name)except Exception as e:print("error-----------------------")print(e)print(file_name)# 调用函数
bulk_index_file(index_name, None)

相关文章:

ES海量数据更新及导入导出备份

一、根据查询条件更新字段 from elasticsearch import Elasticsearch import redis import json# 替换下面的用户名、密码和Elasticsearch服务器地址 username elastic password password es_host https://127.0.0.2:30674# 使用Elasticsearch实例化时传递用户名和密码 es…...

Java线程池核心原理与最佳实践

Java 线程池详解 线程池是Java并发编程的核心组件&#xff0c;它能高效管理线程生命周期&#xff0c;避免频繁创建销毁线程的开销&#xff0c;提升系统性能和资源利用率。 一、线程池核心优势 降低资源消耗&#xff1a;复用已创建的线程&#xff0c;减少线程创建销毁开销提高…...

JAVA-springboot log日志

SpringBoot从入门到精通-第8章 日志的操作 一、Spring Boot默认的日志框架 SpringBoot支持很多种日志框架&#xff0c;通常情况下&#xff0c;这些日志框架都是由一个日志抽象层和一个日志实现层搭建而成的&#xff0c;日志抽象层是为记录日志提供的一套标准且规范的框架&…...

1.springmvc基础入门(一)

1.Spring MVC概念 Spring MVC 是 Spring Framework 提供的 Web 组件&#xff0c;全称是 Spring Web MVC&#xff0c;是⽬前主流的实现 MVC 设计模式的框架&#xff0c;提供前端路由映射、视图解析等功能。 Java Web 开发者必须要掌握的技术框架。 2.Spring MVC 功能 MVC&am…...

AI 时代下语音与视频伪造的网络安全危机

引言 在人工智能技术的推动下&#xff0c;语音合成、视频生成等技术取得了突破性进展&#xff0c;Deepfake、AI 语音克隆等工具让语音和视频伪造变得愈发简单且逼真。这些技术在娱乐、影视等领域带来便利的同时&#xff0c;也被不法分子利用&#xff0c;引发了一系列网络安全问…...

模块缝合-把A模块换成B模块(没写完)

把MLP Head替换为KAN 1.在model文件下新建一个python文件 2.把 模块文件里的整个KAN代码复制到新的python文件中 3.在开头导入 from model.KAN(新建文件名&#xff09; import KAN&#xff08;新建文件中的类名&#xff09; 4.sys.path.append(r"D: Icode(Kansformer"…...

从零开始学Flink:揭开实时计算的神秘面纱

一、为什么需要Flink&#xff1f; 当你在电商平台秒杀商品时&#xff0c;1毫秒的延迟可能导致交易失败&#xff1b;当自动驾驶汽车遇到障碍物时&#xff0c;10毫秒的计算延迟可能酿成事故。这些场景揭示了一个残酷事实&#xff1a;数据的价值随时间呈指数级衰减。 传统批处理…...

一、ES6-let声明变量【解刨分析最详细】

一、块级作用域 { let Tim"Tim是靓仔&#xff01;" } console.log("Tim:",Tim) 打印结果&#xff1a;Tim未进行任何定义&#xff01; 原因&#xff1a;因为Tim定义再块级{}里面&#xff0c;它的声音Tim只服务于该块级里面。而打印结果是再块级外面&#…...

Appium如何支持ios真机测试

ios模拟器上UI自动化测试 以appiumwebdriverio为例&#xff0c;详细介绍如何在模拟器上安装和测试app。在使用ios模拟器前&#xff0c;需要安装xcode&#xff0c;创建和启动一个simulator。simulator创建好后&#xff0c;就可以使用xcrun simctl命令安装被测应用并开始测试了。…...

JDK17 Http Request 异步处理 源码刨析

为什么可以异步&#xff1f; #调用起始源码 // 3. 发送异步请求并处理响应 CompletableFuture future client.sendAsync( request, HttpResponse.BodyHandlers.ofString() // 响应体转为字符串 ).thenApply(response -> { // 状态码检查&#xff08;非200系列抛出异常&…...

【Zephyr 系列 8】构建完整 BLE 产品架构:状态机 + AT 命令 + 双通道通信实战

🧠关键词:Zephyr、BLE、状态机、双向透传、AT 命令、Buffer、主从共存、系统架构 📌适合人群:希望开发 BLE 产品(模块/标签/终端)具备可控、可测、可维护架构的开发者 🧭 引言:从“点功能”到“系统架构” 前面几篇我们已经逐步构建了 BLE 广播、连接、数据透传系统…...

【Mac 从 0 到 1 保姆级配置教程 16】- Docker 快速安装配置、常用命令以及实际项目演示

文章目录 前言1. Docker 是什么&#xff1f;2. 为什么要使用 Docker&#xff1f; 安装 Docker1. 安装 Docker Desktop2. 安装 OrbStack3. Docker Desktop VS OrbStack5. 验证安装 使用 Docker 运行项目1. 克隆项目到本地2. 进入项目目录3. 启动容器: 查看运行效果1. OrbStack 中…...

2025-05-01-决策树算法及应用

决策树算法及应用 参考资料 GitHub - zhaoyichanghong/machine_learing_algo_python: implement the machine learning algorithms by p(机器学习相关的 github 仓库)决策树实现与应用决策树 概述 机器学习算法分类 决策树算法 决策树是一种以树状结构对数据进行划分的分类…...

Redis知识体系

1. 概述 本文总结了Redis基本的核心知识体系&#xff0c;在学习Redis的过程中&#xff0c;可以将其作为学习框架&#xff0c;以此更好的从整体的角度去理解和学习Redis的内容和设计思想。同时知识框架带来的好处是可以帮助我们更好的进行记忆&#xff0c;在大脑中形成相应的知识…...

mysql-MySQL体系结构和存储引擎

1. MySQL体系结构和存储引擎 MySQL被设计成一个单进程多线程架构的数据库&#xff0c;MySQL数据库实例在系统上的表现就是一个进 程当启动实例时&#xff0c;读取配置文件&#xff0c;根据配置文件的参数来启动数据库实例&#xff1b;若没有&#xff0c;按编译时的默认 参数设…...

Pycharm 函数注释

1 Docstring format File -> Settings -> Tools -> Python Integrated Tools -> Docstrings -> Docstring format&#xff0c;选择google File -> Settings -> Editor -> General -> Smart Keys -> Insert type placeholders in the documenta…...

如何使用 Redis 快速实现布隆过滤器?

以下是使用 Redis 实现布隆过滤器的两种方案&#xff0c;结合原理说明和操作步骤&#xff1a; 方案一&#xff1a;手动实现&#xff08;基于 Redis Bitmap&#xff09; 原理 利用 Redis 的 SETBIT 和 GETBIT 操作位数组&#xff0c;结合多个哈希函数计算位置。 步骤 确定参数…...

黑马Javaweb Request和Response

一.介绍 在 Web 开发中&#xff0c;HttpServletRequest 和 HttpServletResponse 是两个非常重要的类&#xff0c;它们分别用于处理客户端的请求和服务器的响应。以下是它们的详细说明和使用方法&#xff1a; 1. HttpServletRequest HttpServletRequest 是一个接口&#xff0…...

山东大学深度学习2025年期末考试

一、名词解释&#xff08;24&#xff09; 1.反向传播 2.激活函数 3.梯度裁剪 4.数据增强 5.迁移学习 6.过拟合 7.word2Vec 8.注意力机制 二、简答题&#xff08;48&#xff09; 1.池化的概念&#xff08;作用&#xff09;以及常见的两种池化操作 2.LSTM为什么能解决…...

添加按钮跳转页面并且根据网站的用户状态判断是否显示按钮

现在我们需要的是为页面添加一个按钮&#xff0c;这个按钮是动态的&#xff0c;需要根据网站用户登录过后是否是vip来判断是否显示&#xff0c;然后按钮的效果是跳转到某个页面。 首先我们需要在页面中找到我们需要添加按钮的位置&#xff0c;找到对应的文件&#xff0c;然后比…...

Gerrit+repo管理git仓库,如果本地有新分支不能执行repo sync来同步远程所有修改,会报错

问题&#xff1a;创建一个本地分支TEST 来关联远程已有分支origin/TEST&#xff0c;直接执行repo sync可能会出现问题&#xff1a;比如&#xff0c;本地分支TES会错乱关联到origin/master&#xff0c;或者拉不下最新代码等问题。 // git checkout -b 新分支名 远程分支名字 git…...

豆瓣图书评论数据分析与可视化

【题目描述】豆瓣图书评论数据爬取。以《平凡的世界》、《都挺好》等为分析对象&#xff0c;编写程序爬取豆瓣读书上针对该图书的短评信息&#xff0c;要求&#xff1a; &#xff08;1&#xff09;对前3页短评信息进行跨页连续爬取&#xff1b; &#xff08;2&#xff09;爬取…...

Vue ④-组件通信 || 进阶语法

组件三大部分 template&#xff1a;只有能一个根元素 style&#xff1a;全局样式(默认)&#xff1a;影响所有组件。局部样式&#xff1a;scoped 下样式&#xff0c;只作用于当前组件 script&#xff1a;el 根实例独有&#xff0c;data 是一个函数&#xff0c;其他配置项一致…...

0x-2-Oracle Linux 9上安装JDK配置环境变量

一、JDK选择和使用 安装完Oracle Linux9.6&#xff0c;同时使用rpm包安装Oracle 23 ai free后&#xff0c; 将面临sqlcl程序无法使用和java无法使用&#xff0c;需要相应进行变量配置问题。 1、java 环境运行不存在&#xff0c;Oracle 23ai free安装后默认安装JDK 11 /opt/…...

深入理解卷积神经网络:从原理到应用

在人工智能领域&#xff0c;卷积神经网络&#xff08;Convolutional Neural Network, CNN&#xff09;无疑是计算机视觉领域的璀璨明珠。从 1998 年 Yann LeCun 提出 LeNet-5 实现手写数字识别&#xff0c;到 2012 年 AlexNet 在 ImageNet 大赛上创造历史性突破&#xff0c;CNN…...

从入门到实战:AI学习路线全解析——避坑指南

分享一下阿里的人工智能学习路线,为感兴趣系统学习的小伙伴们探路。 一、谁适合学这门AI课程?五类人群的精准定位 无论你是零基础小白还是职场转型者,这套系统化课程都能为你量身定制成长路径: 零基础爱好者(无编程/数学背景) 课程提供Python和数学前置学习建议,先补基…...

Spring Boot + Thymeleaf 防重复提交

在 Spring Boot 与 Thymeleaf 结合的 Web 应用中&#xff0c;防止重复提交可以采用token 机制 客户端禁用按钮的方式实现&#xff0c;在高并发场景下&#xff0c;考虑使用 Redis 存储 token 而非 Session。 第一步&#xff1a;后端实现 Controller public class FormControl…...

uniapp实现的简约美观的星级评分组件

采用 uniapp 实现的一款简约美观的星级评分模板&#xff0c;提供丝滑动画效果&#xff0c;用户可根据自身需求进行自定义修改、扩展&#xff0c;纯CSS、HTML实现&#xff0c;支持web、H5、微信小程序&#xff08;其他小程序请自行测试&#xff09; 可到插件市场下载尝试&#x…...

AWS Elastic Beanstalk + CodePipeline(Python Flask Web的国区CI/CD)

目标 需要使用AWS Elastic Beanstalk 部署一个Python的Flask Web应用&#xff0c;并且使用CodePipeline作为CI/CD工作流。 eb部署图 前提 假设你已经有一个能够正常运行的Python的Flask Web应用项目代码&#xff0c;而且需要对已有Flask工程做一些调整。由于AWS Elastic Bea…...

多线程语音识别工具

软件介绍 本文介绍一款支持大厂接口的语音转文字工具&#xff0c;具备免配置、免费使用的特点。 软件特性 该工具是一款完全免费的桌面端应用程序&#xff0c;部署于开源社区平台&#xff0c;其核心优势在于整合了多家技术供应商的接口资源。 操作方式 用户只需将音频…...