kafka 集群 KRaft 模式搭建
Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序
Kafka 官网:https://kafka.apache.org/
Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群
本文讲解 Kafka KRaft 模式集群搭建
笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11
目录
1、官网下载 Kafka
2、配置 Kafka
3、创建 KRaft 集群
4、启动 Kafka KRaft 集群
5、关闭 Kafka KRaft 集群
6、测试 KRaft 集群
1、官网下载 Kafka
这里笔者下载最新版3.6.0
下载完成
将kafka分别上传到3台linux
在3台服务器上分别创建 kafka 安装目录
mkdir /usr/local/kafka
在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录
tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka
2、配置 Kafka
进入配置目录
cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft
编辑配置文件
vi server.properties
server.properties 配置说明
node.id 是kafka的broker节点id
controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址
advertised.listeners 是节点自己的监听地址
192.168.3.232 节点配置
node.id = 1
192.168.2.90 节点配置
node.id = 2
192.168.2.11节点配置
node.id = 3
3、创建 KRaft 集群
生成集群id
在任意一个节点上执行就行,笔者使用 192.168.3.232 节点
进入bin 目录
cd /usr/local/kafka/kafka_2.13-3.6.0/bin
执行生成集群 id 命令
./kafka-storage.sh random-uuid
生成后保存生成的字符串 82vqfbdSTO2QzS_M0Su1Bw
然后分别在3台机器上执行下面命令
为方便执行命令,先回到 kafka安装目录
cd /usr/local/kafka/kafka_2.13-3.6.0
再执行命令,完成集群元数据配置
bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties
192.168.3.232 节点
192.168.2.90 节点
192.168.2.11节点
上面命令执行完成后,开放防火墙端口
kafka 需要开放 9092 端口和 9093 端口
3台机器上分别开放 9092 和 9093 端口
查看开放端口
firewall-cmd --zone=public --list-ports
开放9092 端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
开放9093 端口
firewall-cmd --zone=public --add-port=9093/tcp --permanent
更新防火墙规则(无需断开连接,动态添加规则)
firewall-cmd --reload
4、启动 Kafka KRaft 集群
在3台机器上分别启动
下面2个命令均可启动
bin/kafka-server-start.sh -daemon config/kraft/server.properties
或
bin/kafka-server-start.sh config/kraft/server.properties
笔者使用第二个启动命令 启动,效果看下图
当 3 个节点都出现 Kafka Server started,集群启动成功
5、关闭 Kafka KRaft 集群
关闭命令
bin/kafka-server-stop.sh
在 3 个节点上分别执行关闭命令
6、测试 KRaft 集群
新建 maven 项目,添加 Kafka 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
笔者新建 maven项目 kafka-learn
kafka-learn 项目 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>
新建生产者 ProducerDemo
package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}
新建消费者 ConsumerDemo
package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}
运行测试
效果图
消息成功发送并成功消费
至此完
相关文章:

kafka 集群 KRaft 模式搭建
Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序 Kafka 官网:https://kafka.apache.org/ Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器…...

如何进行有效的移动应用测试?
1、识别关键功能: 对于移动应用测试,首先要了解应用的需求和功能规格,确定哪些功能是最关键的。 关键功能通常是用户最常用的功能,对应用的成功和用户体验至关重要。 2、设定测试目标和用例: 针对每个关键功能,设置具体的测试目…...

飞翔的鸟小游戏
第一步是创建项目 项目名自拟 第二步创建个包名 来规范class 再创建一个包 来存储照片 如下 package game; import java.awt.*; import javax.swing.*; import javax.imageio.ImageIO;public class Bird {Image image;int x,y;int width,height;int size;double g;double t;…...

吴恩达《机器学习》10-1-10-3:决定下一步做什么、评估一个假设、模型选择和交叉验证集
一、决定下一步做什么 在机器学习的学习过程中,我们已经接触了许多不同的学习算法,逐渐深入了解了先进的机器学习技术。然而,即使在了解了这些算法的情况下,仍然存在一些差距,有些人能够高效而有力地运用这些算法&…...
大数据-之LibrA数据库系统告警处理(ALM-37000 MPPDBServer数据目录或Redo目录缺失)
告警解释 当出现如下情况时,产生该告警: 数据实例数据目录被删除。数据实例Redo目录(pg_xlog)被删除。 告警属性 告警ID 告警级别 可自动清除 37000 严重 是 告警参数 参数名称 参数含义 ServiceName 产生告警的服务…...
华为eNSP使用教程(Enterprise Network Simulation Platform,企业网络仿真平台)
文章目录 华为eNSP使用教程详解引言eNSP界面快速入门启动与初始设置主界面组成创建和管理项目 构建网络拓扑添加和连接设备配置设备参数示例:配置设备接口IP 保存配置 仿真网络功能启动与测试示例:测试网络连通性 使用调试工具 疑难技术点解析路由协议配…...
19.Spring如何处理线程并发问题?
Spring如何处理线程并发问题? 在一般情况下,只有无状态的Bean才可以在多线程环境下共享,在Spring中,绝大部分Bean都可以声明为singleton作用域,因为Spring对一些Bean中非线程安全状态采用ThreadLocal进行处理,解决线程安全问题。 ThreadLocal和线程同步机制都是为了解决多…...

Python办公神器:教你如何快速分拆、删页、合并PDF文件
哈喽大家好,我是了不起,今天教你如何用Python快速分拆、删页、合并PDF文件 介绍 有时我们可能需要对PDF文件进行一些处理,例如分拆、删页、合并等。这些操作在一些专业的PDF软件中可能比较容易实现,但是如果我们想要用Python来自…...

Android aidl的简单使用
一.服务端 1.创建aidl文件,然后记得build下生成java文件 package com.example.aidlservice31;// Declare any non-default types here with import statementsinterface IMyAidlServer {// 接收一个字符串参数void setData(String value);// 返回一个字符串String …...

双十一备战与复盘
如何组织备战 重要节点 从大促启动会开始后我就开始计划我们本次备战的整体节奏。 挑战在哪 以上内容介绍了CDP平台有多么重要,那么画像系统备战的核心挑战在“如何保障在大流量高并发情况下系统稳定提供高性能服务”,主要表现在:稳定性、…...

ONNX实践系列-修改yolov5-seg的proto分支输出shape
一、目标 本文主要介绍要将原始yolov5分割的输出掩膜从[b,c,h,.w]修改为[b, h, w, c] 原来的: 目标的: 代码如下: Descripttion: version: @Company: WT-XM Author: yang jinyi Date: 2023-09-08 11:26:28 LastEditors: yang jinyi LastEditTime: 2023-09-08 11:48:01 …...

VMware与Linux安装
VM与Linux安装 1、安装VMware 这里安装Vm主要是为了安装Linux系统,除了相对云服务器,比较大众化的操作,当然更多的是熟悉Linux操作 1、Windows安装 (1) 下载链接,目前版本上下载VM15的版本即可https://www.vmware.com/p…...

服务器连接github
https://zhuanlan.zhihu.com/p/543490354 比着这个一步步做就行。 https://blog.l0v0.com/posts/94ffdbdf.html 上传文件可以看这个 注意: 密钥ssh-keygen设置好之后,以后就不用每次输入账号密码才能访问了。 otherwise,每次要输入账号密码。…...

自动驾驶中的LFM(LED 闪烁缓解)问题
自动驾驶中的LFM Reference: 自动驾驶系统如何跨越LFM这道坎? 从路灯、交通灯,到车载照明,低功耗、长寿命、高可靠的 LED 正在快速取代传统照明方式。但 LED 在道路上的普遍使用,却带来“LED闪烁”现象。“LED闪烁”是由 LED 驱…...
ArkTS-页面和自定义组件生命周期
页面生命周期:被Entry装饰的组件生命周期 onPageShow:页面每次显示时触发一次,包括路由过程、应用进入前台等场景onPageHide:页面每次隐藏时触发一次,包括路由过程、应用进入前后台等场景onBackPress:当用户…...
ELK: logstash gork filter 多个模式(pattern)匹配规则语法和多行日志匹配设置
项目里用logstash分析日志,由于有多种模式(pattern)需要匹配,网上搜了很多示例,发现这些都是老的写法,都会报错,后来查阅了官方文档,才发现,新版本只支持新语法。 错误的…...

Ubuntu20.04上编译安装TVM
本文主要讲述如何在ubuntu20.04平台上编译TVM代码并在python中import tvm成功。 源代码下载: git clone --recursive https://github.com/apache/tvm tvm 平台环境升级: 1) sudo apt-get update 2) sudo apt-get install -y pyth…...

伦敦金现图形态分析(深度好文)
对价格行为交易者来说,伦敦金价走势图表中的一些特殊形态,能够带来比较靠谱的交易信号。然而交易并不只和形态有关,也和我们能够从图表形态中阅读到什么,以及如何理解其他交易者对价格波动的推动有关。 在对伦敦金走势图的技术形态…...

慕尼黑电子展采访全程 | Samtec管理层对话电子发烧友:虎家卓越服务
【摘要/前言】 今年的慕尼黑上海电子展上,Samtec大放异彩,特装展台一亮相就获得了大家的广泛关注,展台观众络绎不绝。 作为深耕连接器行业数十年的知名厂商以及Electronica的常客,Samtec毫无疑问地获得了大量媒体朋友的关注和报…...

APP外包项目维护方案
APP项目维护是确保应用程序持续运行、安全性和性能不断优化的关键活动。以下是一个综合的APP项目维护方案,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 1.定期性能监控和优化: 使用性能…...

Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...

el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...