kafka 集群 KRaft 模式搭建
Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序
Kafka 官网:https://kafka.apache.org/
Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群
本文讲解 Kafka KRaft 模式集群搭建
笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11
目录
1、官网下载 Kafka
2、配置 Kafka
3、创建 KRaft 集群
4、启动 Kafka KRaft 集群
5、关闭 Kafka KRaft 集群
6、测试 KRaft 集群
1、官网下载 Kafka
这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录
mkdir /usr/local/kafka
在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录
tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka
进入配置目录
cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件
vi server.properties
server.properties 配置说明

node.id 是kafka的broker节点id
controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址
advertised.listeners 是节点自己的监听地址
192.168.3.232 节点配置
node.id = 1

192.168.2.90 节点配置
node.id = 2

192.168.2.11节点配置
node.id = 3

3、创建 KRaft 集群
生成集群id
在任意一个节点上执行就行,笔者使用 192.168.3.232 节点
进入bin 目录
cd /usr/local/kafka/kafka_2.13-3.6.0/bin
执行生成集群 id 命令
./kafka-storage.sh random-uuid

生成后保存生成的字符串 82vqfbdSTO2QzS_M0Su1Bw
然后分别在3台机器上执行下面命令
为方便执行命令,先回到 kafka安装目录
cd /usr/local/kafka/kafka_2.13-3.6.0
再执行命令,完成集群元数据配置
bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties
192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口
kafka 需要开放 9092 端口和 9093 端口
3台机器上分别开放 9092 和 9093 端口
查看开放端口
firewall-cmd --zone=public --list-ports
开放9092 端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
开放9093 端口
firewall-cmd --zone=public --add-port=9093/tcp --permanent
更新防火墙规则(无需断开连接,动态添加规则)
firewall-cmd --reload
4、启动 Kafka KRaft 集群
在3台机器上分别启动
下面2个命令均可启动
bin/kafka-server-start.sh -daemon config/kraft/server.properties
或
bin/kafka-server-start.sh config/kraft/server.properties
笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功
5、关闭 Kafka KRaft 集群
关闭命令
bin/kafka-server-stop.sh
在 3 个节点上分别执行关闭命令
6、测试 KRaft 集群
新建 maven 项目,添加 Kafka 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
笔者新建 maven项目 kafka-learn
kafka-learn 项目 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>
新建生产者 ProducerDemo
package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}
新建消费者 ConsumerDemo
package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}
运行测试
效果图

消息成功发送并成功消费
至此完
相关文章:
kafka 集群 KRaft 模式搭建
Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序 Kafka 官网:https://kafka.apache.org/ Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器…...
如何进行有效的移动应用测试?
1、识别关键功能: 对于移动应用测试,首先要了解应用的需求和功能规格,确定哪些功能是最关键的。 关键功能通常是用户最常用的功能,对应用的成功和用户体验至关重要。 2、设定测试目标和用例: 针对每个关键功能,设置具体的测试目…...
飞翔的鸟小游戏
第一步是创建项目 项目名自拟 第二步创建个包名 来规范class 再创建一个包 来存储照片 如下 package game; import java.awt.*; import javax.swing.*; import javax.imageio.ImageIO;public class Bird {Image image;int x,y;int width,height;int size;double g;double t;…...
吴恩达《机器学习》10-1-10-3:决定下一步做什么、评估一个假设、模型选择和交叉验证集
一、决定下一步做什么 在机器学习的学习过程中,我们已经接触了许多不同的学习算法,逐渐深入了解了先进的机器学习技术。然而,即使在了解了这些算法的情况下,仍然存在一些差距,有些人能够高效而有力地运用这些算法&…...
大数据-之LibrA数据库系统告警处理(ALM-37000 MPPDBServer数据目录或Redo目录缺失)
告警解释 当出现如下情况时,产生该告警: 数据实例数据目录被删除。数据实例Redo目录(pg_xlog)被删除。 告警属性 告警ID 告警级别 可自动清除 37000 严重 是 告警参数 参数名称 参数含义 ServiceName 产生告警的服务…...
华为eNSP使用教程(Enterprise Network Simulation Platform,企业网络仿真平台)
文章目录 华为eNSP使用教程详解引言eNSP界面快速入门启动与初始设置主界面组成创建和管理项目 构建网络拓扑添加和连接设备配置设备参数示例:配置设备接口IP 保存配置 仿真网络功能启动与测试示例:测试网络连通性 使用调试工具 疑难技术点解析路由协议配…...
19.Spring如何处理线程并发问题?
Spring如何处理线程并发问题? 在一般情况下,只有无状态的Bean才可以在多线程环境下共享,在Spring中,绝大部分Bean都可以声明为singleton作用域,因为Spring对一些Bean中非线程安全状态采用ThreadLocal进行处理,解决线程安全问题。 ThreadLocal和线程同步机制都是为了解决多…...
Python办公神器:教你如何快速分拆、删页、合并PDF文件
哈喽大家好,我是了不起,今天教你如何用Python快速分拆、删页、合并PDF文件 介绍 有时我们可能需要对PDF文件进行一些处理,例如分拆、删页、合并等。这些操作在一些专业的PDF软件中可能比较容易实现,但是如果我们想要用Python来自…...
Android aidl的简单使用
一.服务端 1.创建aidl文件,然后记得build下生成java文件 package com.example.aidlservice31;// Declare any non-default types here with import statementsinterface IMyAidlServer {// 接收一个字符串参数void setData(String value);// 返回一个字符串String …...
双十一备战与复盘
如何组织备战 重要节点 从大促启动会开始后我就开始计划我们本次备战的整体节奏。 挑战在哪 以上内容介绍了CDP平台有多么重要,那么画像系统备战的核心挑战在“如何保障在大流量高并发情况下系统稳定提供高性能服务”,主要表现在:稳定性、…...
ONNX实践系列-修改yolov5-seg的proto分支输出shape
一、目标 本文主要介绍要将原始yolov5分割的输出掩膜从[b,c,h,.w]修改为[b, h, w, c] 原来的: 目标的: 代码如下: Descripttion: version: @Company: WT-XM Author: yang jinyi Date: 2023-09-08 11:26:28 LastEditors: yang jinyi LastEditTime: 2023-09-08 11:48:01 …...
VMware与Linux安装
VM与Linux安装 1、安装VMware 这里安装Vm主要是为了安装Linux系统,除了相对云服务器,比较大众化的操作,当然更多的是熟悉Linux操作 1、Windows安装 (1) 下载链接,目前版本上下载VM15的版本即可https://www.vmware.com/p…...
服务器连接github
https://zhuanlan.zhihu.com/p/543490354 比着这个一步步做就行。 https://blog.l0v0.com/posts/94ffdbdf.html 上传文件可以看这个 注意: 密钥ssh-keygen设置好之后,以后就不用每次输入账号密码才能访问了。 otherwise,每次要输入账号密码。…...
自动驾驶中的LFM(LED 闪烁缓解)问题
自动驾驶中的LFM Reference: 自动驾驶系统如何跨越LFM这道坎? 从路灯、交通灯,到车载照明,低功耗、长寿命、高可靠的 LED 正在快速取代传统照明方式。但 LED 在道路上的普遍使用,却带来“LED闪烁”现象。“LED闪烁”是由 LED 驱…...
ArkTS-页面和自定义组件生命周期
页面生命周期:被Entry装饰的组件生命周期 onPageShow:页面每次显示时触发一次,包括路由过程、应用进入前台等场景onPageHide:页面每次隐藏时触发一次,包括路由过程、应用进入前后台等场景onBackPress:当用户…...
ELK: logstash gork filter 多个模式(pattern)匹配规则语法和多行日志匹配设置
项目里用logstash分析日志,由于有多种模式(pattern)需要匹配,网上搜了很多示例,发现这些都是老的写法,都会报错,后来查阅了官方文档,才发现,新版本只支持新语法。 错误的…...
Ubuntu20.04上编译安装TVM
本文主要讲述如何在ubuntu20.04平台上编译TVM代码并在python中import tvm成功。 源代码下载: git clone --recursive https://github.com/apache/tvm tvm 平台环境升级: 1) sudo apt-get update 2) sudo apt-get install -y pyth…...
伦敦金现图形态分析(深度好文)
对价格行为交易者来说,伦敦金价走势图表中的一些特殊形态,能够带来比较靠谱的交易信号。然而交易并不只和形态有关,也和我们能够从图表形态中阅读到什么,以及如何理解其他交易者对价格波动的推动有关。 在对伦敦金走势图的技术形态…...
慕尼黑电子展采访全程 | Samtec管理层对话电子发烧友:虎家卓越服务
【摘要/前言】 今年的慕尼黑上海电子展上,Samtec大放异彩,特装展台一亮相就获得了大家的广泛关注,展台观众络绎不绝。 作为深耕连接器行业数十年的知名厂商以及Electronica的常客,Samtec毫无疑问地获得了大量媒体朋友的关注和报…...
APP外包项目维护方案
APP项目维护是确保应用程序持续运行、安全性和性能不断优化的关键活动。以下是一个综合的APP项目维护方案,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 1.定期性能监控和优化: 使用性能…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
LOOI机器人的技术实现解析:从手势识别到边缘检测
LOOI机器人作为一款创新的AI硬件产品,通过将智能手机转变为具有情感交互能力的桌面机器人,展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家,我将全面解析LOOI的技术实现架构,特别是其手势识别、物体识别和环境…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
