JAVA(SpringBoot)集成Kafka实现消息发送和接收。
SpringBoot集成Kafka实现消息发送和接收。
- 一、Kafka 简介
- 二、Kafka 功能
- 三、POM依赖
- 四、配置文件
- 五、生产者
- 六、消费者
君子之学贵一,一则明,明则有功。
一、Kafka 简介
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。
二、Kafka 功能
消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。
三、POM依赖
<!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
四、配置文件
spring:# Kafka 配置kafka:# Kafka 服务器地址和端口 代理地址,可以多个bootstrap-servers: IP:9092# 生产者配置producer:# 发送失败时的重试次数retries: 3# 每次批量发送消息的数量,调整为较小值batch-size: 1# 生产者缓冲区大小buffer-memory: 33554432# 消息 key 的序列化器,将 key 序列化为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息 value 的序列化器,将消息体序列化为字节数组value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: true# 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔auto-commit-interval: 1000# 消息 key 的反序列化器,将字节数组反序列化为 keykey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息 value 的反序列化器,将字节数组反序列化为消息体value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
五、生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 生产者** @author chenlei*/
@Slf4j
@Component
public class KafkaProducer {/*** KafkaTemplate*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到指定的 Kafka 主题,并可指定分组信息** @param topic 消息要发送到的 Kafka 主题* @param message 要发送的消息内容*/public void sendMessage(String topic, String message) {// 使用 KafkaTemplate 发送消息,将消息发送到指定的主题ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功后的处理逻辑,可根据需要添加log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败后的处理逻辑,使用日志记录异常log.error("发送消息=[" + message + "] 失败", ex);}});}
}
六、消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author 消费者* chenlei*/
@Slf4j
@Component
public class KafkaConsumer {/*** 监听 Kafka 主题方法。** @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对*/@KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")public void listen(ConsumerRecord<?, ?> record) {// 打印接收到的消息的详细信息log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}
相关文章:
JAVA(SpringBoot)集成Kafka实现消息发送和接收。
SpringBoot集成Kafka实现消息发送和接收。 一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者 君子之学贵一,一则明,明则有功。 一、Kafka 简介 Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 Link…...
AI刷题-蛋糕工厂产能规划、优质章节的连续选择
挑两个简单的写写 目录 一、蛋糕工厂产能规划 问题描述 输入格式 输出格式 解题思路: 问题理解 数据结构选择 算法步骤 关键点 最终代码: 运行结果:编辑 二、优质章节的连续选择 问题描述 输入格式 输出格式 解题思路&a…...
在线可编辑Excel
1. Handsontable 特点: 提供了类似 Excel 的表格编辑体验,包括单元格样式、公式计算、数据验证等功能。 支持多种插件,如筛选、排序、合并单元格等。 轻量级且易于集成到现有项目中。 具备强大的自定义能力,可以调整外观和行为…...
什么是词嵌入?Word2Vec、GloVe 与 FastText 的区别
自然语言处理(NLP)领域的核心问题之一,是如何将人类的语言转换成计算机可以理解的数值形式,而词嵌入(Word Embedding)正是为了解决这个问题的重要技术。本文将详细讲解词嵌入的概念及其经典模型(Word2Vec、GloVe 和 FastText)的原理与区别。 1. 什么是词嵌入(Word Em…...
WPS数据分析000010
基于数据透视表的内容 一、排序 手动调动 二、筛选 三、值显示方式 四、值汇总依据 五、布局和选项 不显示分类汇总 合并居中带标签的单元格 空单元格显示 六、显示报表筛选页...
Qt中QVariant的使用
1.使用QVariant实现不同类型数据的相加 方法:通过type函数返回数值的类型,然后通过setValue来构造一个QVariant类型的返回值。 函数: QVariant mainPage::dataPlus(QVariant a, QVariant b) {QVariant ret;if ((a.type() QVariant::Int) &a…...
Avalonia UI MVVM DataTemplate里绑定Command
Avalonia 模板里面绑定ViewModel跟WPF写法有些不同。需要单独绑定Command. WPF里面可以直接按照下面的方法绑定DataContext. <Button Content"Button" Command"{Binding DataContext.ClickCommand, RelativeSource{RelativeSource AncestorType{x:Type User…...
动态规划DP 数字三角型模型 最低通行费用(题目详解+C++代码完整实现)
最低通行费用 原题链接 AcWing 1018. 最低同行费用 题目描述 一个商人穿过一个 NN的正方形的网格,去参加一个非常重要的商务活动。 他要从网格的左上角进,右下角出。每穿越中间 1个小方格,都要花费 1个单位时间。商人必须在 (2N−1)个单位…...
deepseek R1的确不错,特别是深度思考模式
deepseek R1的确不错,特别是深度思考模式,每次都能自我反省改进。比如我让 它写文案: 【赛博朋克版程序员新春密码——2025我们来破局】 亲爱的代码骑士们: 当CtrlS的肌肉记忆遇上抢票插件,当Spring Boot的…...
Linux 常用命令 - sort 【对文件内容进行排序】
简介 sort 命令源于英文单词 “sort”,表示排序。其主要功能是对文本文件中的行进行排序。它可以根据字母、数字、特定字段等不同的标准进行排序。sort 通过逐行读取文件(没有指定文件或指定文件为 - 时读取标准输入)内容,并按照…...
MyBatis最佳实践:提升数据库交互效率的秘密武器
第一章:框架的概述: MyBatis 框架的概述: MyBatis 是一个优秀的基于 Java 的持久框架,内部对 JDBC 做了封装,使开发者只需要关注 SQL 语句,而不关注 JDBC 的代码,使开发变得更加的简单MyBatis 通…...
选择困难?直接生成pynput快捷键字符串
from pynput import keyboard# 文档:https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard # 博客(pynput相关源码):https://blog.csdn.net/qq_39124701/article/details/145230331 # 虚拟键码(十六进制):https:/…...
DeepSeek-R1:强化学习驱动的推理模型
1月20日晚,DeepSeek正式发布了全新的推理模型DeepSeek-R1,引起了人工智能领域的广泛关注。该模型在数学、代码生成等高复杂度任务上表现出色,性能对标OpenAI的o1正式版。同时,DeepSeek宣布将DeepSeek-R1以及相关技术报告全面开源。…...
国内优秀的FPGA设计公司主要分布在哪些城市?
近年来,国内FPGA行业发展迅速,随着5G通信、人工智能、大数据等新兴技术的崛起,FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此,国内优秀的FPGA设计公司主要分布在哪些城市&a…...
3.日常英语笔记
screening discrepancies 筛选差异 The team found some screening discrepancies in the data. 团队在数据筛选中发现了些差异。 Don’t tug at it ,or it will fall over and crush you. tug 拉,拽,拖 He tugged the door open with all his might…...
基于RIP的MGRE实验
实验拓扑 实验要求 按照图示配置IP地址配置静态路由协议,搞通公网配置MGRE VPNNHRP的配置配置RIP路由协议来传递两端私网路由测试全网通 实验配置 1、配置IP地址 [R1]int g0/0/0 [R1-GigabitEthernet0/0/0]ip add 15.0.0.1 24 [R1]int LoopBack 0 [R1-LoopBack0]i…...
【开源免费】基于Vue和SpringBoot的美食推荐商城(附论文)
本文项目编号 T 166 ,文末自助获取源码 \color{red}{T166,文末自助获取源码} T166,文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...
Pandas DataFrame 拼接、合并和关联
拼接:使用 pd.concat(),可以沿着行或列方向拼接 DataFrame。 合并:使用 pd.merge(),可以根据一个或多个键进行不同类型的合并(左连接、右连接、全连接、内连接)。 关联:使用 join() 方法,通常在设置了索引的 DataFrame 上进行关联操作。 concat拼接 按列拼接 df1 = …...
【Redis】Redis修改连接数参数
1.重启操作背景 Redis数据库连接数上限,需要修改配置文件里maxclients参数,修改后需重启数据库 1.1、修改操作系统open files参数 1.2、修改redis连接数 2.登录操作系统 登录堡垒机 ssh {ip}3.查看当前状态 3.1、查看操作系统配置 ulimit -a3.2、…...
scratch变魔术 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析
目录 scratch变魔术 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、py…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
华硕a豆14 Air香氛版,美学与科技的馨香融合
在快节奏的现代生活中,我们渴望一个能激发创想、愉悦感官的工作与生活伙伴,它不仅是冰冷的科技工具,更能触动我们内心深处的细腻情感。正是在这样的期许下,华硕a豆14 Air香氛版翩然而至,它以一种前所未有的方式&#x…...
