当前位置: 首页 > article >正文

SpringBoot基础Kafka示例

这里将生产者和消费者放在一个应用中

使用的Boot3.4.3

引入Kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置


spring:application:name: kafka-1#kafka连接地址kafka:bootstrap-servers: 127.0.0.1:9092#配置生产者producer:#消息发送失败重试次数retries: 0#一个批次可以使用内存的大小batch-size: 16384#一个批次消息数量buffer-memory: 33554432#键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer#值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:#是否自动提交enable-auto-commit: false#自动提交的频率auto-commit-interval: 1000#earliest	从分区的最早偏移量开始消费	需要消费所有历史消息  latest	从分区的最新偏移量开始消费,忽略历史消息	只关心新消息#none	如果没有有效的偏移量,抛出异常	严格要求偏移量必须存在#exception spring-kafka不支持auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:#用于配置消费者如何处理消息的确认  ack配置方式  这里指定由消费者手动提交偏移量#Acknowledgment.acknowledge() 方法来提交偏移量ack-mode: MANUAL_IMMEDIATEconcurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3server:port: 8099

生产者示例,一般可能是一个MQTT接收消息入口

package com.hrui.kafka1.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author hrui* @date 2025/3/10 14:56*/
@RestController
public class EventProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/sendMessage")public String sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);return "Message sent to topic '" + topic + "': " + message;}@RequestMapping("/sendMessage2")public String sendMessage2() {//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();kafkaTemplate.send(message);return "Message sent to topic";}}

消费者示例

注意:如果配置了手动提交ack,那么

主要目的不仅仅是避免重复消费,而是为了确保消息的可靠处理和偏移量(offset)的正确提交。它可以避免重复消费,但更重要的是保证消息不会丢失,并且在消息处理失败时能够重新消费。

package com.hrui.kafka1.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author hrui* @date 2025/3/10 15:57*/
@Component
public class EventConsumer {@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")public void onMessage(ConsumerRecord<String,String> message){System.out.println("接收到消息1:"+message.value());}@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")public void onMessage(String message){System.out.println("接收到消息2:"+message);}@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.GROUP_ID) String groupId) {try {System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);// 处理消息逻辑// ...} catch (Exception e) {// 处理异常,记录日志System.err.println("处理消息失败: " + e.getMessage());// 可以根据业务需求决定是否重新抛出异常}finally {// 手动提交偏移量ack.acknowledge();}}
}

生产者可选择异步或者同步发送消息

生产者发送消息有同步异步之说 那么消费者在消费消息时候 有没有同步异步之说呢???

在 Kafka 消费者中,消费消息的方式本质上是由 Kafka 的设计决定的,而不是由消费者代码显式控制的。Kafka 消费者在消费消息时,通常是以拉取(poll)的方式从 Kafka 服务器获取消息,然后处理这些消息。从这个角度来看,消费者的消费行为是同步的,因为消费者需要主动调用 poll 方法来获取消息。

然而,消费者的消息处理逻辑可以是同步异步的,具体取决于业务实现。以下是对消费者消费消息的同步和异步行为的详细分析:

 消费者的同步消费

在默认情况下,Kafka 消费者的消费行为是同步的,即:

  • 消费者通过 poll 方法从 Kafka 拉取一批消息。

  • 消费者逐条处理这些消息。

  • 每条消息处理完成后,消费者提交偏移量(offset)。

  • 消费者继续调用 poll 方法获取下一批消息。

特点:
  • 消息处理是顺序的,即一条消息处理完成后才会处理下一条消息。

  • 如果某条消息处理时间较长,会影响后续消息的处理速度。

  • 适合消息处理逻辑简单、处理时间较短的场景。

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {try {System.out.println("接收到消息:" + message.value());// 同步处理消息逻辑processMessage(message);} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

2. 消费者的异步消费

在某些场景下,消费者可能需要以异步的方式处理消息,即:

  • 消费者通过 poll 方法拉取一批消息。

  • 将每条消息提交到一个线程池或异步任务中处理。

  • 消费者继续调用 poll 方法获取下一批消息,而不等待上一条消息处理完成。

特点:
  • 消息处理是并发的,可以提高消息处理的吞吐量。

  • 需要额外的线程池或异步任务管理机制。

  • 适合消息处理逻辑复杂、处理时间较长的场景。

示例代码:

@Autowired
private ExecutorService executorService; // 注入线程池@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {if (!StringUtils.hasText(message.value())) {ack.acknowledge();return;}// 提交异步任务处理消息executorService.submit(() -> {try {System.out.println("接收到消息:" + message.value());processMessage(message); // 异步处理消息} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}});
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

同步代码示例

@RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);try {//阻塞等待拿结果SendResult<String, String> sendResult = send.get();System.out.println("说明消息发送成功,如果不成功会抛出异常");} catch (Exception e) {throw new RuntimeException(e);}return "Message sent to topic";}

异步注册回调的方式

 @RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);//非阻塞  异步 注册回调异步通知send.thenAccept(result -> {System.out.println("消息发送成功");}).exceptionally(e->{System.out.println("发送失败");e.printStackTrace();return null;});return "Message sent to topic";}

如果需要发送的不是String类型 

那么要发送的不是String类型

KafkaTemplate<String,Object> kafkaTemplate;

一般来说可以专成JSON字符串发送

在引入spring-kafka的时候     KafkaAutoConfiguration中  配置了KafkaTemplate

Kafka<Object,Object>

如果需要用KafkaTemplate发送对象的时候

默认用的String序列化   会报错   除非将对象转为JSON字符串(一般可以这么做)

如果用对象的话   改成JsonSerializer  这样自动转JSON字符串

相关文章:

SpringBoot基础Kafka示例

这里将生产者和消费者放在一个应用中 使用的Boot3.4.3 引入Kafka依赖 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>yml配置 spring:application:name: kafka-1#kafka…...

Spring 的三种注入方式?

1. 实例的注入方式 首先来看看 Spring 中的实例该如何注入&#xff0c;总结起来&#xff0c;无非三种&#xff1a; 属性注入 set 方法注入 构造方法注入 我们分别来看下。 1.1 属性注入 属性注入是大家最为常见也是使用最多的一种注入方式了&#xff0c;代码如下&#x…...

STM32第一天建立工程

新建一个工程 1&#xff1a;新建一个文件&#xff0c;添加文件 a:DOC工程说明 》doc说明文档 b&#xff1a;Libraries固件库 》cmsis内核文件 &#xff08;一般这就是stm32内核文件&#xff09; 》FWLIB外设文件 &#xff08;这种就是stm32外设文件不全&#xff09; 》start…...

记录一下返修

1.对复杂度的分析还不够&#xff1b; 2.融合两种指标的解释还不够&#xff0c;审稿人认为这两种指标存在冲突&#xff0c;不能同时优化&#xff0c;但其实我们考虑的是公平性保证整个调度周期内用户分配到了更加平均的sum-rate,而se是为了追求每个调度时刻都尽可能找到信道条件…...

搭建本地化笔记AI:用Copilot+deepseek+nomic-embed-text构建本地智能知识系统

安装Ollama https://ollama.com/ 下载模型 下载大语言模型 根据自己电脑的配置选择模型的大小 ollama run deepseek-r1:8b 下载向量处理模型 创建向量数据库时需要使用Embedding模型对文本进行向量化处理 ollama pull nomic-embed-text 查看安装的模型 ollama listNAME …...

【C语言】指针篇

目录 C 语言指针概述指针的声明和初始化声明指针初始化指针 指针的操作解引用操作指针算术运算 指针的用途动态内存分配作为函数参数 指针与数组数组名作为指针通过指针访问数组元素指针算术和数组数组作为函数参数指针数组和数组指针指针数组数组指针 函数指针函数指针的定义和…...

【蓝桥杯单片机】第十一届省赛

一、真题 二、创建工程 1.在C盘以外的盘新建文件夹&#xff0c;并在文件夹里面创建两个文件夹Driver 和Project 2.打开keil软件&#xff0c;在新建工程并选择刚刚建好的project文件夹&#xff0c;以准考证号命名 3.选择对应的芯片型号 4.选择否&#xff0c;即不创建启动文件 …...

【存储中间件】Neo4J图数据库超详细教程(一):相关介绍、特点及优势、数据模型、软件安装

文章目录 Neo4J超详细教程一、Neo4J相关介绍1.为什么需要图数据库方案1&#xff1a;Google方案2&#xff1a;Facebook 2.特点和优势3.什么是Neo4j4.Neo4j数据模型图论基础属性图模型Neo4j的构建元素 5.软件安装 个人主页&#xff1a;道友老李 欢迎加入社区&#xff1a;道友老李…...

xxl-job部署在docker-destop,实现定时发送预警信息给指定邮箱

XXL-JOB XXL-JOB是一个分布式任务调度平台&#xff08;XXL是作者徐雪里姓名拼音的首字母&#xff09;&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展。 源码仓库地址&#xff1a;https://github.com/xuxueli/xxl-job 源码结构&#xff1a; 系统架构 在xxl-j…...

【QT】QScrollBar设置样式:圆角、隐藏箭头、上边距等

目录 0.简介 1.原理 2.具体代码 0.简介 环境&#xff1a;Ubuntu22.04、qtDesigner绘制UI 项目需要&#xff0c;按照UI修改滚动条样式&#xff0c;滚动条我使用的是QScrollBar&#xff0c;默认样式和修改之后的样式如下&#xff1a; 1.原理 2.具体代码 我是用qtDesigner绘制…...

trae中文版AI搭建完整可用的项目框架

Trae 是由字节跳动推出的 AI 原生集成开发环境&#xff08;AI IDE&#xff09;&#xff0c;号称可以搭建完整项目&#xff0c;个人试用后体验确实比Cursor或cline更便捷&#xff0c;因为他多个文件关联准确率更高。 正式版的trae不支持大陆使用&#xff0c;不过目前已经推出了…...

多数元素——面试经典150题(力扣)

题目 给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例 1&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&#xff1a;3 …...

cfi网络安全 网络安全hcip

目录 RIP (路由信息协议) 算法 开销 版本 开销值的计算方式 RIPV1和RIPV2的区别 RIP的数据包 Request(请求)包 Reponse(应答)包 RIP的特征 周期更新 RIP的计时器 1&#xff0c;周期更新计时器 2&#xff0c;失效计时器 3&#xff0c;垃圾回收计时器 RIP的核心思…...

Banana Pi 与瑞萨电子携手共同推动开源创新:BPI-AI2N

2025年3月11日&#xff0c; Banana Pi 开源硬件平台很高兴宣布&#xff0c;与全球知名半导体解决方案供应商瑞萨电子&#xff08;Renesas Electronics&#xff09;正式达成技术合作关系。此次合作标志着双方将在开源技术、嵌入式系统和物联网等领域展开深度合作&#xff0c;为全…...

linux 命令 ls

ls 是 Linux 系统中用于列出目录内容的核心命令&#xff0c;几乎所有日常操作都会用到。以下是其详细用法和常见场景说明 1. 基础语法 ls [选项] [目录/文件] 不指定目录时&#xff0c;默认列出当前目录的内容。 可以指定文件或目录路径&#xff0c;支持通配符&#xff08;如…...

论数组去重之高效方法

论数组去重之高效方法 数组去重的高效方法主要有 利用 Set 数据结构、利用对象/Map哈希表、排序后遍历去重 三种核心方案。其中 Set 是ES6最简单高效的方式,时间复杂度为 O(n);若需兼容性优化或处理特殊数据类型,可结合哈希表或排序实现。 分点论述: 1. 使用 Set 数据结构…...

C#-扩展方法-Linq

密封类 sealed&#xff0c;无法被继承 var 可以定义匿名对象 static void test1() {var t 1;t "jack";//报错&#xff0c;类型已经确定好了var s new{id 1,name "tom"};Console.WriteLine(s.id s.name); } 扩展方法 对现有类型做方法的扩展&am…...

【C++ STL】 容器详解:pair 学习

在 C STL&#xff08;标准模板库&#xff09;中&#xff0c;pair 是一个 简单的键值对数据结构&#xff0c;用于存储 两个相关联的值&#xff0c;将两个值组合成一个单元&#xff0c;可以是相同或不同类型。它常用于 返回多个值、存储映射关系、排序 等场景。 1. pair 的基本特…...

Go红队开发—web网络编程

文章目录 web网络编程Req快速请求 调试DevModeDebugLogTraceInfo瓶颈分析 控制请求与响应控制请求的字段内容控制调试打印的内容分开dump请求与响应部分请求体设置 作用范围级别设置参数查询URL 路径参数表单请求设置请求头设置 判断响应状态码解析数据SetSuccessResultgjson响…...

libwebsockets实现异步websocket客户端,服务端异常断开可重连

libwebsockets websocket客户端基本流程网上都有&#xff0c;我只额外优化了重连机制。 在服务器异常断开时不触发LWS_CALLBACK_CLOSED或LWS_CALLBACK_CLIENT_CONNECTION_ERROR&#xff0c;导致无法自动重连 通过定时检查链接是否可写入判断链接是否有效 // 判断wsi是否可用if …...

轻量级模块化前端框架:快速构建强大的Web界面

轻量级模块化前端框架&#xff1a;快速构建强大的Web界面 在当今快节奏的Web开发环境中&#xff0c;选择一个高效且灵活的前端框架至关重要。UIkit 是一个轻量级的模块化前端框架&#xff0c;旨在帮助开发者快速构建功能强大且响应迅速的Web界面。 UIkit提供了丰富的组件和工…...

qt+opengl 播放yuv视频

一、实现效果 二、pro文件 Qt widgets opengl 三、主要代码 #include "glwidget.h"GLWidget::GLWidget(QWidget *parent) : QOpenGLWidget(parent) {connect(&m_timer, &QTimer::timeout, this,[&](){this->update();});m_timer.start(1000/33); }v…...

UI自动化:poium测试库

以下是关于 poium 测试库 的详细介绍&#xff0c;涵盖其核心功能、使用方法及与原生 Selenium 的对比&#xff0c;帮助快速掌握这一工具&#xff1a; 1. poium 简介 定位&#xff1a;基于 Selenium 的 Page Object 模式增强库&#xff0c;专注于简化元素定位和页面操作。 核心…...

树莓集团落子海南,如何重构数字产业生态体系​

树莓集团在海南的布局&#xff0c;是其整体商业战略中的关键一环。这背后&#xff0c;是对政策机遇、产业协同、以及区域优势的深度考量。 政策机遇 海南自贸港建设带来前所未有的政策红利&#xff0c;包括贸易、投资、资金等方面的自由便利。树莓集团紧抓这一机遇&#xff0…...

5G基本概念

作者:私语茶馆 1. 5G应用场景概述 1.1.5G应用场景 ITU域2015年定义了三大应用场景:eMBB(增强型移动宽带)、uRLLC(低时延高可靠通信)、mMTC(海量物联网通信); emBB:Enhanced Mobile Broadband ,移动互联网应用,是4G MBB(移动宽带)的升级,主要侧重于网络速率、带…...

PH热榜 | 2025-03-12

1. Fluently 标语&#xff1a;开始说英语&#xff0c;就像说你的母语一样流利。 介绍&#xff1a;想象一下&#xff0c;有一个像人类一样的英语教练&#xff0c;全天候在线、价格却便宜15倍。这就是 Fluently &#x1f680; 纠正你的错误&#xff0c;提升你的词汇量、发音和语…...

Python Web项目的服务器部署

一.部署运行 1.虚拟环境的安装&#xff1a;&#xff08;一行一行运行&#xff09; wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh bash miniconda.sh -b -p /opt/miniconda3 echo export PATH"/opt/miniconda3/bin:$PAT…...

[项目]基于FreeRTOS的STM32四轴飞行器: 八.遥控器摇杆

基于FreeRTOS的STM32四轴飞行器: 八.遥控器摇杆 一.摇杆数据的扫描二.处理摇杆数据三.微调按键处理 一.摇杆数据的扫描 下面摇杆初始化时&#xff0c;启动了ADC-DMA进行了采集&#xff0c;已经开始转换直接将数据通过DMA存入buff数组中&#xff1a; static uint16_t buff[4] …...

附下载 | 2024 OWASP Top 10 基础设施安全风险.pdf

《2024 OWASP Top 10 基础设施安全风险》报告&#xff0c;由OWASP&#xff08;开放网络应用安全项目&#xff09;发布&#xff0c;旨在提升企业和组织对基础设施安全风险、威胁与漏洞的意识&#xff0c;并提供高质量的信息和最佳实践建议。报告列出了2024年最重要的10大基础设施…...

Pytorch的一小步,昇腾芯片的一大步

Pytorch的一小步&#xff0c;昇腾芯片的一大步 相信在AI圈的人多多少少都看到了最近的信息&#xff1a;PyTorch最新2.1版本宣布支持华为昇腾芯片&#xff01; 1、 发生了什么事儿&#xff1f; 在2023年10月4日PyTorch 2.1版本的发布博客上&#xff0c;PyTorch介绍的beta版本…...