如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费
文章目录
- 准备工作
- 项目依赖
- 配置 RocketMQ
- 生产批量消息
- 消费批量消息
- 测试批量消息发送和消费
- 总结
- 推荐阅读文章
RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能。
准备工作
在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。
项目依赖
首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version>
</dependency>
这个依赖包包含了与 RocketMQ 集成所需的所有内容。
配置 RocketMQ
在 application.yml 文件中添加 RocketMQ 的相关配置:
rocketmq:name-server: 127.0.0.1:9876consumer:group: batchConsumerGroupproducer:group: batchProducerGroup
name-server:RocketMQ 服务的地址consumer.group:消息消费的分组producer.group:消息生产的分组
确保 name-server 地址是正确的,指向你的 RocketMQ 服务。
生产批量消息
创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class BatchProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendBatchMessages() {List<Message<String>> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();messages.add(message);}rocketMQTemplate.syncSend("BatchTopic", messages, 10000);System.out.println("批量消息发送成功!");}
}
- 这里,我们创建了 10 条消息并将它们添加到列表
messages中。 - 调用
rocketMQTemplate.syncSend方法将消息批量发送到主题BatchTopic。
消费批量消息
接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.util.List;@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> messages) {System.out.println("批量接收到消息:");messages.forEach(message -> System.out.println("消息内容:" + message));}
}
在这段代码中:
@RocketMQMessageListener注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题BatchTopic和消费分组batchConsumerGroup。consumeMessageBatchMaxSize = 10表示每次批量消费最多 10 条消息。onMessage方法会处理接收到的消息列表,并逐条打印出消息内容。
测试批量消息发送和消费
创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate BatchProducer batchProducer;@GetMapping("/sendBatchMessages")public String sendBatchMessages() {batchProducer.sendBatchMessages();return "批量消息已发送";}
}
通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。
- 调用这个接口会将批量消息发送到 RocketMQ 主题
BatchTopic。 BatchConsumer会自动接收并批量处理这些消息。
总结
我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:
- 使用
BatchProducer类批量发送消息。 - 使用
BatchConsumer类批量消费消息,并设置最大批量大小。 - 通过简单的 REST API 控制消息发送,确保一切顺利。
批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。
推荐阅读文章
-
由 Spring 静态注入引发的一个线上T0级别事故(真的以后得避坑)
-
如何理解 HTTP 是无状态的,以及它与 Cookie 和 Session 之间的联系
-
HTTP、HTTPS、Cookie 和 Session 之间的关系
-
什么是 Cookie?简单介绍与使用方法
-
什么是 Session?如何应用?
-
使用 Spring 框架构建 MVC 应用程序:初学者教程
-
有缺陷的 Java 代码:Java 开发人员最常犯的 10 大错误
-
如何理解应用 Java 多线程与并发编程?
-
把握Java泛型的艺术:协变、逆变与不可变性一网打尽
-
Java Spring 中常用的 @PostConstruct 注解使用总结
-
如何理解线程安全这个概念?
-
理解 Java 桥接方法
-
Spring 整合嵌入式 Tomcat 容器
-
Tomcat 如何加载 SpringMVC 组件
-
“在什么情况下类需要实现 Serializable,什么情况下又不需要(一)?”
-
“避免序列化灾难:掌握实现 Serializable 的真相!(二)”
-
如何自定义一个自己的 Spring Boot Starter 组件(从入门到实践)
-
解密 Redis:如何通过 IO 多路复用征服高并发挑战!
-
线程 vs 虚拟线程:深入理解及区别
-
深度解读 JDK 8、JDK 11、JDK 17 和 JDK 21 的区别
-
10大程序员提升代码优雅度的必杀技,瞬间让你成为团队宠儿!
-
“打破重复代码的魔咒:使用 Function 接口在 Java 8 中实现优雅重构!”
-
Java 中消除 If-else 技巧总结
-
线程池的核心参数配置(仅供参考)
-
【人工智能】聊聊Transformer,深度学习的一股清流(13)
-
Java 枚举的几个常用技巧,你可以试着用用
相关文章:
如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费
文章目录 准备工作项目依赖配置 RocketMQ生产批量消息消费批量消息测试批量消息发送和消费总结推荐阅读文章 RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这…...
推荐一个Star超过2K的.Net轻量级的CMS开源项目
推荐一个具有模块化和可扩展的架构的CMS开源项目。 01 项目简介 Piranha CMS是一个轻量级且跨平台的CMS库,专为.NET 8设计。 该项目提供多种模板,具备CMS基本功能,也有空模板方便从头开始构建新网站,甚至可以作为移动应用的后端…...
基于驾驶员面部特征的疲劳检测系统
大家好,本文是对基于驾驶员面部特征的疲劳检测系统源码的介绍与说明。 项目下载:基于驾驶员面部特征的疲劳检测系统 1.关于项目 疲劳驾驶检测系统通过监测驾驶人的眼睛状态,头部状态,嘴部状态等指标,识别出疲劳迹象…...
前端知识点---字符串的8种拼接方法(Javascript)
文章目录 01使用 运算符(改变了原始字符串)02使用 运算符(改变了原本的字符串)03 使用 concat() 方法(不改变原本的字符串)04使用模板字面量(不改变原本的字符串)05使用 join() 方法(不改变原本的字符串)①指定分隔符 ②没有指定…...
用 Python 从零开始创建神经网络(一):编码我们的第一个神经元
编码我们的第一个神经元 引言1. A Single Neuron:Example 1Example 2 2. A Layer of Neurons:Example 1 引言 本教程专为那些对神经网络已有基础了解、但尚未动手实践过的读者而设计。尽管网上充斥着各种教程,但很多内容要么过于简略&#x…...
低代码开发
低代码(Low Code)是一种软件开发方法,它通过可视化界面和少量的编码来快速构建应用程序。低代码平台的核心理念是通过抽象和最小化手工编码的方式,加速软件开发和部署的过程。 定义 低代码是一种软件开发方法,它允许…...
sql server 文件和文件组介绍
sql server 文件和文件组介绍 数据库文件和文件组 - SQL Server | Microsoft Learn...
caozha-CEPCS(新冠肺炎疫情防控系统)
caozha-CEPCS,是一个基于PHP开发的新冠肺炎疫情防控系统,CEPCS(全称:COVID-19 Epidemic Prevention and Control System),可以应用于单位、企业、学校、工业园区、村落等等。小小系统,希望能为大…...
1Panel修改PostgreSQL时区
需求 1Panel安装的PostgreSQL默认是UTC时区,需要将它修改为上海时间 步骤 进入PostgreSQL的安装目录 /opt/1panel/apps/postgresql/postgresql/data打开postgresql.conf文件 修改: log_timezone Asia/Shanghai timezone Asia/Shanghai保存后重启…...
开发一个CRM系统难吗?CRM系统的实现步骤
越来越多企业意识到了,客户关系管理(CRM)系统已成为企业提升客户体验、推动销售增长的必备工具。一个高效的CRM系统不仅能够帮助企业优化客户数据管理,还能提升客户满意度,增强客户忠诚度,从而推动业务的持…...
kafka常见面试题总结
Kafka 核心知识解析 一、Kafka 消息发送流程 Kafka 发送消息涉及两个线程:main 线程和 sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator,main 线程负责将消息发送给 RecordAccumulator,而 sender 线程则从 R…...
前端知识点---Javascript中检测数据类型函数总结
文章目录 01 typeof 运算符02 instanceof 运算符03 Array.isArray()04 Object.prototype.toString.call()05 constructor 属性06 isNaN() 和 Number.isNaN() (常用)07 isFinite() 和 Number.isFinite()08 typeof null 是 "object" 的问题 01 typeof 运算符 返回值是…...
aspose如何获取PPT放映页“切换”的“持续时间”值
aspose如何获取PPT放映页“切换”的“持续时间”值 项目场景问题描述问题1:从官方文档和资料查阅发现并没有对切换的持续时间进行处理的方法问题2:aspose的依赖包中,所有的关键对象都进行了混淆处理 解决方案1、找到ppt切换的持续时间对应的混…...
【MQTT】代理服务比较RabbitMQ、Mosquitto 和 EMQX
前言 目前要处理大量设备同时频繁发送数据的情况,MQTT协议确实是一个更优的选择,因为它特别适合需要低带宽和高效能的物联网应用,下面是对目前主流协议的对比 数据截止日期:2024年11月10日 基础设施 后端: springclo…...
【C#/C++】C++/CL中String^的含义和举例,C++层需要调用C#层对象时...
示例: String^ IDataServer::GetParam(String^ aParamName){ /// }在 C/CLI 中,String^ 和 IDataServer::GetParam(String^ aParamName) 这种写法是一种混合了 C 和 .NET 的语法,用于在 C 中操作 .NET 对象。C/CLI 是微软扩展的 C 语言&…...
Python学习从0到1 day26 第三阶段 Spark ② 数据计算Ⅰ
人总是会执着于失去的,而又不珍惜现在所拥有的 —— 24.11.9 一、map方法 PySpark的数据计算,都是基于RDD对象来进行的,采用依赖进行,RDD对象内置丰富的成员方法(算子) map算子 功能:map算子…...
【详细】如何优雅地删除 Docker 容器与镜像
内容预览 ≧∀≦ゞ 镜像与容器的区别删除容器和镜像的具体步骤1. 删除容器步骤 1:查看当前运行的容器步骤 2:停止容器步骤 3:删除容器 2. 删除镜像步骤 1:查看镜像列表步骤 2:删除镜像 3. 删除所有容器和镜像 使用 1Pa…...
Spring Spring Boot 常用注解总结
在 Java 开发中,Spring 和 Spring Boot 框架广泛应用于企业级应用开发。这两个框架提供了丰富的注解,使得开发更加高效和便捷。本文将对 Spring 和 Spring Boot 中常用的注解进行总结。 一、Spring 常用注解 1. Component 作用:用于将普通的…...
Flink独立集群+Flink整合yarn
Flink独立集群的搭建: 1、上传解压配置环境变量 # 1、解压 tar -xvf flink-1.15.4-bin-scala_2.12.tgz # 2、修改环境变量 export FLINK_HOME/usr/local/soft/flink-1.15.4 export PATH$PATH:$FLINK_HOME/bin 2、修改配置文件 cd /usr/local/soft/flink-1.15.4/…...
动态规划 之 简单多状态 dp 问题 算法专题
一. 按摩师 按摩师 状态表示 根据经验 题目要求 dp[i] 表示: 选择到i位置时, 此时的最长预约时长 但是根据题目又分成两种情况: f[i] : 选择到 i 位置的时候, nums[i] 必选, 此时的最长预约时长 g[i] : 选择到 i 位置的时候, nums[i] 不选, 此时的最长预约时长状态转移方程 …...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error
在前端开发中,JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作(如 Promise、async/await 等),开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝(r…...
tauri项目,如何在rust端读取电脑环境变量
如果想在前端通过调用来获取环境变量的值,可以通过标准的依赖: std::env::var(name).ok() 想在前端通过调用来获取,可以写一个command函数: #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...
通过MicroSip配置自己的freeswitch服务器进行调试记录
之前用docker安装的freeswitch的,启动是正常的, 但用下面的Microsip连接不上 主要原因有可能一下几个 1、通过下面命令可以看 [rootlocalhost default]# docker exec -it freeswitch fs_cli -x "sofia status profile internal"Name …...
深入理解 React 样式方案
React 的样式方案较多,在应用开发初期,开发者需要根据项目业务具体情况选择对应样式方案。React 样式方案主要有: 1. 内联样式 2. module css 3. css in js 4. tailwind css 这些方案中,均有各自的优势和缺点。 1. 方案优劣势 1. 内联样式: 简单直观,适合动态样式和…...
VSCode 没有添加Windows右键菜单
关键字:VSCode;Windows右键菜单;注册表。 文章目录 前言一、工程环境二、配置流程1.右键文件打开2.右键文件夹打开3.右键空白处打开文件夹 三、测试总结 前言 安装 VSCode 时没有注意,实际使用的时候发现 VSCode 在 Windows 菜单栏…...
