kafka集群搭建-使用zookeeper
1.环境准备:
使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093
2.下载地址
去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。
# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
3.软件包下载解压
在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。
cd /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka
4.修改配置
需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。
cd kafka
vim conf/server.properties
修改内容如下,
# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1# 默认为9092,云服务器开放端口问题,改为了8093
port=8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093
advertised.listeners=PLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs=/tmp/kafka-logs
5.启动kafka集群
分别在每个节点的bin路径下执行启动脚本
# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties
使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。
6.创建topic`
# 创建topic,在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3
# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list
7.模拟生产消费消息
需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。
# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
生产者客户端发送hello

此时可以看到2个消费者模拟客户端都受到了消息:hello

8.集成springboot
坐标如下:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
kafka配置类:
package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {@Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")private String kafkaServers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}
}
消费者客户端:
@KafkaListener(topics = {"topic-demo"},groupId = "test1",properties = {"auto-offset-reset:latest", "enable.auto.commit:true"})public void listen(ConsumerRecord<String, String> consumerRecord) {log.info("consumer Received: " + consumerRecord);}
生产者发送消息:
@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@PostMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");SendResult<String, String> sendResult = hello_kafka.completable().join();System.out.println(sendResult);return "send topic: " + topic + ", msg: " + msg;}
}
发送测试:

消费者可以接收到消息:
consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
9.IDEA客户端工具
可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic

相关文章:
kafka集群搭建-使用zookeeper
1.环境准备: 使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093 2.下载地址 去官网下载,或者使用如…...
【python】Numpy运行报错分析:IndexError与形状不匹配问题
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...
你有多自律就有多自由
当你失去对时间的控制权,生活也就失去了平衡。 真正对自己有要求的人,都是高度自律的人。 追求自己想要的生活,任何时候开始都不会晚,关键在于你能够坚持下去,以高度自律的精神,日复一日、年复一年的坚持下…...
Codeforces Round 959 (Div. 1 + Div. 2 ABCDEFG 题) 文字讲解+视频讲解
Problem A. Diverse Game Statement 给定 n m n\times m nm 的矩形 a a a, a a a 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互不相同。求出 n m n\times m nm 的矩形 b b b, b b b 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互…...
WSL2 Centos7 Docker服务启动失败怎么办?
wsl 安装的CentOS7镜像,安装了Docker之后,发现用systemctl start docker 无法将docker启动起来。 解决办法 1、编辑文件 vim /usr/lib/systemd/system/docker.service将13行注释掉,然后在下面新增14行的内容。然后保存退出。 2、再次验证 可以发现,我们已经可以正常通过s…...
分布式锁-redisson锁重试和WatchDog机制
抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同。 1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null 2、判断当前这把锁是否是属于当前线程,如果是&a…...
ESP8266模块(2)
实例1 查看附近的WiFi 步骤1:进入AT指令模式 使用USB转串口适配器将ESP8266模块连接到电脑。打开串口终端软件,并设置正确的串口和波特率(通常为115200)。输入以下命令并按回车确认: AT如果模块响应OK,…...
Docker安装笔记
1. Mac安装Docker 1.1 Docker安装包下载 1.1.1 阿里云 对于10.10.3以下的用户 推荐使用 对于10.10.3以上的用户 推荐使用 1.1.2 官网下载 系统和芯片选择适合自己的安装包 1.2 镜像加速 【推荐】阿里镜像 登陆后,左侧菜单选中镜像加速器就可以看到你的专属地…...
《昇思25天学习打卡营第21天|Pix2Pix实现图像转换》
Pix2Pix 是一种图像转换模型,使用条件生成对抗网络(Conditional Generative Adversarial Networks,cGANs)实现图像到图像的转换。它主要由生成器(Generator)和判别器(Discriminator)…...
Python和MATLAB网络尺度结构和幂律度大型图生成式模型算法
🎯要点 🎯算法随机图模型数学概率 | 🎯图预期度序列数学定义 | 🎯生成具有任意指数的大型幂律网络,数学计算幂律指数和平均度 | 🎯随机图分析中巨型连接分量数学理论和推论 | 🎯生成式多层网络…...
在jsPsych中使用Vue
jspsych 介绍 jsPsych是一个非常好用的心理学实验插件,可以用来构建心理学实验。具体的就不多介绍了,大家可以去看官网:https://www.jspsych.org/latest/ 但是大家在使用时就会发现,这个插件只能使用js绘制界面,或者…...
机器学习·概率论基础
概率论 概率基础 这部分太简单,直接略过 条件概率 独立性 独立事件A和B的交集如下 非独立事件 非独立事件A和B的交集如下 贝叶斯定理 先验 事件 后验 在概率论和统计学中,先验概率和后验概率是贝叶斯统计的核心概念 简单来说后验概率就是结合了先验概…...
c生万物系列(面向对象:封装)
本系列博客主要介绍c语言的一些屠龙技,里面包含了笔者本人的一些奇思妙想。 该系列博客笔者只是用作记录。如果你偶然找到了这篇博客,但是发现不知所云,请不要过多投入时间,可能笔者本人那时候也看不懂了。 笔者决定用c语言模仿…...
当当网数据采集:Scrapy框架的异步处理能力
在互联网数据采集领域,Scrapy框架以其强大的异步处理能力而著称。Scrapy利用了Python的异步网络请求库,如twisted,来实现高效的并发数据采集。本文将深入探讨Scrapy框架的异步处理能力,并展示如何在当当网数据采集项目中应用这一能…...
React——useEffect和自定义useUpdateEffect
useEffect 是React的一个内置Hook,用于在组件渲染后执行副作用(例如数据获取、订阅或手动更改DOM)。它将在第一次渲染后和每次更新后都会执行。 useEffect(() > {// 这里的代码将在组件挂载和更新时执行。 }, [dependencies]); // depend…...
Hadoop大数据处理架构中ODB、DIM、DWD、DWS
在Hadoop的大数据处理架构中,ODS、DIM、DWD和DWS分别代表了数据仓库体系中不同的层次和功能。下面解释这几个概念: ODS (Operational Data Store) 想象你有一家超市,每天营业结束后,你会把当天所有的销售记录、顾客信息、商品库…...
【刷题汇总 -- 爱丽丝的人偶、集合、最长回文子序列】
C日常刷题积累 今日刷题汇总 - day0211、爱丽丝的人偶1.1、题目1.2、思路1.3、程序实现 2、集合2.1、题目2.2、思路2.3、程序实现 -- set 3、最长回文子序列3.1、题目3.2、思路3.3、程序实现 -- dp 4、题目链接 今日刷题汇总 - day021 1、爱丽丝的人偶 1.1、题目 1.2、思路 …...
基于vue3 + vite产生的 TypeError: Failed to fetch dynamically imported module
具体参考这篇衔接: Vue3报错:Failed to fetch dynamically imported module-CSDN博客 反正挺扯淡的,错误来源于基于ry-vue-plus来进行二次开发的时候遇到的问题。 错误起因 我创建了一个广告管理页面。然后发现访问一直在加载中。报的是这样…...
批量自动添加好友,高效拓展人脉圈.
随着微信使用数量的不断增加,手动添加好友成为了一项耗时且繁琐的任务。为了帮助大家解决这个问题,下面分享一款高效的微信管理系统,它能够帮助你实现批量自动添加好友,极大提升了人脉拓展的效率。 这款微信管理系统可以同时管理多…...
Web开发:一个可拖拽的模态框(HTML、CSS、JavaScript)
目录 一、需求描述 二、实现效果 三、完整代码 四、实现过程 1、HTML 页面结构 2、CSS 元素样式 3、JavaScript动态控制 (1)获取元素 (2)显示\隐藏遮罩层与模态框 (3)实现模态框拖动效果 一、需求…...
C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...
【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
linux设备重启后时间与网络时间不同步怎么解决?
linux设备重启后时间与网络时间不同步怎么解决? 设备只要一重启,时间又错了/偏了,明明刚刚对时还是对的! 这在物联网、嵌入式开发环境特别常见,尤其是开发板、树莓派、rk3588 这类设备。 解决方法: 加硬件…...
汇编语言学习(三)——DoxBox中debug的使用
目录 一、安装DoxBox,并下载汇编工具(MASM文件) 二、debug是什么 三、debug中的命令 一、安装DoxBox,并下载汇编工具(MASM文件) 链接: https://pan.baidu.com/s/1IbyJj-JIkl_oMOJmkKiaGQ?pw…...
稻米分类和病害检测数据集(猫脸码客第237期)
稻米分类图像数据集:驱动农业智能化发展的核心资源 引言 在全球农业体系中,稻米作为最关键的粮食作物之一,其品种多样性为人类饮食提供了丰富选择。然而,传统稻米分类方法高度依赖人工经验,存在效率低、主观性强等缺…...
