当前位置: 首页 > news >正文

kafka 3.5.0 raft协议安装

前言

最近做项目,需要使用kafka进行通信,且只能使用kafka,笔者没有测试集群,就自己搭建了kafka集群,实际上笔者在很早之前就搭建了,因为当时还是zookeeper(简称ZK)注册元数据,现在新版kafka(3.0.0开始)已经自带了元数据能力(使用raft协议)减少了kafka对zk的依赖性。笔者在查询资料发现,说jdk至少jdk11,实测jdk8也能运行,且并不需要网上说的3+4节点,3+3即可,当然理论上broker节点越多越好,但是元数据节点建议3、5个最合适,raft的过半一致性和容错性的综合取舍。

准备

准备kafka安装包:Apache Kafka

笔者使用的kafka 3.5.0和scala 2.13,采用3台虚拟机,当然容器也不是不行,注意持久化pv pvc和配置的管理(ip换成域名,dns的切换支持),中间件建议使用虚拟机,可以降低很多容错性。

jdk使用open jdk,配置java_home和path,以Ubuntu为例

 sudo apt install openjdk-8-jdk-headless

以macOS为例,创建一个ubuntu-server 最小安装的虚拟机(vmware,毕竟个人使用不要钱),然后安装openssh 和 openjdk,然后shutdown now

网络选择桥接,相当于一台“真实在”网络上的一台物理机

这样就得到了

192.168.0.108

192.168.0.107

192.168.0.106

3台虚拟机

步骤

先看kafka集群的架构图,实际上安装的过程就是架构图的执行过程

 

从图中可以看出已经没有zk的存在了,从kafka节点自己管理元数据,通过raft协议选主的方式。

1. kafka的准备

上传kafka安装包,必须是二进制安装包,不要源码包,编译比较麻烦,然后解压

tar -zxvf  kafka_2.13-3.5.0.gz

查看配置目录会发现

明显多了kraft的配置目录,那么如果使用kafka raft元数据中心,则需要修改kraft目录,启动时指定kraft目录的配置

2. 配置修改

raft协议实际上跟zk差不多,使用raft协议的中间件就太多了,但是本质上每个节点都需要一个唯一id,zk也是如此,所以kafka kraft就相当于集成的zk。

在kraft下的有3个文件文件,其中启动相关的是server.properties中

执行配置修改


# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
node.id=1# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

每一行都有注释,重点关注

笔者设定

192.168.0.106 nodeid 1 

192.168.0.107 nodeid 2

192.168.0.108 nodeid 3

至此配置基本上完成,同理一个节点可以同时是controller和broker,也可以仅仅是controller或者broker,因为controller的负载比较轻,所以一般是和broker一起。其中有个log.dir这个的路径是下面元数据生成的路径(选主)和数据事务日志,索引日志的存储目录

3. 启动

1. 生成uuid

任意找一个节点执行:

./kafka-storage.sh random-uuid

每次执行uuid会不一样,这个uuid标识是一个集群,所以所有节点公用一个uuid,不要每个节点重新生成,会识别不了 

 

然后执行format,如下标红是我生成的,这个每次不是固定的

 ./kafka-storage.sh format -t gZzkfRm4T1y8wSAY-ZNG5Q -c ../config/kraft/server.properties  

 格式化配置文件,同步其他节点

配置文件有什么变化?在日志配置的目录下出现

关键还是meta的文件,有集群id和节点id,版本号,这个对启动至关重要。

即在上面的log.dir的目录生成,所以尽量不能使用临时目录

2. 启动

启动就很简单了,使用刚刚配置的server.properties执行启动即可

./kafka-server-start.sh -daemon ../config/kraft/server.properties

不过为了方便查看启动日志,建议执行日志的console文件输出

 先看事务日志和索引

验证

验证很简单,查看bin同级目录下的日志即可

日志带有[2025-02-08 08:34:12,286] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) 

如果生成用途可以安装kafka的控制台,kafka-ui,不过我这里就不安装了,因为docker安装比较容易。

总结

kafka从3.0.0开始推出了raft模式的元数据中心,实际上类似zk,kafka自己命名kraft。使用这种方式搭建kafka集群将不再需要zk,同理,kafka的集群的每个节点可以同时是broker和controller(以前zk充当),也可以是单独的broker,controller(负载不重,不建议单独controller,跟zk没区别),官方说明需要jdk11及以上,实测jdk8可以运行,但是生成建议严格按照官方标定的jdk执行,jdk是向下兼容的,但是不确定是否会涉及新api或新特性的使用。

另外实际使用中,可能会涉及使用iptables做nat限制kafka的连接方,比如在kafka节点通过iptables限制发送者或者消费端的ip

iptables -t nat -A PREROUTING -p tcp -m tcp --dport 9093 -j DNAT --to-destination kafkaxxx:9093

kafkaxxx --- 指定的是 Kafka 服务所在的机器地址

如果kafka是对接方提供,则在nat打通时,需要客户端连接的服务器也执行iptables,否则可能出现连接kafka正常,但是不能消费。

iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 9093 -j SNAT --to-source natxxx

natxxx --- 指定的是配置 iptables 的本机的地址

相关文章:

kafka 3.5.0 raft协议安装

前言 最近做项目,需要使用kafka进行通信,且只能使用kafka,笔者没有测试集群,就自己搭建了kafka集群,实际上笔者在很早之前就搭建了,因为当时还是zookeeper(简称ZK)注册元数据&#…...

后台管理系统网页开发

CSS样式代码 /* 后台管理系统样式文件 */ #container{ width:100%; height:100%; /* background-color:antiquewhite;*/ display:flex;} /* 左侧导航区域:宽度300px*/ .left{ width:300px; height: 100%; background-color:#203453; display:flex; flex-direction:column; jus…...

使用一个大语言模型对另一个大语言模型进行“调教”

使用一个大语言模型对另一个大语言模型进行“调教”(通常称为微调或适配),是一种常见的技术手段,用于让目标模型更好地适应特定的任务、领域或风格。以下是基于搜索结果整理的详细步骤和方法: 1.准备工作 安装必要的…...

golang使用sqlite3,开启wal模式,并发读写

因为sqlite是基于文件的,所以默认情况下,sqlite是不支持并发读写的,即写操作会阻塞其他操作,同时sqlite也很容易就产生死锁。 但是作为一个使用广泛的离线数据库,从sqlite3.7.0版本开始(SQLite Release 3.…...

如何利用maven更优雅的打包

最近在客户现场部署项目,有两套环境,无法连接互联网,两套环境之间也是完全隔离,于是问题就来了,每次都要远程到公司电脑改完代码,打包,通过网盘(如果没有会员,上传下载慢…...

音频进阶学习十二——Z变换一(Z变换、收敛域、性质与定理)

文章目录 前言一、Z变换1.Z变换的作用2.Z变换公式3.Z的状态表示1&#xff09; r 1 r1 r12&#xff09; 0 < r < 1 0<r<1 0<r<13&#xff09; r > 1 r>1 r>1 4.关于Z的解释 二、收敛域1.收敛域的定义2.收敛域的表示方式3.ROC的分析1&#xff09;当 …...

cursor指令工具

Cursor 工具使用指南与实例 工具概览 Cursor 提供了一系列强大的工具来帮助开发者提高工作效率。本指南将通过具体实例来展示这些工具的使用方法。 1. 目录文件操作 1.1 查看目录内容 (list_dir) 使用 list_dir 命令可以查看指定目录下的文件结构: 示例: list_dir log…...

MySQL 主从读写分离实现方案(一)—MariaDB MaxScale实现mysql8读写分离

一&#xff1a;MaxScale 是干什么的&#xff1f;? MaxScale是maridb开发的一个mysql数据中间件&#xff0c;其配置简单&#xff0c;能够实现读写分离&#xff0c;并且可以根据主从状态实现写库的自动切换&#xff0c;对多个从服务器能实现负载均衡。 二&#xff1a;MaxScale …...

阿里云 | DeepSeek人工智能大模型安装部署

ModelScope是阿里云人工智能大模型开源社区 ModelScope网络链接地址 https://www.modelscope.cn DeepSeek模型库网络链接地址 https://www.modelscope.cn/organization/deepseek-ai 如上所示&#xff0c;在阿里云人工智能大模型开源社区ModelScope中&#xff0c;使用阿里云…...

LLAMA-Factory安装教程(解决报错cannot allocate memory in static TLS block的问题)

步骤一&#xff1a; 下载基础镜像 # 配置docker DNS vi /etc/docker/daemon.json # daemon.json文件中 { "insecure-registries": ["https://swr.cn-east-317.qdrgznjszx.com"], "registry-mirrors": ["https://docker.mirrors.ustc.edu.c…...

STM32 CUBE Can调试

STM32 CUBE Can调试 1、CAN配置2、时钟配置3、手动添加4、回调函数5、启动函数和发送函数6、使用方法(采用消息队列来做缓存)7、数据不多在发送函数中获取空邮箱发送&#xff0c;否则循环等待空邮箱 1、CAN配置 2、时钟配置 3、手动添加 需要注意的是STM32CUBE配置的代码需要再…...

MySQL数据存储- 索引组织表

索引组织表 前言数据存储堆表索引组织表 二级索引二级索引的性能评估&#x1f539;为什么 idx_name 的性能开销最大&#xff1f;&#x1f539; 为什么 idx_last_modify_date 更新频繁会影响性能&#xff1f;分析二级索引性能表格为什么主键应该“紧凑且顺序”&#xff1f;二级索…...

基于STM32设计的仓库环境监测与预警系统

目录 项目开发背景设计实现的功能项目硬件模块组成设计思路系统功能总结使用的模块的技术详情介绍总结 1. 项目开发背景 随着工业化和现代化的进程&#xff0c;尤其是在制造业、食品业、医药业等行业&#xff0c;仓库环境的监控和管理成为了至关重要的一环。尤其是在存储易腐…...

VSCode便捷开发

一、常用插件 Vue 3 Snippets、Vetur、Vue - Official 二、常用开发者工具 三、Vue中使用Element-UI 安装步骤&#xff1a; 1、在VSCode的终端执行如下指令&#xff1a; npm i element-ui -S 2、在main.js中全局引入&#xff1a; import Vue from vue; import ElementUI from …...

理解 Maven 的 pom.xml 文件

pom.xml 是 Maven 项目的核心文件&#xff0c;它是项目构建、依赖管理、插件配置和项目元数据的主要地方。通过 pom.xml 文件&#xff0c;Maven 知道如何构建项目、下载依赖库、执行测试等任务。每个 Maven 项目都必须包含一个 pom.xml 文件。本文将详细讲解 pom.xml 文件的结构…...

docker数据持久化的意义

Docker 数据持久化是指在 Docker 容器中保存的数据不会因为容器的停止、删除或重启而丢失。Docker 容器本身是临时性的&#xff0c;默认情况下&#xff0c;容器内的文件系统是临时的&#xff0c;容器停止或删除后&#xff0c;其中的数据也会随之丢失。为了确保重要数据&#xf…...

opentelemetry-collector 配置elasticsearch

一、修改otelcol-config.yaml receivers:otlp:protocols:grpc:endpoint: 0.0.0.0:4317http:endpoint: 0.0.0.0:4318 exporters:debug:verbosity: detailedotlp/jaeger: # Jaeger supports OTLP directlyendpoint: 192.168.31.161:4317tls:insecure: trueotlphttp/prometheus: …...

ASP.NET Core JWT Version

目录 JWT缺点 方案 实现 Program.cs IdentityHelper.cs Controller NotCheckJWTVersionAttribute.cs JWTVersionCheckkFilter.cs 优化 JWT缺点 到期前&#xff0c;令牌无法被提前撤回。什么情况下需要撤回&#xff1f;用户被删除了、禁用了&#xff1b;令牌被盗用了&…...

【ArcGIS】R语言空间分析、模拟预测与可视化技术

R语言在空间数据挖掘中具有广泛的应用&#xff0c;以下是一些关键内容和常用包的介绍&#xff1a; R语言空间数据挖掘的关键技术 空间数据类型 矢量数据&#xff1a;包括点&#xff08;Point&#xff09;、线&#xff08;Line&#xff09;、面&#xff08;Polygon&#xff09;等…...

日常知识点之面试后反思遗留问题汇总

梳理一下最近接触到的几个知识点&#xff1a; 1&#xff1a;突然问到端口复用 &#xff08;SO_REUSEADDR&#xff09; 端口复用一般用在服务端重启时&#xff0c;套接字处于time_wait状态时&#xff0c;无法绑定该端口&#xff0c;导致无法启动问题。 设置端口复用&#xff…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

uniapp中使用aixos 报错

问题&#xff1a; 在uniapp中使用aixos&#xff0c;运行后报如下错误&#xff1a; AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

命令行关闭Windows防火墙

命令行关闭Windows防火墙 引言一、防火墙:被低估的"智能安检员"二、优先尝试!90%问题无需关闭防火墙方案1:程序白名单(解决软件误拦截)方案2:开放特定端口(解决网游/开发端口不通)三、命令行极速关闭方案方法一:PowerShell(推荐Win10/11)​方法二:CMD命令…...