当前位置: 首页 > news >正文

Python连接Kafka收发数据等操作

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode('utf-8'))producer.flush()if __name__ == '__main__':# 创建'test'主题topic = 'test'# 发送消息i = 1while True:message = {'num': i, 'msg': f'Hello Kafka {i}'}send_message(topic, message)i += 1time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaConsumer实例
consumer = KafkaConsumer('test',bootstrap_servers=bootstrap_servers,auto_offset_reset='latest',  # 从最新的消息开始消费# auto_offset_reset='earliest',  # 从最早的offset开始消费enable_auto_commit=True,  # 自动提交offsetgroup_id='my-group'  # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message = json.loads(message.value.decode('utf-8'))print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)# 删除主题
admin_client.delete_topics(topics=['test-topic'])

相关文章:

Python连接Kafka收发数据等操作

目录 一、Kafka 二、发送端(生产者) 三、接收端(消费者) 四、其他操作 一、Kafka Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构…...

MySql在更新操作时引入“两阶段提交”的必要性

日志模块有两个redo log和binlog,redo log 是引擎层的日志(负责存储相关的事),binlog是在Server层,主要做MySQL共嗯那个层面的事情。redo log就像一个缓冲区,可以让当更新操作的时候先放redo log中&#xf…...

充气模块方案——无刷充气泵pcba方案

在方案开发中,充气效率是无刷充气泵PCBA方案开发中的关键问题。一般通过优化电路设计和控制算法,可以实现高效的气体压缩和快速的充气效果。另外,选择合理的电机驱动器和传感器等元器件能够提高打气泵的功率和效率,减少充气时间&a…...

[sql-03] 求阅读至少两章的人数

准备数据 CREATE TABLE book_read (bookid varchar(150) NOT NULL COMMENT 书籍ID,username varchar(150) DEFAULT NULL COMMENT 用户名,seq varchar(150) comment 章节ID ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT 用户阅读表insert into book_read values(《太子日子》…...

Linux如何通过链接下载文件

在Linux系统中,你可以通过多种方式通过链接下载文件。这些方式包括使用命令行工具(如wget、curl、axel等)和图形界面程序(如浏览器或文件管理器)。以下是几种常用的命令行方法: 1. 使用wget wget是一个非交…...

seL4 IPC(五)

官网链接:link 求解 代码中的很多方法例如这一个教程里面的seL4_GetMR(0),我在官方给的手册和API中都搜不到,想问一下大家这些大家都是在哪里搜的!! IPC seL4中的IPC和一般OS中讲的IPC概念相差比较大,根…...

【Java】多线程基础操作

多线程基础操作 Thread类回顾Thread类观察线程运行线程的休眠常用方法构造方法属性获取方法 中断线程线程状态线程等待 初识synchronized问题引入初步使用初步了解可重入锁死锁 volatile问题引入初步使用volatile 与 synchronized 线程顺序控制初步了解wait()notify()防止线程饿…...

基于Hive和Hadoop的病例分析系统

本项目是一个基于大数据技术的医疗病历分析系统,旨在为用户提供全面的病历信息和深入的医疗数据分析。系统采用 Hadoop 平台进行大规模数据存储和处理,利用 MapReduce 进行数据分析和处理,通过 Sqoop 实现数据的导入导出,以 Spark…...

数据结构编程实践20讲(Python版)—03栈

本文目录 03 栈 StackS1 说明S2 示例基于列表的实现基于链表的实现 S3 问题:复杂嵌套结构的括号匹配问题求解思路Python3程序 S4 问题:基于栈的阶乘计算VS递归实现求解思路Python3程序 S5 问题:逆波兰表示法(后缀表达式)求值求解思路Python3程…...

【注册/登录安全分析报告:孔夫子旧书网】

前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...

PMP--二模--解题--141-150

文章目录 14.敏捷--创建敏捷环境--团队构成--混合项目环境,通常是自组织团队,即团队成员自己决定谁做什么,而不是项目经理决定。易混--常见场景--一个新人加入141、 [单选] 在一个混合项目的执行过程中,不得不更换一个开发人员。新…...

我的领域-关怀三次元成长的二次元虚拟陪伴 | OPENAIGC开发者大赛高校组AI创作力奖

在第二届拯救者杯OPENAIGC开发者大赛中,涌现出一批技术突出、创意卓越的作品。为了让这些优秀项目被更多人看到,我们特意开设了优秀作品报道专栏,旨在展示其独特之处和开发者的精彩故事。 无论您是技术专家还是爱好者,希望能带给…...

个人账号(学校+个人)申请专利过程中遇见的问题

一、请指定一位申请人作为代表人 因为是拿个人账号申请的专利,同时要求学校是第一申请人,所以可以再添加一个第二申请人,然后勾选第二申请人为代表人就可以提交申请了(注意:两个申请人只能减免75%,也就是要…...

在ubuntu系统中,如何让其按下物理关机键时,系统不处理,但qt程序能检测到关机键按下的事件,并处理信号

要让 Ubuntu 系统在按下物理关机键时,系统不直接处理该事件,但让你的 Qt 程序能够检测到并处理关机键的按下事件,可以参考以下步骤: 1. 禁用系统对关机键的默认处理 Ubuntu 系统默认会捕获电源键的按下事件并执行关机操作。首先你…...

先进制造aps专题二十六 基于强化学习的人工智能ai生产排程aps模型简介

基于强化学习的人工智能ai生产排程模型简介 人工智能ai能不能做生产排程? 答案是肯定的。 ai的算法分两类,一类是学习,一类是搜索。 而生产排程问题,它是一个搜索问题,本质上,它和下围棋是一样的 我们…...

各领域/行业硬件一览表

专班硬件装备制造agv小车、机械臂、PDA、服务器、大屏、扭矩传感器、温湿度检测仪、粉尘传感器、陀螺仪传感器、3D打印设备、在线质量检测仪器、新能源水表、电表、气表、汽表、服务器、大屏、温度传感器、压力传感器、光照度传感器、RTU医药化工温湿度传感器、压力传感器、流量…...

机器学习-SVM

线性感知机分类 支持向量机 线性感知机(Perceptron) 感知机是线性二值分类器。 注意:什么是线性?线性分割面就是,就是在分割面中,任意两个的连线也在分割面中,这个分割面,就是线…...

翻译器在线翻译:开启多语言交流新时代

随着国际交流、商务合作、文化交融以及互联网的飞速发展,人们对于跨越语言鸿沟的需求日益迫切。翻译工具成为了我们必备的一个工具,这篇文章我们一起来探讨一些好用的翻译器在线翻译工具吧。 1.在线福昕翻译 链接直达>>https://fanyi.pdf365.cn/…...

网络编程(10)——json序列化

十、day10 今天学习如何使用jsoncpp将json数据解析为c对象,将c对象序列化为json数据。jsoncp经常在网络通信中使用,也就是服务器和客户端的通信一般使用json(可视化好);而protobuf一般在服务器之间的通信中使用 json…...

基于FreeRTOS的STM32多功能手表设计

在智能穿戴设备迅速发展的今天,多功能手表因其便携性和实用性而受到广泛关注。本项目旨在设计一款基于FreeRTOS操作系统的STM32多功能手表,通过实时多任务处理,实现时间显示、多级菜单、万年历、模拟手电筒、温湿度显示、电子闹钟和设置等功能…...

macOS高效录屏工具实战指南:从入门到专业的QuickRecorder应用技巧

macOS高效录屏工具实战指南:从入门到专业的QuickRecorder应用技巧 【免费下载链接】QuickRecorder A lightweight screen recorder based on ScreenCapture Kit for macOS / 基于 ScreenCapture Kit 的轻量化多功能 macOS 录屏工具 项目地址: https://gitcode.com…...

2026-3-26、可变字符串类型StringBuilder

*为什么使用StringBuiler: string是不可变字符串类型,意味着一旦修改就无法修改: string s "Hello"; s s " World"; // 看起来是修改,实际上是创建了新对象// 原来的"Hello"对象还在内存中stri…...

从国科大NLP课程笔记出发:手把手教你用Python复现CYK句法分析算法

从理论到实践:用Python实现CYK句法分析算法的完整指南 在自然语言处理领域,句法分析是理解句子结构的关键步骤。CYK算法作为一种经典的句法分析技术,因其简洁高效的特点,成为许多NLP工程师工具箱中的必备武器。本文将带你从零开始…...

国产操作系统安全实战:用银河麒麟KYSEC防护关键文件的5种典型场景

国产操作系统安全实战:银河麒麟KYSEC防护关键文件的5种典型场景 在数字化转型浪潮中,企业核心数据资产的安全防护已成为技术团队的头等大事。想象一下:财务系统的敏感账目被误删、研发代码遭恶意篡改、数据库凭证意外泄露...这些场景轻则造成…...

【开源鸿蒙Flutter跨平台开发实战复盘】从零到一:GitCode口袋工具项目构建全记录

1. 环境搭建:从零开始的跨平台开发之旅 作为一个有Android开发背景但完全没接触过Flutter的开发者,我最初面对开源鸿蒙和Flutter跨平台开发时也是一头雾水。环境搭建这个看似简单的第一步,就让我深刻体会到"万事开头难"的含义。 在…...

FINCH聚类算法实战:5分钟搞定无参数聚类(附Python代码)

FINCH聚类算法实战:5分钟搞定无参数聚类(附Python代码) 在数据科学和机器学习领域,聚类分析一直是探索性数据分析的重要工具。传统聚类方法如K-means、DBSCAN等虽然广泛应用,但都面临一个共同挑战:需要人工…...

HunyuanVideo-Foley效果展示:AI生成ASMR触发音、白噪音与专注背景音

HunyuanVideo-Foley效果展示:AI生成ASMR触发音、白噪音与专注背景音 1. 核心能力概览 HunyuanVideo-Foley是一款专为音效生成优化的AI模型,能够根据文字描述自动生成高质量的音频内容。基于RTX 4090D 24GB显存深度优化,该镜像提供了开箱即用…...

基于Vue的沧交食堂食品监管系统[vue]-计算机毕业设计源码+LW文档

摘要:本文阐述了一个基于Vue框架开发的沧交食堂食品监管系统。该系统旨在借助现代Web技术,强化对沧交食堂食品安全的监管力度,提升监管效率与质量。系统涵盖了系统用户管理、新闻数据管理、食品相关业务管理以及评论管理等多方面功能。文章详…...

5分钟搞定OpenClaw+nanobot:超轻量级AI助手一键部署指南

5分钟搞定OpenClawnanobot:超轻量级AI助手一键部署指南 1. 为什么选择OpenClawnanobot组合 上周我在整理电脑上的项目文档时,突然意识到自己每天要重复执行大量机械操作:查找文件、转换格式、汇总数据。作为独立开发者,这些琐事…...

Postiz消息队列:任务优先级与重试机制的终极指南

Postiz消息队列:任务优先级与重试机制的终极指南 【免费下载链接】clickvote Add upvotes, likes, and reviews to any context ⭐️ 项目地址: https://gitcode.com/GitHub_Trending/cl/clickvote Postiz是一款功能强大的开源项目,专注于为开发者…...