JAVA(SpringBoot)集成Kafka实现消息发送和接收。
SpringBoot集成Kafka实现消息发送和接收。
- 一、Kafka 简介
- 二、Kafka 功能
- 三、POM依赖
- 四、配置文件
- 五、生产者
- 六、消费者
君子之学贵一,一则明,明则有功。
一、Kafka 简介
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。
二、Kafka 功能
消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。
三、POM依赖
<!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
四、配置文件
spring:# Kafka 配置kafka:# Kafka 服务器地址和端口 代理地址,可以多个bootstrap-servers: IP:9092# 生产者配置producer:# 发送失败时的重试次数retries: 3# 每次批量发送消息的数量,调整为较小值batch-size: 1# 生产者缓冲区大小buffer-memory: 33554432# 消息 key 的序列化器,将 key 序列化为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息 value 的序列化器,将消息体序列化为字节数组value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: true# 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔auto-commit-interval: 1000# 消息 key 的反序列化器,将字节数组反序列化为 keykey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息 value 的反序列化器,将字节数组反序列化为消息体value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
五、生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 生产者** @author chenlei*/
@Slf4j
@Component
public class KafkaProducer {/*** KafkaTemplate*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到指定的 Kafka 主题,并可指定分组信息** @param topic 消息要发送到的 Kafka 主题* @param message 要发送的消息内容*/public void sendMessage(String topic, String message) {// 使用 KafkaTemplate 发送消息,将消息发送到指定的主题ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功后的处理逻辑,可根据需要添加log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败后的处理逻辑,使用日志记录异常log.error("发送消息=[" + message + "] 失败", ex);}});}
}
六、消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author 消费者* chenlei*/
@Slf4j
@Component
public class KafkaConsumer {/*** 监听 Kafka 主题方法。** @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对*/@KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")public void listen(ConsumerRecord<?, ?> record) {// 打印接收到的消息的详细信息log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}
相关文章:
JAVA(SpringBoot)集成Kafka实现消息发送和接收。
SpringBoot集成Kafka实现消息发送和接收。 一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者 君子之学贵一,一则明,明则有功。 一、Kafka 简介 Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 Link…...

AI刷题-蛋糕工厂产能规划、优质章节的连续选择
挑两个简单的写写 目录 一、蛋糕工厂产能规划 问题描述 输入格式 输出格式 解题思路: 问题理解 数据结构选择 算法步骤 关键点 最终代码: 运行结果:编辑 二、优质章节的连续选择 问题描述 输入格式 输出格式 解题思路&a…...

在线可编辑Excel
1. Handsontable 特点: 提供了类似 Excel 的表格编辑体验,包括单元格样式、公式计算、数据验证等功能。 支持多种插件,如筛选、排序、合并单元格等。 轻量级且易于集成到现有项目中。 具备强大的自定义能力,可以调整外观和行为…...
什么是词嵌入?Word2Vec、GloVe 与 FastText 的区别
自然语言处理(NLP)领域的核心问题之一,是如何将人类的语言转换成计算机可以理解的数值形式,而词嵌入(Word Embedding)正是为了解决这个问题的重要技术。本文将详细讲解词嵌入的概念及其经典模型(Word2Vec、GloVe 和 FastText)的原理与区别。 1. 什么是词嵌入(Word Em…...

WPS数据分析000010
基于数据透视表的内容 一、排序 手动调动 二、筛选 三、值显示方式 四、值汇总依据 五、布局和选项 不显示分类汇总 合并居中带标签的单元格 空单元格显示 六、显示报表筛选页...

Qt中QVariant的使用
1.使用QVariant实现不同类型数据的相加 方法:通过type函数返回数值的类型,然后通过setValue来构造一个QVariant类型的返回值。 函数: QVariant mainPage::dataPlus(QVariant a, QVariant b) {QVariant ret;if ((a.type() QVariant::Int) &a…...
Avalonia UI MVVM DataTemplate里绑定Command
Avalonia 模板里面绑定ViewModel跟WPF写法有些不同。需要单独绑定Command. WPF里面可以直接按照下面的方法绑定DataContext. <Button Content"Button" Command"{Binding DataContext.ClickCommand, RelativeSource{RelativeSource AncestorType{x:Type User…...
动态规划DP 数字三角型模型 最低通行费用(题目详解+C++代码完整实现)
最低通行费用 原题链接 AcWing 1018. 最低同行费用 题目描述 一个商人穿过一个 NN的正方形的网格,去参加一个非常重要的商务活动。 他要从网格的左上角进,右下角出。每穿越中间 1个小方格,都要花费 1个单位时间。商人必须在 (2N−1)个单位…...

deepseek R1的确不错,特别是深度思考模式
deepseek R1的确不错,特别是深度思考模式,每次都能自我反省改进。比如我让 它写文案: 【赛博朋克版程序员新春密码——2025我们来破局】 亲爱的代码骑士们: 当CtrlS的肌肉记忆遇上抢票插件,当Spring Boot的…...
Linux 常用命令 - sort 【对文件内容进行排序】
简介 sort 命令源于英文单词 “sort”,表示排序。其主要功能是对文本文件中的行进行排序。它可以根据字母、数字、特定字段等不同的标准进行排序。sort 通过逐行读取文件(没有指定文件或指定文件为 - 时读取标准输入)内容,并按照…...

MyBatis最佳实践:提升数据库交互效率的秘密武器
第一章:框架的概述: MyBatis 框架的概述: MyBatis 是一个优秀的基于 Java 的持久框架,内部对 JDBC 做了封装,使开发者只需要关注 SQL 语句,而不关注 JDBC 的代码,使开发变得更加的简单MyBatis 通…...

选择困难?直接生成pynput快捷键字符串
from pynput import keyboard# 文档:https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard # 博客(pynput相关源码):https://blog.csdn.net/qq_39124701/article/details/145230331 # 虚拟键码(十六进制):https:/…...

DeepSeek-R1:强化学习驱动的推理模型
1月20日晚,DeepSeek正式发布了全新的推理模型DeepSeek-R1,引起了人工智能领域的广泛关注。该模型在数学、代码生成等高复杂度任务上表现出色,性能对标OpenAI的o1正式版。同时,DeepSeek宣布将DeepSeek-R1以及相关技术报告全面开源。…...

国内优秀的FPGA设计公司主要分布在哪些城市?
近年来,国内FPGA行业发展迅速,随着5G通信、人工智能、大数据等新兴技术的崛起,FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此,国内优秀的FPGA设计公司主要分布在哪些城市&a…...
3.日常英语笔记
screening discrepancies 筛选差异 The team found some screening discrepancies in the data. 团队在数据筛选中发现了些差异。 Don’t tug at it ,or it will fall over and crush you. tug 拉,拽,拖 He tugged the door open with all his might…...

基于RIP的MGRE实验
实验拓扑 实验要求 按照图示配置IP地址配置静态路由协议,搞通公网配置MGRE VPNNHRP的配置配置RIP路由协议来传递两端私网路由测试全网通 实验配置 1、配置IP地址 [R1]int g0/0/0 [R1-GigabitEthernet0/0/0]ip add 15.0.0.1 24 [R1]int LoopBack 0 [R1-LoopBack0]i…...

【开源免费】基于Vue和SpringBoot的美食推荐商城(附论文)
本文项目编号 T 166 ,文末自助获取源码 \color{red}{T166,文末自助获取源码} T166,文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...
Pandas DataFrame 拼接、合并和关联
拼接:使用 pd.concat(),可以沿着行或列方向拼接 DataFrame。 合并:使用 pd.merge(),可以根据一个或多个键进行不同类型的合并(左连接、右连接、全连接、内连接)。 关联:使用 join() 方法,通常在设置了索引的 DataFrame 上进行关联操作。 concat拼接 按列拼接 df1 = …...

【Redis】Redis修改连接数参数
1.重启操作背景 Redis数据库连接数上限,需要修改配置文件里maxclients参数,修改后需重启数据库 1.1、修改操作系统open files参数 1.2、修改redis连接数 2.登录操作系统 登录堡垒机 ssh {ip}3.查看当前状态 3.1、查看操作系统配置 ulimit -a3.2、…...

scratch变魔术 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析
目录 scratch变魔术 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、py…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...

在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
LeetCode - 199. 二叉树的右视图
题目 199. 二叉树的右视图 - 力扣(LeetCode) 思路 右视图是指从树的右侧看,对于每一层,只能看到该层最右边的节点。实现思路是: 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf
FTP 客服管理系统 实现kefu123登录,不允许匿名访问,kefu只能访问/data/kefu目录,不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...
MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用
文章目录 一、背景知识:什么是 B-Tree 和 BTree? B-Tree(平衡多路查找树) BTree(B-Tree 的变种) 二、结构对比:一张图看懂 三、为什么 MySQL InnoDB 选择 BTree? 1. 范围查询更快 2…...