Kafka的安装与使用(windows下python使用等)
一、下载
可以去官网下载:https://kafka.apache.org/downloads
版本可选择,建议下载比较新的,新版本里面自带zookeeper

二、安装
创建一个目录,此处是D:\kafka,将文件放进去解压
如果文件后缀是gz,解压后没有文件夹,此时需要先将文件后缀修改为tgz,然后再解压

进入kafka目录,创建一个data和log目录,用作zookeeper和kafka的数据和日志目录

三、修改配置
1.修改zookeeper的配置,使用编译器打开 config文件夹下面的zookeeper.properties文件,然后修改
dataDir,这个值改为自己刚才创建的data目录地址,使用\\连接
clientPort=2181

修改后,确认保存
2.修改kafka配置文件,打开config文件夹下的server.properties文件,修改内容
port,9092是默认的端口号
host.name,修改为自己的IP,一般是本机或者局域网IP
listeners,修改为自己的IP和端口
log.dirs,填写为自己刚才创建的log文件夹,\\连接
zookeeper.connect=127.0.0.1:2181,这个根据自己情况写IP和端口,上面配置的



四、启动服务
1.启动zookeeper服务
先进入kafka解压后的根目录,然后在cmd里面执行如下命令:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2.启动kafka
先进入kafka解压后的根目录,然后在cmd里面执行如下命令:
bin\windows\kafka-server-start.bat config\server.properties

温馨提示:如果kafka没有正常关闭,可能下一次启动就会报错,可以删除data,log和logs目录里面的内容之后,再从启动zookeeper开始往下走
3.创建topic
创建一个test_topic
bin\windows\kafka-topics.bat --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

4.查看topic列表
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

5.启动producer生产者
如果使用代码去创建生产者和消费者此步骤不需要
假设你要向名为 test_topic 的主题发送消息,并且 Kafka 代理地址是 localhost:9092,可以运行以下命令:
.\bin\windows\kafka-console-producer.bat --topic test_topic --bootstrap-server localhost:9092

启动后进入输入模式,等待生产者输入,输入测试的123,234

6.启动customer消费者
如果使用代码去创建生产者和消费者此步骤不需要
.\bin\windows\kafka-console-consumer.bat --topic test_topic --bootstrap-server localhost:9092 --from-beginning
启动后,可以看到生产者发送的内容:

五、Python代码实现生产者和消费者
pip install kafka-python
模拟生产者代码,通过输入进行内容控制:
import uuidfrom kafka import KafkaProducer
import json
import time# 配置 Kafka 生产者
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8')
)# 要发送的主题名称
topic = 'test_topic'try:while True:content = input("内容:")message = {'id': uuid.uuid4().hex,'message': f'{content}'}# 发送消息future = producer.send(topic, value=message)# 等待消息发送结果result = future.get(timeout=10)print(f"Sent message {content} to partition {result.partition} at offset {result.offset}")except Exception as e:print(f"Error sending message: {e}")
finally:# 刷新缓冲区并关闭生产者producer.flush()producer.close()
消费者模拟:
from kafka import KafkaConsumer
import json# 配置 Kafka 消费者
consumer = KafkaConsumer('test_topic',bootstrap_servers=['127.0.0.1:9092'],value_deserializer=lambda m: json.loads(m.decode('utf-8')),auto_offset_reset='earliest'
)try:# 持续消费消息for message in consumer:print(f"Received message: {message.value} from partition {message.partition} at offset {message.offset}")
except KeyboardInterrupt:print("Consumer interrupted by user.")
finally:# 关闭消费者consumer.close()
在生产者输入 hello,yes进行测试:
在消费者代码出进行获取:

注意消费者的message本身就是一个可迭代对象,是无穷尽的。
并且auto_offset_reset 参数控制了是从第一个开始获取还是从接入的时候再算起,移除参数就代表从接入开始获取message里面的数据,如果是 earliest 就会从第一个开始获取,即使已经处理了!
那么,如果是消费者掉线,生产者在掉线期间新增了若干条数据,如何让消费者上线后从没有处理的数据开始处理呢?可以给消费者设置三个参数并且手动commit:
auto_offset_reset='earliest', # 从最新的消息位置开始消费 enable_auto_commit=False, # 开启自动提交偏移量 group_id='aaa'
参考代码:
from kafka import KafkaConsumer
import json# 配置 Kafka 消费者
consumer = KafkaConsumer('test_topic',bootstrap_servers=['127.0.0.1:9092'],value_deserializer=lambda m: json.loads(m.decode('utf-8')),auto_offset_reset='earliest', # 从最新的消息位置开始消费enable_auto_commit=False, # 关闭自动提交偏移量group_id='aaa'
)try:# 持续消费消息for message in consumer:print(f"Received message: {message.value} from partition {message.partition} at offset {message.offset}")consumer.commit() # 手动提交偏移量
except KeyboardInterrupt:print("Consumer interrupted by user.")
finally:# 关闭消费者consumer.close()
相关文章:
Kafka的安装与使用(windows下python使用等)
一、下载 可以去官网下载:https://kafka.apache.org/downloads 版本可选择,建议下载比较新的,新版本里面自带zookeeper 二、安装 创建一个目录,此处是D:\kafka,将文件放进去解压 如果文件后缀是gz,解压…...
DataPermissionInterceptor源码解读
原文首发在我的博客:https://blog.liuzijian.com/post/mybatis-plus-source-data-permission-interceptor.html 目录 一、概述二、源码解读2.1 beforeQuery2.2 beforePrepare2.3 processSelect2.4 setWhere2.5 processUpdate2.6 processDelete2.7 getUpdateOrDelete…...
大模型中的参数规模与显卡匹配
在大模型训练和推理中,显卡(GPU/TPU)的选择与模型参数量紧密相关,需综合考虑显存、计算能力和成本。以下是不同规模模型与硬件的匹配关系及优化策略: 一、参数规模与显卡匹配参考表 模型参数量训练阶段推荐显卡推理阶…...
数据结构初阶: 顺序表的增删查改
顺序表 概念 顺序表是⽤⼀段物理地址连续的存储单元依次存储数据元素的线性结构,⼀般情况下采⽤数组存储。如图1: 顺序表和数组有什么区别? 顺序表的底层是用数组实现的,是对数组的封装,实现了增删查改等接口。 分…...
Spring Boot项目中策略模式的应用与实现
前言 在Spring Boot项目中,策略模式是一种非常重要的设计模式,它能够让我们定义一系列算法,并使它们可以互相替换。 策略模式通过将算法封装到独立的类中,从而使得代码中的算法可以独立于使用它的客户端变化。 这对于某些需求频…...
【机器学习中的基本术语:特征、样本、训练集、测试集、监督/无监督学习】
机器学习基本术语详解 1. 特征(Feature) 定义:数据的属性或变量,用于描述样本的某个方面。作用:模型通过学习特征与目标之间的关系进行预测。示例: 预测房价时,特征可以是 面积、地段、房龄。…...
MySQL全链路指南
目录 前言 第一章 MySQL基础入门 1.1 MySQL简介与安装 1.2 数据库基本操作 1.3 表结构与数据类型 第二章 SQL语言深度解析 2.1 DDL(数据定义语言) 2.2 DML(数据操作语言) 2.3 DQL(数据查询语言) 2…...
System.arraycopy()
在 Java 编程中,数组是一种常用的数据结构,用于存储相同类型的元素集合。在处理数组时,经常需要进行数组复制操作,例如将一个数组的部分或全部元素复制到另一个数组中。System.arraycopy() 方法是 Java 提供的一个高效的数组复制工…...
详解AI采集框架Crawl4AI,打造智能网络爬虫
大家好,Crawl4AI作为开源Python库,专门用来简化网页爬取和数据提取的工作。它不仅功能强大、灵活,而且全异步的设计让处理速度更快,稳定性更好。无论是构建AI项目还是提升语言模型的性能,Crawl4AI都能帮您简化工作流程…...
【爬虫开发】爬虫开发从0到1全知识教程第14篇:scrapy爬虫框架,介绍【附代码文档】
本教程的知识点为:爬虫概要 爬虫基础 爬虫概述 知识点: 1. 爬虫的概念 requests模块 requests模块 知识点: 1. requests模块介绍 1.1 requests模块的作用: 数据提取概要 数据提取概述 知识点 1. 响应内容的分类 知识点:…...
MySQL索引原理:从B+树手绘到EXPLAIN
最近在学后端,学到了这里做个记录 一、为什么索引像书的目录? 类比:500页的技术书籍 vs 10页的目录缺点:全表扫描就像逐页翻找内容优点:索引将查询速度从O(n)提升到O(log n) 二、B树手绘课堂 1. 结构解剖࿰…...
SQLark:一款国产免费数据库开发和管理工具
SQLark(百灵连接)是一款面向信创应用开发者的数据库开发和管理工具,用于快速查询、创建和管理不同类型的数据库系统,目前可以支持达梦数据库、Oracle 以及 MySQL。 对象管理 SQLark 支持丰富的数据库对象管理功能,包括…...
防爆对讲机VS非防爆对讲机,如何选择?
在通信设备的广阔市场中,对讲机以其高效、便捷的特点,成为众多行业不可或缺的沟通工具。而面对防爆对讲机与非防爆对讲机,许多用户常常陷入选择困境。究竟该如何抉择,且听我为您细细道来。 防爆对讲机,专为危险作业场…...
微信小程序开发:开发实践
微信小程序开发实践研究 摘要 随着移动互联网的迅猛发展,微信小程序作为一种轻量化、无需安装的应用形式,逐渐成为开发者和用户的首选。本文以“个人名片”小程序为例,详细阐述了微信小程序的开发流程,包括需求分析、项目规划、…...
操作 Office Excel 文档类库Excelize
Excelize 是 Go 语言编写的一个用来操作 Office Excel 文档类库,基于 ECMA-376 OOXML 技术标准。可以使用它来读取、写入 XLSX 文件,相比较其他的开源类库,Excelize 支持操作带有数据透视表、切片器、图表与图片的 Excel 并支持向 Excel 中插…...
青铜与信隼的史诗——TCP与UDP的千年博弈
点击下面图片带您领略全新的嵌入式学习路线 🔥爆款热榜 88万阅读 1.6万收藏 第一章 契约之匣与自由之羽 熔岩尚未冷却的铸造台上,初代信使长欧诺弥亚将液态秘银倒入双生模具。左侧模具刻着交握的青铜手掌,右侧则是展开的隼翼纹章。当星辰…...
「青牛科技」GC5849 12V三相无感正弦波电机驱动芯片
芯片描述: • 4 ~ 20V 工作电压, 30V 最大耐压 • 驱动峰值电流 2.0A ,连续电流 800mA 以内 • 芯片内阻: 900mΩ (上桥 下桥) • eSOP-8 封装,底部 ePAD 散热,引…...
Java基础之反射的基本使用
简介 在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意属性和方法;这种动态获取信息以及动态调用对象方法的功能称为Java语言的反射机制。反射让Java成为了一门动…...
大语言模型中的嵌入模型
本教程将拆解什么是嵌入模型、为什么它们在NLP中如此重要,并提供一个简单的Python实战示例。 分词器将原始文本转换为token和ID,而嵌入模型则将这些ID映射为密集向量表示。二者合力为LLMs的语义理解提供动力。图片来源:[https://tzamtzis.gr/2024/coding/tokenization-by-an…...
【从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装
📢博客主页:https://blog.csdn.net/2301_779549673 📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! &…...
位置编码(Positional Encoding, PE)的作用
在神经网络(尤其是Transformer、RNN等序列模型)中,位置编码(Positional Encoding, PE)的作用是为模型提供序列中元素的位置信息,以弥补模型本身对顺序感知的不足。 为什么Transformer需要位置编码…...
开源的 LLM 应用开发平台Dify的安装和使用
文章目录 前提环境应用安装deocker desktop镜像源配置Dify简介Dify本地docker安装Dify安装ollama插件Dify安装硅基流动插件简单应用练习进阶应用练习数据库图像检索与展示助手echart助手可视化 前提环境 Windows环境 docker desktop魔法环境:访问Dify项目ollama电脑…...
从零构建大语言模型全栈开发指南:第五部分:行业应用与前沿探索-5.1.2行业落地挑战:算力成本与数据隐私解决方案
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 从零构建大语言模型全栈开发指南-第五部分:行业应用与前沿探索5.1.2 行业落地挑战:算力成本与数据隐私解决方案1. 算力成本挑战与优化策略1.1 算力成本的核心问题1.2 算力优化技术方案2. 数据隐私挑战…...
NodeJS--NPM介绍使用
1、使用npm install命令安装模块 1.1、本地安装 npm install express 1.2、全局安装 npm install express -g 1.3、本地安装和全局安装的区别...
DeepSeek与ChatGPT的优势对比:选择合适的工具来提升工作效率
选DeepSeek还是ChatGPT?这就像问火锅和披萨哪个香! "到底该用DeepSeek还是ChatGPT?” 这个问题最近在互联网圈吵翻天!其实这就跟选手机系统-样,安卓党iOS党都能说出一万条理由,但真正重要的是你拿它来干啥!&am…...
lib-zo,C语言另一个协程库,sleep协程化,睡眠
lib-zo,C语言另一个协程库,sleep协程化,睡眠 另一个 C 协程库 https://blog.csdn.net/eli960/article/details/146802313 重载了 sleep 函数, 使其支持协程化 另外毫秒单位睡眠函数 void zcoroutine_sleep_millisecond(int milliseconds);例子 #include "coroutine.h…...
25大唐杯赛道一本科B组知识点大纲(下)
5G/6G网络技术知识点(10%) 工程概论及通信工程项目实践(20%) 5G垂直行业应用知识点(20%) ⭐⭐⭐为重点知识,尽量要过一遍哦 大唐杯赛道一国一备赛思路 大唐杯国一省赛回忆录--有付出就会有收…...
Python+Playwright自动化测试-1-环境准备与搭建
1、Playwright 是什么? 微软在 2020 年初开源的新一代自动化测试工具,它的功能类似于 Selenium、Pyppeteer 等,都可以驱动浏览器进行各种自动化操作。它的功能也非常强大,对市面上的主流浏览器都提供了支持,API 功能简…...
生产管理系统如何破解汽车零部件行业追溯难痛点
在汽车零部件制造行业中,生产追溯一直是企业面临的核心挑战之一。随着市场竞争的加剧和客户需求的日益复杂,如何确保产品质量、快速定位问题源头、减少批次性返工,成为了每个企业亟待解决的问题。而生产管理系统,作为智能制造的重…...
【XTerminal】【树莓派】Linux系统下的函数调用编程
目录 一、XTerminal下的Linux系统调用编程 1.1理解进程和线程的概念并在Linux系统下完成相应操作 (1) 进程 (2)线程 (3) 进程 vs 线程 (4)Linux 下的实践操作 1.2Linux的“虚拟内存管理”和stm32正式物理内存(内存映射)的区别 (1)Linux虚拟内存管…...
