Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门
参考文章
【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客
Docker搭建kafka+zookeeper
打开我们的docker的镜像源配置
vim /etc/docker/daemon.json
配置
{
"registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}
下面的那个insecure是我自己虚拟机的,不用理会
拉取镜像
然后开始拉取我们的zookeeper镜像和我们的kafka镜像
这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本
docker pull zookeeper
kafka镜像
docker pull wurstmeister/kafka
因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络
让不同容器都加入这个网络
docker network create创建我们的网络
然后那个zookeeper_network是我们自定义的网络名称
docker network create --driver bridge zookeeper_network
kafka是依赖于zookeeper的所以我们要先安装zookeeper
我们先用run来创建一个zookeeper容器
docker run -d --name zookeeper1 --network zookeeper_network -p 2181:2181 zookeeper
-d 是后台运行
--name 是我们自定义容器的名字 我定义的名字是zookeeper1
--network
是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络
-p 是指定我们的容器暴露给外部的端口 2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射
最后面的那个zookeeper 是我们的使用的镜像源的名称
一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本
查看我们创建的网络环境的地址
docker inspect zookeeper_network
那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址
我的是12.21.0.2,这个ip地址是要记住方便后面使用的
创建一个kafka容器
这段代码有点长,根子自己改吧
# 启动kafka
docker run -d --name kafka1 --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
解释
KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址
不知道本机地址可以输入 ip addr来查看本机地址
这样子就搭建完成了
SpringBoot集成kafka
首先就是springboot和kafka的版本兼容了
Spring for Apache Kafka
然后我们引入两个kafka的依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version> </dependency>
自己对着自己的版本来
自己看b站视频,9分钟就搞定了
63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili
然后开始写我们的application.yml配置文件
下面是配置文件的全部+解析
其实和普通mq差不多
也就是配置生产者和消费者和一些过期时间超时时间
重点在于那个missing-topics-fatal
主题不存在的话,我们是否还要成功启动
我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题
所以我启动的时候,报错然后失败了
spring:kafka:bootstrap-servers: 192.168.88.130:9092 #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384 #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432 #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2 #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576 #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate 手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号 server:port: 9999 #这个是java项目启动的端口
基本案例
这是常量类
指定了一个topic和group
主题和分组id
groupid是消费者组的唯一标识
这个视频9分钟看懂kafka
小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili
生产者
我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入
@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;
produces里面指定我们前端传的是json格式
我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"
消费者
@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}
我们用List<String>来接收,因为可能一个消费者接收多条消息
指定消费者监听的主题topic
以及指定消费者的唯一标识GROUP_ID
这些其实都是自己在常量类里面自己写好的
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
这个是得到我们的主题topic的名字
我用apifox调试之后,成功执行了
kafka的图形化工具
这里介绍一个免费的开源项目KafkaKing
Releases · Bronya0/Kafka-King (github.com)
里面还能指定中文
相关文章:

Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门
参考文章 【Docker安装部署KafkaZookeeper详细教程】_linux arm docker安装kafka-CSDN博客 Docker搭建kafkazookeeper 打开我们的docker的镜像源配置 vim /etc/docker/daemon.json 配置 { "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"…...
【cocos2dx】【iOS工程】如何保存用户在游戏内的绘画数据,并将数据以图像形式展示在预览界面
【cocos2dx】【iOS工程】如何保存用户在应用内的操作数据,并将数据以图像形式展示在预览界面 设备/引擎:Mac(11.6)/Mac Mini 开发工具:Xcode(15.0.1) 开发需求:如何保存用户在应用…...

拥抱应用创新,拒绝无谓的模型竞争
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

【源码+文档+调试讲解】旅游资源网站
摘 要 本论文主要论述了如何使用JAVA语言开发一个旅游资源网站 ,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S架构,面向对象编程思想进行项目开发。在引言中,作者将论述旅游资源网站的当前背景以及系统开发的目的&…...

Monaco 多行提示的实现方式
AI 代码助手最近太火爆,国内有模型厂商都有代码助手,代码助手是个比较典型的 AI 应用,主要看前端,后端的模型都差不多,国内外都有专门的代码模型。现在都是集中在 VSCode 和 Idea的插件,本文通过 Monaco 实…...

SpringMVC的架构有什么优势?——表单和数据校验(四)
#SpringMVC的架构有什么优势?——表单和数据校验(四) 前言 关键字: 机器学习 人工智能 AI chatGPT 学习 实现 使用 搭建 深度 python 事件 远程 docker mysql安全 技术 部署 技术 自动化 代码 文章目录 - - - - - 表单数据…...
Linux实战记录
踩坑实录: day2: 最坑:安装UB居然不知道创建文件夹。 1.虚拟机上不了网:多重置几次 网卡 2.Winscp链接主机: 用户名 就是 linux terminal中的 第一个用户名!...

时间、查找、打包、行过滤与指令的运行——linux指令学习(二)
前言:本节内容标题虽然为指令,但是并不只是讲指令, 更多的是和指令相关的一些原理性的东西。 如果友友只想要查一查某个指令的用法, 很抱歉, 本节不是那种带有字典性质的文章。但是如果友友是想要来学习的,…...

android CameraX构建相机拍照
Android CameraX 是一个 Jetpack 支持库,旨在简化相机应用的开发工作。它提供了一致且易用的API接口,适用于大多数Android设备,并可向后兼容至Android 5.0(API级别21)。 CameraX解决了在多种设备上实现相机功能时所遇…...

【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示 proteus仿真+程序+设计报告+讲解视频
【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 1.主要功能:讲解视频:2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接: 【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 ( proteus仿真…...

工厂水电燃气表流量计等能耗计量仪表非侵入式拍照抄表的方案
在企业园区、工厂等企事业单位,传统的手动抄表方式已逐渐不能满足现代化、信息化管理的需求。为了提高抄表工作的效率,减少人工操作的误差,同时保障数据的安全性和实时性,我们提出了拍照采集抄表方案。本方案旨在通过拍照的方式&a…...

LLM大模型应用中的安全对齐的简单理解
LLM大模型应用中的安全对齐的简单理解 随着人工智能技术的不断发展,大规模语言模型(如GPT-4)的应用越来越广泛。为了保证这些大模型在实际应用中的性能和安全性,安全对齐(Safe Alignment)成为一个重要的概…...

clickhouse-jdbc-bridge rce
clickhouse-jdbc-bridge 是什么 JDBC bridge for ClickHouse. It acts as a stateless proxy passing queries from ClickHouse to external datasources. With this extension, you can run distributed query on ClickHouse across multiple datasources in real time, whic…...
java中Comparator函数的用法实例?
在Java中,Comparator接口用于比较两个对象的顺序,常用于集合的排序。自Java 8开始,Comparator接口得到了增强,提供了许多默认方法,使得排序逻辑更加灵活和强大。下面将通过几个实例来展示Comparator的用法。 示例1&am…...
mysql实战入门-基础篇
目录 1、MySQL概述 1.1、数据库相关概念 1.2、MySQL数据库 1.2.1、版本 1.2.2、下载 1.2.3、安装 输入MySQL中root用户的密码,一定记得记住该密码 1.2.4、启动停止 1.2.5、客户端连接 1.2.6、数据模型 2、SQL 2.1、SQL通用语法 2.2、SQL分类 2.3、DDL 2.3.1、数据…...

阶段三:项目开发---民航功能模块实现:任务24:航空实时监控
任务描述 内 容:地图展示、飞机飞行轨迹、扇区控制。航空实时监控,是飞机每秒发送坐标,经过终端转换实时发送给塔台,为了飞机位置的精准度,传输位置的密度很大,在地图位置显示不明显。本次为了案例展示效…...

手机容器化 安装docker
旧手机-基于Termux容器化 1、安装app 在手机上安装Termux或ZeroTermux(Termux扩展) 1.1 切换源 注:可以将termux进行换源,最好采用国内源,例如:清华源等 更新包列表和升级包(可选࿰…...

科普文:深入理解Mybatis
概叙 (1) JDBC JDBC(Java Data Base Connection,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成.JDBC提供了一种基准,据此可以构建更高级的工具和接口,使数据库开发人员能够编写数据库应用程序。 优点…...
称重传感器有哪些种类
有关称重传感器的知识,称重传感器是众多传感器产品中的一种,也是很常用的传感器之一,那么称重传感器有哪些种类,称重传感器的分类方式是什么样的,一起来了解下。 称重传感器的分类 主要有六种称重传感器类型…...
程序员鱼皮的保姆级写简历指南第四弹,优秀简历参考
大家好,我是程序员鱼皮。做知识分享这些年来,我看过太多简历、也帮忙修改过很多的简历,发现很多同学是完全不会写简历的、会犯很多常见的问题,不能把自己的优势充分展示出来,导致措施了很多面试机会,实在是…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...

地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...

遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)
前言: 最近在做行为检测相关的模型,用的是时空图卷积网络(STGCN),但原有kinetic-400数据集数据质量较低,需要进行细粒度的标注,同时粗略搜了下已有开源工具基本都集中于图像分割这块,…...

DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
Spring Security 认证流程——补充
一、认证流程概述 Spring Security 的认证流程基于 过滤器链(Filter Chain),核心组件包括 UsernamePasswordAuthenticationFilter、AuthenticationManager、UserDetailsService 等。整个流程可分为以下步骤: 用户提交登录请求拦…...