Python 消费Kafka手动提交 批量存入Elasticsearch
一、第三方包选择
pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快
pip install elasticsearch==7.12.0(ES版本)
二、创建es连接对象
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulkclass Create_ES(object):_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instancedef __init__(self, hosts):try:self.es = Elasticsearch([{'host':host, 'port':9200}])except Exception as e:print('Connect ES Fail db:{} error:{}'.format(hosts, str(e)))def get_conn(self):return self.esdef set_multi_data(self, datas):'''批量插入数据'''success = bulk(self.es, datas, raise_on_error=True)return success
三、消费kafka数据
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from . import Create_ESclass AppKfkConsumer(object):def __init__(self):self.server = 'localhost:9092'self.topic = KAFKA_TOPICself.consumer = Noneself.tp = Noneself.consumer_timeout_ms = 5000 # 设置消费超时时间,self.type = 'members'self.group_id = 'test1' # 设置消费group_id,避免重复消费self.es_index = 'index' # es的indexdef get_connect(self):self.consumer = KafkaConsumer(group_id=self.group_id,auto_offset_reset='earliest', # 从最早的数据开始消费bootstrap_servers=self.server,enable_auto_commit=False, # 关闭自动提交consumer_timeout_ms=self.consumer_timeout_ms)self.tp = TopicPartition(topic=self.topic, partition=0) # 设置我们要消费的分区self.consumer.assign([self.tp]) # 由consumer对象分配分区def beginConsumer(self):now_offset = 0 # 当前偏移量es_conn = Create_ES()Actions = []while True:for message in self.consumer:now_offset = message.offset # 获取当前偏移量data = eval(message.value.decode()) # 解析数据action = {"_index": self.es_index,"_type": self.type,"_source": data}Actions.append(action)if len(Actions) >= 50000:result = es_conn.set_multi_data(Actions) # 批量插入数据Actions = []# 提交偏移量,now_offset+1的原因是因为我发现如果不加1,下次消费会从上次消费最后一条数据开始,重复消费self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})if len(Actions) > 0:result = es_conn.set_multi_data(Actions)Actions = []self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})def delconnect(self):self.consumer.close()# 执行任务
ks = AppKfkConsumer()
ks.get_connect()
ks.beginConsumer()
相关文章:
Python 消费Kafka手动提交 批量存入Elasticsearch
一、第三方包选择 pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快pip install elasticsearch7.12.0(ES版本) 二、创建es连接对象 from elasticsearch import Elasticsearch from elasticsearch.helpers import bulkc…...
oracle 基础知识表的主键
一、表的约束条件 •约束条件是施加在表的字段上的一组限制条件,它使得只有符合限制条件要求的数据才能输入表。 •保证了表中的数据的正确性 i.约束条件包括了:非空和唯一和核对,即not null 和unique 和check null的含义:不确定 3个人去捡苹…...
opencascade AIS_MouseGesture AIS_MultipleConnectedInteractive源码学习
AIS_MouseGesture //! 鼠标手势 - 同一时刻只能激活一个。 enum AIS_MouseGesture { AIS_MouseGesture_NONE, //!< 无激活手势 // AIS_MouseGesture_SelectRectangle, //!< 矩形选择; //! 按下按钮开始,移动鼠标定义矩形&…...
Unity Apple Vision Pro 开发:如何把 PolySpatial 和 Play To Device 的版本从 1.2.3 升级为 1.3.1
XR 开发社区: SpatialXR社区:完整课程、项目下载、项目孵化宣发、答疑、投融资、专属圈子 📕教程说明 本教程将介绍如何把 Unity 的 PolySpatial 和 Play To Device 版本从 1.2.3 升级为 1.3.1。 📕Play To Device 软件升级 ht…...
大数据时代,区块链是如何助力数据开放共享的?
在大数据时代,区块链技术以其独特的优势,为数据开放共享提供了强有力的支持。以下是区块链助力数据开放共享的几个主要方面: 1. 增强数据安全性与隐私保护 加密安全:区块链技术采用先进的加密算法,如国密非对称加密技…...
睿抗2024省赛----RC-u4 章鱼图的判断
题目 对于无向图 G(V,E),我们将有且只有一个环的、大于 2 个顶点的无向连通图称之为章鱼图,因为其形状像是一个环(身体)带着若干个树(触手),故得名。 给定一个无向图,请你判断是不…...
py2exe,一个神奇的 Python 库
在众多Python打包工具中,py2exe无疑是一款出色的选择。它能够将Python脚本转换成可在Windows平台上独立运行的可执行文件,极大地方便了程序的分发与部署。本文将深入探讨py2exe的特性和使用方法,让你在创建桌面应用程序时更加游刃有余。 安装…...
博途PLC网络连接不上
博途PLC网络连接不上其中的一个原因就是网线接触不好,各种原因都试了,任然连接不上,大家可以把网线拔下,重新插拔或者直接更换一根网线。 1、无线网络网段和PLC连接网段冲突 。。。。...
哪个邮箱最安全最好用啊
企业邮箱安全至关重要,需保护隐私、防财务损失、维护通信安全、避免纠纷,并维持业务连续性。哪个企业邮箱最安全好用呢?Zoho企业邮箱,采用加密技术、反垃圾邮件和病毒保护,支持多因素认证,确保数据安全合规…...
企业微信开发智能升级:AIGC技术赋能,打造高效沟通平台
文章目录 一、AIGC在企业微信开发中的核心价值1. 智能化客服体验2. 自动化工作流程3. 个性化内容推荐4. 深度数据分析与洞察 二、使用AIGC进行企业微信开发的实践路径1. 需求分析与场景定义2. 技术选型与平台搭建3. 模型训练与调优4. 接口对接与功能集成5. 测试与优化 《企业微…...
Apache Doris + Paimon 快速搭建指南|Lakehouse 使用手册(二)
湖仓一体(Data Lakehouse)融合了数据仓库的高性能、实时性以及数据湖的低成本、灵活性等优势,帮助用户更加便捷地满足各种数据处理分析的需求。在过去多个版本中,Apache Doris 持续加深与数据湖的融合,已演进出一套成熟…...
Inno setup pascal编码下如何美化安装界面支持带边框,圆角,透明阴影窗口
inno setup自带的安装界面太老套了,如何实现类似网易,微信那种带界面的安装?一般有两种思路:提供一个单独的下载器,然后通过下载器将你用innosetup 打包后的软件下载下来,然后,静默安装这个包&a…...
SQL语句(以MySQL为例)——单表、多表查询
笛卡尔积(或交叉连接): 笛卡尔乘积是一个数学运算。假设我有两个集合 X 和 Y,那么 X 和 Y 的笛卡尔积就是 X 和 Y 的所有可能组合,也就是第一个对象来自于 X,第二个对象来自于 Y 的所有可能。组合的个数即为两个集合中…...
C++第二十八弹---进一步理解模板:特化和分离编译
✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】【C详解】 目录 1. 非类型模板参数 2. 模板的特化 2.1 概念 2.2 函数模板特化 2.3 类模板特化 2.3.1 全特化 2.3.2 偏特化 2.3.3 类模板特化应用示例 3. …...
正则表达式的独占模式,懒惰模式等有那些区别
正则表达式的独占模式、懒惰模式(也称为非贪婪模式)和贪婪模式(默认模式)在匹配行为上存在显著的区别。以下是这三种模式的详细解释和区别: 1、贪婪模式(Greedy): 默认情况下&…...
【INTEL(ALTERA)】Quartus® Prime Pro Edition 软件 v24.2 中,哪些 Agilex™ 5 IP 功能的硬件验证有限?
目录 说明 解决方法 说明 如下表所示,Quartus Prime 专业版软件 24.2 版为 Agilex™ 5 IP 或功能提供有限的硬件支持。此外,设备的设备型号、比特流和固件尚未最终确定。 影响 Agilex™ 5 特定功能的已知问题可参阅 Agilex 5 知识库文章搜索。 解决…...
Lua编程
文章目录 概述lua数据类型元表注意 闭包表现 实现 lua/c 接口编程skynet中调用层次虚拟栈C闭包注册表userdatalightuserdata 小结 概述 这次是skynet,需要一些lua/c相关的。写一篇博客,记录下。希望有所收获。 lua数据类型 boolean , number , string…...
2019数字经济公测大赛-VMware逃逸
文章目录 环境搭建漏洞点exp 环境搭建 ubuntu :18.04.01vmware: VMware-Workstation-Full-15.5.0-14665864.x86_64.bundle 这里环境搭不成功。。patch过后就报错,不知道咋搞 发现可能是IDA加载后的patch似乎不行对原来的patch可能有影响,重新下了patch&…...
如何改桥接模式
桥接模式主要是解决 路由功能的 因为NAT多层 主要是网络连接数太多时 然后路由器要好 不然光猫 比差路由要强的 光猫 请注意,对光猫的任何设置进行修改前,请一定要备份光猫的配置文件,并在每次修改前都截图保存原始设置信息!不要…...
江科大/江协科技 STM32学习笔记P13
文章目录 TIM定时中断1、TIM简介计数器PSC预分频器ARR自动重装寄存器 2、定时器类型基本定时器主模式触发DAC 通用定时器高级定时器 3、定时器原理定时中断基本结构预分频器时序计数器时序RCC时钟树 TIM定时中断 1、TIM简介 定时器的基准时钟一般都是主频72MHz,如果…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...
TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?
在工业自动化持续演进的今天,通信网络的角色正变得愈发关键。 2025年6月6日,为期三天的华南国际工业博览会在深圳国际会展中心(宝安)圆满落幕。作为国内工业通信领域的技术型企业,光路科技(Fiberroad&…...
在 Spring Boot 中使用 JSP
jsp? 好多年没用了。重新整一下 还费了点时间,记录一下。 项目结构: pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...
