部署kafka并通过python操作
目录
- 一、安装JDK1.8
- 1、检查服务器是否已安装JDK
- 2、若已安装JDK,进行卸载
- 3、更新yum源
- 4、搜索JDK1.8安装包
- 5、安装JDK1.8
- 6、查看是否安装成功
- 7、配置环境变量
- 二、安装Kafka
- 1、下载并解压kafka部署包至/usr/local/目录
- 2、修改server.properties
- 3、修改/etc/profile
- 4、执行/etc/profile
- 5、启动kafka
- 6、topic管理
- 7、生产者管理
- 8、消费者管理
- 9、查看group-id
- 三、python连接kafka
- 1、安装kafka-python
- 2、消费消息
- 3、生产数据
Kafka的安装需要依赖于jdk和zookeeper。(kafka 2.11-1.1.0版本才与JDK1.7兼容,更高版本需要JDK1.8);
2.8之前版本的Kafka需要单独下载zookeeper,2.8及之后的Kafka已经内置了一个zookeeper环境,无需单独下载;
一、安装JDK1.8
1、检查服务器是否已安装JDK
rpm -qa |grep java
rpm -qa |grep jdk
rpm -qa |grep gcj
2、若已安装JDK,进行卸载
rpm -qa | grep java | xargs rpm -e --nodeps
3、更新yum源
yum update -y
4、搜索JDK1.8安装包
yum list java-1.8*
5、安装JDK1.8
yum install java-1.8.0-openjdk* -y
6、查看是否安装成功
java -version
7、配置环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
export PATH=${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH
二、安装Kafka
1、下载并解压kafka部署包至/usr/local/目录
tar -zxvf kafka_2.12-3.1.1.tgz -C /usr/local/
2、修改server.properties
vim /usr/local/kafka_2.12-3.1.1/config/server.properties修改以下内容
listeners=PLAINTEXT://192.168.15.128:9092
advertised.listeners=PLAINTEXT://192.168.15.128:9092
log.dirs=/data/kafka/logs
zookeeper.connect=localhost:2181(local改成192.168.15.128会报错[2024-12-03 11:17:06,427] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient))
3、修改/etc/profile
vim /etc/profile新增:
export KAFKA_HOME=/usr/local/kafka_2.12-3.1.1
export PATH=$KAFKA_HOME/bin:$PATH
4、执行/etc/profile
source /etc/profile
5、启动kafka
先启动zookeeper
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties查看是否启动
netstat -tuln | grep 2181再启动kafka
/usr/local/kafka_2.12-3.1.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.1/config/server.properties查看是否启动
netstat -tuln | grep 9092
jps #有kafka则为启动后台启动
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/server.properties
6、topic管理
1. 创建topic
# replication-factor指定副本因子。注意:指定副本因子的时候,不能大于broker实例个数,否则报错
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest # 旧版本创建方式,新版本只有--bootstrap-server 一种创建topic的方式
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --create --bootstrap-server 192.168.15.128:9092 --replication-factor 1 --partitions 1 --topic demo2. 查询topic详情
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --describe --bootstrap-server 192.168.15.128:9092 --topic demo3. 查询所有topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --list4. 修改topic参数配置
# 注意:partition个数count,只能增加,不能减少
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --alter --topic mytest --parti-tions count5. 删除topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --delete --topic mytest5.1 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
输入如下命令查看:
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rm -r /brokers/topics/【topic name】即可,此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失
7、生产者管理
# 新起一个终端,进入kafka解压目录后,输入如下命令。在执行完毕后会进入的编辑器页面,此时任意编辑一个消息之后,消费者那边的终端可以看到,终端中已经打印出了我们刚才发送的消息
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.128:9092 --topic demo
8、消费者管理
1.创建消费者(有非必须参数,分区与consumer之间的关系:一个分区不能分给两个consumer,但是两个分区可以分给一个consumer)
# 下面的命令可以创建一个用于消费topic为mytest的消费者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --from-beginning --group testgroup2.从尾部开始取数据,必需要指定分区(指定分区)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 03.从尾部开始取数据,必需要指定分区(取指定个数)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 0 --max-messages 1
9、查看group-id
kafka-consumer-groups.sh --bootstrap-server 192.168.15.128:9092 --list
三、python连接kafka
1、安装kafka-python
pip3 install kafka-python-ng
2、消费消息
from kafka import KafkaConsumerkafka_broker = '192.168.15.128:9092' # 替换为虚拟机的IP和端口# 创建Kafka消费者
consumer = KafkaConsumer('demo', bootstrap_servers=[kafka_broker])for message in consumer:print(message.value)
3、生产数据
import json
from kafka import KafkaProducer# 指定Kafka代理地址,格式为"host:port"
kafka_broker = '192.168.15.128:9092' # 替换为虚拟机的IP和端口# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=[kafka_broker])
# 发送10条消息
for i in range(10):# 创建一个字典,然后使用json.dumps()将其转换为JSON格式的字符串,并编码为字节串message = json.dumps({'name': 'kafka', 'index': i}).encode('utf-8')producer.send('demo', message)# 如果你需要打印消息内容,可以解码字节串并打印print(message.decode('utf-8'))# 确保所有消息都已发送
producer.flush()
相关文章:

部署kafka并通过python操作
目录 一、安装JDK1.81、检查服务器是否已安装JDK2、若已安装JDK,进行卸载3、更新yum源4、搜索JDK1.8安装包5、安装JDK1.86、查看是否安装成功7、配置环境变量 二、安装Kafka1、下载并解压kafka部署包至/usr/local/目录2、修改server.properties3、修改/etc/profile4…...
【JAVA】Java高级:数据库监控与调优:SQL调优与执行计划的分析
作为Java开发工程师,理解SQL调优和执行计划的分析是至关重要的。这不仅可以帮助我们提高数据库查询的效率,还能减少系统资源的消耗,提升整体应用的性能。 1. SQL调优的重要性 随着数据量的增加和用户请求的增多,数据库的性能问题…...

【单片机开发】MCU三种启动方式(Boot选择)[主Flash/系统存储器(BootLoader)/嵌入式SRAM]
目录 参考资料: 利用 Boot 选择不同的启动方式: 单片机的存储结构(主 FLASH/系统存储器/嵌入式 SRAM): 1. Cortex-M 内核芯片——启动原理: 1.1. 启动流程: 1.2. 根据单片机的存储器映射和架构图:启动…...

跨库移植 SQL
背景 应用程序可能要基于不同数据库工作,各种数据库的 SQL 语法大体一致,但仍有些差别,结果就要改造这些 SQL,而这事通常只能手工调整,工作量大还容易出错。 完全自动改造 SQL 几乎是无法做到的,毕竟各种…...

(软件测试文档大全)测试计划,测试报告,测试方案,压力测试报告,性能测试,等保测评,安全扫描测试,日常运维检查测试,功能测试等全下载
1. 引言 1.1. 编写目的 1.2. 项目背景 1.3. 读者对象 1.4. 参考资料 1.5. 术语与缩略语 2. 测试策略 2.1. 测试完成标准 2.2. 测试类型 2.2.1. 功能测试 2.2.2. 性能测试 2.2.3. 安全性与访问控制测试 2.3. 测试工具 3. 测试技术 4. 测试资源 4.1. 人员安排 4.2. 测试环境 4.2.…...

Vue前端开发-路由跳转及带参数跳转
在Vue 3中,由于没有实例化对象this,因此,无法通过this去访问 $route对象,而是通过导入一个名为 useRouter 的方法,执行这个方法后,返回一个路由对象,通过这个路由对象就可以获取到当前路由中的信…...
服务器上安装 Node.js
在服务器上安装 Node.js 的过程根据你使用的操作系统和环境可能会有所不同。以下是一些常见的 Linux 发行版(如 Ubuntu 或 CentOS)上的安装步骤。 在基于 Red Hat/CentOS 的系统上安装 Node.js 设置 EPEL 仓库 如果没有启用 EPEL (Extra Packages for E…...

在阿里云/Linux环境搭建Gitblit服务
在阿里云/Linux环境搭建Gitblit服务 1. 整体描述2. 前期准备3. 安装步骤3.1 下载gitblit3.2 上传gitblit3.3 解压文件3.4 修改文件配置3.5 启动gitblit3.6 安全组配置 4. 总结 1. 整体描述 前段时间买了一个阿里云服务器,2核2G,3M固定带宽的配置&#x…...

MicroBlaze软核开发(二):GPIO
实现功能:使用 MicroBlaze软核,配置GPIO用拨码开关控制LED灯 Vivado版本:2018.3 目录 引言 vivado部分: 一、配置GPIO 二、生成HDL文件编译 SDK部分: 一、导出硬件启动SDK 二、新建应用程序工程 三、编写程序代…...

threejs相机辅助对象cameraHelper
为指定相机创建一个辅助对象,显示这个相机的视锥。 想要在场景里面显示相机的视锥,需要创建两个相机。 举个例子,场景中有个相机A,想要显示相机A的视锥,那么需要一个相机B,把B放在A的后面,两个…...

Luma 视频生成 API 对接说明
Luma 视频生成 API 对接说明 随着 AI 的应用变广,各类 AI 程序已逐渐普及。AI 已逐渐深入到人们的工作生活方方面面。而 AI 涉及的行业也越来越多,从最初的写作,到医疗教育,再到现在的视频。 Luma 是一个专业高质量的视频生成平…...

服务器数据恢复—EVA存储硬盘磁头和盘片损坏离线的数据恢复案例
服务器存储数据恢复环境&故障: 一台HP EVA存储中有23块硬盘,挂接到一台windows server操作系统的服务器。 EVA存储上有三个硬盘指示灯亮黄灯,此刻存储还能正常使用。管理员在更换硬盘的过程中,又出现一块硬盘对应的指示灯亮黄…...
【Python】深入探索Python类型检查:掌握 `typing` 模块的高级用法
解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 随着代码复杂度的增加,类型安全性在现代开发中变得尤为重要。Python自3.5引入类型提示(type hints),为开发者提供了静态类型检查的能力,而typing模块则是这一系统的核心。本篇文章深入研究Python的类型提示及…...

Android学习15--charger
1 概述 最近正好在做关机充电这个,就详细看看吧。还是本着保密的原则,项目里的代码也不能直接用,这里就用的Github的。https://github.com/aosp-mirror 具体位置是:https://github.com/aosp-mirror/platform_system_core/tree/mai…...

顶会新宠!KAN-LSTM完美融合新方案
2024深度学习发论文&模型涨点之——KANLSTM KAN-LSTM混合预测模型是一种结合了自注意力机制(KAN, Key-attention network)和长短时记忆网络(LSTM)的深度学习模型,主要用于序列数据的预测任务,如时间序…...
JS中对象的浅拷贝,深拷贝和引用
JS中对象的浅拷贝,深拷贝和引用 浅拷贝和深拷贝的区别主要在于它们如何处理引用类型的数据(如数组和对象),而引用简而言之就是换了个变量名。 浅拷贝 引用:浅拷贝只复制对象的第一层属性,对于嵌套的对象或…...
思普企业运营平台 idsCheck Sql注入漏洞复现
0x01 产品描述: 思普企业运营平台是由贵阳思普信息技术有限公司自主研发的国内首款投融建管营云服务平台——...

FSWIND脉动风-风载时程生成器软件下载、安装及注册
1、软件下载 点击文末超链接下载 2、软件安装 以下操作,若被电脑杀毒软件提示风险,请加入白名单,软件无任何病毒和后台,请放心使用! 1)双击Fswind_setup.exe,启动安装程序 2)、点…...
spring通过RequestContextHolder获取HttpServletRequest对象
1.获取HttpServletRequest对象方法: public static HttpServletRequest getRequest() {ServletRequestAttributes attributes ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes());assert attributes ! null;return attributes.getRequest(…...

STM32编码器接口及编码器测速模板代码
编码器是什么? 编码器是一种将角位移或者角速度转换成一连串电数字脉冲的旋转式传感 器,我们可以通过编码器测量到底位移或者速度信息。编码器从输出数据类型上 分,可以分为增量式编码器和绝对式编码器。 从编码器检测原理上来分࿰…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...