01_kafka_环境搭建安装_topic管理
文章目录
- 安装jdk
- 配置主机名
- Zookeeper 下载与安装
- Kafka 下载与安装
- 测试
- 集群版安装
- 测试
- 输出
安装jdk
- 略
配置主机名
-
hostnamectl set-hostname kafka_1
-
/etc/sysconfig/network
HOSTNAME=kafka_1
- /etc/hosts
ip kafka_1
- ping kafka_1 测试
Zookeeper 下载与安装
- 由于 集群中的 Leader 的监控 和 Topic 元数据 存储在 Zookeeper 中 ,所以需要安装 zk;
- https://zookeeper.apache.org/releases.html 中选择需要的版本;
- conf 目录 复制zoo_sample.cfg 为 zoo.cfg, 修改其中的配置 dataDir 指定自己的目录;
- 启动zk:使用
zkServer.sh start zoo.cfg
cd /opt/pkg/
wget --no-check-certificate https://dlcdn.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
tar zxf apache-zookeeper-3.8.2-bin.tar.gz
mv apache-zookeeper-3.8.2-bin ../app/
cd /opt/app/apache-zookeeper-3.8.2-bin/
cp zoo_sample.cfg zoo.cfg
./bin/zkServer.sh start zoo.cfg---
[root@loaclhost bin]# ./zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/app/apache-zookeeper-3.8.2-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
Kafka 下载与安装
- https://kafka.apache.org/downloads
cd /opt/pkg/
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar zxf kafka_2.13-3.5.1.tgz
mv kafka_2.13-3.5.1 ../app/
cd /opt/app
cd kafka_2.13-3.5.1/
cd config/
cp server.properties server.properties.bak
vim server.properties
---
listeners=PLAINTEXT://kafka_1:9092
og.dirs=./kafka-logs
zookeeper.connect=kafka_1:2181
---cd ../
./bin/kafka-server-start.sh -daemon config/server.properties
- 配置: config/server.properties
broker.id
listeners=PLAINTEXT://主机名:9092
log.dirs= # 日志目录
zookeeper.connect=主机名:2181
- 启动:
./bin/kafka-server-start.sh -daemon config/server.properties
- 停止:./bin/kafka-server-stop.sh
测试
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092 --create --topic topic_01 --partitions 3 --replication-factor 1 # 单机版副本因子为1 多了没有意义
./bin/kafka-console-consumer.sh --bootstrap-server kafka_1:9092 --topic topic_01 --group g1 # 消费者 查看输出
./bin/kafka-console-producer.sh --broker-list kafka_1:9092 --topic topic_01 # 开始输入
root@loaclhost kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092 --create --topic topic_0
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should c
WARNING: Due to limitations in metric names, topics with a period (‘.’) or underscore ('') could collide. To avo
Created topic topic_01.
[root@loaclhost kafka_2.13-3.5.1]# ./bin/kafka-console-consumer.sh --bootstrap-server kafka_1:9092 --topic topic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should c
root@loaclhost kafka_2.13-3.5.1]# ./bin/kafka-console-consumer.sh --bootstrap-server kafka_1:9092 --topic topic_01 --group g1
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
集群版安装
- 使用3台主机进行搭建,三台机器上都需要安装 jdk,Zookeeper,Kafka
- 其中 /etc/hosts 都需要配置好三台主机的ip 主机名映射;
- 需要额外注意的是,三台主机需要做一次时间同步
yum -y install ntpdate
timedatectl set-timezone Asia/Shanghai
ntpdate ntp1.aliyun.com # aliyun 有ntp1~7, 或者 cn.pool.ntp.org
- 复制文件到 另外两台主机
[root@kafka_1 pkg]# pwd
/opt/pkg
scp kafka_2.13-3.5.1.tgz root@kafka_2:/opt/pkg
scp kafka_2.13-3.5.1.tgz root@kafka_3:/opt/pkg
scp apache-zookeeper-3.8.2-bin.tar.gz root@kafka_3:/opt/pkg
scp apache-zookeeper-3.8.2-bin.tar.gz root@kafka_2:/opt/pkg
- 配置 zoo.cfg:
# 数据目录 每个机器中,此目录下 需要创建 myid文件内容 和server.id 对应 , 可以指定一个绝对目录
dataDir=/opt/app/cluster/apache-zookeeper-3.8.2/dataDir server.1=kafka_1:2888:3888
server.2=kafka_2:2888:3888
server.3=kafka_3:2888:3888
4lw.commands.whitelist=*
-
myid生成: echo 1 > /opt/app/cluster/apache-zookeeper-3.8.2/dataDir/myid # 1,2,3
-
配置: config/server.properties
broker.id=0 # 需要修改 1,2
listeners=PLAINTEXT://kafka_1:9092 # 主机名和当前机器对应
log.dirs=/opt/app/cluster/kafka_2.13-3.5.1/log_dirs # 日志目录
zookeeper.connect=kafka_1:2181,kafka_2:2181,kafka_3:2181
- 启动zk 和 kafka:
cd /opt/app/cluster/apache-zookeeper-3.8.2/
./bin/zkServer.sh start zoo.cfg
cd /opt/app/cluster/kafka_2.13-3.5.1
./bin/kafka-server-start.sh -daemon config/server.properties
- 报错:
Using config: /opt/app/cluster/apache-zookeeper-3.8.2/bin/…/conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
- 原因是防火墙没关:
systemctl status firewalld
#● firewalld.service - firewalld - dynamic firewall daemon
# Loaded: loaded (/usr/lib/systemd/system/firewalld.service; enabled; vendor preset: enabled)
# Active: active (running) since Sun 2023-08-27 19:48:04 CST; 2h 32min ago
# Docs: man:firewalld(1)
# Main PID: 649 (firewalld)
# CGroup: /system.slice/firewalld.service
# └─649 /usr/bin/python -Es /usr/sbin/firewalld --nofork --nopid
#
systemctl disable firewalld
测试
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --create --topic topic01 --partitions 3 --replication-factor 3 # 创建 topic
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --create --topic topic02 --partitions 3 --replication-factor 3 # 创建 topic
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --list # 查看
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --describe --topic topic02 # 查看详细描述
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --alter --topic topic02 --partitions 4 # 修改分区数量,只能增不能减,设置小于3则报错
./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --delete --topic topic02 # 删除topic---
# 消息的生产与消费/订阅
./bin/kafka-console-producer.sh --broker-list kafka_1:9092,kafka_2:9092,kafka_3:9092 --topic topic01 # 生产
./bin/kafka-console-consumer.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --topic topic01 --group g1 --property print.key=true --property print.value=true --property key.separator=, # 消费
---
# 查看消费组信息
./bin/kafka-consumer-groups.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --list g1 # 查看
./bin/kafka-consumer-groups.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --describe --group g1 # 查看详情
输出
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --create --topic topic01 --partitions 3 --replication-factor 3
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic topic01.
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --create --topic topic02 --partitions 3 --replication-factor 3
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic topic02.
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
topic01
topic02[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --describe --topic topic02
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic: topic02 TopicId: _s44mBQqTLaSmVWcZA2GTw PartitionCount: 3 ReplicationFactor: 3 Configs:Topic: topic02 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1Topic: topic02 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2Topic: topic02 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --alter --topic topic02 --partitions 4[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --describe --topic topic02
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic: topic02 TopicId: _s44mBQqTLaSmVWcZA2GTw PartitionCount: 4 ReplicationFactor: 3 Configs:Topic: topic02 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1Topic: topic02 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2Topic: topic02 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: topic02 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
topic01
topic02
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --delete --topic topic02
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-topics.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
topic01---
# 生产者
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-console-producer.sh --broker-list kafka_1:9092,kafka_2:9092,kafka_3:9092 --topic topic01
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>123
>123
>key=123
># 消费者
[root@kafka_1 kafka_2.13-3.5.1]# ./bin/kafka-console-consumer.sh --bootstrap-server kafka_1:9092,kafka_2:9092,kafka_3:9092 --topic topic01 --group g1 --property print.key=true --property print.value=true --property key.separator=,
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=Nnull,123
null,123
null,key=123
相关文章:

01_kafka_环境搭建安装_topic管理
文章目录 安装jdk配置主机名Zookeeper 下载与安装Kafka 下载与安装测试集群版安装测试输出 安装jdk 略 配置主机名 hostnamectl set-hostname kafka_1 /etc/sysconfig/network HOSTNAMEkafka_1/etc/hosts ip kafka_1ping kafka_1 测试 Zookeeper 下载与安装 由于 集群…...

Python+Requests+Excel接口测试实战
1、EXCEL文件接口保存方式,如图。 2、然后就是读取EXCEL文件中的数据方法,如下: 1 import xlrd2 3 4 class readExcel(object):5 def __init__(self, path):6 self.path path7 8 property9 def getSheet(self): 10 …...

10:STM32------I2C通信
目录 一:I2C通信协议 1:I2C简历 2:硬件电路 3:I2C时序基本单元 A : 开/ 终条件 2:发送一个字节 3:接收一个字节 4:应答机制 4:I2C时序 1:指定地址写 2:当前地址读 3: 指定地址读 二:MPU6050 1:简历 2:参数 3:硬件电路 4:框图 5:寄存器地址 …...

Git多人开发解决冲突案例
准备工作: 1.创建一个gitee远程仓库https://gitee.com/xxxxxxx.git 2.初始化两个本地git仓库用户,目的是模拟多人协作开发时提交代码发生冲突的场景 3.解决冲突并提交。 进入正题: lisi 通过vim指令修改readme.md文件内容,推送到…...

医疗机构如何维护电力系统?来看看这个小技巧
在现代医疗领域,电力是不可或缺的。从手术室里的手术灯到病房中的呼吸机,医院的各种医疗设备和系统都依赖于稳定的电源供应。 然而,电力中断和紧急情况不可避免,而这些情况下的电力可靠性可能会直接影响病人的生命和健康。为了确保…...

时序预测 | MATLAB实现ELM极限学习机时间序列预测未来
时序预测 | MATLAB实现ELM极限学习机时间序列预测未来 目录 时序预测 | MATLAB实现ELM极限学习机时间序列预测未来预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现ELM极限学习机时间序列预测未来; 2.运行环境Matlab2018及以上,data为数…...

【数据分享】1901-2022年我国省市县镇四级的逐年平均气温数据(免费获取/Shp/Excel格式)
气象数据在日常研究中非常常用,之前我们分享过来自国家青藏高原科学数据中心提供的1901-2022年1km分辨率逐月平均气温栅格数据,2001-2022年我国省市县镇四级的逐月平均气温数据,以及基于该栅格数据处理得到的1901-2022年1km分辨率的逐年平均气…...

【Axure高保真原型】日历日期原型模板
今天和大家分享日历日期的原型模板,包括月计划、周计划、日计划的原型案例,以及日期、时间、月份、区间选择器……具体效果可以点击下方视频观看 【原型预览及下载地址】 Axure 原型 备用地址:Untitled Document 【原型效果】 【原型效果…...

深入了解接口测试:Postman 接口测试指南
在现代软件开发生命周期中,接口测试是一个至关重要的部分。使用 Postman 这一工具,可以轻松地进行 接口测试。以下是一份简单的使用教程,帮助你快速上手。 安装 Postman 首先,你需要在电脑上安装 Postman。你可以从官网上下载并…...

【ROS】Ubuntu20.04+ROS Noetic 配置PX4-v1.12.2和Gazebo11联合仿真环境【教程】
【ROS】Ubuntu20.04ROS Noetic 配置PX4-v-v1.12.2和Gazebo11联合仿真环境【教程】 文章目录 【ROS】Ubuntu20.04ROS Noetic 配置PX4-v-v1.12.2和Gazebo11联合仿真环境【教程】0. 安装UbuntuROS1. 安装依赖2. 安装QGC地面站3. 配置PX4-v1.12.23.1 安装PX43.2 测试PX4是否成功安装…...

Java 代理模式之静态代理与动态代理
1,代理模式 代理模式给某一个对象提供一个代理对象,并由代理对象控制对原对象的引用。通俗的来讲代理模式就是我们生活中常见的中介。 代理模式的目的: (1)通过引入代理对象的方式来间接访问目标对象,防…...

打造基于终端命令行的IDE,Termux配置Vim C++开发环境
Termux配置Vim C开发环境,打造基于终端命令行的IDE 主要利用VimCoc插件,配置C的代码提示等功能。 Termux换源 打开termux,输入termux-change-repo 找到mirrors.tuna.tsinghua.edu.cn,清华源,空格选中,回…...

【初阶C语言】操作符2---表达式求值
前言:本节重点介绍操作符的使用,如,优先级高低、类型转换等 一、逻辑操作符 前言:逻辑操作符包括逻辑与(&&)和逻辑或(||),操作对象:两个 1.逻辑与&…...
代码随想录day50|123. 买卖股票的最佳时机 III188. 买卖股票的最佳时机 IV
123. 买卖股票的最佳时机 III class Solution:def maxProfit(self, prices: List[int]) -> int:dp[[0]*5 for _ in range(len(prices))]dp[0][0]0dp[0][1]-prices[0]dp[0][2]0dp[0][3]-prices[0]dp[0][4]0for i in range(1,len(prices)):dp[i][0] dp[i-1][0]dp[i][1] max…...

Word 表格单元格无法垂直居中
Word使用 由于平时也需要用到word编写一些文档,但是咱们就是用的少,很多操作或者技巧不太清楚,很多小问题处理起来反而需要消耗很多时间,所以在这里记录平时遇到的一些问题。 表格无法垂直居中 类似于上图的情况,总之…...
python实现Flask POST Demo
数据处理逻辑 from flask import Flask, requestapp Flask(__name__)app.route(/, methods[POST]) def index():username request.form[username]password request.form[password]if username "Jhon" and password "1":return f"<html>&l…...
3-Pytorch张量的运算、形状改变、自动微分
3-Pytorch张量的运算、形状改变、自动微分 1 导入必备库2 张量的运算3 张量的算数运算4 一个元素的张量可以使用tensor.item()方法转成标量5 torch.from_numpy()和tensor.numpy()6 张量的变形7 张量的自动微分8 使用with torch.no_grad():包含上下文中使其不再跟踪计算9 使用te…...

用户权限数据转换为用户组列表(3/3) - Excel PY公式
最近Excel圈里的大事情就是微软把PY塞进了Excel单元格,可以作为公式使用,轻松用PY做数据分析。系好安全带,老司机带你玩一把。 实例需求:如下是AD用户的列表,每个用户拥有该应用程序的只读或读写权限,现在需要创建新的…...

VS2022+CMAKE+OPENCV+QT+PCL安装及环境搭建
VS2022安装: Visual Studio 2022安装教程(千字图文详解),手把手带你安装运行VS2022以及背景图设置_vs安装教程_我不是大叔丶的博客-CSDN博客 CMAKE配置: win11下配置vscodecmake_心儿痒痒的博客-CSDN博客 OPENCV配…...

JavaScript的内置类
一、认识包装类型 1.原始类型的包装类 JavaScript的原始类型并非对象类型,所以从理论上来说,它们是没有办法获取属性或者调用方法的。 但是,在开发中会看到,我们会经常这样操作: var message "hello world&q…...

ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...
Vue 模板语句的数据来源
🧩 Vue 模板语句的数据来源:全方位解析 Vue 模板(<template> 部分)中的表达式、指令绑定(如 v-bind, v-on)和插值({{ }})都在一个特定的作用域内求值。这个作用域由当前 组件…...

Vue3 PC端 UI组件库我更推荐Naive UI
一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用,前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率,还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库(Naive UI、Element …...