kafka集群搭建-使用zookeeper
1.环境准备:
使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093
2.下载地址
去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。
# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
3.软件包下载解压
在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。
cd /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka
4.修改配置
需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。
cd kafka
vim conf/server.properties
修改内容如下,
# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1# 默认为9092,云服务器开放端口问题,改为了8093
port=8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093
advertised.listeners=PLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs=/tmp/kafka-logs
5.启动kafka集群
分别在每个节点的bin路径下执行启动脚本
# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties
使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。
6.创建topic`
# 创建topic,在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3
# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list
7.模拟生产消费消息
需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。
# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
生产者客户端发送hello

此时可以看到2个消费者模拟客户端都受到了消息:hello

8.集成springboot
坐标如下:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
kafka配置类:
package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {@Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")private String kafkaServers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}
}
消费者客户端:
@KafkaListener(topics = {"topic-demo"},groupId = "test1",properties = {"auto-offset-reset:latest", "enable.auto.commit:true"})public void listen(ConsumerRecord<String, String> consumerRecord) {log.info("consumer Received: " + consumerRecord);}
生产者发送消息:
@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@PostMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");SendResult<String, String> sendResult = hello_kafka.completable().join();System.out.println(sendResult);return "send topic: " + topic + ", msg: " + msg;}
}
发送测试:

消费者可以接收到消息:
consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
9.IDEA客户端工具
可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic

相关文章:
kafka集群搭建-使用zookeeper
1.环境准备: 使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093 2.下载地址 去官网下载,或者使用如…...
【python】Numpy运行报错分析:IndexError与形状不匹配问题
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...
你有多自律就有多自由
当你失去对时间的控制权,生活也就失去了平衡。 真正对自己有要求的人,都是高度自律的人。 追求自己想要的生活,任何时候开始都不会晚,关键在于你能够坚持下去,以高度自律的精神,日复一日、年复一年的坚持下…...
Codeforces Round 959 (Div. 1 + Div. 2 ABCDEFG 题) 文字讲解+视频讲解
Problem A. Diverse Game Statement 给定 n m n\times m nm 的矩形 a a a, a a a 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互不相同。求出 n m n\times m nm 的矩形 b b b, b b b 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互…...
WSL2 Centos7 Docker服务启动失败怎么办?
wsl 安装的CentOS7镜像,安装了Docker之后,发现用systemctl start docker 无法将docker启动起来。 解决办法 1、编辑文件 vim /usr/lib/systemd/system/docker.service将13行注释掉,然后在下面新增14行的内容。然后保存退出。 2、再次验证 可以发现,我们已经可以正常通过s…...
分布式锁-redisson锁重试和WatchDog机制
抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同。 1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null 2、判断当前这把锁是否是属于当前线程,如果是&a…...
ESP8266模块(2)
实例1 查看附近的WiFi 步骤1:进入AT指令模式 使用USB转串口适配器将ESP8266模块连接到电脑。打开串口终端软件,并设置正确的串口和波特率(通常为115200)。输入以下命令并按回车确认: AT如果模块响应OK,…...
Docker安装笔记
1. Mac安装Docker 1.1 Docker安装包下载 1.1.1 阿里云 对于10.10.3以下的用户 推荐使用 对于10.10.3以上的用户 推荐使用 1.1.2 官网下载 系统和芯片选择适合自己的安装包 1.2 镜像加速 【推荐】阿里镜像 登陆后,左侧菜单选中镜像加速器就可以看到你的专属地…...
《昇思25天学习打卡营第21天|Pix2Pix实现图像转换》
Pix2Pix 是一种图像转换模型,使用条件生成对抗网络(Conditional Generative Adversarial Networks,cGANs)实现图像到图像的转换。它主要由生成器(Generator)和判别器(Discriminator)…...
Python和MATLAB网络尺度结构和幂律度大型图生成式模型算法
🎯要点 🎯算法随机图模型数学概率 | 🎯图预期度序列数学定义 | 🎯生成具有任意指数的大型幂律网络,数学计算幂律指数和平均度 | 🎯随机图分析中巨型连接分量数学理论和推论 | 🎯生成式多层网络…...
在jsPsych中使用Vue
jspsych 介绍 jsPsych是一个非常好用的心理学实验插件,可以用来构建心理学实验。具体的就不多介绍了,大家可以去看官网:https://www.jspsych.org/latest/ 但是大家在使用时就会发现,这个插件只能使用js绘制界面,或者…...
机器学习·概率论基础
概率论 概率基础 这部分太简单,直接略过 条件概率 独立性 独立事件A和B的交集如下 非独立事件 非独立事件A和B的交集如下 贝叶斯定理 先验 事件 后验 在概率论和统计学中,先验概率和后验概率是贝叶斯统计的核心概念 简单来说后验概率就是结合了先验概…...
c生万物系列(面向对象:封装)
本系列博客主要介绍c语言的一些屠龙技,里面包含了笔者本人的一些奇思妙想。 该系列博客笔者只是用作记录。如果你偶然找到了这篇博客,但是发现不知所云,请不要过多投入时间,可能笔者本人那时候也看不懂了。 笔者决定用c语言模仿…...
当当网数据采集:Scrapy框架的异步处理能力
在互联网数据采集领域,Scrapy框架以其强大的异步处理能力而著称。Scrapy利用了Python的异步网络请求库,如twisted,来实现高效的并发数据采集。本文将深入探讨Scrapy框架的异步处理能力,并展示如何在当当网数据采集项目中应用这一能…...
React——useEffect和自定义useUpdateEffect
useEffect 是React的一个内置Hook,用于在组件渲染后执行副作用(例如数据获取、订阅或手动更改DOM)。它将在第一次渲染后和每次更新后都会执行。 useEffect(() > {// 这里的代码将在组件挂载和更新时执行。 }, [dependencies]); // depend…...
Hadoop大数据处理架构中ODB、DIM、DWD、DWS
在Hadoop的大数据处理架构中,ODS、DIM、DWD和DWS分别代表了数据仓库体系中不同的层次和功能。下面解释这几个概念: ODS (Operational Data Store) 想象你有一家超市,每天营业结束后,你会把当天所有的销售记录、顾客信息、商品库…...
【刷题汇总 -- 爱丽丝的人偶、集合、最长回文子序列】
C日常刷题积累 今日刷题汇总 - day0211、爱丽丝的人偶1.1、题目1.2、思路1.3、程序实现 2、集合2.1、题目2.2、思路2.3、程序实现 -- set 3、最长回文子序列3.1、题目3.2、思路3.3、程序实现 -- dp 4、题目链接 今日刷题汇总 - day021 1、爱丽丝的人偶 1.1、题目 1.2、思路 …...
基于vue3 + vite产生的 TypeError: Failed to fetch dynamically imported module
具体参考这篇衔接: Vue3报错:Failed to fetch dynamically imported module-CSDN博客 反正挺扯淡的,错误来源于基于ry-vue-plus来进行二次开发的时候遇到的问题。 错误起因 我创建了一个广告管理页面。然后发现访问一直在加载中。报的是这样…...
批量自动添加好友,高效拓展人脉圈.
随着微信使用数量的不断增加,手动添加好友成为了一项耗时且繁琐的任务。为了帮助大家解决这个问题,下面分享一款高效的微信管理系统,它能够帮助你实现批量自动添加好友,极大提升了人脉拓展的效率。 这款微信管理系统可以同时管理多…...
Web开发:一个可拖拽的模态框(HTML、CSS、JavaScript)
目录 一、需求描述 二、实现效果 三、完整代码 四、实现过程 1、HTML 页面结构 2、CSS 元素样式 3、JavaScript动态控制 (1)获取元素 (2)显示\隐藏遮罩层与模态框 (3)实现模态框拖动效果 一、需求…...
嵌入式Linux开发:手把手教你交叉编译全套WiFi工具链(iwconfig, iw, wpa_supplicant, hostapd)
嵌入式Linux WiFi工具链深度实战:从交叉编译到系统集成 在嵌入式Linux开发中,WiFi功能实现往往是最具挑战性的环节之一。不同于桌面环境,嵌入式设备需要从底层开始构建完整的无线网络栈,这涉及到多个工具的协同工作。本文将带你深…...
C++ 算法实战:从鸡兔同笼到多元方程求解的编程思维演进
1. 从鸡兔同笼开始理解算法思维 记得第一次接触鸡兔同笼问题时,我正啃着铅笔头对着数学作业发愁。题目说笼子里有35个头和94只脚,问鸡和兔各有多少只。这个看似简单的应用题,后来竟成了我算法思维的启蒙老师。 用C解决这个问题时,…...
从强化学习视角看HDP:ADP中的Actor-Critic框架到底怎么工作的?
从强化学习视角看HDP:ADP中的Actor-Critic框架到底怎么工作的? 在控制理论与机器学习交叉领域,自适应动态规划(ADP)与强化学习(RL)的融合正催生新一代智能控制范式。当我们以RL从业者熟悉的Act…...
如何学习java?
目录 一. 初识Java 1. Java语⾔概述 1.1 Java是什么 1.2 什么是JavaSE?什么是JavaEE? JavaSE(JavaStandardEdition): JavaEE(JavaEnterprise Edition): 主要区别: 1.3 Java语⾔重要性 1.4 Java语⾔发展简史 1.5 Java语⾔特性 1.6 Java开发环境安装 1. …...
还在手动整理ai会议纪要浪费宝贵下班时间?2026年这4款真香AI工具3分钟搞定3小时会议
作为挖了快三年AI效率工具的爱好者,我上周刚被3小时项目复盘会的纪要搞到加班到九点,试了一圈新出的工具,直接给大家上结论:听脑AI是目前同类会议纪要工具里最值得用的,没有之一。 直达链接:https://iting…...
STM32CubeMX 实战指南:LL库外部中断配置与按键响应优化
1. STM32CubeMX与LL库外部中断入门 第一次接触STM32外部中断时,我被它的响应速度惊艳到了。相比轮询方式,中断能让CPU在按键按下瞬间立即响应,就像有个24小时待命的管家。STM32CubeMX这个图形化配置工具,把原本需要手动编写的底层…...
3步开启Windows实时语音转文字:TMSpeech离线语音识别完全指南
3步开启Windows实时语音转文字:TMSpeech离线语音识别完全指南 【免费下载链接】TMSpeech 腾讯会议摸鱼工具 项目地址: https://gitcode.com/gh_mirrors/tm/TMSpeech TMSpeech是一款专为Windows系统设计的开源实时语音识别工具,能够将电脑系统声音…...
Origin9.1绘图避坑指南:从数据导入到论文级.tif图保存的完整流程
Origin9.1科研绘图全流程避坑指南:从数据导入到论文级.tif输出 科研绘图是论文写作中不可或缺的一环,而Origin9.1作为经典的数据可视化工具,在学术界有着广泛的应用。然而,从原始数据到最终符合期刊要求的图表,这一过程…...
5分钟搞定!iperf3 Windows版:专业网络性能测试工具完全指南
5分钟搞定!iperf3 Windows版:专业网络性能测试工具完全指南 【免费下载链接】iperf3-win-builds iperf3 binaries for Windows. Benchmark your network limits. 项目地址: https://gitcode.com/gh_mirrors/ip/iperf3-win-builds 你是否曾经怀疑过…...
从‘监控谁’到‘如何查’:手把手教你用Prometheus标签玩转K8s监控数据筛选
从‘监控谁’到‘如何查’:手把手教你用Prometheus标签玩转K8s监控数据筛选 在Kubernetes集群监控领域,数据洪流是每个运维人员必须面对的挑战。当数百个Pod不断创建销毁时,传统静态配置的监控方式显得力不从心。这正是Prometheus标签系统大显…...
