OpenEuler 22.03 不依赖zookeeper安装 kafka 3.3.2集群
零:规划
本次计划安装三台OpenEuler 22.03 版本操作系统的服务器,用于搭建 kafka和flink 集群。因为从kafka 2.8 版本以后开始不依赖 zookeeper ,同时考虑到需要找一个发布时间早于 flink 1.17 的kafka 版本且应尽量稳定,综合考虑下选择了 kafka 3.3.2。
| 服务器名 | IP地址 | 作用 | 其他应用 |
|---|---|---|---|
| flink01 | 192.168.159.133 | kafka | jdk11、flink-1.17.2 |
| flink02 | 192.168.159.134 | kafka | jdk11、flink-1.17.2 |
| flink03 | 192.168.159.135 | kafka | jdk11、flink-1.17.2 |
一、准备工作
关于服务器安装与免密、防火墙设置等操作可以参考上一篇 flink集群的搭建 。kafka可以从 中文官网 下载到kafka 3.3.2 版本,需要注意的是,每个版本的 kafka 都会用 scala 2.12和2.13,如果后续的使用不涉及 scala 的部分,那么下载哪个scala版本的 kafka都无所谓。
另外,kafka 的常用端口包括9092和9093 两个,前者用于接收集群producer和consumer等的通信,后者用于集群内部的管理和监控。
二、安装
2.1、部署
登录到 192.168.159.133 服务器,执行 如下命令下载安装包并解压到 /usr/local/路径下
## 进入 /usr/local/
[root@flink01 local]# cd /usr/local/## 下载
[root@flink01 local]# wget https://archive.apache.org/dist/kafka/3.3.2/kafka_2.13-3.3.2.tgz## 解压
[root@flink01 local]# tar -vxf kafka_2.13-3.3.2.tgz## 进入目录、备份配置文件并修改
[root@flink01 local]# cd /usr/local/kafka_2.12-3.3.2/config/kraft[root@flink01 local]# cp -rf server.properties server.properties_20241222bak[root@flink01 local]# vim server.properties
server.properties 文件需要修改的配置项如下面所示。
## 需要关注的配置项主要包括以下几个
## 为不同kafka 服务分配不同的 id值,133服务器作为第一个节点,其他服务id随ip的递增而增加
node.id=1## 需要确定当前 kafka 服务的通信端口和监控管理端口,这里使用的是kafka默认的 9092和9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093## 接入集群的kafka 服务的ip 和 编号。端口号使用监控和管理端口 9093
controller.quorum.voters=1@192.168.159.133:9093,2@192.168.159.134:9093,3@192.168.159.135:9093## 设定当前服务的接收生产者与消费者的ip和端口。
advertised.listeners=PLAINTEXT://192.168.159.133:9092
2.2、设置
修改 133 kafka 服务器的配置文件后,将kafka 程序目录发送到另外两个服务器
## 将kafka文件夹分发到其他服务器
[root@flink01 local]# scp kafka_2.12-3.3.2 192.168.159.134:/usr/local/## 其他服务器上的kafka 仅需修改 server.properties 文件中的以下配置项即可,其他项保持不变## id序号逐渐递增即可
node.id=1## 这一项修改成当前服务器的 ip即可,端口不必改变
advertised.listeners=PLAINTEXT://192.168.159.134:9092
2.3、启动
创建 kafka集群 UUID,并启动集群中所有 kafka 服务
## 登入kafka 的bin目录
[root@flink03 bin]# cd /usr/local/kafka_2.12-3.3.2/bin## 生成 集群 uuid
[root@flink01 bin]# ./kafka-storage.sh random-uuid## 使用生成的 uuid 8xXSdiD0RdKhq-wvOpApcg 格式化集群中所有kafka服务
## 注意,要格式化集群中所有服务!
[root@flink01 bin]# ./kafka-storage.sh format -t 8xXSdiD0RdKhq-wvOpApcg -c ../config/kraft/server.properties## 分别启动集群中三台服务器上的 kafka 服务
[root@flink01 bin]# ./kafka-server-start.sh -daemon ../config/kraft/server.properties
2.4、创建topic
事实上,在集群中任何一个 ip 的kafka下创建 topic后,其他kafka服务都能用任意一个ip 访问该名称的 topic,请自行尝试,在此不做演示。
[root@flink03 bin]# ./kafka-topics.sh --bootstrap-server 192.168.159.135:9092 --create --topic zgyKraft --partitions 1 --replication-factor 1
2.5、发送消息
启动 consumer 和 producer,并发送消息
## 这里在第二台服务器上,启动第一台服务器的 consumer
[root@flink02 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.159.133:9092 --topic zgyKraft## 这里在第一台服务器上,启动第三台服务器的 producer
[root@flink01 bin]# ./kafka-console-producer.sh --broker-list 192.168.159.135:9092 --topic zgyKraft## 然后在producer 上发送消息后,所有同名 topic 下的消费者都可以收到了。
三、问题与心得
3.1、启动问题
当我的虚拟机重启后,简单执行 2.3 环节的最后一步启动服务,发现报错,内容如下
[2024-12-24 07:39:19,194] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2024-12-24 07:39:19,831] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2024-12-24 07:39:20,010] WARN No meta.properties file under dir /tmp/kraft-combined-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2024-12-24 07:39:20,012] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
org.apache.kafka.common.KafkaException: No `meta.properties` found in /tmp/kraft-combined-logs (have you run `kafka-storage.sh` to format the directory?)at kafka.server.BrokerMetadataCheckpoint$.$anonfun$getBrokerMetadataAndOfflineDirs$2(BrokerMetadataCheckpoint.scala:172)at scala.collection.Iterator.foreach(Iterator.scala:943)at scala.collection.Iterator.foreach$(Iterator.scala:943)at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)at scala.collection.IterableLike.foreach(IterableLike.scala:74)at scala.collection.IterableLike.foreach$(IterableLike.scala:73)at scala.collection.AbstractIterable.foreach(Iterable.scala:56)at kafka.server.BrokerMetadataCheckpoint$.getBrokerMetadataAndOfflineDirs(BrokerMetadataCheckpoint.scala:161)at kafka.server.KafkaRaftServer$.initializeLogDirs(KafkaRaftServer.scala:184)at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:61)at kafka.Kafka$.buildServer(Kafka.scala:79)at kafka.Kafka$.main(Kafka.scala:87)at kafka.Kafka.main(Kafka.scala)
经过分析,感觉需要format 操作,回顾之前发现只在 2.3 环节使用 uuid时执行过 format命令。
因此问题的解决方案,就是生成新的集群UUID 并在三台服务器上重新执行 2.3 环节,问题解决。但因为服务器重启,所有 topic 都已不复存在,虽然可以在打开生产者/消费者的时候以默认方式创建 符合名称的 topic,但还是推荐手动创建!
四、附录
附上一些常用的 kafka命令
## 查看主题
./kafka-topics.sh --bootstrap-server 192.168.159.133:9092 --list## 查看主题明细
./kafka-topics.sh --bootstrap-server 192.168.159.133:9092 --describe <topic-id>## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
./kafka-topics.sh --bootstrap-server 192.168.159.133:9092 --create --topic zgyTopic --partitions 5 --replication-factor 2## 删除主题
./kafka-topics.sh --bootstrap-server 192.168.159.133:9092 --delete --topic <topic-id>## 查看消费者列表--list
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --list## 查看指定消费组详情
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --describe --group <group-id>## 删除特定group
kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --delete --group <group-id>## 打开一个生产者
./kafka-console-producer.sh --bootstrap-server 192.168.159.133:9092 --topic zgyTopic## 打开一个消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.159.133:9092 --topic zgyTopic --from-beginning ## 查看所有消费组详情--all-groups
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --describe --all-groups查询消费者成员信息--members## 所有消费组成员信息
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --describe --all-groups --members## 指定消费组成员信息
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --describe --members --group zgyConsumerGroup## 修改到最新offset
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --group zgyConsumerGroup --reset-offsets --topic mytopic --to-latest --execute## 重设位移位置
./kafka-consumer-groups.sh --bootstrap-server 192.168.159.133:9092 --group zgyConsumerGroup --reset-offsets --topic mytopic --to-offset 100 --execute
相关文章:
OpenEuler 22.03 不依赖zookeeper安装 kafka 3.3.2集群
零:规划 本次计划安装三台OpenEuler 22.03 版本操作系统的服务器,用于搭建 kafka和flink 集群。因为从kafka 2.8 版本以后开始不依赖 zookeeper ,同时考虑到需要找一个发布时间早于 flink 1.17 的kafka 版本且应尽量稳定,综合考虑…...
ubuntu 将python3.8 升级为python3.10并进行版本切换
ubuntu 将python3.8 升级为python3.10并进行版本切换 前言将python3.8 升级为3.10安装pippython版本切换 前言 有一个功能包编译环境需要为python3.10 ,但是当前环境为python3.8 ,所以需要进行版本升级,编译完还需要把环境切换回来。 将pyt…...
3. Kafka入门—安装与基本命令
Kafka基础操作 一. 章节简介二. kafka简介三. Kafka安装1. 准备工作2. Zookeeper安装2.1 配置文件2.2 启动相关命令3. Kafka安装3.1 配置文件3.2 启动相关命令-------------------------------------------------------------------------------------------------------------…...
如何使用 python创建图片格式转换器
在本篇博客中,我们将通过一个简单的实例来展示如何使用 wxPython 创建一个图形用户界面(GUI)应用程序,用于将图片从一种格式转换为另一种格式。我们将通过以下几个步骤实现这一目标: C:\pythoncode\new\imageconvertty…...
命令行之巅:Linux Shell编程的至高艺术(上)
文章一览 前言一、shell概述1.1 shell的特点和类型1.1.1 **shell的特点:**1.1.2 常用shell类型 1.2 shell脚本的建立和执行1.2.1 建立shell脚本1.2.2 执行shell脚本的方式1.2.3 shell程序实例 二、shell变量与算数运算2.1 简单shell变量2.1.1 简单变量定义和赋值2.1…...
【gulp】gulp 的基本使用
gulp 是一个基于node的自动化打包构建工具,前端开发者可以使用它来处理常见任务: 创建项目 进入项目 npm init -ynpm i gulp -g (使用命令 gulp)npm i gulp -D # 开发依赖(前端工具都是开发依赖 本地安装 代…...
Linux 下处理 ^M 字符的最佳实践
Linux 下处理 ^M 字符的最佳实践 一、快速解决方案 按照优先级排序的三种解决方案: 1. 使用 dos2unix(推荐) # 安装 sudo apt-get install dos2unix # Ubuntu/Debian sudo yum install dos2unix # CentOS# 使用 dos2unix 文件名2. 使用 sed sed...
【优选算法】—复写零(双指针算法)
云边有个稻草人-CSDN博客 每天至少一道算法题,接着干,以额现在的实力想完成那个目标确实难。算法题确实烧脑,挺煎熬的,但脑子烧多了是不是就该好些了?。。。 记得那句话,必须有为成功付出代价的决心&#x…...
2024国赛A问题三和四
问题三 最小螺距单目标优化模型的建立 问题二考虑了在螺距固定的条件下计算舞龙队盘入的终止时间,问题三在第二问的基础提出了改变螺距的要求,即求解在螺距最小为多少时,龙头前把手能够沿着相应的螺线盘入到调头空间的边界。故可将其转换为…...
asp.net 高校学生勤工俭学系统设计与实现
博主介绍:专注于Java(springboot ssm 等开发框架) vue .net php python(flask Django) 小程序 等诸多技术领域和毕业项目实战、企业信息化系统建设,从业十五余年开发设计教学工作 ☆☆☆ 精彩专栏推荐订阅☆☆☆☆☆不然下次找…...
《计算机组成及汇编语言原理》阅读笔记:p116-p120
《计算机组成及汇编语言原理》学习第 7 天,p116-p120 总结,总计 5 页。 一、技术总结 1.CPU优化 (1)increase overall performance number 例如:16位电脑提升到32位电脑。 (2)multiprocessing One way to make computers more useful i…...
C# OpenCvSharp DNN 卡证检测矫正
目录 说明 效果 模型 项目 代码 下载 参考 说明 源码地址:https://modelscope.cn/models/iic/cv_resnet_carddetection_scrfd34gkps 在实人认证、文档电子化等场景中需要自动化提取卡证的信息,以便进一步做录入处理。这类场景通常存在两类问题&…...
Spring Boot 中 Map 的最佳实践
在Spring Boot中使用Map时,请遵循以下最佳实践: 1.避免在Controller中 直接使用Map。应该使用RequestBody 接收-个DTO对象或者 RequestParam接收参数,然后在Service中处 理Map。 2.避免在Service中 直接使用原始的Map。应该使用Autowired 注入-个专门…...
J-LangChain - 智能链构建
介绍 j-langchain是一个Java版的LangChain开发框架,旨在简化和加速各类大模型应用在Java平台的落地开发。它提供了一组实用的工具和类,使得开发人员能够更轻松地构建类似于LangChain的Java应用程序。 依赖 Maven <dependency><groupId>i…...
开源低代码平台-Microi吾码 打印引擎使用
引言 在开发中,会遇到很多记录的表单数据需要下载打印下来使用到线下各种应用场景中。在传统的方法中可能是需要先导出数据,然后将数据填入word表格中在打印下来。 但Microi吾码提供了一项新功能,便是打印引擎。打印引擎即可在线设计…...
【MySQL】索引 面试题
文章目录 适合创建索引的情况创建索引的注意事项MySQL中不适合创建索引的情况索引失效的常见情况 索引定义与作用 索引是帮助MySQL高效获取数据的有序数据结构,通过维护特定查找算法的数据结构(如B树),以某种方式引用数据…...
【高阶数据结构】AVL树
AVL树 1.AVL的概念2.AVL树的实现1.AVL树的结构2.AVL树的插入1.更新平衡因子2.旋转1.右单旋2.左单旋3.左右双旋4.右左双旋 3.AVL树的查找4.AVL树的平衡检测5.AVL树的性能分析6.AVL树的删除 3.总代码1.AVLTree.h2.Test.cpp 1.AVL的概念 AVL树是最先发明的自平衡⼆叉查找树&#…...
【Spring】基于XML的Spring容器配置——<bean>标签与属性解析
Spring框架是一个非常流行的应用程序框架,它通过控制反转(IoC)和依赖注入(DI)来简化企业级应用的开发。Spring容器是其核心部分,负责管理对象的创建、配置和生命周期。在Spring中,XML配置是一种…...
docker mysql5.7安装
一.更改 /etc/docker/daemon.json sudo mkdir -p /etc/dockersudo tee /etc/docker/daemon.json <<-EOF {"registry-mirrors": ["https://do.nark.eu.org","https://dc.j8.work","https://docker.m.daocloud.io","https:/…...
HDR视频技术之十一:HEVCH.265 的 HDR 编码方案
前文我们对 HEVC 的 HDR 编码优化技术做了介绍,侧重编码性能的提升。 本章主要阐述 HEVC 中 HDR/WCG 相关的整体编码方案, 包括不同应用场景下的 HEVC 扩展编码技术。 1 背景 HDR 信号一般意味着使用更多比特,一般的 HDR 信号倾向于使用 10…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
