kafka集群搭建-使用zookeeper
1.环境准备:
使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093
2.下载地址
去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。
# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
3.软件包下载解压
在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。
cd /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka
4.修改配置
需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。
cd kafka
vim conf/server.properties
修改内容如下,
# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1# 默认为9092,云服务器开放端口问题,改为了8093
port=8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093
advertised.listeners=PLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs=/tmp/kafka-logs
5.启动kafka集群
分别在每个节点的bin路径下执行启动脚本
# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties
使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。
6.创建topic`
# 创建topic,在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3
# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list
7.模拟生产消费消息
需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。
# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
生产者客户端发送hello

此时可以看到2个消费者模拟客户端都受到了消息:hello

8.集成springboot
坐标如下:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
kafka配置类:
package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {@Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")private String kafkaServers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}
}
消费者客户端:
@KafkaListener(topics = {"topic-demo"},groupId = "test1",properties = {"auto-offset-reset:latest", "enable.auto.commit:true"})public void listen(ConsumerRecord<String, String> consumerRecord) {log.info("consumer Received: " + consumerRecord);}
生产者发送消息:
@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@PostMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");SendResult<String, String> sendResult = hello_kafka.completable().join();System.out.println(sendResult);return "send topic: " + topic + ", msg: " + msg;}
}
发送测试:

消费者可以接收到消息:
consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
9.IDEA客户端工具
可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic

相关文章:
kafka集群搭建-使用zookeeper
1.环境准备: 使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093 2.下载地址 去官网下载,或者使用如…...
【python】Numpy运行报错分析:IndexError与形状不匹配问题
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...
你有多自律就有多自由
当你失去对时间的控制权,生活也就失去了平衡。 真正对自己有要求的人,都是高度自律的人。 追求自己想要的生活,任何时候开始都不会晚,关键在于你能够坚持下去,以高度自律的精神,日复一日、年复一年的坚持下…...
Codeforces Round 959 (Div. 1 + Div. 2 ABCDEFG 题) 文字讲解+视频讲解
Problem A. Diverse Game Statement 给定 n m n\times m nm 的矩形 a a a, a a a 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互不相同。求出 n m n\times m nm 的矩形 b b b, b b b 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互…...
WSL2 Centos7 Docker服务启动失败怎么办?
wsl 安装的CentOS7镜像,安装了Docker之后,发现用systemctl start docker 无法将docker启动起来。 解决办法 1、编辑文件 vim /usr/lib/systemd/system/docker.service将13行注释掉,然后在下面新增14行的内容。然后保存退出。 2、再次验证 可以发现,我们已经可以正常通过s…...
分布式锁-redisson锁重试和WatchDog机制
抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同。 1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null 2、判断当前这把锁是否是属于当前线程,如果是&a…...
ESP8266模块(2)
实例1 查看附近的WiFi 步骤1:进入AT指令模式 使用USB转串口适配器将ESP8266模块连接到电脑。打开串口终端软件,并设置正确的串口和波特率(通常为115200)。输入以下命令并按回车确认: AT如果模块响应OK,…...
Docker安装笔记
1. Mac安装Docker 1.1 Docker安装包下载 1.1.1 阿里云 对于10.10.3以下的用户 推荐使用 对于10.10.3以上的用户 推荐使用 1.1.2 官网下载 系统和芯片选择适合自己的安装包 1.2 镜像加速 【推荐】阿里镜像 登陆后,左侧菜单选中镜像加速器就可以看到你的专属地…...
《昇思25天学习打卡营第21天|Pix2Pix实现图像转换》
Pix2Pix 是一种图像转换模型,使用条件生成对抗网络(Conditional Generative Adversarial Networks,cGANs)实现图像到图像的转换。它主要由生成器(Generator)和判别器(Discriminator)…...
Python和MATLAB网络尺度结构和幂律度大型图生成式模型算法
🎯要点 🎯算法随机图模型数学概率 | 🎯图预期度序列数学定义 | 🎯生成具有任意指数的大型幂律网络,数学计算幂律指数和平均度 | 🎯随机图分析中巨型连接分量数学理论和推论 | 🎯生成式多层网络…...
在jsPsych中使用Vue
jspsych 介绍 jsPsych是一个非常好用的心理学实验插件,可以用来构建心理学实验。具体的就不多介绍了,大家可以去看官网:https://www.jspsych.org/latest/ 但是大家在使用时就会发现,这个插件只能使用js绘制界面,或者…...
机器学习·概率论基础
概率论 概率基础 这部分太简单,直接略过 条件概率 独立性 独立事件A和B的交集如下 非独立事件 非独立事件A和B的交集如下 贝叶斯定理 先验 事件 后验 在概率论和统计学中,先验概率和后验概率是贝叶斯统计的核心概念 简单来说后验概率就是结合了先验概…...
c生万物系列(面向对象:封装)
本系列博客主要介绍c语言的一些屠龙技,里面包含了笔者本人的一些奇思妙想。 该系列博客笔者只是用作记录。如果你偶然找到了这篇博客,但是发现不知所云,请不要过多投入时间,可能笔者本人那时候也看不懂了。 笔者决定用c语言模仿…...
当当网数据采集:Scrapy框架的异步处理能力
在互联网数据采集领域,Scrapy框架以其强大的异步处理能力而著称。Scrapy利用了Python的异步网络请求库,如twisted,来实现高效的并发数据采集。本文将深入探讨Scrapy框架的异步处理能力,并展示如何在当当网数据采集项目中应用这一能…...
React——useEffect和自定义useUpdateEffect
useEffect 是React的一个内置Hook,用于在组件渲染后执行副作用(例如数据获取、订阅或手动更改DOM)。它将在第一次渲染后和每次更新后都会执行。 useEffect(() > {// 这里的代码将在组件挂载和更新时执行。 }, [dependencies]); // depend…...
Hadoop大数据处理架构中ODB、DIM、DWD、DWS
在Hadoop的大数据处理架构中,ODS、DIM、DWD和DWS分别代表了数据仓库体系中不同的层次和功能。下面解释这几个概念: ODS (Operational Data Store) 想象你有一家超市,每天营业结束后,你会把当天所有的销售记录、顾客信息、商品库…...
【刷题汇总 -- 爱丽丝的人偶、集合、最长回文子序列】
C日常刷题积累 今日刷题汇总 - day0211、爱丽丝的人偶1.1、题目1.2、思路1.3、程序实现 2、集合2.1、题目2.2、思路2.3、程序实现 -- set 3、最长回文子序列3.1、题目3.2、思路3.3、程序实现 -- dp 4、题目链接 今日刷题汇总 - day021 1、爱丽丝的人偶 1.1、题目 1.2、思路 …...
基于vue3 + vite产生的 TypeError: Failed to fetch dynamically imported module
具体参考这篇衔接: Vue3报错:Failed to fetch dynamically imported module-CSDN博客 反正挺扯淡的,错误来源于基于ry-vue-plus来进行二次开发的时候遇到的问题。 错误起因 我创建了一个广告管理页面。然后发现访问一直在加载中。报的是这样…...
批量自动添加好友,高效拓展人脉圈.
随着微信使用数量的不断增加,手动添加好友成为了一项耗时且繁琐的任务。为了帮助大家解决这个问题,下面分享一款高效的微信管理系统,它能够帮助你实现批量自动添加好友,极大提升了人脉拓展的效率。 这款微信管理系统可以同时管理多…...
Web开发:一个可拖拽的模态框(HTML、CSS、JavaScript)
目录 一、需求描述 二、实现效果 三、完整代码 四、实现过程 1、HTML 页面结构 2、CSS 元素样式 3、JavaScript动态控制 (1)获取元素 (2)显示\隐藏遮罩层与模态框 (3)实现模态框拖动效果 一、需求…...
接口测试,接口间数据传递,数组和字符串类型
一、接口传递说明接口1:输出如下接口2:输入如下:接口2的入参employeeId和userName需要从接口1的出参中获取二、解决方案ApiFox脚本:1、接口1后置操作:设置环境变量如下:var employeeList pm.response.json().data[0].employeeLis…...
告别盲目下载:用STM32CubeIDE仿真功能在电脑上预演你的硬件行为
告别盲目下载:用STM32CubeIDE仿真功能在电脑上预演你的硬件行为 在嵌入式开发领域,每一次将程序烧录到硬件的过程都像是一次小小的冒险——你永远无法百分百确定代码在真实硬件上会如何表现。对于使用STM32系列芯片的开发者来说,这种不确定性…...
嵌入式C语言核心技术与经典书籍推荐
C语言学习必读经典书籍推荐与核心知识点解析1. C语言在嵌入式开发中的核心地位C语言作为嵌入式系统开发的基石语言,具有直接操作硬件、执行效率高、可移植性强等显著优势。在资源受限的嵌入式环境中,熟练掌握C语言是开发高效可靠嵌入式系统的必备技能。1…...
OpenClaw新手入门:Qwen3.5-9B镜像一键部署与基础配置
OpenClaw新手入门:Qwen3.5-9B镜像一键部署与基础配置 1. 为什么选择Qwen3.5-9B作为OpenClaw的"大脑"? 去年冬天,当我第一次尝试用OpenClaw自动化处理周报时,发现默认的小模型经常把"会议纪要"理解成"会…...
MDK分散加载文件(.sct)解析与嵌入式内存管理
MDK分散加载文件(.sct)剖析及应用1. 项目概述1.1 分散加载概念分散加载(Scatter Loading)是一种允许开发者精确控制代码和数据在存储器中布局的技术。通过分散加载文件,我们可以指定程序的特定部分(如代码段、数据段)在存储器的特定地址空间运…...
Folo信息整理神器:如何告别碎片化阅读,轻松构建专属知识库?
Folo信息整理神器:如何告别碎片化阅读,轻松构建专属知识库? 【免费下载链接】follow [WIP] Next generation information browser 项目地址: https://gitcode.com/GitHub_Trending/fol/follow 每天被数十个APP推送轰炸,有价…...
Dify + Weaviate/Qdrant混合重排架构实践(支持动态权重调度、Fallback降级与A/B测试埋点)
第一章:Dify重排序架构的核心设计哲学Dify 的重排序(Reranking)模块并非简单叠加于检索之后的后处理步骤,而是在整个 LLM 应用生命周期中承担语义对齐、意图强化与可信度校准三重使命的设计原语。其核心哲学可凝练为:*…...
Java 新纪元 — JDK 25 + Spring Boot 4 全栈实战(十九):微服务实战——Boot 4 + Spring Cloud 2026.x,构建高并发电商分布式系统
系列导航 | ← 上一篇:D18 云原生部署:Docker + K8s + GraalVM | 下一篇:D20 Spring Security 7.x + JDK 25加密升级 → 适用读者:正在做微服务架构设计或升级的中高级开发者,有一定Spring Cloud经验。 前置知识:了解Spring Boot基础、Docker/K8s基础(D17-D18)、分布式…...
SDMatte镜像国产化适配:昇腾/海光平台移植可行性评估
SDMatte镜像国产化适配:昇腾/海光平台移植可行性评估 1. 项目背景与技术特点 SDMatte是一款专注于高质量图像抠图的AI模型,特别擅长处理复杂边缘和半透明物体的提取任务。该模型在电商、设计、内容创作等领域具有广泛应用价值,能够高效完成…...
高德API+ECharts实战:5分钟搞定最新行政区划地图可视化(附乡镇级GeoJSON下载)
高德API与ECharts融合实战:行政区划地图极速可视化指南 当我们面对需要展示行政区划变动的需求时,往往会遇到数据过时、格式不兼容等问题。本文将手把手教你如何利用高德API和ECharts,在5分钟内构建一个支持乡镇级数据展示的动态地图可视化方…...
