当前位置: 首页 > news >正文

部署kafka并通过python操作

目录

      • 一、安装JDK1.8
        • 1、检查服务器是否已安装JDK
        • 2、若已安装JDK,进行卸载
        • 3、更新yum源
        • 4、搜索JDK1.8安装包
        • 5、安装JDK1.8
        • 6、查看是否安装成功
        • 7、配置环境变量
      • 二、安装Kafka
        • 1、下载并解压kafka部署包至/usr/local/目录
        • 2、修改server.properties
        • 3、修改/etc/profile
        • 4、执行/etc/profile
        • 5、启动kafka
        • 6、topic管理
        • 7、生产者管理
        • 8、消费者管理
        • 9、查看group-id
      • 三、python连接kafka
        • 1、安装kafka-python
        • 2、消费消息
        • 3、生产数据

Kafka的安装需要依赖于jdk和zookeeper。(kafka 2.11-1.1.0版本才与JDK1.7兼容,更高版本需要JDK1.8);
2.8之前版本的Kafka需要单独下载zookeeper,2.8及之后的Kafka已经内置了一个zookeeper环境,无需单独下载;

一、安装JDK1.8

1、检查服务器是否已安装JDK
rpm -qa |grep java
rpm -qa |grep jdk
rpm -qa |grep gcj
2、若已安装JDK,进行卸载
rpm -qa | grep java | xargs rpm -e --nodeps
3、更新yum源
yum update -y
4、搜索JDK1.8安装包
yum list java-1.8*
5、安装JDK1.8
yum install java-1.8.0-openjdk* -y
6、查看是否安装成功
java -version
7、配置环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
export PATH=${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH

二、安装Kafka

1、下载并解压kafka部署包至/usr/local/目录
tar -zxvf kafka_2.12-3.1.1.tgz -C /usr/local/
2、修改server.properties
vim /usr/local/kafka_2.12-3.1.1/config/server.properties修改以下内容
listeners=PLAINTEXT://192.168.15.128:9092
advertised.listeners=PLAINTEXT://192.168.15.128:9092
log.dirs=/data/kafka/logs
zookeeper.connect=localhost:2181(local改成192.168.15.128会报错[2024-12-03 11:17:06,427] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient))
3、修改/etc/profile
vim /etc/profile新增:
export KAFKA_HOME=/usr/local/kafka_2.12-3.1.1
export PATH=$KAFKA_HOME/bin:$PATH
4、执行/etc/profile
source /etc/profile
5、启动kafka
先启动zookeeper
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties查看是否启动
netstat -tuln | grep 2181再启动kafka
/usr/local/kafka_2.12-3.1.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.1/config/server.properties查看是否启动
netstat -tuln | grep 9092
jps #有kafka则为启动后台启动
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/server.properties

在这里插入图片描述

在这里插入图片描述

6、topic管理
1. 创建topic
# replication-factor指定副本因子。注意:指定副本因子的时候,不能大于broker实例个数,否则报错
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest  # 旧版本创建方式,新版本只有--bootstrap-server 一种创建topic的方式
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --create --bootstrap-server 192.168.15.128:9092 --replication-factor 1 --partitions 1 --topic demo2. 查询topic详情
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --describe --bootstrap-server 192.168.15.128:9092 --topic demo3. 查询所有topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --list4. 修改topic参数配置
# 注意:partition个数count,只能增加,不能减少
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --alter --topic mytest --parti-tions count5. 删除topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --delete --topic mytest5.1 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
输入如下命令查看:
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rm -r /brokers/topics/【topic name】即可,此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失
7、生产者管理
# 新起一个终端,进入kafka解压目录后,输入如下命令。在执行完毕后会进入的编辑器页面,此时任意编辑一个消息之后,消费者那边的终端可以看到,终端中已经打印出了我们刚才发送的消息
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.128:9092 --topic demo

在这里插入图片描述

8、消费者管理
1.创建消费者(有非必须参数,分区与consumer之间的关系:一个分区不能分给两个consumer,但是两个分区可以分给一个consumer)
# 下面的命令可以创建一个用于消费topic为mytest的消费者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --from-beginning --group testgroup2.从尾部开始取数据,必需要指定分区(指定分区)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 03.从尾部开始取数据,必需要指定分区(取指定个数)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 0 --max-messages 1 
9、查看group-id
kafka-consumer-groups.sh --bootstrap-server 192.168.15.128:9092 --list 

三、python连接kafka

1、安装kafka-python
pip3 install kafka-python-ng

在这里插入图片描述

2、消费消息
from kafka import KafkaConsumerkafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka消费者
consumer = KafkaConsumer('demo', bootstrap_servers=[kafka_broker])for message in consumer:print(message.value)

在这里插入图片描述

3、生产数据
import json
from kafka import KafkaProducer# 指定Kafka代理地址,格式为"host:port"
kafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=[kafka_broker])
# 发送10条消息
for i in range(10):# 创建一个字典,然后使用json.dumps()将其转换为JSON格式的字符串,并编码为字节串message = json.dumps({'name': 'kafka', 'index': i}).encode('utf-8')producer.send('demo', message)# 如果你需要打印消息内容,可以解码字节串并打印print(message.decode('utf-8'))# 确保所有消息都已发送
producer.flush()

在这里插入图片描述

相关文章:

部署kafka并通过python操作

目录 一、安装JDK1.81、检查服务器是否已安装JDK2、若已安装JDK,进行卸载3、更新yum源4、搜索JDK1.8安装包5、安装JDK1.86、查看是否安装成功7、配置环境变量 二、安装Kafka1、下载并解压kafka部署包至/usr/local/目录2、修改server.properties3、修改/etc/profile4…...

【JAVA】Java高级:数据库监控与调优:SQL调优与执行计划的分析

作为Java开发工程师,理解SQL调优和执行计划的分析是至关重要的。这不仅可以帮助我们提高数据库查询的效率,还能减少系统资源的消耗,提升整体应用的性能。 1. SQL调优的重要性 随着数据量的增加和用户请求的增多,数据库的性能问题…...

【单片机开发】MCU三种启动方式(Boot选择)[主Flash/系统存储器(BootLoader)/嵌入式SRAM]

目录 参考资料: 利用 Boot 选择不同的启动方式: 单片机的存储结构(主 FLASH/系统存储器/嵌入式 SRAM): 1. Cortex-M 内核芯片——启动原理: 1.1. 启动流程: 1.2. 根据单片机的存储器映射和架构图:启动…...

跨库移植 SQL

背景 应用程序可能要基于不同数据库工作,各种数据库的 SQL 语法大体一致,但仍有些差别,结果就要改造这些 SQL,而这事通常只能手工调整,工作量大还容易出错。 完全自动改造 SQL 几乎是无法做到的,毕竟各种…...

(软件测试文档大全)测试计划,测试报告,测试方案,压力测试报告,性能测试,等保测评,安全扫描测试,日常运维检查测试,功能测试等全下载

1. 引言 1.1. 编写目的 1.2. 项目背景 1.3. 读者对象 1.4. 参考资料 1.5. 术语与缩略语 2. 测试策略 2.1. 测试完成标准 2.2. 测试类型 2.2.1. 功能测试 2.2.2. 性能测试 2.2.3. 安全性与访问控制测试 2.3. 测试工具 3. 测试技术 4. 测试资源 4.1. 人员安排 4.2. 测试环境 4.2.…...

Vue前端开发-路由跳转及带参数跳转

在Vue 3中,由于没有实例化对象this,因此,无法通过this去访问 $route对象,而是通过导入一个名为 useRouter 的方法,执行这个方法后,返回一个路由对象,通过这个路由对象就可以获取到当前路由中的信…...

服务器上安装 Node.js

在服务器上安装 Node.js 的过程根据你使用的操作系统和环境可能会有所不同。以下是一些常见的 Linux 发行版(如 Ubuntu 或 CentOS)上的安装步骤。 在基于 Red Hat/CentOS 的系统上安装 Node.js 设置 EPEL 仓库 如果没有启用 EPEL (Extra Packages for E…...

在阿里云/Linux环境搭建Gitblit服务

在阿里云/Linux环境搭建Gitblit服务 1. 整体描述2. 前期准备3. 安装步骤3.1 下载gitblit3.2 上传gitblit3.3 解压文件3.4 修改文件配置3.5 启动gitblit3.6 安全组配置 4. 总结 1. 整体描述 前段时间买了一个阿里云服务器,2核2G,3M固定带宽的配置&#x…...

MicroBlaze软核开发(二):GPIO

实现功能:使用 MicroBlaze软核,配置GPIO用拨码开关控制LED灯 Vivado版本:2018.3 目录 引言 vivado部分: 一、配置GPIO 二、生成HDL文件编译 SDK部分: 一、导出硬件启动SDK 二、新建应用程序工程 三、编写程序代…...

threejs相机辅助对象cameraHelper

为指定相机创建一个辅助对象,显示这个相机的视锥。 想要在场景里面显示相机的视锥,需要创建两个相机。 举个例子,场景中有个相机A,想要显示相机A的视锥,那么需要一个相机B,把B放在A的后面,两个…...

Luma 视频生成 API 对接说明

Luma 视频生成 API 对接说明 随着 AI 的应用变广,各类 AI 程序已逐渐普及。AI 已逐渐深入到人们的工作生活方方面面。而 AI 涉及的行业也越来越多,从最初的写作,到医疗教育,再到现在的视频。 Luma 是一个专业高质量的视频生成平…...

服务器数据恢复—EVA存储硬盘磁头和盘片损坏离线的数据恢复案例

服务器存储数据恢复环境&故障: 一台HP EVA存储中有23块硬盘,挂接到一台windows server操作系统的服务器。 EVA存储上有三个硬盘指示灯亮黄灯,此刻存储还能正常使用。管理员在更换硬盘的过程中,又出现一块硬盘对应的指示灯亮黄…...

【Python】深入探索Python类型检查:掌握 `typing` 模块的高级用法

解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 随着代码复杂度的增加,类型安全性在现代开发中变得尤为重要。Python自3.5引入类型提示(type hints),为开发者提供了静态类型检查的能力,而typing模块则是这一系统的核心。本篇文章深入研究Python的类型提示及…...

Android学习15--charger

1 概述 最近正好在做关机充电这个,就详细看看吧。还是本着保密的原则,项目里的代码也不能直接用,这里就用的Github的。https://github.com/aosp-mirror 具体位置是:https://github.com/aosp-mirror/platform_system_core/tree/mai…...

顶会新宠!KAN-LSTM完美融合新方案

2024深度学习发论文&模型涨点之——KANLSTM KAN-LSTM混合预测模型是一种结合了自注意力机制(KAN, Key-attention network)和长短时记忆网络(LSTM)的深度学习模型,主要用于序列数据的预测任务,如时间序…...

JS中对象的浅拷贝,深拷贝和引用

JS中对象的浅拷贝,深拷贝和引用 浅拷贝和深拷贝的区别主要在于它们如何处理引用类型的数据(如数组和对象),而引用简而言之就是换了个变量名。 浅拷贝 引用:浅拷贝只复制对象的第一层属性,对于嵌套的对象或…...

思普企业运营平台 idsCheck Sql注入漏洞复现

0x01 产品描述: ‌思普企业运营平台‌是由贵阳思普信息技术有限公司自主研发的国内首款投融建管营云服务平台——...

FSWIND脉动风-风载时程生成器软件下载、安装及注册

1、软件下载 点击文末超链接下载 2、软件安装 以下操作,若被电脑杀毒软件提示风险,请加入白名单,软件无任何病毒和后台,请放心使用! 1)双击Fswind_setup.exe,启动安装程序 2)、点…...

spring通过RequestContextHolder获取HttpServletRequest对象

1.获取HttpServletRequest对象方法: public static HttpServletRequest getRequest() {ServletRequestAttributes attributes ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes());assert attributes ! null;return attributes.getRequest(…...

STM32编码器接口及编码器测速模板代码

编码器是什么? 编码器是一种将角位移或者角速度转换成一连串电数字脉冲的旋转式传感 器,我们可以通过编码器测量到底位移或者速度信息。编码器从输出数据类型上 分,可以分为增量式编码器和绝对式编码器。 从编码器检测原理上来分&#xff0…...

qt QNetworkAccessManager详解

1、概述 QNetworkAccessManager是QtNetwork模块中的一个核心类,它允许应用程序发送网络请求并接收响应。该类是网络通信的基石,提供了一种方便的方式来处理常见的网络协议,如HTTP、HTTPS等。QNetworkAccessManager对象持有其发送的请求的通用…...

部署 Vue 前端项目到 Linux

看看怎么把一个 Vue 3 项目部署到 Linux 服务器上。准备好你的咖啡,让我们愉快地度过这段部署时光! 前期准备 确保你已经在本地构建了 Vue 3 项目,并生成了 dist 文件夹。 npm run build构建完成后,你将看到一个新鲜出炉的 dis…...

数据分析:探索数据背后的秘密与挑战

在当今这个数据驱动的时代,数据分析已成为各行各业不可或缺的一部分。从市场营销到金融风控,从医疗健康到智能制造,数据分析为企业决策提供了强有力的支持。然而,尽管其重要性日益凸显,数据分析的过程并非一帆风顺&…...

文本域设置高度 加上文字限制并show出来:

文本域设置高度 :rows"4" 加上文字限制并show出来&#xff1a; maxlength"30" show-word-limit 效果: <el-form-item label"产品备注" prop"remark"><el-input v-model"form.remark" type"textarea"…...

深入浅出:Gin框架-简介与API开发入门

深入浅出&#xff1a;Gin框架-简介与API开发入门 引言 Gin框架是基于Go语言的HTTP Web框架&#xff0c;凭借其简单易用、性能卓越和丰富的功能&#xff0c;成为构建高性能Web应用的理想选择。本文将深入浅出地介绍Gin框架的基础知识&#xff0c;并通过一个简单的案例&#xf…...

MySQL各种锁详解

什么是锁&#xff1f; 1.1 锁的解释 计算机协调多个进程或线程并发访问某一资源的机制。 1.2 锁的重要性 在数据库中&#xff0c;除传统计算资源&#xff08;CPU、RAM、I/O等&#xff09;的争抢&#xff0c;数据也是一种供多用户共享的资源。 如何保证数据并发访问的一致性&…...

海外的bug-hunters,不一样的403bypass

一种绕过403的新技术&#xff0c;跟大家分享一下。研究HTTP协议已经有一段时间了。发现HTTP协议的1.0版本可以绕过403。于是开始对lyncdiscover.microsoft.com域做FUZZ并且发现了几个403Forbidden的文件。 &#xff08;访问fsip.svc为403&#xff09; 在经过尝试后&#xff0…...

React 组件中 State 的定义、使用及正确更新方式

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;React篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来React篇专栏内容React 组件中 State 的定义、使用及正确更新方式 前言 在 React 应用开发中&#xff0c;state …...

Jenkins 的HTTP Request 插件为什么不能配置Basic认证了

本篇遇到的问题 还是因为Jenkins需要及其所在的OS需要升级&#xff0c;升级策略是在一台新服务器上安装和配置最新版本的Jenkins&#xff0c; 当前的最新版本是&#xff1a; 2.479.2 LTS。 如果需要这个版本的话可以在官方站点下载&#xff0c;也可以到如下地址下载&#xff1…...

8 Bellman Ford算法SPFA

图论 —— 最短路 —— Bellman-Ford 算法与 SPFA_通信网理论基础 分别使用bellman-ford算法和dijkstra算法的应用-CSDN博客 图解Bellman-Ford计算过程以及正确性证明 - 知乎 (zhihu.com) 语雀版本 1 概念 **适用场景&#xff1a;**单源点&#xff0c;可以有负边&#xff0…...