当前位置: 首页 > news >正文

JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。

  • 一、Kafka 简介
  • 二、Kafka 功能
  • 三、POM依赖
  • 四、配置文件
  • 五、生产者
  • 六、消费者

君子之学贵一,一则明,明则有功

一、Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能

消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。

三、POM依赖

    <!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>

四、配置文件

spring:# Kafka 配置kafka:# Kafka 服务器地址和端口 代理地址,可以多个bootstrap-servers: IP:9092# 生产者配置producer:# 发送失败时的重试次数retries: 3# 每次批量发送消息的数量,调整为较小值batch-size: 1# 生产者缓冲区大小buffer-memory: 33554432# 消息 key 的序列化器,将 key 序列化为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息 value 的序列化器,将消息体序列化为字节数组value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: true# 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔auto-commit-interval: 1000# 消息 key 的反序列化器,将字节数组反序列化为 keykey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息 value 的反序列化器,将字节数组反序列化为消息体value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 生产者** @author chenlei*/
@Slf4j
@Component
public class KafkaProducer {/*** KafkaTemplate*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到指定的 Kafka 主题,并可指定分组信息** @param topic   消息要发送到的 Kafka 主题* @param message 要发送的消息内容*/public void sendMessage(String topic, String message) {// 使用 KafkaTemplate 发送消息,将消息发送到指定的主题ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功后的处理逻辑,可根据需要添加log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败后的处理逻辑,使用日志记录异常log.error("发送消息=[" + message + "] 失败", ex);}});}
}

六、消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author 消费者* chenlei*/
@Slf4j
@Component
public class KafkaConsumer {/*** 监听 Kafka 主题方法。** @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对*/@KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")public void listen(ConsumerRecord<?, ?> record) {// 打印接收到的消息的详细信息log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}

相关文章:

JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。 一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者 君子之学贵一&#xff0c;一则明&#xff0c;明则有功。 一、Kafka 简介 Kafka 是由 Apache 软件基金会开发的一个开源流处理平台&#xff0c;最初由 Link…...

AI刷题-蛋糕工厂产能规划、优质章节的连续选择

挑两个简单的写写 目录 一、蛋糕工厂产能规划 问题描述 输入格式 输出格式 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 关键点 最终代码&#xff1a; 运行结果&#xff1a;​编辑 二、优质章节的连续选择 问题描述 输入格式 输出格式 解题思路&a…...

在线可编辑Excel

1. Handsontable 特点&#xff1a; 提供了类似 Excel 的表格编辑体验&#xff0c;包括单元格样式、公式计算、数据验证等功能。 支持多种插件&#xff0c;如筛选、排序、合并单元格等。 轻量级且易于集成到现有项目中。 具备强大的自定义能力&#xff0c;可以调整外观和行为…...

什么是词嵌入?Word2Vec、GloVe 与 FastText 的区别

自然语言处理(NLP)领域的核心问题之一,是如何将人类的语言转换成计算机可以理解的数值形式,而词嵌入(Word Embedding)正是为了解决这个问题的重要技术。本文将详细讲解词嵌入的概念及其经典模型(Word2Vec、GloVe 和 FastText)的原理与区别。 1. 什么是词嵌入(Word Em…...

WPS数据分析000010

基于数据透视表的内容 一、排序 手动调动 二、筛选 三、值显示方式 四、值汇总依据 五、布局和选项 不显示分类汇总 合并居中带标签的单元格 空单元格显示 六、显示报表筛选页...

Qt中QVariant的使用

1.使用QVariant实现不同类型数据的相加 方法&#xff1a;通过type函数返回数值的类型&#xff0c;然后通过setValue来构造一个QVariant类型的返回值。 函数&#xff1a; QVariant mainPage::dataPlus(QVariant a, QVariant b) {QVariant ret;if ((a.type() QVariant::Int) &a…...

Avalonia UI MVVM DataTemplate里绑定Command

Avalonia 模板里面绑定ViewModel跟WPF写法有些不同。需要单独绑定Command. WPF里面可以直接按照下面的方法绑定DataContext. <Button Content"Button" Command"{Binding DataContext.ClickCommand, RelativeSource{RelativeSource AncestorType{x:Type User…...

动态规划DP 数字三角型模型 最低通行费用(题目详解+C++代码完整实现)

最低通行费用 原题链接 AcWing 1018. 最低同行费用 题目描述 一个商人穿过一个 NN的正方形的网格&#xff0c;去参加一个非常重要的商务活动。 他要从网格的左上角进&#xff0c;右下角出。每穿越中间 1个小方格&#xff0c;都要花费 1个单位时间。商人必须在 (2N−1)个单位…...

deepseek R1的确不错,特别是深度思考模式

deepseek R1的确不错&#xff0c;特别是深度思考模式&#xff0c;每次都能自我反省改进。比如我让 它写文案&#xff1a; 【赛博朋克版程序员新春密码——2025我们来破局】 亲爱的代码骑士们&#xff1a; 当CtrlS的肌肉记忆遇上抢票插件&#xff0c;当Spring Boot的…...

Linux 常用命令 - sort 【对文件内容进行排序】

简介 sort 命令源于英文单词 “sort”&#xff0c;表示排序。其主要功能是对文本文件中的行进行排序。它可以根据字母、数字、特定字段等不同的标准进行排序。sort 通过逐行读取文件&#xff08;没有指定文件或指定文件为 - 时读取标准输入&#xff09;内容&#xff0c;并按照…...

MyBatis最佳实践:提升数据库交互效率的秘密武器

第一章&#xff1a;框架的概述&#xff1a; MyBatis 框架的概述&#xff1a; MyBatis 是一个优秀的基于 Java 的持久框架&#xff0c;内部对 JDBC 做了封装&#xff0c;使开发者只需要关注 SQL 语句&#xff0c;而不关注 JDBC 的代码&#xff0c;使开发变得更加的简单MyBatis 通…...

选择困难?直接生成pynput快捷键字符串

from pynput import keyboard# 文档&#xff1a;https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard # 博客(pynput相关源码)&#xff1a;https://blog.csdn.net/qq_39124701/article/details/145230331 # 虚拟键码(十六进制)&#xff1a;https:/…...

DeepSeek-R1:强化学习驱动的推理模型

1月20日晚&#xff0c;DeepSeek正式发布了全新的推理模型DeepSeek-R1&#xff0c;引起了人工智能领域的广泛关注。该模型在数学、代码生成等高复杂度任务上表现出色&#xff0c;性能对标OpenAI的o1正式版。同时&#xff0c;DeepSeek宣布将DeepSeek-R1以及相关技术报告全面开源。…...

国内优秀的FPGA设计公司主要分布在哪些城市?

近年来&#xff0c;国内FPGA行业发展迅速&#xff0c;随着5G通信、人工智能、大数据等新兴技术的崛起&#xff0c;FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此&#xff0c;国内优秀的FPGA设计公司主要分布在哪些城市&a…...

3.日常英语笔记

screening discrepancies 筛选差异 The team found some screening discrepancies in the data. 团队在数据筛选中发现了些差异。 Don’t tug at it ,or it will fall over and crush you. tug 拉&#xff0c;拽&#xff0c;拖 He tugged the door open with all his might…...

基于RIP的MGRE实验

实验拓扑 实验要求 按照图示配置IP地址配置静态路由协议&#xff0c;搞通公网配置MGRE VPNNHRP的配置配置RIP路由协议来传递两端私网路由测试全网通 实验配置 1、配置IP地址 [R1]int g0/0/0 [R1-GigabitEthernet0/0/0]ip add 15.0.0.1 24 [R1]int LoopBack 0 [R1-LoopBack0]i…...

【开源免费】基于Vue和SpringBoot的美食推荐商城(附论文)

本文项目编号 T 166 &#xff0c;文末自助获取源码 \color{red}{T166&#xff0c;文末自助获取源码} T166&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

Pandas DataFrame 拼接、合并和关联

拼接:使用 pd.concat(),可以沿着行或列方向拼接 DataFrame。 合并:使用 pd.merge(),可以根据一个或多个键进行不同类型的合并(左连接、右连接、全连接、内连接)。 关联:使用 join() 方法,通常在设置了索引的 DataFrame 上进行关联操作。 concat拼接 按列拼接 df1 = …...

【Redis】Redis修改连接数参数

1.重启操作背景 Redis数据库连接数上限&#xff0c;需要修改配置文件里maxclients参数&#xff0c;修改后需重启数据库 1.1、修改操作系统open files参数 1.2、修改redis连接数 2.登录操作系统 登录堡垒机 ssh {ip}3.查看当前状态 3.1、查看操作系统配置 ulimit -a3.2、…...

scratch变魔术 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析

目录 scratch变魔术 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、py…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验

系列回顾&#xff1a; 在上一篇中&#xff0c;我们成功地为应用集成了数据库&#xff0c;并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了&#xff01;但是&#xff0c;如果你仔细审视那些 API&#xff0c;会发现它们还很“粗糙”&#xff1a;有…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础

第三周 Day 3 &#x1f3af; 今日目标 理解类&#xff08;class&#xff09;和对象&#xff08;object&#xff09;的关系学会定义类的属性、方法和构造函数&#xff08;init&#xff09;掌握对象的创建与使用初识封装、继承和多态的基本概念&#xff08;预告&#xff09; &a…...

Copilot for Xcode (iOS的 AI辅助编程)

Copilot for Xcode 简介Copilot下载与安装 体验环境要求下载最新的安装包安装登录系统权限设置 AI辅助编程生成注释代码补全简单需求代码生成辅助编程行间代码生成注释联想 代码生成 总结 简介 尝试使用了Copilot&#xff0c;它能根据上下文补全代码&#xff0c;快速生成常用…...