当前位置: 首页 > news >正文

kafka集群搭建-使用zookeeper

1.环境准备:

使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093

2.下载地址

去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。

# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

3.软件包下载解压

在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。

cd  /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka

4.修改配置

需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。

cd kafka
vim conf/server.properties

修改内容如下,

# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1# 默认为9092,云服务器开放端口问题,改为了8093
port=8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093                                                                                                 
advertised.listeners=PLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs=/tmp/kafka-logs

5.启动kafka集群

分别在每个节点的bin路径下执行启动脚本

# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties

使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。

6.创建topic`

# 创建topic,在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3
# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list

7.模拟生产消费消息

需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。

# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo

生产者客户端发送hello
在这里插入图片描述
此时可以看到2个消费者模拟客户端都受到了消息:hello
在这里插入图片描述

8.集成springboot

坐标如下:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

kafka配置类:

package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {@Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")private String kafkaServers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}
}

消费者客户端:

    @KafkaListener(topics = {"topic-demo"},groupId = "test1",properties = {"auto-offset-reset:latest", "enable.auto.commit:true"})public void listen(ConsumerRecord<String, String> consumerRecord) {log.info("consumer Received: " + consumerRecord);}

生产者发送消息:

@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@PostMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");SendResult<String, String> sendResult = hello_kafka.completable().join();System.out.println(sendResult);return "send topic: " + topic + ", msg: " + msg;}
}

发送测试:
在这里插入图片描述

消费者可以接收到消息:

consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)

9.IDEA客户端工具

可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic

在这里插入图片描述

相关文章:

kafka集群搭建-使用zookeeper

1.环境准备&#xff1a; 使用如下3台主机搭建zookeeper集群&#xff0c;由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内&#xff0c;故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093 2.下载地址 去官网下载&#xff0c;或者使用如…...

【python】Numpy运行报错分析:IndexError与形状不匹配问题

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…...

你有多自律就有多自由

当你失去对时间的控制权&#xff0c;生活也就失去了平衡。 真正对自己有要求的人&#xff0c;都是高度自律的人。 追求自己想要的生活&#xff0c;任何时候开始都不会晚&#xff0c;关键在于你能够坚持下去&#xff0c;以高度自律的精神&#xff0c;日复一日、年复一年的坚持下…...

Codeforces Round 959 (Div. 1 + Div. 2 ABCDEFG 题) 文字讲解+视频讲解

Problem A. Diverse Game Statement 给定 n m n\times m nm 的矩形 a a a&#xff0c; a a a 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互不相同。求出 n m n\times m nm 的矩形 b b b&#xff0c; b b b 中的每一个数均在 1 ∼ n m 1\sim nm 1∼nm 之间且互…...

WSL2 Centos7 Docker服务启动失败怎么办?

wsl 安装的CentOS7镜像,安装了Docker之后,发现用systemctl start docker 无法将docker启动起来。 解决办法 1、编辑文件 vim /usr/lib/systemd/system/docker.service将13行注释掉,然后在下面新增14行的内容。然后保存退出。 2、再次验证 可以发现,我们已经可以正常通过s…...

分布式锁-redisson锁重试和WatchDog机制

抢锁过程中&#xff0c;获得当前线程&#xff0c;通过tryAcquire进行抢锁&#xff0c;该抢锁逻辑和之前逻辑相同。 1、先判断当前这把锁是否存在&#xff0c;如果不存在&#xff0c;插入一把锁&#xff0c;返回null 2、判断当前这把锁是否是属于当前线程&#xff0c;如果是&a…...

ESP8266模块(2)

实例1 查看附近的WiFi 步骤1&#xff1a;进入AT指令模式 使用USB转串口适配器将ESP8266模块连接到电脑。打开串口终端软件&#xff0c;并设置正确的串口和波特率&#xff08;通常为115200&#xff09;。输入以下命令并按回车确认&#xff1a; AT如果模块响应OK&#xff0c;…...

Docker安装笔记

1. Mac安装Docker 1.1 Docker安装包下载 1.1.1 阿里云 对于10.10.3以下的用户 推荐使用 对于10.10.3以上的用户 推荐使用 1.1.2 官网下载 系统和芯片选择适合自己的安装包 1.2 镜像加速 【推荐】阿里镜像 登陆后&#xff0c;左侧菜单选中镜像加速器就可以看到你的专属地…...

《昇思25天学习打卡营第21天|Pix2Pix实现图像转换》

Pix2Pix 是一种图像转换模型&#xff0c;使用条件生成对抗网络&#xff08;Conditional Generative Adversarial Networks&#xff0c;cGANs&#xff09;实现图像到图像的转换。它主要由生成器&#xff08;Generator&#xff09;和判别器&#xff08;Discriminator&#xff09;…...

Python和MATLAB网络尺度结构和幂律度大型图生成式模型算法

&#x1f3af;要点 &#x1f3af;算法随机图模型数学概率 | &#x1f3af;图预期度序列数学定义 | &#x1f3af;生成具有任意指数的大型幂律网络&#xff0c;数学计算幂律指数和平均度 | &#x1f3af;随机图分析中巨型连接分量数学理论和推论 | &#x1f3af;生成式多层网络…...

在jsPsych中使用Vue

jspsych 介绍 jsPsych是一个非常好用的心理学实验插件&#xff0c;可以用来构建心理学实验。具体的就不多介绍了&#xff0c;大家可以去看官网&#xff1a;https://www.jspsych.org/latest/ 但是大家在使用时就会发现&#xff0c;这个插件只能使用js绘制界面&#xff0c;或者…...

机器学习·概率论基础

概率论 概率基础 这部分太简单&#xff0c;直接略过 条件概率 独立性 独立事件A和B的交集如下 非独立事件 非独立事件A和B的交集如下 贝叶斯定理 先验 事件 后验 在概率论和统计学中&#xff0c;先验概率和后验概率是贝叶斯统计的核心概念 简单来说后验概率就是结合了先验概…...

c生万物系列(面向对象:封装)

本系列博客主要介绍c语言的一些屠龙技&#xff0c;里面包含了笔者本人的一些奇思妙想。 该系列博客笔者只是用作记录。如果你偶然找到了这篇博客&#xff0c;但是发现不知所云&#xff0c;请不要过多投入时间&#xff0c;可能笔者本人那时候也看不懂了。 笔者决定用c语言模仿…...

当当网数据采集:Scrapy框架的异步处理能力

在互联网数据采集领域&#xff0c;Scrapy框架以其强大的异步处理能力而著称。Scrapy利用了Python的异步网络请求库&#xff0c;如twisted&#xff0c;来实现高效的并发数据采集。本文将深入探讨Scrapy框架的异步处理能力&#xff0c;并展示如何在当当网数据采集项目中应用这一能…...

React——useEffect和自定义useUpdateEffect

useEffect 是React的一个内置Hook&#xff0c;用于在组件渲染后执行副作用&#xff08;例如数据获取、订阅或手动更改DOM&#xff09;。它将在第一次渲染后和每次更新后都会执行。 useEffect(() > {// 这里的代码将在组件挂载和更新时执行。 }, [dependencies]); // depend…...

Hadoop大数据处理架构中ODB、DIM、DWD、DWS

在Hadoop的大数据处理架构中&#xff0c;ODS、DIM、DWD和DWS分别代表了数据仓库体系中不同的层次和功能。下面解释这几个概念&#xff1a; ODS (Operational Data Store) 想象你有一家超市&#xff0c;每天营业结束后&#xff0c;你会把当天所有的销售记录、顾客信息、商品库…...

【刷题汇总 -- 爱丽丝的人偶、集合、最长回文子序列】

C日常刷题积累 今日刷题汇总 - day0211、爱丽丝的人偶1.1、题目1.2、思路1.3、程序实现 2、集合2.1、题目2.2、思路2.3、程序实现 -- set 3、最长回文子序列3.1、题目3.2、思路3.3、程序实现 -- dp 4、题目链接 今日刷题汇总 - day021 1、爱丽丝的人偶 1.1、题目 1.2、思路 …...

基于vue3 + vite产生的 TypeError: Failed to fetch dynamically imported module

具体参考这篇衔接&#xff1a; Vue3报错&#xff1a;Failed to fetch dynamically imported module-CSDN博客 反正挺扯淡的&#xff0c;错误来源于基于ry-vue-plus来进行二次开发的时候遇到的问题。 错误起因 我创建了一个广告管理页面。然后发现访问一直在加载中。报的是这样…...

批量自动添加好友,高效拓展人脉圈.

随着微信使用数量的不断增加&#xff0c;手动添加好友成为了一项耗时且繁琐的任务。为了帮助大家解决这个问题&#xff0c;下面分享一款高效的微信管理系统&#xff0c;它能够帮助你实现批量自动添加好友&#xff0c;极大提升了人脉拓展的效率。 这款微信管理系统可以同时管理多…...

Web开发:一个可拖拽的模态框(HTML、CSS、JavaScript)

目录 一、需求描述 二、实现效果 三、完整代码 四、实现过程 1、HTML 页面结构 2、CSS 元素样式 3、JavaScript动态控制 &#xff08;1&#xff09;获取元素 &#xff08;2&#xff09;显示\隐藏遮罩层与模态框 &#xff08;3&#xff09;实现模态框拖动效果 一、需求…...

嵌入式Linux开发:手把手教你交叉编译全套WiFi工具链(iwconfig, iw, wpa_supplicant, hostapd)

嵌入式Linux WiFi工具链深度实战&#xff1a;从交叉编译到系统集成 在嵌入式Linux开发中&#xff0c;WiFi功能实现往往是最具挑战性的环节之一。不同于桌面环境&#xff0c;嵌入式设备需要从底层开始构建完整的无线网络栈&#xff0c;这涉及到多个工具的协同工作。本文将带你深…...

C++ 算法实战:从鸡兔同笼到多元方程求解的编程思维演进

1. 从鸡兔同笼开始理解算法思维 记得第一次接触鸡兔同笼问题时&#xff0c;我正啃着铅笔头对着数学作业发愁。题目说笼子里有35个头和94只脚&#xff0c;问鸡和兔各有多少只。这个看似简单的应用题&#xff0c;后来竟成了我算法思维的启蒙老师。 用C解决这个问题时&#xff0c;…...

从强化学习视角看HDP:ADP中的Actor-Critic框架到底怎么工作的?

从强化学习视角看HDP&#xff1a;ADP中的Actor-Critic框架到底怎么工作的&#xff1f; 在控制理论与机器学习交叉领域&#xff0c;自适应动态规划&#xff08;ADP&#xff09;与强化学习&#xff08;RL&#xff09;的融合正催生新一代智能控制范式。当我们以RL从业者熟悉的Act…...

如何学习java?

目录 一. 初识Java 1. Java语⾔概述 1.1 Java是什么 1.2 什么是JavaSE&#xff1f;什么是JavaEE? JavaSE(JavaStandardEdition): JavaEE(JavaEnterprise Edition): 主要区别: 1.3 Java语⾔重要性 1.4 Java语⾔发展简史 1.5 Java语⾔特性 1.6 Java开发环境安装 1. …...

还在手动整理ai会议纪要浪费宝贵下班时间?2026年这4款真香AI工具3分钟搞定3小时会议

作为挖了快三年AI效率工具的爱好者&#xff0c;我上周刚被3小时项目复盘会的纪要搞到加班到九点&#xff0c;试了一圈新出的工具&#xff0c;直接给大家上结论&#xff1a;听脑AI是目前同类会议纪要工具里最值得用的&#xff0c;没有之一。 直达链接&#xff1a;https://iting…...

STM32CubeMX 实战指南:LL库外部中断配置与按键响应优化

1. STM32CubeMX与LL库外部中断入门 第一次接触STM32外部中断时&#xff0c;我被它的响应速度惊艳到了。相比轮询方式&#xff0c;中断能让CPU在按键按下瞬间立即响应&#xff0c;就像有个24小时待命的管家。STM32CubeMX这个图形化配置工具&#xff0c;把原本需要手动编写的底层…...

3步开启Windows实时语音转文字:TMSpeech离线语音识别完全指南

3步开启Windows实时语音转文字&#xff1a;TMSpeech离线语音识别完全指南 【免费下载链接】TMSpeech 腾讯会议摸鱼工具 项目地址: https://gitcode.com/gh_mirrors/tm/TMSpeech TMSpeech是一款专为Windows系统设计的开源实时语音识别工具&#xff0c;能够将电脑系统声音…...

Origin9.1绘图避坑指南:从数据导入到论文级.tif图保存的完整流程

Origin9.1科研绘图全流程避坑指南&#xff1a;从数据导入到论文级.tif输出 科研绘图是论文写作中不可或缺的一环&#xff0c;而Origin9.1作为经典的数据可视化工具&#xff0c;在学术界有着广泛的应用。然而&#xff0c;从原始数据到最终符合期刊要求的图表&#xff0c;这一过程…...

5分钟搞定!iperf3 Windows版:专业网络性能测试工具完全指南

5分钟搞定&#xff01;iperf3 Windows版&#xff1a;专业网络性能测试工具完全指南 【免费下载链接】iperf3-win-builds iperf3 binaries for Windows. Benchmark your network limits. 项目地址: https://gitcode.com/gh_mirrors/ip/iperf3-win-builds 你是否曾经怀疑过…...

从‘监控谁’到‘如何查’:手把手教你用Prometheus标签玩转K8s监控数据筛选

从‘监控谁’到‘如何查’&#xff1a;手把手教你用Prometheus标签玩转K8s监控数据筛选 在Kubernetes集群监控领域&#xff0c;数据洪流是每个运维人员必须面对的挑战。当数百个Pod不断创建销毁时&#xff0c;传统静态配置的监控方式显得力不从心。这正是Prometheus标签系统大显…...