当前位置: 首页 > news >正文

JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。

  • 一、Kafka 简介
  • 二、Kafka 功能
  • 三、POM依赖
  • 四、配置文件
  • 五、生产者
  • 六、消费者

君子之学贵一,一则明,明则有功

一、Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能

消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。

三、POM依赖

    <!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>

四、配置文件

spring:# Kafka 配置kafka:# Kafka 服务器地址和端口 代理地址,可以多个bootstrap-servers: IP:9092# 生产者配置producer:# 发送失败时的重试次数retries: 3# 每次批量发送消息的数量,调整为较小值batch-size: 1# 生产者缓冲区大小buffer-memory: 33554432# 消息 key 的序列化器,将 key 序列化为字节数组key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息 value 的序列化器,将消息体序列化为字节数组value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: true# 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔auto-commit-interval: 1000# 消息 key 的反序列化器,将字节数组反序列化为 keykey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息 value 的反序列化器,将字节数组反序列化为消息体value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 生产者** @author chenlei*/
@Slf4j
@Component
public class KafkaProducer {/*** KafkaTemplate*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到指定的 Kafka 主题,并可指定分组信息** @param topic   消息要发送到的 Kafka 主题* @param message 要发送的消息内容*/public void sendMessage(String topic, String message) {// 使用 KafkaTemplate 发送消息,将消息发送到指定的主题ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {// 消息发送成功后的处理逻辑,可根据需要添加log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");}@Overridepublic void onFailure(Throwable ex) {// 消息发送失败后的处理逻辑,使用日志记录异常log.error("发送消息=[" + message + "] 失败", ex);}});}
}

六、消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author 消费者* chenlei*/
@Slf4j
@Component
public class KafkaConsumer {/*** 监听 Kafka 主题方法。** @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对*/@KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")public void listen(ConsumerRecord<?, ?> record) {// 打印接收到的消息的详细信息log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}

相关文章:

JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。 一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者 君子之学贵一&#xff0c;一则明&#xff0c;明则有功。 一、Kafka 简介 Kafka 是由 Apache 软件基金会开发的一个开源流处理平台&#xff0c;最初由 Link…...

AI刷题-蛋糕工厂产能规划、优质章节的连续选择

挑两个简单的写写 目录 一、蛋糕工厂产能规划 问题描述 输入格式 输出格式 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 关键点 最终代码&#xff1a; 运行结果&#xff1a;​编辑 二、优质章节的连续选择 问题描述 输入格式 输出格式 解题思路&a…...

在线可编辑Excel

1. Handsontable 特点&#xff1a; 提供了类似 Excel 的表格编辑体验&#xff0c;包括单元格样式、公式计算、数据验证等功能。 支持多种插件&#xff0c;如筛选、排序、合并单元格等。 轻量级且易于集成到现有项目中。 具备强大的自定义能力&#xff0c;可以调整外观和行为…...

什么是词嵌入?Word2Vec、GloVe 与 FastText 的区别

自然语言处理(NLP)领域的核心问题之一,是如何将人类的语言转换成计算机可以理解的数值形式,而词嵌入(Word Embedding)正是为了解决这个问题的重要技术。本文将详细讲解词嵌入的概念及其经典模型(Word2Vec、GloVe 和 FastText)的原理与区别。 1. 什么是词嵌入(Word Em…...

WPS数据分析000010

基于数据透视表的内容 一、排序 手动调动 二、筛选 三、值显示方式 四、值汇总依据 五、布局和选项 不显示分类汇总 合并居中带标签的单元格 空单元格显示 六、显示报表筛选页...

Qt中QVariant的使用

1.使用QVariant实现不同类型数据的相加 方法&#xff1a;通过type函数返回数值的类型&#xff0c;然后通过setValue来构造一个QVariant类型的返回值。 函数&#xff1a; QVariant mainPage::dataPlus(QVariant a, QVariant b) {QVariant ret;if ((a.type() QVariant::Int) &a…...

Avalonia UI MVVM DataTemplate里绑定Command

Avalonia 模板里面绑定ViewModel跟WPF写法有些不同。需要单独绑定Command. WPF里面可以直接按照下面的方法绑定DataContext. <Button Content"Button" Command"{Binding DataContext.ClickCommand, RelativeSource{RelativeSource AncestorType{x:Type User…...

动态规划DP 数字三角型模型 最低通行费用(题目详解+C++代码完整实现)

最低通行费用 原题链接 AcWing 1018. 最低同行费用 题目描述 一个商人穿过一个 NN的正方形的网格&#xff0c;去参加一个非常重要的商务活动。 他要从网格的左上角进&#xff0c;右下角出。每穿越中间 1个小方格&#xff0c;都要花费 1个单位时间。商人必须在 (2N−1)个单位…...

deepseek R1的确不错,特别是深度思考模式

deepseek R1的确不错&#xff0c;特别是深度思考模式&#xff0c;每次都能自我反省改进。比如我让 它写文案&#xff1a; 【赛博朋克版程序员新春密码——2025我们来破局】 亲爱的代码骑士们&#xff1a; 当CtrlS的肌肉记忆遇上抢票插件&#xff0c;当Spring Boot的…...

Linux 常用命令 - sort 【对文件内容进行排序】

简介 sort 命令源于英文单词 “sort”&#xff0c;表示排序。其主要功能是对文本文件中的行进行排序。它可以根据字母、数字、特定字段等不同的标准进行排序。sort 通过逐行读取文件&#xff08;没有指定文件或指定文件为 - 时读取标准输入&#xff09;内容&#xff0c;并按照…...

MyBatis最佳实践:提升数据库交互效率的秘密武器

第一章&#xff1a;框架的概述&#xff1a; MyBatis 框架的概述&#xff1a; MyBatis 是一个优秀的基于 Java 的持久框架&#xff0c;内部对 JDBC 做了封装&#xff0c;使开发者只需要关注 SQL 语句&#xff0c;而不关注 JDBC 的代码&#xff0c;使开发变得更加的简单MyBatis 通…...

选择困难?直接生成pynput快捷键字符串

from pynput import keyboard# 文档&#xff1a;https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard # 博客(pynput相关源码)&#xff1a;https://blog.csdn.net/qq_39124701/article/details/145230331 # 虚拟键码(十六进制)&#xff1a;https:/…...

DeepSeek-R1:强化学习驱动的推理模型

1月20日晚&#xff0c;DeepSeek正式发布了全新的推理模型DeepSeek-R1&#xff0c;引起了人工智能领域的广泛关注。该模型在数学、代码生成等高复杂度任务上表现出色&#xff0c;性能对标OpenAI的o1正式版。同时&#xff0c;DeepSeek宣布将DeepSeek-R1以及相关技术报告全面开源。…...

国内优秀的FPGA设计公司主要分布在哪些城市?

近年来&#xff0c;国内FPGA行业发展迅速&#xff0c;随着5G通信、人工智能、大数据等新兴技术的崛起&#xff0c;FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此&#xff0c;国内优秀的FPGA设计公司主要分布在哪些城市&a…...

3.日常英语笔记

screening discrepancies 筛选差异 The team found some screening discrepancies in the data. 团队在数据筛选中发现了些差异。 Don’t tug at it ,or it will fall over and crush you. tug 拉&#xff0c;拽&#xff0c;拖 He tugged the door open with all his might…...

基于RIP的MGRE实验

实验拓扑 实验要求 按照图示配置IP地址配置静态路由协议&#xff0c;搞通公网配置MGRE VPNNHRP的配置配置RIP路由协议来传递两端私网路由测试全网通 实验配置 1、配置IP地址 [R1]int g0/0/0 [R1-GigabitEthernet0/0/0]ip add 15.0.0.1 24 [R1]int LoopBack 0 [R1-LoopBack0]i…...

【开源免费】基于Vue和SpringBoot的美食推荐商城(附论文)

本文项目编号 T 166 &#xff0c;文末自助获取源码 \color{red}{T166&#xff0c;文末自助获取源码} T166&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

Pandas DataFrame 拼接、合并和关联

拼接:使用 pd.concat(),可以沿着行或列方向拼接 DataFrame。 合并:使用 pd.merge(),可以根据一个或多个键进行不同类型的合并(左连接、右连接、全连接、内连接)。 关联:使用 join() 方法,通常在设置了索引的 DataFrame 上进行关联操作。 concat拼接 按列拼接 df1 = …...

【Redis】Redis修改连接数参数

1.重启操作背景 Redis数据库连接数上限&#xff0c;需要修改配置文件里maxclients参数&#xff0c;修改后需重启数据库 1.1、修改操作系统open files参数 1.2、修改redis连接数 2.登录操作系统 登录堡垒机 ssh {ip}3.查看当前状态 3.1、查看操作系统配置 ulimit -a3.2、…...

scratch变魔术 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析

目录 scratch变魔术 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、py…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...