当前位置: 首页 > news >正文

kafka 集群 KRaft 模式搭建

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:https://kafka.apache.org/

Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群

本文讲解 Kafka KRaft 模式集群搭建

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、创建 KRaft 集群

4、启动 Kafka KRaft 集群

5、关闭 Kafka KRaft 集群

6、测试 KRaft 集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件

vi server.properties

server.properties 配置说明

node.id 是kafka的broker节点id

controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址

advertised.listeners 是节点自己的监听地址

192.168.3.232 节点配置

node.id = 1

192.168.2.90 节点配置

node.id = 2

192.168.2.11节点配置

node.id = 3

3、创建 KRaft 集群

生成集群id

在任意一个节点上执行就行,笔者使用 192.168.3.232 节点

进入bin 目录

cd /usr/local/kafka/kafka_2.13-3.6.0/bin

执行生成集群 id 命令

./kafka-storage.sh random-uuid

生成后保存生成的字符串    82vqfbdSTO2QzS_M0Su1Bw

然后分别在3台机器上执行下面命令

为方便执行命令,先回到 kafka安装目录

cd /usr/local/kafka/kafka_2.13-3.6.0

再执行命令,完成集群元数据配置

bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties

192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口

kafka 需要开放 9092 端口和 9093 端口

3台机器上分别开放 9092 和 9093 端口

查看开放端口

firewall-cmd --zone=public --list-ports

 开放9092 端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

  开放9093 端口

firewall-cmd --zone=public --add-port=9093/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

4、启动 Kafka KRaft 集群

在3台机器上分别启动

下面2个命令均可启动

bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功

5、关闭 Kafka KRaft 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

6、测试 KRaft 集群

新建 maven 项目,添加 Kafka 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}

运行测试

效果图

消息成功发送并成功消费

至此完

相关文章:

kafka 集群 KRaft 模式搭建

Apache Kafka是一个开源分布式事件流平台&#xff0c;被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序 Kafka 官网&#xff1a;https://kafka.apache.org/ Kafka 在2.8版本之后&#xff0c;移除了对Zookeeper的依赖&#xff0c;将依赖于ZooKeeper的控制器…...

如何进行有效的移动应用测试?

1、识别关键功能: 对于移动应用测试&#xff0c;首先要了解应用的需求和功能规格&#xff0c;确定哪些功能是最关键的。 关键功能通常是用户最常用的功能&#xff0c;对应用的成功和用户体验至关重要。 2、设定测试目标和用例: 针对每个关键功能&#xff0c;设置具体的测试目…...

飞翔的鸟小游戏

第一步是创建项目 项目名自拟 第二步创建个包名 来规范class 再创建一个包 来存储照片 如下 package game; import java.awt.*; import javax.swing.*; import javax.imageio.ImageIO;public class Bird {Image image;int x,y;int width,height;int size;double g;double t;…...

吴恩达《机器学习》10-1-10-3:决定下一步做什么、评估一个假设、模型选择和交叉验证集

一、决定下一步做什么 在机器学习的学习过程中&#xff0c;我们已经接触了许多不同的学习算法&#xff0c;逐渐深入了解了先进的机器学习技术。然而&#xff0c;即使在了解了这些算法的情况下&#xff0c;仍然存在一些差距&#xff0c;有些人能够高效而有力地运用这些算法&…...

大数据-之LibrA数据库系统告警处理(ALM-37000 MPPDBServer数据目录或Redo目录缺失)

告警解释 当出现如下情况时&#xff0c;产生该告警&#xff1a; 数据实例数据目录被删除。数据实例Redo目录&#xff08;pg_xlog&#xff09;被删除。 告警属性 告警ID 告警级别 可自动清除 37000 严重 是 告警参数 参数名称 参数含义 ServiceName 产生告警的服务…...

华为eNSP使用教程(Enterprise Network Simulation Platform,企业网络仿真平台)

文章目录 华为eNSP使用教程详解引言eNSP界面快速入门启动与初始设置主界面组成创建和管理项目 构建网络拓扑添加和连接设备配置设备参数示例&#xff1a;配置设备接口IP 保存配置 仿真网络功能启动与测试示例&#xff1a;测试网络连通性 使用调试工具 疑难技术点解析路由协议配…...

19.Spring如何处理线程并发问题?

Spring如何处理线程并发问题? 在一般情况下,只有无状态的Bean才可以在多线程环境下共享,在Spring中,绝大部分Bean都可以声明为singleton作用域,因为Spring对一些Bean中非线程安全状态采用ThreadLocal进行处理,解决线程安全问题。 ThreadLocal和线程同步机制都是为了解决多…...

Python办公神器:教你如何快速分拆、删页、合并PDF文件

哈喽大家好&#xff0c;我是了不起&#xff0c;今天教你如何用Python快速分拆、删页、合并PDF文件 介绍 有时我们可能需要对PDF文件进行一些处理&#xff0c;例如分拆、删页、合并等。这些操作在一些专业的PDF软件中可能比较容易实现&#xff0c;但是如果我们想要用Python来自…...

Android aidl的简单使用

一.服务端 1.创建aidl文件&#xff0c;然后记得build下生成java文件 package com.example.aidlservice31;// Declare any non-default types here with import statementsinterface IMyAidlServer {// 接收一个字符串参数void setData(String value);// 返回一个字符串String …...

双十一备战与复盘

如何组织备战 重要节点 从大促启动会开始后我就开始计划我们本次备战的整体节奏。 挑战在哪 以上内容介绍了CDP平台有多么重要&#xff0c;那么画像系统备战的核心挑战在“如何保障在大流量高并发情况下系统稳定提供高性能服务”&#xff0c;主要表现在&#xff1a;稳定性、…...

ONNX实践系列-修改yolov5-seg的proto分支输出shape

一、目标 本文主要介绍要将原始yolov5分割的输出掩膜从[b,c,h,.w]修改为[b, h, w, c] 原来的: 目标的: 代码如下: Descripttion: version: @Company: WT-XM Author: yang jinyi Date: 2023-09-08 11:26:28 LastEditors: yang jinyi LastEditTime: 2023-09-08 11:48:01 …...

VMware与Linux安装

VM与Linux安装 1、安装VMware ​ 这里安装Vm主要是为了安装Linux系统&#xff0c;除了相对云服务器&#xff0c;比较大众化的操作&#xff0c;当然更多的是熟悉Linux操作 1、Windows安装 ​ (1) 下载链接&#xff0c;目前版本上下载VM15的版本即可https://www.vmware.com/p…...

服务器连接github

https://zhuanlan.zhihu.com/p/543490354 比着这个一步步做就行。 https://blog.l0v0.com/posts/94ffdbdf.html 上传文件可以看这个 注意&#xff1a; 密钥ssh-keygen设置好之后&#xff0c;以后就不用每次输入账号密码才能访问了。 otherwise&#xff0c;每次要输入账号密码。…...

自动驾驶中的LFM(LED 闪烁缓解)问题

自动驾驶中的LFM Reference: 自动驾驶系统如何跨越LFM这道坎&#xff1f; 从路灯、交通灯&#xff0c;到车载照明&#xff0c;低功耗、长寿命、高可靠的 LED 正在快速取代传统照明方式。但 LED 在道路上的普遍使用&#xff0c;却带来“LED闪烁”现象。“LED闪烁”是由 LED 驱…...

ArkTS-页面和自定义组件生命周期

页面生命周期&#xff1a;被Entry装饰的组件生命周期 onPageShow&#xff1a;页面每次显示时触发一次&#xff0c;包括路由过程、应用进入前台等场景onPageHide&#xff1a;页面每次隐藏时触发一次&#xff0c;包括路由过程、应用进入前后台等场景onBackPress&#xff1a;当用户…...

ELK: logstash gork filter 多个模式(pattern)匹配规则语法和多行日志匹配设置

项目里用logstash分析日志&#xff0c;由于有多种模式&#xff08;pattern&#xff09;需要匹配&#xff0c;网上搜了很多示例&#xff0c;发现这些都是老的写法&#xff0c;都会报错&#xff0c;后来查阅了官方文档&#xff0c;才发现&#xff0c;新版本只支持新语法。 错误的…...

Ubuntu20.04上编译安装TVM

本文主要讲述如何在ubuntu20.04平台上编译TVM代码并在python中import tvm成功。 源代码下载&#xff1a; git clone --recursive https://github.com/apache/tvm tvm 平台环境升级&#xff1a; 1&#xff09; sudo apt-get update 2&#xff09; sudo apt-get install -y pyth…...

伦敦金现图形态分析(深度好文)

对价格行为交易者来说&#xff0c;伦敦金价走势图表中的一些特殊形态&#xff0c;能够带来比较靠谱的交易信号。然而交易并不只和形态有关&#xff0c;也和我们能够从图表形态中阅读到什么&#xff0c;以及如何理解其他交易者对价格波动的推动有关。 在对伦敦金走势图的技术形态…...

慕尼黑电子展采访全程 | Samtec管理层对话电子发烧友:虎家卓越服务

【摘要/前言】 今年的慕尼黑上海电子展上&#xff0c;Samtec大放异彩&#xff0c;特装展台一亮相就获得了大家的广泛关注&#xff0c;展台观众络绎不绝。 作为深耕连接器行业数十年的知名厂商以及Electronica的常客&#xff0c;Samtec毫无疑问地获得了大量媒体朋友的关注和报…...

APP外包项目维护方案

APP项目维护是确保应用程序持续运行、安全性和性能不断优化的关键活动。以下是一个综合的APP项目维护方案&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.定期性能监控和优化&#xff1a; 使用性能…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...