Python连接Kafka收发数据等操作
目录
一、Kafka
二、发送端(生产者)
三、接收端(消费者)
四、其他操作
一、Kafka
Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。
kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。
安装命令如下:
pip install kafka-python
二、发送端(生产者)
自动创建test主题,并每隔一秒发送一条数据,示例代码如下:
from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode('utf-8'))producer.flush()if __name__ == '__main__':# 创建'test'主题topic = 'test'# 发送消息i = 1while True:message = {'num': i, 'msg': f'Hello Kafka {i}'}send_message(topic, message)i += 1time.sleep(1)
三、接收端(消费者)
代码如下:
from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaConsumer实例
consumer = KafkaConsumer('test',bootstrap_servers=bootstrap_servers,auto_offset_reset='latest', # 从最新的消息开始消费# auto_offset_reset='earliest', # 从最早的offset开始消费enable_auto_commit=True, # 自动提交offsetgroup_id='my-group' # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message = json.loads(message.value.decode('utf-8'))print(f"Received message: {message}")

消费者参数如下:
1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。
2、enable_auto_commit
该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。
3、group_id
该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)
四、其他操作
list_topics():获取主题元数据。
create_topics():创建新主题。
delete_topics():删除主题。
from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)# 删除主题
admin_client.delete_topics(topics=['test-topic'])
相关文章:
Python连接Kafka收发数据等操作
目录 一、Kafka 二、发送端(生产者) 三、接收端(消费者) 四、其他操作 一、Kafka Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构…...
MySql在更新操作时引入“两阶段提交”的必要性
日志模块有两个redo log和binlog,redo log 是引擎层的日志(负责存储相关的事),binlog是在Server层,主要做MySQL共嗯那个层面的事情。redo log就像一个缓冲区,可以让当更新操作的时候先放redo log中…...
充气模块方案——无刷充气泵pcba方案
在方案开发中,充气效率是无刷充气泵PCBA方案开发中的关键问题。一般通过优化电路设计和控制算法,可以实现高效的气体压缩和快速的充气效果。另外,选择合理的电机驱动器和传感器等元器件能够提高打气泵的功率和效率,减少充气时间&a…...
[sql-03] 求阅读至少两章的人数
准备数据 CREATE TABLE book_read (bookid varchar(150) NOT NULL COMMENT 书籍ID,username varchar(150) DEFAULT NULL COMMENT 用户名,seq varchar(150) comment 章节ID ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT 用户阅读表insert into book_read values(《太子日子》…...
Linux如何通过链接下载文件
在Linux系统中,你可以通过多种方式通过链接下载文件。这些方式包括使用命令行工具(如wget、curl、axel等)和图形界面程序(如浏览器或文件管理器)。以下是几种常用的命令行方法: 1. 使用wget wget是一个非交…...
seL4 IPC(五)
官网链接:link 求解 代码中的很多方法例如这一个教程里面的seL4_GetMR(0),我在官方给的手册和API中都搜不到,想问一下大家这些大家都是在哪里搜的!! IPC seL4中的IPC和一般OS中讲的IPC概念相差比较大,根…...
【Java】多线程基础操作
多线程基础操作 Thread类回顾Thread类观察线程运行线程的休眠常用方法构造方法属性获取方法 中断线程线程状态线程等待 初识synchronized问题引入初步使用初步了解可重入锁死锁 volatile问题引入初步使用volatile 与 synchronized 线程顺序控制初步了解wait()notify()防止线程饿…...
基于Hive和Hadoop的病例分析系统
本项目是一个基于大数据技术的医疗病历分析系统,旨在为用户提供全面的病历信息和深入的医疗数据分析。系统采用 Hadoop 平台进行大规模数据存储和处理,利用 MapReduce 进行数据分析和处理,通过 Sqoop 实现数据的导入导出,以 Spark…...
数据结构编程实践20讲(Python版)—03栈
本文目录 03 栈 StackS1 说明S2 示例基于列表的实现基于链表的实现 S3 问题:复杂嵌套结构的括号匹配问题求解思路Python3程序 S4 问题:基于栈的阶乘计算VS递归实现求解思路Python3程序 S5 问题:逆波兰表示法(后缀表达式)求值求解思路Python3程…...
【注册/登录安全分析报告:孔夫子旧书网】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...
PMP--二模--解题--141-150
文章目录 14.敏捷--创建敏捷环境--团队构成--混合项目环境,通常是自组织团队,即团队成员自己决定谁做什么,而不是项目经理决定。易混--常见场景--一个新人加入141、 [单选] 在一个混合项目的执行过程中,不得不更换一个开发人员。新…...
我的领域-关怀三次元成长的二次元虚拟陪伴 | OPENAIGC开发者大赛高校组AI创作力奖
在第二届拯救者杯OPENAIGC开发者大赛中,涌现出一批技术突出、创意卓越的作品。为了让这些优秀项目被更多人看到,我们特意开设了优秀作品报道专栏,旨在展示其独特之处和开发者的精彩故事。 无论您是技术专家还是爱好者,希望能带给…...
个人账号(学校+个人)申请专利过程中遇见的问题
一、请指定一位申请人作为代表人 因为是拿个人账号申请的专利,同时要求学校是第一申请人,所以可以再添加一个第二申请人,然后勾选第二申请人为代表人就可以提交申请了(注意:两个申请人只能减免75%,也就是要…...
在ubuntu系统中,如何让其按下物理关机键时,系统不处理,但qt程序能检测到关机键按下的事件,并处理信号
要让 Ubuntu 系统在按下物理关机键时,系统不直接处理该事件,但让你的 Qt 程序能够检测到并处理关机键的按下事件,可以参考以下步骤: 1. 禁用系统对关机键的默认处理 Ubuntu 系统默认会捕获电源键的按下事件并执行关机操作。首先你…...
先进制造aps专题二十六 基于强化学习的人工智能ai生产排程aps模型简介
基于强化学习的人工智能ai生产排程模型简介 人工智能ai能不能做生产排程? 答案是肯定的。 ai的算法分两类,一类是学习,一类是搜索。 而生产排程问题,它是一个搜索问题,本质上,它和下围棋是一样的 我们…...
各领域/行业硬件一览表
专班硬件装备制造agv小车、机械臂、PDA、服务器、大屏、扭矩传感器、温湿度检测仪、粉尘传感器、陀螺仪传感器、3D打印设备、在线质量检测仪器、新能源水表、电表、气表、汽表、服务器、大屏、温度传感器、压力传感器、光照度传感器、RTU医药化工温湿度传感器、压力传感器、流量…...
机器学习-SVM
线性感知机分类 支持向量机 线性感知机(Perceptron) 感知机是线性二值分类器。 注意:什么是线性?线性分割面就是,就是在分割面中,任意两个的连线也在分割面中,这个分割面,就是线…...
翻译器在线翻译:开启多语言交流新时代
随着国际交流、商务合作、文化交融以及互联网的飞速发展,人们对于跨越语言鸿沟的需求日益迫切。翻译工具成为了我们必备的一个工具,这篇文章我们一起来探讨一些好用的翻译器在线翻译工具吧。 1.在线福昕翻译 链接直达>>https://fanyi.pdf365.cn/…...
网络编程(10)——json序列化
十、day10 今天学习如何使用jsoncpp将json数据解析为c对象,将c对象序列化为json数据。jsoncp经常在网络通信中使用,也就是服务器和客户端的通信一般使用json(可视化好);而protobuf一般在服务器之间的通信中使用 json…...
基于FreeRTOS的STM32多功能手表设计
在智能穿戴设备迅速发展的今天,多功能手表因其便携性和实用性而受到广泛关注。本项目旨在设计一款基于FreeRTOS操作系统的STM32多功能手表,通过实时多任务处理,实现时间显示、多级菜单、万年历、模拟手电筒、温湿度显示、电子闹钟和设置等功能…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?
uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件,用于在原生应用中加载 HTML 页面: 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...
深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙
WebGL:在浏览器中解锁3D世界的魔法钥匙 引言:网页的边界正在消失 在数字化浪潮的推动下,网页早已不再是静态信息的展示窗口。如今,我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室,甚至沉浸式的V…...
数据分析六部曲?
引言 上一章我们说到了数据分析六部曲,何谓六部曲呢? 其实啊,数据分析没那么难,只要掌握了下面这六个步骤,也就是数据分析六部曲,就算你是个啥都不懂的小白,也能慢慢上手做数据分析啦。 第一…...
计算机系统结构复习-名词解释2
1.定向:在某条指令产生计算结果之前,其他指令并不真正立即需要该计算结果,如果能够将该计算结果从其产生的地方直接送到其他指令中需要它的地方,那么就可以避免停顿。 2.多级存储层次:由若干个采用不同实现技术的存储…...
在Spring Boot中集成RabbitMQ的完整指南
前言 在现代微服务架构中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个流行的消息中间件,支持多种消息协议,具有高可靠性和可扩展性。 本博客将详细介绍如何在 Spring Boot 项目…...
软件工程教学评价
王海林老师您好。 您的《软件工程》课程成功地将宏观的理论与具体的实践相结合。上半学期的理论教学中,您通过丰富的实例,将“高内聚低耦合”、SOLID原则等抽象概念解释得十分透彻,让这些理论不再是停留在纸面的名词,而是可以指导…...
