当前位置: 首页 > news >正文

Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

参考文章

【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客

Docker搭建kafka+zookeeper

打开我们的docker的镜像源配置

vim /etc/docker/daemon.json

配置

 {
  "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

 下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像

然后开始拉取我们的zookeeper镜像和我们的kafka镜像

这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本

docker pull zookeeper

kafka镜像 

docker pull wurstmeister/kafka

因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络

让不同容器都加入这个网络

docker network create创建我们的网络

然后那个zookeeper_network是我们自定义的网络名称

docker network create --driver bridge zookeeper_network

kafka是依赖于zookeeper的所以我们要先安装zookeeper

我们先用run来创建一个zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后台运行

--name 是我们自定义容器的名字  我定义的名字是zookeeper1

--network 

是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络

-p 是指定我们的容器暴露给外部的端口  2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射

最后面的那个zookeeper 是我们的使用的镜像源的名称

一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本

查看我们创建的网络环境的地址

docker inspect zookeeper_network

那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址

我的是12.21.0.2,这个ip地址是要记住方便后面使用的
 

创建一个kafka容器

这段代码有点长,根子自己改吧

 # 启动kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解释 

KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址

不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我们引入两个kafka的依赖 

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version>
</dependency>

自己对着自己的版本来

自己看b站视频,9分钟就搞定了

63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili

然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析

其实和普通mq差不多

也就是配置生产者和消费者和一些过期时间超时时间

重点在于那个missing-topics-fatal

主题不存在的话,我们是否还要成功启动

我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题

所以我启动的时候,报错然后失败了 

spring:kafka:bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384  #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432  #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest  #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2  #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576  #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate   手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号
server:port: 9999 #这个是java项目启动的端口

基本案例

这是常量类

指定了一个topic和group

主题和分组id

groupid是消费者组的唯一标识

这个视频9分钟看懂kafka

小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili

生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

 我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}

我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic

以及指定消费者的唯一标识GROUP_ID

这些其实都是自己在常量类里面自己写好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

 这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了


kafka的图形化工具

这里介绍一个免费的开源项目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面还能指定中文

相关文章:

Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

参考文章 【Docker安装部署KafkaZookeeper详细教程】_linux arm docker安装kafka-CSDN博客 Docker搭建kafkazookeeper 打开我们的docker的镜像源配置 vim /etc/docker/daemon.json 配置 { "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"…...

【cocos2dx】【iOS工程】如何保存用户在游戏内的绘画数据,并将数据以图像形式展示在预览界面

【cocos2dx】【iOS工程】如何保存用户在应用内的操作数据&#xff0c;并将数据以图像形式展示在预览界面 设备/引擎&#xff1a;Mac&#xff08;11.6&#xff09;/Mac Mini 开发工具&#xff1a;Xcode&#xff08;15.0.1&#xff09; 开发需求&#xff1a;如何保存用户在应用…...

拥抱应用创新,拒绝无谓的模型竞争

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

【源码+文档+调试讲解】旅游资源网站

摘 要 本论文主要论述了如何使用JAVA语言开发一个旅游资源网站 &#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述旅游资源网站的当前背景以及系统开发的目的&…...

Monaco 多行提示的实现方式

AI 代码助手最近太火爆&#xff0c;国内有模型厂商都有代码助手&#xff0c;代码助手是个比较典型的 AI 应用&#xff0c;主要看前端&#xff0c;后端的模型都差不多&#xff0c;国内外都有专门的代码模型。现在都是集中在 VSCode 和 Idea的插件&#xff0c;本文通过 Monaco 实…...

SpringMVC的架构有什么优势?——表单和数据校验(四)

#SpringMVC的架构有什么优势&#xff1f;——表单和数据校验&#xff08;四&#xff09; 前言 关键字&#xff1a; 机器学习 人工智能 AI chatGPT 学习 实现 使用 搭建 深度 python 事件 远程 docker mysql安全 技术 部署 技术 自动化 代码 文章目录 - - - - - 表单数据…...

Linux实战记录

踩坑实录&#xff1a; day2: 最坑&#xff1a;安装UB居然不知道创建文件夹。 1.虚拟机上不了网&#xff1a;多重置几次 网卡 2.Winscp链接主机&#xff1a; 用户名 就是 linux terminal中的 第一个用户名&#xff01;...

时间、查找、打包、行过滤与指令的运行——linux指令学习(二)

前言&#xff1a;本节内容标题虽然为指令&#xff0c;但是并不只是讲指令&#xff0c; 更多的是和指令相关的一些原理性的东西。 如果友友只想要查一查某个指令的用法&#xff0c; 很抱歉&#xff0c; 本节不是那种带有字典性质的文章。但是如果友友是想要来学习的&#xff0c;…...

android CameraX构建相机拍照

Android CameraX 是一个 Jetpack 支持库&#xff0c;旨在简化相机应用的开发工作。它提供了一致且易用的API接口&#xff0c;适用于大多数Android设备&#xff0c;并可向后兼容至Android 5.0&#xff08;API级别21&#xff09;。 CameraX解决了在多种设备上实现相机功能时所遇…...

【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示 proteus仿真+程序+设计报告+讲解视频

【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 1.主要功能&#xff1a;讲解视频&#xff1a;2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接&#xff1a; 【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 ( proteus仿真…...

工厂水电燃气表流量计等能耗计量仪表非侵入式拍照抄表的方案

在企业园区、工厂等企事业单位&#xff0c;传统的手动抄表方式已逐渐不能满足现代化、信息化管理的需求。为了提高抄表工作的效率&#xff0c;减少人工操作的误差&#xff0c;同时保障数据的安全性和实时性&#xff0c;我们提出了拍照采集抄表方案。本方案旨在通过拍照的方式&a…...

LLM大模型应用中的安全对齐的简单理解

LLM大模型应用中的安全对齐的简单理解 随着人工智能技术的不断发展&#xff0c;大规模语言模型&#xff08;如GPT-4&#xff09;的应用越来越广泛。为了保证这些大模型在实际应用中的性能和安全性&#xff0c;安全对齐&#xff08;Safe Alignment&#xff09;成为一个重要的概…...

clickhouse-jdbc-bridge rce

clickhouse-jdbc-bridge 是什么 JDBC bridge for ClickHouse. It acts as a stateless proxy passing queries from ClickHouse to external datasources. With this extension, you can run distributed query on ClickHouse across multiple datasources in real time, whic…...

java中Comparator函数的用法实例?

在Java中&#xff0c;Comparator接口用于比较两个对象的顺序&#xff0c;常用于集合的排序。自Java 8开始&#xff0c;Comparator接口得到了增强&#xff0c;提供了许多默认方法&#xff0c;使得排序逻辑更加灵活和强大。下面将通过几个实例来展示Comparator的用法。 示例1&am…...

mysql实战入门-基础篇

目录 1、MySQL概述 1.1、数据库相关概念 1.2、MySQL数据库 1.2.1、版本 1.2.2、下载 1.2.3、安装 输入MySQL中root用户的密码,一定记得记住该密码 1.2.4、启动停止 1.2.5、客户端连接 1.2.6、数据模型 2、SQL 2.1、SQL通用语法 2.2、SQL分类 2.3、DDL 2.3.1、数据…...

阶段三:项目开发---民航功能模块实现:任务24:航空实时监控

任务描述 内 容&#xff1a;地图展示、飞机飞行轨迹、扇区控制。航空实时监控&#xff0c;是飞机每秒发送坐标&#xff0c;经过终端转换实时发送给塔台&#xff0c;为了飞机位置的精准度&#xff0c;传输位置的密度很大&#xff0c;在地图位置显示不明显。本次为了案例展示效…...

手机容器化 安装docker

旧手机-基于Termux容器化 1、安装app 在手机上安装Termux或ZeroTermux&#xff08;Termux扩展&#xff09; 1.1 切换源 注&#xff1a;可以将termux进行换源&#xff0c;最好采用国内源&#xff0c;例如&#xff1a;清华源等 更新包列表和升级包&#xff08;可选&#xff0…...

科普文:深入理解Mybatis

概叙 (1) JDBC JDBC(Java Data Base Connection,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成.JDBC提供了一种基准,据此可以构建更高级的工具和接口,使数据库开发人员能够编写数据库应用程序。 优点…...

称重传感器有哪些种类

有关称重传感器的知识&#xff0c;称重传感器是众多传感器产品中的一种&#xff0c;也是很常用的传感器之一&#xff0c;那么称重传感器有哪些种类&#xff0c;称重传感器的分类方式是什么样的&#xff0c;一起来了解下。 称重传感器的分类 主要有六种称重传感器类型&#xf…...

程序员鱼皮的保姆级写简历指南第四弹,优秀简历参考

大家好&#xff0c;我是程序员鱼皮。做知识分享这些年来&#xff0c;我看过太多简历、也帮忙修改过很多的简历&#xff0c;发现很多同学是完全不会写简历的、会犯很多常见的问题&#xff0c;不能把自己的优势充分展示出来&#xff0c;导致措施了很多面试机会&#xff0c;实在是…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

Linux nano命令的基本使用

参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时&#xff0c;显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...

R 语言科研绘图第 55 期 --- 网络图-聚类

在发表科研论文的过程中&#xff0c;科研绘图是必不可少的&#xff0c;一张好看的图形会是文章很大的加分项。 为了便于使用&#xff0c;本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中&#xff0c;获取方式&#xff1a; R 语言科研绘图模板 --- sciRplothttps://mp.…...