Python连接Kafka收发数据等操作
目录
一、Kafka
二、发送端(生产者)
三、接收端(消费者)
四、其他操作
一、Kafka
Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。
kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。
安装命令如下:
pip install kafka-python
二、发送端(生产者)
自动创建test主题,并每隔一秒发送一条数据,示例代码如下:
from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode('utf-8'))producer.flush()if __name__ == '__main__':# 创建'test'主题topic = 'test'# 发送消息i = 1while True:message = {'num': i, 'msg': f'Hello Kafka {i}'}send_message(topic, message)i += 1time.sleep(1)
三、接收端(消费者)
代码如下:
from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaConsumer实例
consumer = KafkaConsumer('test',bootstrap_servers=bootstrap_servers,auto_offset_reset='latest', # 从最新的消息开始消费# auto_offset_reset='earliest', # 从最早的offset开始消费enable_auto_commit=True, # 自动提交offsetgroup_id='my-group' # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message = json.loads(message.value.decode('utf-8'))print(f"Received message: {message}")
消费者参数如下:
1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。
2、enable_auto_commit
该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。
3、group_id
该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)
四、其他操作
list_topics():获取主题元数据。
create_topics():创建新主题。
delete_topics():删除主题。
from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)# 删除主题
admin_client.delete_topics(topics=['test-topic'])
相关文章:

Python连接Kafka收发数据等操作
目录 一、Kafka 二、发送端(生产者) 三、接收端(消费者) 四、其他操作 一、Kafka Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构…...

MySql在更新操作时引入“两阶段提交”的必要性
日志模块有两个redo log和binlog,redo log 是引擎层的日志(负责存储相关的事),binlog是在Server层,主要做MySQL共嗯那个层面的事情。redo log就像一个缓冲区,可以让当更新操作的时候先放redo log中…...
充气模块方案——无刷充气泵pcba方案
在方案开发中,充气效率是无刷充气泵PCBA方案开发中的关键问题。一般通过优化电路设计和控制算法,可以实现高效的气体压缩和快速的充气效果。另外,选择合理的电机驱动器和传感器等元器件能够提高打气泵的功率和效率,减少充气时间&a…...

[sql-03] 求阅读至少两章的人数
准备数据 CREATE TABLE book_read (bookid varchar(150) NOT NULL COMMENT 书籍ID,username varchar(150) DEFAULT NULL COMMENT 用户名,seq varchar(150) comment 章节ID ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT 用户阅读表insert into book_read values(《太子日子》…...
Linux如何通过链接下载文件
在Linux系统中,你可以通过多种方式通过链接下载文件。这些方式包括使用命令行工具(如wget、curl、axel等)和图形界面程序(如浏览器或文件管理器)。以下是几种常用的命令行方法: 1. 使用wget wget是一个非交…...

seL4 IPC(五)
官网链接:link 求解 代码中的很多方法例如这一个教程里面的seL4_GetMR(0),我在官方给的手册和API中都搜不到,想问一下大家这些大家都是在哪里搜的!! IPC seL4中的IPC和一般OS中讲的IPC概念相差比较大,根…...

【Java】多线程基础操作
多线程基础操作 Thread类回顾Thread类观察线程运行线程的休眠常用方法构造方法属性获取方法 中断线程线程状态线程等待 初识synchronized问题引入初步使用初步了解可重入锁死锁 volatile问题引入初步使用volatile 与 synchronized 线程顺序控制初步了解wait()notify()防止线程饿…...

基于Hive和Hadoop的病例分析系统
本项目是一个基于大数据技术的医疗病历分析系统,旨在为用户提供全面的病历信息和深入的医疗数据分析。系统采用 Hadoop 平台进行大规模数据存储和处理,利用 MapReduce 进行数据分析和处理,通过 Sqoop 实现数据的导入导出,以 Spark…...

数据结构编程实践20讲(Python版)—03栈
本文目录 03 栈 StackS1 说明S2 示例基于列表的实现基于链表的实现 S3 问题:复杂嵌套结构的括号匹配问题求解思路Python3程序 S4 问题:基于栈的阶乘计算VS递归实现求解思路Python3程序 S5 问题:逆波兰表示法(后缀表达式)求值求解思路Python3程…...

【注册/登录安全分析报告:孔夫子旧书网】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...

PMP--二模--解题--141-150
文章目录 14.敏捷--创建敏捷环境--团队构成--混合项目环境,通常是自组织团队,即团队成员自己决定谁做什么,而不是项目经理决定。易混--常见场景--一个新人加入141、 [单选] 在一个混合项目的执行过程中,不得不更换一个开发人员。新…...

我的领域-关怀三次元成长的二次元虚拟陪伴 | OPENAIGC开发者大赛高校组AI创作力奖
在第二届拯救者杯OPENAIGC开发者大赛中,涌现出一批技术突出、创意卓越的作品。为了让这些优秀项目被更多人看到,我们特意开设了优秀作品报道专栏,旨在展示其独特之处和开发者的精彩故事。 无论您是技术专家还是爱好者,希望能带给…...

个人账号(学校+个人)申请专利过程中遇见的问题
一、请指定一位申请人作为代表人 因为是拿个人账号申请的专利,同时要求学校是第一申请人,所以可以再添加一个第二申请人,然后勾选第二申请人为代表人就可以提交申请了(注意:两个申请人只能减免75%,也就是要…...
在ubuntu系统中,如何让其按下物理关机键时,系统不处理,但qt程序能检测到关机键按下的事件,并处理信号
要让 Ubuntu 系统在按下物理关机键时,系统不直接处理该事件,但让你的 Qt 程序能够检测到并处理关机键的按下事件,可以参考以下步骤: 1. 禁用系统对关机键的默认处理 Ubuntu 系统默认会捕获电源键的按下事件并执行关机操作。首先你…...
先进制造aps专题二十六 基于强化学习的人工智能ai生产排程aps模型简介
基于强化学习的人工智能ai生产排程模型简介 人工智能ai能不能做生产排程? 答案是肯定的。 ai的算法分两类,一类是学习,一类是搜索。 而生产排程问题,它是一个搜索问题,本质上,它和下围棋是一样的 我们…...
各领域/行业硬件一览表
专班硬件装备制造agv小车、机械臂、PDA、服务器、大屏、扭矩传感器、温湿度检测仪、粉尘传感器、陀螺仪传感器、3D打印设备、在线质量检测仪器、新能源水表、电表、气表、汽表、服务器、大屏、温度传感器、压力传感器、光照度传感器、RTU医药化工温湿度传感器、压力传感器、流量…...

机器学习-SVM
线性感知机分类 支持向量机 线性感知机(Perceptron) 感知机是线性二值分类器。 注意:什么是线性?线性分割面就是,就是在分割面中,任意两个的连线也在分割面中,这个分割面,就是线…...

翻译器在线翻译:开启多语言交流新时代
随着国际交流、商务合作、文化交融以及互联网的飞速发展,人们对于跨越语言鸿沟的需求日益迫切。翻译工具成为了我们必备的一个工具,这篇文章我们一起来探讨一些好用的翻译器在线翻译工具吧。 1.在线福昕翻译 链接直达>>https://fanyi.pdf365.cn/…...

网络编程(10)——json序列化
十、day10 今天学习如何使用jsoncpp将json数据解析为c对象,将c对象序列化为json数据。jsoncp经常在网络通信中使用,也就是服务器和客户端的通信一般使用json(可视化好);而protobuf一般在服务器之间的通信中使用 json…...
基于FreeRTOS的STM32多功能手表设计
在智能穿戴设备迅速发展的今天,多功能手表因其便携性和实用性而受到广泛关注。本项目旨在设计一款基于FreeRTOS操作系统的STM32多功能手表,通过实时多任务处理,实现时间显示、多级菜单、万年历、模拟手电筒、温湿度显示、电子闹钟和设置等功能…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...

Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...

微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...

vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...