Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门
参考文章
【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客
Docker搭建kafka+zookeeper
打开我们的docker的镜像源配置
vim /etc/docker/daemon.json
配置
{
"registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}
下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像
然后开始拉取我们的zookeeper镜像和我们的kafka镜像
这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本
docker pull zookeeper
kafka镜像
docker pull wurstmeister/kafka
因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络
让不同容器都加入这个网络
docker network create创建我们的网络
然后那个zookeeper_network是我们自定义的网络名称
docker network create --driver bridge zookeeper_network
kafka是依赖于zookeeper的所以我们要先安装zookeeper
我们先用run来创建一个zookeeper容器
docker run -d --name zookeeper1 --network zookeeper_network -p 2181:2181 zookeeper
-d 是后台运行
--name 是我们自定义容器的名字 我定义的名字是zookeeper1
--network
是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络
-p 是指定我们的容器暴露给外部的端口 2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射
最后面的那个zookeeper 是我们的使用的镜像源的名称
一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本
查看我们创建的网络环境的地址
docker inspect zookeeper_network


那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址
我的是12.21.0.2,这个ip地址是要记住方便后面使用的
创建一个kafka容器
这段代码有点长,根子自己改吧
# 启动kafka
docker run -d --name kafka1 --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
解释
KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址
不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了
SpringBoot集成kafka
首先就是springboot和kafka的版本兼容了
Spring for Apache Kafka

然后我们引入两个kafka的依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version> </dependency>
自己对着自己的版本来
自己看b站视频,9分钟就搞定了
63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili
然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析
其实和普通mq差不多
也就是配置生产者和消费者和一些过期时间超时时间
重点在于那个missing-topics-fatal
主题不存在的话,我们是否还要成功启动
我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题
所以我启动的时候,报错然后失败了
spring:kafka:bootstrap-servers: 192.168.88.130:9092 #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384 #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432 #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2 #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576 #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate 手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号
server:port: 9999 #这个是java项目启动的端口
基本案例
这是常量类
指定了一个topic和group
主题和分组id
groupid是消费者组的唯一标识
这个视频9分钟看懂kafka
小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili
生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}
我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic
以及指定消费者的唯一标识GROUP_ID
这些其实都是自己在常量类里面自己写好的
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了

![]()
kafka的图形化工具
这里介绍一个免费的开源项目KafkaKing
Releases · Bronya0/Kafka-King (github.com)
里面还能指定中文

相关文章:
Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门
参考文章 【Docker安装部署KafkaZookeeper详细教程】_linux arm docker安装kafka-CSDN博客 Docker搭建kafkazookeeper 打开我们的docker的镜像源配置 vim /etc/docker/daemon.json 配置 { "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"…...
【cocos2dx】【iOS工程】如何保存用户在游戏内的绘画数据,并将数据以图像形式展示在预览界面
【cocos2dx】【iOS工程】如何保存用户在应用内的操作数据,并将数据以图像形式展示在预览界面 设备/引擎:Mac(11.6)/Mac Mini 开发工具:Xcode(15.0.1) 开发需求:如何保存用户在应用…...
拥抱应用创新,拒绝无谓的模型竞争
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
【源码+文档+调试讲解】旅游资源网站
摘 要 本论文主要论述了如何使用JAVA语言开发一个旅游资源网站 ,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S架构,面向对象编程思想进行项目开发。在引言中,作者将论述旅游资源网站的当前背景以及系统开发的目的&…...
Monaco 多行提示的实现方式
AI 代码助手最近太火爆,国内有模型厂商都有代码助手,代码助手是个比较典型的 AI 应用,主要看前端,后端的模型都差不多,国内外都有专门的代码模型。现在都是集中在 VSCode 和 Idea的插件,本文通过 Monaco 实…...
SpringMVC的架构有什么优势?——表单和数据校验(四)
#SpringMVC的架构有什么优势?——表单和数据校验(四) 前言 关键字: 机器学习 人工智能 AI chatGPT 学习 实现 使用 搭建 深度 python 事件 远程 docker mysql安全 技术 部署 技术 自动化 代码 文章目录 - - - - - 表单数据…...
Linux实战记录
踩坑实录: day2: 最坑:安装UB居然不知道创建文件夹。 1.虚拟机上不了网:多重置几次 网卡 2.Winscp链接主机: 用户名 就是 linux terminal中的 第一个用户名!...
时间、查找、打包、行过滤与指令的运行——linux指令学习(二)
前言:本节内容标题虽然为指令,但是并不只是讲指令, 更多的是和指令相关的一些原理性的东西。 如果友友只想要查一查某个指令的用法, 很抱歉, 本节不是那种带有字典性质的文章。但是如果友友是想要来学习的,…...
android CameraX构建相机拍照
Android CameraX 是一个 Jetpack 支持库,旨在简化相机应用的开发工作。它提供了一致且易用的API接口,适用于大多数Android设备,并可向后兼容至Android 5.0(API级别21)。 CameraX解决了在多种设备上实现相机功能时所遇…...
【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示 proteus仿真+程序+设计报告+讲解视频
【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 1.主要功能:讲解视频:2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接: 【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 ( proteus仿真…...
工厂水电燃气表流量计等能耗计量仪表非侵入式拍照抄表的方案
在企业园区、工厂等企事业单位,传统的手动抄表方式已逐渐不能满足现代化、信息化管理的需求。为了提高抄表工作的效率,减少人工操作的误差,同时保障数据的安全性和实时性,我们提出了拍照采集抄表方案。本方案旨在通过拍照的方式&a…...
LLM大模型应用中的安全对齐的简单理解
LLM大模型应用中的安全对齐的简单理解 随着人工智能技术的不断发展,大规模语言模型(如GPT-4)的应用越来越广泛。为了保证这些大模型在实际应用中的性能和安全性,安全对齐(Safe Alignment)成为一个重要的概…...
clickhouse-jdbc-bridge rce
clickhouse-jdbc-bridge 是什么 JDBC bridge for ClickHouse. It acts as a stateless proxy passing queries from ClickHouse to external datasources. With this extension, you can run distributed query on ClickHouse across multiple datasources in real time, whic…...
java中Comparator函数的用法实例?
在Java中,Comparator接口用于比较两个对象的顺序,常用于集合的排序。自Java 8开始,Comparator接口得到了增强,提供了许多默认方法,使得排序逻辑更加灵活和强大。下面将通过几个实例来展示Comparator的用法。 示例1&am…...
mysql实战入门-基础篇
目录 1、MySQL概述 1.1、数据库相关概念 1.2、MySQL数据库 1.2.1、版本 1.2.2、下载 1.2.3、安装 输入MySQL中root用户的密码,一定记得记住该密码 1.2.4、启动停止 1.2.5、客户端连接 1.2.6、数据模型 2、SQL 2.1、SQL通用语法 2.2、SQL分类 2.3、DDL 2.3.1、数据…...
阶段三:项目开发---民航功能模块实现:任务24:航空实时监控
任务描述 内 容:地图展示、飞机飞行轨迹、扇区控制。航空实时监控,是飞机每秒发送坐标,经过终端转换实时发送给塔台,为了飞机位置的精准度,传输位置的密度很大,在地图位置显示不明显。本次为了案例展示效…...
手机容器化 安装docker
旧手机-基于Termux容器化 1、安装app 在手机上安装Termux或ZeroTermux(Termux扩展) 1.1 切换源 注:可以将termux进行换源,最好采用国内源,例如:清华源等 更新包列表和升级包(可选࿰…...
科普文:深入理解Mybatis
概叙 (1) JDBC JDBC(Java Data Base Connection,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成.JDBC提供了一种基准,据此可以构建更高级的工具和接口,使数据库开发人员能够编写数据库应用程序。 优点…...
称重传感器有哪些种类
有关称重传感器的知识,称重传感器是众多传感器产品中的一种,也是很常用的传感器之一,那么称重传感器有哪些种类,称重传感器的分类方式是什么样的,一起来了解下。 称重传感器的分类 主要有六种称重传感器类型…...
程序员鱼皮的保姆级写简历指南第四弹,优秀简历参考
大家好,我是程序员鱼皮。做知识分享这些年来,我看过太多简历、也帮忙修改过很多的简历,发现很多同学是完全不会写简历的、会犯很多常见的问题,不能把自己的优势充分展示出来,导致措施了很多面试机会,实在是…...
别再让收款语音卡顿!UniApp + WebSocket 实现流畅支付播报的完整避坑指南
UniApp WebSocket 支付语音播报实战:从性能优化到高并发处理 在移动支付场景中,实时语音播报不仅是用户体验的关键环节,更是商户经营效率的重要保障。想象这样的场景:高峰时段,收银台前排队等待的顾客,收银…...
出海营销决战指南:从“流量过客”到“私域常客”的全局地图
2026 全球出海营销日历:如何在关键节点实现社媒私域流量的指数级增长?2026年,出海战场规则已变。粗放投放的红利耗尽,碎片化的渠道、敏感的风控与难以逾越的文化沟壑,正让每一分营销预算的效能急剧衰减。节点依旧汹涌&…...
技术指标——格雷厄姆指数
文章目录1. 格雷厄姆指数是什么?2. 格雷厄姆指数的作用是什么?3. 举例计算例1:牛市顶部(2021年2月)例2:熊市底部(2024年2月)例3:中性水平(假设某一般时刻&…...
2026年03月GESPC++二级真题解析(含视频)
视频讲解:GESP2026年3月二级C真题讲解 一、单选题 第1题 解析: 答案B,ACD选项都是向机器人输入信息,是输入设备 第2题 解析: 答案D,判断是 “ 菱形框 ” 第3题 解析: 答案D,变…...
Antares LoRaWAN库深度解析:嵌入式LoRaWAN MAC层实现指南
1. Antares LoRaWAN 库深度技术解析:面向嵌入式工程师的 LoRaWAN MAC 层实现指南 1.1 库定位与工程价值 Antares LoRaWAN 是一个专为 Arduino 生态设计的轻量级 LoRaWAN MAC 层实现库,其核心价值不在于功能堆砌,而在于 可理解性、可调试性与…...
feishu2md:飞书文档批量下载与Markdown转换解决方案
feishu2md:飞书文档批量下载与Markdown转换解决方案 【免费下载链接】feishu2md 一键命令下载飞书文档为 Markdown 项目地址: https://gitcode.com/gh_mirrors/fe/feishu2md 在团队协作和知识管理场景中,飞书文档已成为许多组织的核心工具。然而&…...
python-flask-djangol框架的青少年编程学习平台
目录技术选型与架构设计功能模块划分开发阶段规划安全与扩展性示例代码片段(Flask路由)部署与运维教育适配项目技术支持源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作技术选型与架构设计 采用Python生态的Flask或D…...
深度解析 ConcurrentHashMap 1.8:put 与 get 核心流程全解
在 Java 并发编程中,ConcurrentHashMap 是线程安全的高频使用集合,相比线程不安全的 HashMap、效率低下的 HashTable(全锁),JDK 1.8 版本的 ConcurrentHashMap 做了底层结构重构和锁机制优化,成为高并发场景…...
高效对接Tiktok电商API:PHP开发者的一站式解决方案指南
高效对接Tiktok电商API:PHP开发者的一站式解决方案指南 【免费下载链接】tiktokshop-php Unofficial Tiktok Shop API Client in PHP. Use API version 202309 and later 项目地址: https://gitcode.com/gh_mirrors/ti/tiktokshop-php 在瞬息万变的电商生态中…...
Vivado初始化设计慢?可能是这3个隐藏设置惹的祸
Vivado初始化设计慢?可能是这3个隐藏设置惹的祸 当你在深夜赶项目进度,Vivado却卡在"Initializing Design"界面转圈超过15分钟,那种焦虑感堪比考试时笔没水。作为Xilinx FPGA开发的核心工具,Vivado的初始化速度直接影响…...
