使用Python构建Kafka示例项目
新建项目
mkdir python-kafka-test
cd python-kafka-test
安装依赖
pip install confluent_kafka
创建配置文件
# Kafka配置文件# Kafka服务器配置
KAFKA_CONFIG = {'bootstrap.servers': 'localhost:9092',# 生产者特定配置'producer': {'client.id': 'python-kafka-producer','acks': 'all', # 确保消息被所有副本确认'retries': 3, # 重试次数'retry.backoff.ms': 1000, # 重试间隔'batch.size': 16384, # 批处理大小'linger.ms': 5, # 等待时间以允许更多消息加入批次'compression.type': 'snappy', # 压缩类型},# 消费者特定配置'consumer': {'group.id': 'notification-group','auto.offset.reset': 'earliest','enable.auto.commit': True,'auto.commit.interval.ms': 5000,'session.timeout.ms': 30000,'max.poll.interval.ms': 300000,'heartbeat.interval.ms': 10000,}
}# 主题配置
TOPICS = {'email': 'email-topic','sms': 'sms-topic'
}
创建Kafka生产者
import json
import logging
import signal
import sys
from confluent_kafka import Producer
from config import KAFKA_CONFIG, TOPICS# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('kafka-producer')# 合并配置
producer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('producer', {})}
# 移除嵌套的producer配置,避免冲突
if 'producer' in producer_config:del producer_config['producer']
if 'consumer' in producer_config:del producer_config['consumer']# 创建Producer实例
p = Producer(producer_config)# 标记是否正在关闭
shutting_down = Falsedef signal_handler(sig, frame):"""处理终止信号,确保优雅关闭"""global shutting_downif shutting_down:returnshutting_down = Truelogger.info("接收到终止信号,正在优雅关闭...")# 确保所有消息都被发送p.flush(10) # 等待最多10秒logger.info("生产者已关闭")sys.exit(0)# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)def delivery_report(err, msg):"""消息发送回调函数"""if err is not None:logger.error(f'消息发送失败: {err}')else:logger.info(f'消息已发送到 {msg.topic()} [分区 {msg.partition()}]')def send_notification(topic_key, payload, key=None):"""发送通知消息到指定主题Args:topic_key: 主题键名(在TOPICS字典中定义)payload: 消息内容(字典或JSON字符串)key: 可选的消息键Returns:bool: 是否成功将消息加入发送队列"""try:# 获取实际主题名topic = TOPICS.get(topic_key, topic_key)# 如果payload是字典,转换为JSON字符串if isinstance(payload, dict):payload = json.dumps(payload)# 发送消息p.produce(topic, payload.encode('utf-8'), key=key.encode('utf-8') if key else None,callback=delivery_report)# 轮询一次以触发回调p.poll(0)logger.info(f'消息已加入发送队列: {topic}')return Trueexcept Exception as e:logger.error(f'发送消息时出错: {e}')return False# 使用示例
if __name__ == "__main__":try:# 发送邮件通知email_payload = {"to": "receiver@example.com", "from": "sender@example.com", "subject": "Sample Email", "body": "This is a sample email notification"}send_notification('email', email_payload)# 发送短信通知sms_payload = {"phoneNumber": "1234567890", "message": "This is a sample SMS notification"}send_notification('sms', sms_payload)# 确保所有消息都被发送remaining = p.flush(timeout=5)if remaining > 0:logger.warning(f'仍有 {remaining} 条消息未发送完成')else:logger.info('所有消息已成功发送')except KeyboardInterrupt:logger.info("程序被用户中断")except Exception as e:logger.error(f"发生错误: {e}")finally:# 确保所有消息都被发送p.flush(timeout=5)
创建Kafka消费者
import json
import logging
import signal
import sys
from confluent_kafka import Consumer, KafkaError
from config import KAFKA_CONFIG, TOPICS# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('kafka-consumer')# 合并配置
consumer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('consumer', {})}
# 移除嵌套的配置,避免冲突
if 'producer' in consumer_config:del consumer_config['producer']
if 'consumer' in consumer_config:del consumer_config['consumer']# 创建Consumer实例
c = Consumer(consumer_config)# 标记是否正在关闭
shutting_down = Falsedef signal_handler(sig, frame):"""处理终止信号,确保优雅关闭"""global shutting_downif shutting_down:returnshutting_down = Truelogger.info("接收到终止信号,正在优雅关闭...")sys.exit(0)# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)def process_message(msg):"""处理接收到的消息Args:msg: Kafka消息对象"""try:topic = msg.topic()value = msg.value().decode("utf-8")key = msg.key().decode("utf-8") if msg.key() else None# 尝试解析JSONtry:payload = json.loads(value)logger.info(f'接收到消息 [主题: {topic}, 键: {key}]')logger.debug(f'消息内容: {payload}')except json.JSONDecodeError:logger.info(f'接收到非JSON消息 [主题: {topic}, 键: {key}]: {value}')# 根据主题类型处理不同的消息if topic == TOPICS['email']:handle_email_notification(payload if 'payload' in locals() else value)elif topic == TOPICS['sms']:handle_sms_notification(payload if 'payload' in locals() else value)else:logger.warning(f'收到未知主题的消息: {topic}')except Exception as e:logger.error(f'处理消息时出错: {e}')def handle_email_notification(payload):"""处理邮件通知"""# 这里实现实际的邮件发送逻辑logger.info(f'处理邮件通知: {payload}')def handle_sms_notification(payload):"""处理短信通知"""# 这里实现实际的短信发送逻辑logger.info(f'处理短信通知: {payload}')def main():try:# 订阅主题topics_to_subscribe = list(TOPICS.values())logger.info(f'订阅主题: {topics_to_subscribe}')c.subscribe(topics_to_subscribe)logger.info('开始消费消息...')while not shutting_down:msg = c.poll(1.0) # 超时时间1秒if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# 到达分区末尾,不是错误logger.debug(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')continueelse:# 其他错误logger.error(f'Kafka错误: {msg.error()}')break# 处理消息process_message(msg)except KeyboardInterrupt:logger.info("程序被用户中断")except Exception as e:logger.error(f"发生错误: {e}")finally:# 关闭消费者logger.info("关闭消费者...")c.close()logger.info("消费者已关闭")if __name__ == "__main__":main()
运行项目
打开终端运行命令
python producer.py
python consumer.py


可以看到终端输出正常
详细代码:https://github.com/wan88888/python-kafka-test
相关文章:
使用Python构建Kafka示例项目
新建项目 mkdir python-kafka-test cd python-kafka-test 安装依赖 pip install confluent_kafka 创建配置文件 # Kafka配置文件# Kafka服务器配置 KAFKA_CONFIG {bootstrap.servers: localhost:9092,# 生产者特定配置producer: {client.id: python-kafka-producer,acks:…...
本地化部署DeepSeek-R1蒸馏大模型:基于飞桨PaddleNLP 3.0的实战指南
目录 一、飞桨框架3.0:大模型推理新范式的开启1.1 自动并行机制革新:解放多卡推理1.2 推理-训练统一设计:一套代码全流程复用 二、本地部署DeepSeek-R1-Distill-Llama-8B的实战流程2.1 机器环境说明2.2 模型与推理脚本准备2.3 启动 Docker 容…...
VBA 64位API声明语句第008讲
跟我学VBA,我这里专注VBA, 授人以渔。我98年开始,从源码接触VBA已经20余年了,随着年龄的增长,越来越觉得有必要把这项技能传递给需要这项技术的职场人员。希望职场和数据打交道的朋友,都来学习VBA,利用VBA,起码可以提高…...
Linux信号——信号的保存(2)
关于core和term两种终止方式 core是什么? 将进程在内存中的核心数据(与调试有关)转存到磁盘中形成core,core.pid的文件。 core dump:核心转储。 core与term的区别: term只是普通的终止,而core终止方式还要…...
PyQt6实例_A股日数据维护工具_权息数据增量更新线程
目录 前置: 代码: 1 工作类 2 数据库交互 3 主界面启用子线程 视频: 前置: 1 本系列将以 “PyQt6实例_A股日数据维护工具” 开头放置在“PyQt6实例”专栏 专栏地址 https://blog.csdn.net/m0_37967652/category_12929760.h…...
【蓝桥杯嵌入式——学习笔记一】2016年第七届省赛真题重难点解析记录,闭坑指南(文末附完整代码)
在读题过程中发现本次使用的是串口2,需要配置串口2。 但在查看产品手册时发现PA14同时也是SWCLK。 所以在使用串口2时需要拔下跳线帽去连接CH340。 可能是用到串口2的缘故,在烧录时发现报了一个错误。这时我们要想烧录得按着复位键去点击烧录,…...
基础常问 (概念、代码)
读源码 代码题 Void方法 ,也可以提前rerun;结束 RandomAccessFile类(随机访问文件) 在 Java 中,可以使用RandomAccessFile类来实现文件指针操作。RandomAccessFile提供了对文件内容的随机访问功能,它的文件指针可以通…...
大学生机器人比赛实战(一)综述篇
大学生机器人比赛实战 参加机器人比赛是大学生提升工程实践能力的绝佳机会。本指南将全面介绍如何从零开始准备华北五省机器人大赛、ROBOCAN、RoboMaster等主流机器人赛事,涵盖硬件设计、软件开发、算法实现和团队协作等关键知识。 一、比赛选择与准备策略 1.1 主…...
什么是宽带拨号?
宽带拨号(PPPoE拨号)是一种通过账号密码认证接入互联网的方式,常见于家庭宽带、企业专线等场景。用户需要通过路由器或电脑进行拨号连接,运营商验证身份后分配IP地址,才能正常上网。 1. 宽带拨号的工作原理 PPPoE协议&…...
J1 ResNet-50算法实战与解析
🍨 本文為🔗365天深度學習訓練營 中的學習紀錄博客🍖 原作者:K同学啊 | 接輔導、項目定制 一、理论知识储备 1. 残差网络的由来 ResNet主要解决了CNN在深度加深时的退化问题(梯度消失与梯度爆炸)。 虽然B…...
[MySQL初阶]MySQL(8)索引机制:下
标题:[MySQL初阶]MySQL(8)索引机制:下 水墨不写bug 文章目录 四、从问题到底层,从现象到本质1.为什么插入的数据默认排好序2.MySQL的Page(1)为什么选择用Page?(2&#x…...
Muduo网络库实现 [九] - EventLoopThread模块
目录 设计思路 类的设计 模块的实现 私有接口 公有接口 设计思路 我们说过一个EventLoop要绑定一个线程,未来该EventLoop所管理的所有的连接的操作都需要在这个EventLoop绑定的线程中进行,所以我们该如何实现将EventLoop和线程绑定呢?…...
Vim操作指令全解析
Vim是我们在Linux日常工作中不可或缺的文本编辑器。它强大的功能和高效的编辑方式可以极大提升工作效率。本文将全面解析Vim的各种操作指令,从基础操作到高级技巧。 一、Vim模式解析 Vim是一个模式化编辑器,理解不同模式是掌握Vim的关键: …...
《K230 从熟悉到...》识别机器码(AprilTag)
《K230 从熟悉到...》识别机器码(aprirltag) tag id 《庐山派 K230 从熟悉到...》 识别机器码(AprilTag) AprilTag是一种基于二维码的视觉标记系统,最早是由麻省理工学院(MIT)在2008年开发的。A…...
VMware ESXi:企业级虚拟化平台详解
VMware ESXi:企业级虚拟化平台详解 目录 什么是VMware ESXi? ESXi的发展历史 ESXi的核心特性 3.1 裸机架构(Type-1 Hypervisor) 3.2 轻量化与高性能 3.3 集中管理(vCenter集成) ESXi的架构与工作原理…...
使用 PyTorch 的 `optim.lr_scheduler.CosineAnnealingLR` 学习率调度器
使用 PyTorch 的 optim.lr_scheduler.CosineAnnealingLR 学习率调度器 在深度学习中,学习率(Learning Rate, LR)是影响模型训练效果的一个关键超参数。一个合适的学习率调度策略可以帮助模型更快地收敛,同时避免陷入局部最优或振荡。PyTorch 提供了多种学习率调度器,其中…...
栈和队列的概念
1.栈的概念 只允许在固定的一端进行插入和删除,进行数据的插入和数据的删除操作的一端数栈顶,另一端称为栈底。 栈中数据元素遵循后进先出LIFO (Last In First Out) 压栈:栈的插入。 出栈:栈的删除。出入数据在栈顶。 那么下面…...
常用的元素操作API
click 触发当前元素的点击事件 clear() 清空内容 sendKeys(...) 往文本框一类元素中写入内容 getTagName() 获取元素的的标签名 getAttribute(属性名) 根据属性名获取元素属性值 getText() 获取当前元素的文本值 isDisplayed() 查看元素是否显示 get(String url) 访…...
红日靶场一实操笔记
一,网络拓扑图 二,信息搜集 1.kali机地址:192.168.50.129 2.探测靶机 注:需要win7开启c盘里面的phpstudy的服务。 nmap -sV -Pn 192.168.50.128 或者扫 nmap -PO 192.168.50.0/24 可以看出来win7(ip为192.168.50.128)的靶机开…...
SpringBoot集成Redis 灵活使用 TypedTuple 和 DefaultTypedTuple 实现 Redis ZSet 的复杂操作
以下是 Spring Boot 集成 Redis 中 TypedTuple 和 DefaultTypedTuple 的详细使用说明,包含代码示例和场景说明: 1. 什么是 TypedTuple 和 DefaultTypedTuple? TypedTuple<T> 接口: 定义了 Redis 中有序集合(ZSet…...
7-4 BCD解密
BCD数是用一个字节来表达两位十进制的数,每四个比特表示一位。所以如果一个BCD数的十六进制是0x12,它表达的就是十进制的12。但是小明没学过BCD,把所有的BCD数都当作二进制数转换成十进制输出了。于是BCD的0x12被输出成了十进制的18了&#x…...
Golang改进后的任务调度系统分析
以下是整合了所有改进点的完整代码实现: package mainimport ("bytes""context""fmt""io""log""net/http""sync""time""github.com/go-redis/redis/v8""github.com/robfig/…...
【目标检测】【深度学习】【Pytorch版本】YOLOV2模型算法详解
【目标检测】【深度学习】【Pytorch版本】YOLOV2模型算法详解 文章目录 【目标检测】【深度学习】【Pytorch版本】YOLOV2模型算法详解前言YOLOV2的模型结构YOLOV2模型的基本执行流程YOLOV2模型的网络参数YOLOV2模型的训练方式 YOLOV2的核心思想前向传播阶段反向传播阶段 总结 前…...
NineData云原生智能数据管理平台新功能发布|2025年3月版
本月发布 15 项更新,其中重点发布 3 项、功能优化 11 项、性能优化 1 项。 重点发布 基础服务 - MFA 多因子认证 新增 MFA 多因子认证,提升账号安全性。系统管理员开启后,所有组织成员需绑定认证器,登录时需输入动态验证码。 数…...
破局与赋能:信息系统战略规划方法论
信息系统战略规划是将组织的战略目标和发展规划转化为信息系统的战略目标和发展规划的过程,常见的方法有以下几种: 一、企业系统规划法(BSP) 1.基本概念:通过全面调查,分析企业信息需求,确定信…...
GLSL(OpenGL 着色器语言)基础语法
GLSL(OpenGL 着色器语言)基础语法 GLSL(OpenGL Shading Language)是 OpenGL 计算着色器的语言,语法类似于 C 语言,但提供了针对 GPU 的特殊功能,如向量运算和矩阵运算。 着色器的开头总是要声明…...
Redis基础知识-3
RedisTemplate对多种数据结构的操作 1. String类型 示例代码: // 保存数据 redisTemplate.opsForValue().set("user:1001", "John Doe"); // 设置键值对,无过期时间 redisTemplate.opsForValue().set("user:1002", &qu…...
Git Rebase 操作中丢失提交的恢复方法
背景介绍 在团队协作中,使用 Git 进行版本控制是常见实践。然而,有时在执行 git rebase 或者其他操作后,我们可能会发现自己的提交记录"消失"了,这往往让开发者感到恐慌。本文将介绍几种在 rebase 后恢复丢失提交的方法。 问题描述 当我们执行以下操作时,可能…...
【diffusers 进阶(十五)】dataset 工具,Parquet和Arrow 数据文件格式,load dataset 方法
系列文章目录 【diffusers 极速入门(一)】pipeline 实际调用的是什么? call 方法!【diffusers 极速入门(二)】如何得到扩散去噪的中间结果?Pipeline callbacks 管道回调函数【diffusers极速入门࿰…...
unity各个面板说明
游戏开发,unity各个面板说明 提示:帮帮志会陆续更新非常多的IT技术知识,希望分享的内容对您有用。本章分享的是Python基础语法。前后每一小节的内容是存在的有:学习and理解的关联性,希望对您有用~ unity简介-unity基础…...
