当前位置: 首页 > news >正文

部署kafka并通过python操作

目录

      • 一、安装JDK1.8
        • 1、检查服务器是否已安装JDK
        • 2、若已安装JDK,进行卸载
        • 3、更新yum源
        • 4、搜索JDK1.8安装包
        • 5、安装JDK1.8
        • 6、查看是否安装成功
        • 7、配置环境变量
      • 二、安装Kafka
        • 1、下载并解压kafka部署包至/usr/local/目录
        • 2、修改server.properties
        • 3、修改/etc/profile
        • 4、执行/etc/profile
        • 5、启动kafka
        • 6、topic管理
        • 7、生产者管理
        • 8、消费者管理
        • 9、查看group-id
      • 三、python连接kafka
        • 1、安装kafka-python
        • 2、消费消息
        • 3、生产数据

Kafka的安装需要依赖于jdk和zookeeper。(kafka 2.11-1.1.0版本才与JDK1.7兼容,更高版本需要JDK1.8);
2.8之前版本的Kafka需要单独下载zookeeper,2.8及之后的Kafka已经内置了一个zookeeper环境,无需单独下载;

一、安装JDK1.8

1、检查服务器是否已安装JDK
rpm -qa |grep java
rpm -qa |grep jdk
rpm -qa |grep gcj
2、若已安装JDK,进行卸载
rpm -qa | grep java | xargs rpm -e --nodeps
3、更新yum源
yum update -y
4、搜索JDK1.8安装包
yum list java-1.8*
5、安装JDK1.8
yum install java-1.8.0-openjdk* -y
6、查看是否安装成功
java -version
7、配置环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
export PATH=${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH

二、安装Kafka

1、下载并解压kafka部署包至/usr/local/目录
tar -zxvf kafka_2.12-3.1.1.tgz -C /usr/local/
2、修改server.properties
vim /usr/local/kafka_2.12-3.1.1/config/server.properties修改以下内容
listeners=PLAINTEXT://192.168.15.128:9092
advertised.listeners=PLAINTEXT://192.168.15.128:9092
log.dirs=/data/kafka/logs
zookeeper.connect=localhost:2181(local改成192.168.15.128会报错[2024-12-03 11:17:06,427] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient))
3、修改/etc/profile
vim /etc/profile新增:
export KAFKA_HOME=/usr/local/kafka_2.12-3.1.1
export PATH=$KAFKA_HOME/bin:$PATH
4、执行/etc/profile
source /etc/profile
5、启动kafka
先启动zookeeper
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties查看是否启动
netstat -tuln | grep 2181再启动kafka
/usr/local/kafka_2.12-3.1.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.1/config/server.properties查看是否启动
netstat -tuln | grep 9092
jps #有kafka则为启动后台启动
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/server.properties

在这里插入图片描述

在这里插入图片描述

6、topic管理
1. 创建topic
# replication-factor指定副本因子。注意:指定副本因子的时候,不能大于broker实例个数,否则报错
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest  # 旧版本创建方式,新版本只有--bootstrap-server 一种创建topic的方式
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --create --bootstrap-server 192.168.15.128:9092 --replication-factor 1 --partitions 1 --topic demo2. 查询topic详情
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --describe --bootstrap-server 192.168.15.128:9092 --topic demo3. 查询所有topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --list4. 修改topic参数配置
# 注意:partition个数count,只能增加,不能减少
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --alter --topic mytest --parti-tions count5. 删除topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --delete --topic mytest5.1 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
输入如下命令查看:
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rm -r /brokers/topics/【topic name】即可,此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失
7、生产者管理
# 新起一个终端,进入kafka解压目录后,输入如下命令。在执行完毕后会进入的编辑器页面,此时任意编辑一个消息之后,消费者那边的终端可以看到,终端中已经打印出了我们刚才发送的消息
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.128:9092 --topic demo

在这里插入图片描述

8、消费者管理
1.创建消费者(有非必须参数,分区与consumer之间的关系:一个分区不能分给两个consumer,但是两个分区可以分给一个consumer)
# 下面的命令可以创建一个用于消费topic为mytest的消费者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --from-beginning --group testgroup2.从尾部开始取数据,必需要指定分区(指定分区)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 03.从尾部开始取数据,必需要指定分区(取指定个数)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 0 --max-messages 1 
9、查看group-id
kafka-consumer-groups.sh --bootstrap-server 192.168.15.128:9092 --list 

三、python连接kafka

1、安装kafka-python
pip3 install kafka-python-ng

在这里插入图片描述

2、消费消息
from kafka import KafkaConsumerkafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka消费者
consumer = KafkaConsumer('demo', bootstrap_servers=[kafka_broker])for message in consumer:print(message.value)

在这里插入图片描述

3、生产数据
import json
from kafka import KafkaProducer# 指定Kafka代理地址,格式为"host:port"
kafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=[kafka_broker])
# 发送10条消息
for i in range(10):# 创建一个字典,然后使用json.dumps()将其转换为JSON格式的字符串,并编码为字节串message = json.dumps({'name': 'kafka', 'index': i}).encode('utf-8')producer.send('demo', message)# 如果你需要打印消息内容,可以解码字节串并打印print(message.decode('utf-8'))# 确保所有消息都已发送
producer.flush()

在这里插入图片描述

相关文章:

部署kafka并通过python操作

目录 一、安装JDK1.81、检查服务器是否已安装JDK2、若已安装JDK,进行卸载3、更新yum源4、搜索JDK1.8安装包5、安装JDK1.86、查看是否安装成功7、配置环境变量 二、安装Kafka1、下载并解压kafka部署包至/usr/local/目录2、修改server.properties3、修改/etc/profile4…...

【JAVA】Java高级:数据库监控与调优:SQL调优与执行计划的分析

作为Java开发工程师,理解SQL调优和执行计划的分析是至关重要的。这不仅可以帮助我们提高数据库查询的效率,还能减少系统资源的消耗,提升整体应用的性能。 1. SQL调优的重要性 随着数据量的增加和用户请求的增多,数据库的性能问题…...

【单片机开发】MCU三种启动方式(Boot选择)[主Flash/系统存储器(BootLoader)/嵌入式SRAM]

目录 参考资料: 利用 Boot 选择不同的启动方式: 单片机的存储结构(主 FLASH/系统存储器/嵌入式 SRAM): 1. Cortex-M 内核芯片——启动原理: 1.1. 启动流程: 1.2. 根据单片机的存储器映射和架构图:启动…...

跨库移植 SQL

背景 应用程序可能要基于不同数据库工作,各种数据库的 SQL 语法大体一致,但仍有些差别,结果就要改造这些 SQL,而这事通常只能手工调整,工作量大还容易出错。 完全自动改造 SQL 几乎是无法做到的,毕竟各种…...

(软件测试文档大全)测试计划,测试报告,测试方案,压力测试报告,性能测试,等保测评,安全扫描测试,日常运维检查测试,功能测试等全下载

1. 引言 1.1. 编写目的 1.2. 项目背景 1.3. 读者对象 1.4. 参考资料 1.5. 术语与缩略语 2. 测试策略 2.1. 测试完成标准 2.2. 测试类型 2.2.1. 功能测试 2.2.2. 性能测试 2.2.3. 安全性与访问控制测试 2.3. 测试工具 3. 测试技术 4. 测试资源 4.1. 人员安排 4.2. 测试环境 4.2.…...

Vue前端开发-路由跳转及带参数跳转

在Vue 3中,由于没有实例化对象this,因此,无法通过this去访问 $route对象,而是通过导入一个名为 useRouter 的方法,执行这个方法后,返回一个路由对象,通过这个路由对象就可以获取到当前路由中的信…...

服务器上安装 Node.js

在服务器上安装 Node.js 的过程根据你使用的操作系统和环境可能会有所不同。以下是一些常见的 Linux 发行版(如 Ubuntu 或 CentOS)上的安装步骤。 在基于 Red Hat/CentOS 的系统上安装 Node.js 设置 EPEL 仓库 如果没有启用 EPEL (Extra Packages for E…...

在阿里云/Linux环境搭建Gitblit服务

在阿里云/Linux环境搭建Gitblit服务 1. 整体描述2. 前期准备3. 安装步骤3.1 下载gitblit3.2 上传gitblit3.3 解压文件3.4 修改文件配置3.5 启动gitblit3.6 安全组配置 4. 总结 1. 整体描述 前段时间买了一个阿里云服务器,2核2G,3M固定带宽的配置&#x…...

MicroBlaze软核开发(二):GPIO

实现功能:使用 MicroBlaze软核,配置GPIO用拨码开关控制LED灯 Vivado版本:2018.3 目录 引言 vivado部分: 一、配置GPIO 二、生成HDL文件编译 SDK部分: 一、导出硬件启动SDK 二、新建应用程序工程 三、编写程序代…...

threejs相机辅助对象cameraHelper

为指定相机创建一个辅助对象,显示这个相机的视锥。 想要在场景里面显示相机的视锥,需要创建两个相机。 举个例子,场景中有个相机A,想要显示相机A的视锥,那么需要一个相机B,把B放在A的后面,两个…...

Luma 视频生成 API 对接说明

Luma 视频生成 API 对接说明 随着 AI 的应用变广,各类 AI 程序已逐渐普及。AI 已逐渐深入到人们的工作生活方方面面。而 AI 涉及的行业也越来越多,从最初的写作,到医疗教育,再到现在的视频。 Luma 是一个专业高质量的视频生成平…...

服务器数据恢复—EVA存储硬盘磁头和盘片损坏离线的数据恢复案例

服务器存储数据恢复环境&故障: 一台HP EVA存储中有23块硬盘,挂接到一台windows server操作系统的服务器。 EVA存储上有三个硬盘指示灯亮黄灯,此刻存储还能正常使用。管理员在更换硬盘的过程中,又出现一块硬盘对应的指示灯亮黄…...

【Python】深入探索Python类型检查:掌握 `typing` 模块的高级用法

解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 随着代码复杂度的增加,类型安全性在现代开发中变得尤为重要。Python自3.5引入类型提示(type hints),为开发者提供了静态类型检查的能力,而typing模块则是这一系统的核心。本篇文章深入研究Python的类型提示及…...

Android学习15--charger

1 概述 最近正好在做关机充电这个,就详细看看吧。还是本着保密的原则,项目里的代码也不能直接用,这里就用的Github的。https://github.com/aosp-mirror 具体位置是:https://github.com/aosp-mirror/platform_system_core/tree/mai…...

顶会新宠!KAN-LSTM完美融合新方案

2024深度学习发论文&模型涨点之——KANLSTM KAN-LSTM混合预测模型是一种结合了自注意力机制(KAN, Key-attention network)和长短时记忆网络(LSTM)的深度学习模型,主要用于序列数据的预测任务,如时间序…...

JS中对象的浅拷贝,深拷贝和引用

JS中对象的浅拷贝,深拷贝和引用 浅拷贝和深拷贝的区别主要在于它们如何处理引用类型的数据(如数组和对象),而引用简而言之就是换了个变量名。 浅拷贝 引用:浅拷贝只复制对象的第一层属性,对于嵌套的对象或…...

思普企业运营平台 idsCheck Sql注入漏洞复现

0x01 产品描述: ‌思普企业运营平台‌是由贵阳思普信息技术有限公司自主研发的国内首款投融建管营云服务平台——...

FSWIND脉动风-风载时程生成器软件下载、安装及注册

1、软件下载 点击文末超链接下载 2、软件安装 以下操作,若被电脑杀毒软件提示风险,请加入白名单,软件无任何病毒和后台,请放心使用! 1)双击Fswind_setup.exe,启动安装程序 2)、点…...

spring通过RequestContextHolder获取HttpServletRequest对象

1.获取HttpServletRequest对象方法: public static HttpServletRequest getRequest() {ServletRequestAttributes attributes ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes());assert attributes ! null;return attributes.getRequest(…...

STM32编码器接口及编码器测速模板代码

编码器是什么? 编码器是一种将角位移或者角速度转换成一连串电数字脉冲的旋转式传感 器,我们可以通过编码器测量到底位移或者速度信息。编码器从输出数据类型上 分,可以分为增量式编码器和绝对式编码器。 从编码器检测原理上来分&#xff0…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制&#xff0…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行

项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

DingDing机器人群消息推送

文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...

基于PHP的连锁酒店管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...

uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)

UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...

在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7

在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤: 第一步: 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为: // 改为 v…...