当前位置: 首页 > news >正文

Python连接Kafka收发数据等操作

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode('utf-8'))producer.flush()if __name__ == '__main__':# 创建'test'主题topic = 'test'# 发送消息i = 1while True:message = {'num': i, 'msg': f'Hello Kafka {i}'}send_message(topic, message)i += 1time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaConsumer实例
consumer = KafkaConsumer('test',bootstrap_servers=bootstrap_servers,auto_offset_reset='latest',  # 从最新的消息开始消费# auto_offset_reset='earliest',  # 从最早的offset开始消费enable_auto_commit=True,  # 自动提交offsetgroup_id='my-group'  # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message = json.loads(message.value.decode('utf-8'))print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)# 删除主题
admin_client.delete_topics(topics=['test-topic'])

相关文章:

Python连接Kafka收发数据等操作

目录 一、Kafka 二、发送端(生产者) 三、接收端(消费者) 四、其他操作 一、Kafka Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构…...

MySql在更新操作时引入“两阶段提交”的必要性

日志模块有两个redo log和binlog,redo log 是引擎层的日志(负责存储相关的事),binlog是在Server层,主要做MySQL共嗯那个层面的事情。redo log就像一个缓冲区,可以让当更新操作的时候先放redo log中&#xf…...

充气模块方案——无刷充气泵pcba方案

在方案开发中,充气效率是无刷充气泵PCBA方案开发中的关键问题。一般通过优化电路设计和控制算法,可以实现高效的气体压缩和快速的充气效果。另外,选择合理的电机驱动器和传感器等元器件能够提高打气泵的功率和效率,减少充气时间&a…...

[sql-03] 求阅读至少两章的人数

准备数据 CREATE TABLE book_read (bookid varchar(150) NOT NULL COMMENT 书籍ID,username varchar(150) DEFAULT NULL COMMENT 用户名,seq varchar(150) comment 章节ID ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT 用户阅读表insert into book_read values(《太子日子》…...

Linux如何通过链接下载文件

在Linux系统中,你可以通过多种方式通过链接下载文件。这些方式包括使用命令行工具(如wget、curl、axel等)和图形界面程序(如浏览器或文件管理器)。以下是几种常用的命令行方法: 1. 使用wget wget是一个非交…...

seL4 IPC(五)

官网链接:link 求解 代码中的很多方法例如这一个教程里面的seL4_GetMR(0),我在官方给的手册和API中都搜不到,想问一下大家这些大家都是在哪里搜的!! IPC seL4中的IPC和一般OS中讲的IPC概念相差比较大,根…...

【Java】多线程基础操作

多线程基础操作 Thread类回顾Thread类观察线程运行线程的休眠常用方法构造方法属性获取方法 中断线程线程状态线程等待 初识synchronized问题引入初步使用初步了解可重入锁死锁 volatile问题引入初步使用volatile 与 synchronized 线程顺序控制初步了解wait()notify()防止线程饿…...

基于Hive和Hadoop的病例分析系统

本项目是一个基于大数据技术的医疗病历分析系统,旨在为用户提供全面的病历信息和深入的医疗数据分析。系统采用 Hadoop 平台进行大规模数据存储和处理,利用 MapReduce 进行数据分析和处理,通过 Sqoop 实现数据的导入导出,以 Spark…...

数据结构编程实践20讲(Python版)—03栈

本文目录 03 栈 StackS1 说明S2 示例基于列表的实现基于链表的实现 S3 问题:复杂嵌套结构的括号匹配问题求解思路Python3程序 S4 问题:基于栈的阶乘计算VS递归实现求解思路Python3程序 S5 问题:逆波兰表示法(后缀表达式)求值求解思路Python3程…...

【注册/登录安全分析报告:孔夫子旧书网】

前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...

PMP--二模--解题--141-150

文章目录 14.敏捷--创建敏捷环境--团队构成--混合项目环境,通常是自组织团队,即团队成员自己决定谁做什么,而不是项目经理决定。易混--常见场景--一个新人加入141、 [单选] 在一个混合项目的执行过程中,不得不更换一个开发人员。新…...

我的领域-关怀三次元成长的二次元虚拟陪伴 | OPENAIGC开发者大赛高校组AI创作力奖

在第二届拯救者杯OPENAIGC开发者大赛中,涌现出一批技术突出、创意卓越的作品。为了让这些优秀项目被更多人看到,我们特意开设了优秀作品报道专栏,旨在展示其独特之处和开发者的精彩故事。 无论您是技术专家还是爱好者,希望能带给…...

个人账号(学校+个人)申请专利过程中遇见的问题

一、请指定一位申请人作为代表人 因为是拿个人账号申请的专利,同时要求学校是第一申请人,所以可以再添加一个第二申请人,然后勾选第二申请人为代表人就可以提交申请了(注意:两个申请人只能减免75%,也就是要…...

在ubuntu系统中,如何让其按下物理关机键时,系统不处理,但qt程序能检测到关机键按下的事件,并处理信号

要让 Ubuntu 系统在按下物理关机键时,系统不直接处理该事件,但让你的 Qt 程序能够检测到并处理关机键的按下事件,可以参考以下步骤: 1. 禁用系统对关机键的默认处理 Ubuntu 系统默认会捕获电源键的按下事件并执行关机操作。首先你…...

先进制造aps专题二十六 基于强化学习的人工智能ai生产排程aps模型简介

基于强化学习的人工智能ai生产排程模型简介 人工智能ai能不能做生产排程? 答案是肯定的。 ai的算法分两类,一类是学习,一类是搜索。 而生产排程问题,它是一个搜索问题,本质上,它和下围棋是一样的 我们…...

各领域/行业硬件一览表

专班硬件装备制造agv小车、机械臂、PDA、服务器、大屏、扭矩传感器、温湿度检测仪、粉尘传感器、陀螺仪传感器、3D打印设备、在线质量检测仪器、新能源水表、电表、气表、汽表、服务器、大屏、温度传感器、压力传感器、光照度传感器、RTU医药化工温湿度传感器、压力传感器、流量…...

机器学习-SVM

线性感知机分类 支持向量机 线性感知机(Perceptron) 感知机是线性二值分类器。 注意:什么是线性?线性分割面就是,就是在分割面中,任意两个的连线也在分割面中,这个分割面,就是线…...

翻译器在线翻译:开启多语言交流新时代

随着国际交流、商务合作、文化交融以及互联网的飞速发展,人们对于跨越语言鸿沟的需求日益迫切。翻译工具成为了我们必备的一个工具,这篇文章我们一起来探讨一些好用的翻译器在线翻译工具吧。 1.在线福昕翻译 链接直达>>https://fanyi.pdf365.cn/…...

网络编程(10)——json序列化

十、day10 今天学习如何使用jsoncpp将json数据解析为c对象,将c对象序列化为json数据。jsoncp经常在网络通信中使用,也就是服务器和客户端的通信一般使用json(可视化好);而protobuf一般在服务器之间的通信中使用 json…...

基于FreeRTOS的STM32多功能手表设计

在智能穿戴设备迅速发展的今天,多功能手表因其便携性和实用性而受到广泛关注。本项目旨在设计一款基于FreeRTOS操作系统的STM32多功能手表,通过实时多任务处理,实现时间显示、多级菜单、万年历、模拟手电筒、温湿度显示、电子闹钟和设置等功能…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage)&#xff1a…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中,JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作(如 Promise、async/await 等),开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝(r…...

git: early EOF

macOS报错: Initialized empty Git repository in /usr/local/Homebrew/Library/Taps/homebrew/homebrew-core/.git/ remote: Enumerating objects: 2691797, done. remote: Counting objects: 100% (1760/1760), done. remote: Compressing objects: 100% (636/636…...

从零开始了解数据采集(二十八)——制造业数字孪生

近年来,我国的工业领域正经历一场前所未有的数字化变革,从“双碳目标”到工业互联网平台的推广,国家政策和市场需求共同推动了制造业的升级。在这场变革中,数字孪生技术成为备受关注的关键工具,它不仅让企业“看见”设…...

前端调试HTTP状态码

1xx(信息类状态码) 这类状态码表示临时响应,需要客户端继续处理请求。 100 Continue 服务器已收到请求的初始部分,客户端应继续发送剩余部分。 2xx(成功类状态码) 表示请求已成功被服务器接收、理解并处…...

Yii2项目自动向GitLab上报Bug

Yii2 项目自动上报Bug 原理 yii2在程序报错时, 会执行指定action, 通过重写ErrorAction, 实现Bug自动提交至GitLab的issue 步骤 配置SiteController中的actions方法 public function actions(){return [error > [class > app\helpers\web\ErrorAction,],];}重写Error…...