当前位置: 首页 > news >正文

如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费

文章目录

      • 准备工作
      • 项目依赖
      • 配置 RocketMQ
      • 生产批量消息
      • 消费批量消息
      • 测试批量消息发送和消费
      • 总结
      • 推荐阅读文章

RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能。

准备工作

在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。

项目依赖

首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version>
</dependency>

这个依赖包包含了与 RocketMQ 集成所需的所有内容。

配置 RocketMQ

application.yml 文件中添加 RocketMQ 的相关配置:

rocketmq:name-server: 127.0.0.1:9876consumer:group: batchConsumerGroupproducer:group: batchProducerGroup
  • name-server:RocketMQ 服务的地址
  • consumer.group:消息消费的分组
  • producer.group:消息生产的分组

确保 name-server 地址是正确的,指向你的 RocketMQ 服务。

生产批量消息

创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class BatchProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendBatchMessages() {List<Message<String>> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();messages.add(message);}rocketMQTemplate.syncSend("BatchTopic", messages, 10000);System.out.println("批量消息发送成功!");}
}
  • 这里,我们创建了 10 条消息并将它们添加到列表 messages 中。
  • 调用 rocketMQTemplate.syncSend 方法将消息批量发送到主题 BatchTopic

消费批量消息

接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.util.List;@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> messages) {System.out.println("批量接收到消息:");messages.forEach(message -> System.out.println("消息内容:" + message));}
}

在这段代码中:

  • @RocketMQMessageListener 注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题 BatchTopic 和消费分组 batchConsumerGroup
  • consumeMessageBatchMaxSize = 10 表示每次批量消费最多 10 条消息。
  • onMessage 方法会处理接收到的消息列表,并逐条打印出消息内容。

测试批量消息发送和消费

创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate BatchProducer batchProducer;@GetMapping("/sendBatchMessages")public String sendBatchMessages() {batchProducer.sendBatchMessages();return "批量消息已发送";}
}

通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。

  • 调用这个接口会将批量消息发送到 RocketMQ 主题 BatchTopic
  • BatchConsumer 会自动接收并批量处理这些消息。

总结

我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:

  1. 使用 BatchProducer 类批量发送消息。
  2. 使用 BatchConsumer 类批量消费消息,并设置最大批量大小。
  3. 通过简单的 REST API 控制消息发送,确保一切顺利。

批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。

推荐阅读文章

  • 由 Spring 静态注入引发的一个线上T0级别事故(真的以后得避坑)

  • 如何理解 HTTP 是无状态的,以及它与 Cookie 和 Session 之间的联系

  • HTTP、HTTPS、Cookie 和 Session 之间的关系

  • 什么是 Cookie?简单介绍与使用方法

  • 什么是 Session?如何应用?

  • 使用 Spring 框架构建 MVC 应用程序:初学者教程

  • 有缺陷的 Java 代码:Java 开发人员最常犯的 10 大错误

  • 如何理解应用 Java 多线程与并发编程?

  • 把握Java泛型的艺术:协变、逆变与不可变性一网打尽

  • Java Spring 中常用的 @PostConstruct 注解使用总结

  • 如何理解线程安全这个概念?

  • 理解 Java 桥接方法

  • Spring 整合嵌入式 Tomcat 容器

  • Tomcat 如何加载 SpringMVC 组件

  • “在什么情况下类需要实现 Serializable,什么情况下又不需要(一)?”

  • “避免序列化灾难:掌握实现 Serializable 的真相!(二)”

  • 如何自定义一个自己的 Spring Boot Starter 组件(从入门到实践)

  • 解密 Redis:如何通过 IO 多路复用征服高并发挑战!

  • 线程 vs 虚拟线程:深入理解及区别

  • 深度解读 JDK 8、JDK 11、JDK 17 和 JDK 21 的区别

  • 10大程序员提升代码优雅度的必杀技,瞬间让你成为团队宠儿!

  • “打破重复代码的魔咒:使用 Function 接口在 Java 8 中实现优雅重构!”

  • Java 中消除 If-else 技巧总结

  • 线程池的核心参数配置(仅供参考)

  • 【人工智能】聊聊Transformer,深度学习的一股清流(13)

  • Java 枚举的几个常用技巧,你可以试着用用

相关文章:

如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费

文章目录 准备工作项目依赖配置 RocketMQ生产批量消息消费批量消息测试批量消息发送和消费总结推荐阅读文章 RocketMQ 是一款分布式消息队列&#xff0c;支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景&#xff0c;RocketMQ 提供了批量消费的机制&#xff0c;这…...

推荐一个Star超过2K的.Net轻量级的CMS开源项目

推荐一个具有模块化和可扩展的架构的CMS开源项目。 01 项目简介 Piranha CMS是一个轻量级且跨平台的CMS库&#xff0c;专为.NET 8设计。 该项目提供多种模板&#xff0c;具备CMS基本功能&#xff0c;也有空模板方便从头开始构建新网站&#xff0c;甚至可以作为移动应用的后端…...

基于驾驶员面部特征的疲劳检测系统

大家好&#xff0c;本文是对基于驾驶员面部特征的疲劳检测系统源码的介绍与说明。 项目下载&#xff1a;基于驾驶员面部特征的疲劳检测系统 1.关于项目 疲劳驾驶检测系统通过监测驾驶人的眼睛状态&#xff0c;头部状态&#xff0c;嘴部状态等指标&#xff0c;识别出疲劳迹象…...

前端知识点---字符串的8种拼接方法(Javascript)

文章目录 01使用 运算符(改变了原始字符串)02使用 运算符(改变了原本的字符串)03 使用 concat() 方法(不改变原本的字符串)04使用模板字面量&#xff08;不改变原本的字符串&#xff09;05使用 join() 方法&#xff08;不改变原本的字符串&#xff09;①指定分隔符 ②没有指定…...

用 Python 从零开始创建神经网络(一):编码我们的第一个神经元

编码我们的第一个神经元 引言1. A Single Neuron&#xff1a;Example 1Example 2 2. A Layer of Neurons&#xff1a;Example 1 引言 本教程专为那些对神经网络已有基础了解、但尚未动手实践过的读者而设计。尽管网上充斥着各种教程&#xff0c;但很多内容要么过于简略&#x…...

低代码开发

低代码&#xff08;Low Code&#xff09;是一种软件开发方法&#xff0c;它通过可视化界面和少量的编码来快速构建应用程序。低代码平台的核心理念是通过抽象和最小化手工编码的方式&#xff0c;加速软件开发和部署的过程。 定义 低代码是一种软件开发方法&#xff0c;它允许…...

sql server 文件和文件组介绍

sql server 文件和文件组介绍 数据库文件和文件组 - SQL Server | Microsoft Learn...

caozha-CEPCS(新冠肺炎疫情防控系统)

caozha-CEPCS&#xff0c;是一个基于PHP开发的新冠肺炎疫情防控系统&#xff0c;CEPCS&#xff08;全称&#xff1a;COVID-19 Epidemic Prevention and Control System&#xff09;&#xff0c;可以应用于单位、企业、学校、工业园区、村落等等。小小系统&#xff0c;希望能为大…...

1Panel修改PostgreSQL时区

需求 1Panel安装的PostgreSQL默认是UTC时区&#xff0c;需要将它修改为上海时间 步骤 进入PostgreSQL的安装目录 /opt/1panel/apps/postgresql/postgresql/data打开postgresql.conf文件 修改&#xff1a; log_timezone Asia/Shanghai timezone Asia/Shanghai保存后重启…...

开发一个CRM系统难吗?CRM系统的实现步骤

越来越多企业意识到了&#xff0c;客户关系管理&#xff08;CRM&#xff09;系统已成为企业提升客户体验、推动销售增长的必备工具。一个高效的CRM系统不仅能够帮助企业优化客户数据管理&#xff0c;还能提升客户满意度&#xff0c;增强客户忠诚度&#xff0c;从而推动业务的持…...

kafka常见面试题总结

Kafka 核心知识解析 一、Kafka 消息发送流程 Kafka 发送消息涉及两个线程&#xff1a;main 线程和 sender 线程。在 main 线程中&#xff0c;会创建一个双端队列 RecordAccumulator&#xff0c;main 线程负责将消息发送给 RecordAccumulator&#xff0c;而 sender 线程则从 R…...

前端知识点---Javascript中检测数据类型函数总结

文章目录 01 typeof 运算符02 instanceof 运算符03 Array.isArray()04 Object.prototype.toString.call()05 constructor 属性06 isNaN() 和 Number.isNaN() (常用)07 isFinite() 和 Number.isFinite()08 typeof null 是 "object" 的问题 01 typeof 运算符 返回值是…...

aspose如何获取PPT放映页“切换”的“持续时间”值

aspose如何获取PPT放映页“切换”的“持续时间”值 项目场景问题描述问题1&#xff1a;从官方文档和资料查阅发现并没有对切换的持续时间进行处理的方法问题2&#xff1a;aspose的依赖包中&#xff0c;所有的关键对象都进行了混淆处理 解决方案1、找到ppt切换的持续时间对应的混…...

【MQTT】代理服务比较RabbitMQ、Mosquitto 和 EMQX

前言 目前要处理大量设备同时频繁发送数据的情况&#xff0c;MQTT协议确实是一个更优的选择&#xff0c;因为它特别适合需要低带宽和高效能的物联网应用&#xff0c;下面是对目前主流协议的对比 数据截止日期&#xff1a;2024年11月10日 基础设施 后端&#xff1a; springclo…...

【C#/C++】C++/CL中String^的含义和举例,C++层需要调用C#层对象时...

示例&#xff1a; String^ IDataServer::GetParam(String^ aParamName){ /// }在 C/CLI 中&#xff0c;String^ 和 IDataServer::GetParam(String^ aParamName) 这种写法是一种混合了 C 和 .NET 的语法&#xff0c;用于在 C 中操作 .NET 对象。C/CLI 是微软扩展的 C 语言&…...

Python学习从0到1 day26 第三阶段 Spark ② 数据计算Ⅰ

人总是会执着于失去的&#xff0c;而又不珍惜现在所拥有的 —— 24.11.9 一、map方法 PySpark的数据计算&#xff0c;都是基于RDD对象来进行的&#xff0c;采用依赖进行&#xff0c;RDD对象内置丰富的成员方法&#xff08;算子&#xff09; map算子 功能&#xff1a;map算子…...

【详细】如何优雅地删除 Docker 容器与镜像

内容预览 ≧∀≦ゞ 镜像与容器的区别删除容器和镜像的具体步骤1. 删除容器步骤 1&#xff1a;查看当前运行的容器步骤 2&#xff1a;停止容器步骤 3&#xff1a;删除容器 2. 删除镜像步骤 1&#xff1a;查看镜像列表步骤 2&#xff1a;删除镜像 3. 删除所有容器和镜像 使用 1Pa…...

Spring Spring Boot 常用注解总结

在 Java 开发中&#xff0c;Spring 和 Spring Boot 框架广泛应用于企业级应用开发。这两个框架提供了丰富的注解&#xff0c;使得开发更加高效和便捷。本文将对 Spring 和 Spring Boot 中常用的注解进行总结。 一、Spring 常用注解 1. Component 作用&#xff1a;用于将普通的…...

Flink独立集群+Flink整合yarn

Flink独立集群的搭建&#xff1a; 1、上传解压配置环境变量 # 1、解压 tar -xvf flink-1.15.4-bin-scala_2.12.tgz # 2、修改环境变量 export FLINK_HOME/usr/local/soft/flink-1.15.4 export PATH$PATH:$FLINK_HOME/bin 2、修改配置文件 cd /usr/local/soft/flink-1.15.4/…...

动态规划 之 简单多状态 dp 问题 算法专题

一. 按摩师 按摩师 状态表示 根据经验 题目要求 dp[i] 表示: 选择到i位置时, 此时的最长预约时长 但是根据题目又分成两种情况: f[i] : 选择到 i 位置的时候, nums[i] 必选, 此时的最长预约时长 g[i] : 选择到 i 位置的时候, nums[i] 不选, 此时的最长预约时长状态转移方程 …...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP

编辑-虚拟网络编辑器-更改设置 选择桥接模式&#xff0c;然后找到相应的网卡&#xff08;可以查看自己本机的网络连接&#xff09; windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置&#xff0c;选择刚才配置的桥接模式 静态ip设置&#xff1a; 我用的ubuntu24桌…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...