SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理
SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理
- 前言
- 添加依赖
- 配置文件
- 编写监听器
- 创建SimpleRabbitListenerContainerFactory
- 发送消息
前言
RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一些特殊的需求,比如批量获取数据。

在本文中,我们将使用Spring Boot来整合RabbitMQ,并自定义一个监听器容器,实现批量获取数据的功能。
前置条件:
在开始之前,您需要具备以下条件:
- 已经安装好RabbitMQ服务器并启动。
- 已经创建好要使用的队列。
- 已经熟悉了Spring Boot和RabbitMQ的基本知识。
环境准备:
在开始之前,我们需要准备好以下环境:
- JDK 1.8或以上版本
- Spring Boot 2.5.0或以上版本
- RabbitMQ 3.8.0或以上版本
添加依赖
首先,在pom.xml文件中添加以下依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
接下来,在application.properties文件中添加以下配置:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/# 队列名称
spring.rabbitmq.listener.simple.queue-name=myQueue# 最大并发消费者数量
spring.rabbitmq.listener.simple.concurrency=5# 最小数量
spring.rabbitmq.listener.simple.min-concurrency=1# 最大数量
spring.rabbitmq.listener.simple.max-concurrency=10# 批量处理消息的大小
spring.rabbitmq.listener.simple.batch-size=50
或
spring:rabbitmq:host: localhostlistener:simple:batch-size: 50concurrency: 5max-concurrency: 10min-concurrency: 1queue-name: myQueuepassword: guestport: 5672username: guestvirtual-host: /
编写监听器
然后,我们需要创建一个监听器类,以便处理从队列中接收到的消息。以下是一个简单的示例:
@Component
public class MyListener {@RabbitListener(queues = "myQueue", containerFactory = "myFactory")public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)throws IOException {try {// 处理消息System.out.println("Received " + messages.size() + " messages");for (Message message : messages) {// 处理消息System.out.println("Received message: " + new String(message.getBody()));}channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true);} finally {// 手动确认消息channel.basicAck(deliveryTag, true);}}
}
在上面的代码中,我们使用了@RabbitListener注解来指定要监听的队列名称,同时也指定了使用myFactory工厂来创建监听容器。在这个监听器中,我们简单地打印了接收到的消息。
创建SimpleRabbitListenerContainerFactory
接下来,我们需要创建一个SimpleRabbitListenerContainerFactory工厂,以便能够自定义监听容器的行为。以下是一个简单的示例:
@Configuration
public class RabbitMQConfig {// @Bean
// public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConnectionFactory(connectionFactory);
// factory.setConcurrentConsumers(1);
// factory.setMaxConcurrentConsumers(10);
// factory.setBatchListener(true);
// factory.setBatchSize(50);
// return factory;
// }@Beanpublic SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager,MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 并发消费者数,默认为 1factory.setConcurrentConsumers(5);// 最大并发消费者数,默认为 1factory.setMaxConcurrentConsumers(10);// 拒绝未确认的消息并重新将它们放回队列,默认为 truefactory.setDefaultRequeueRejected(false);// 容器启动时是否自动启动,默认为 truefactory.setAutoStartup(true);// 消息确认模式,默认为 AUTOfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 每个消费者在一次请求中预获取的消息数,默认为 1factory.setPrefetchCount(5);// 从队列中接收消息的超时时间,默认为 0,表示没有超时限制factory.setReceiveTimeout(1000);// 与容器一起使用的事务管理器。默认情况下,容器不会使用事务factory.setTransactionManager(transactionManager);// 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息factory.setMessageConverter(messageConverter);// 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutorfactory.setTaskExecutor(new SimpleAsyncTaskExecutor());// 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒factory.setShutdownTimeout(10000);// 重试失败的消息之前等待的时间,默认为 5000 毫秒factory.setRecoveryInterval(5000);// 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 truefactory.setMissingQueuesFatal(false);// 监听器容器连接工厂factory.setConnectionFactory(connectionFactory);return factory;}
}
这些属性中的大多数都是可选的,可以根据需要进行设置。根据应用程序的需求,我们可以自由地调整这些属性,以提高应用程序的性能和可靠性。
发送消息
最后,我们可以编写一个简单的发送消息的代码来向队列中发送一些消息。以下是一个简单的示例:
@Component
public class MySender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend("myQueue", "message:" + i);}}
}
总之, SimpleRabbitListenerContainerFactory 提供了一个灵活而强大的机制,用于自定义和配置 RabbitMQ 消息监听容器。
相关文章:
SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理
SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理前言添加依赖配置文件编写监听器创建SimpleRabbitListenerContainerFactory发送消息前言 RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收…...
jquery基础之操作节点对象
jquery操作节点(元素)对象 捕获-DOM操作,获取内容,值 获取内容:1.text()获取元素的文本内容 2.html()获取元素的文档内容 …...
对于Java的深入理解及其特点--面试
前言 计算机语言千千万,每一种语言都有其自己的特点、擅长的领域。在学习了Java之后才对Java有了进一步的理解。 面试问一: 你是如何理解Java这门语言的? 这里我们应该从下面几个点去总结 1、Java语言具有的属性 2、他的特点在哪 Java语…...
Linux GPSD的使用
目录 1: GPSD 运行状态查看 2:停止GPSD 服务 3: GPSD运行输出(协议的识别) 4:开启的服务...
ArrayList无参构造添加元素源码解读
一、ArrayList无参构造add方法源码阅读 Test//无参构造源码阅读 public void testArrayListNoConstructorAdd(){ArrayList<Integer> arrayList new ArrayList<>();ArrayList<Integer> list new ArrayList<>();arrayList.add(1);arrayList.add(12);a…...
手写简易 Spring(二)
文章目录手写简易 Spring(二)1. 扩展 BeanFactory 接口2. 实现资源加载器,从 Spring.xml 解析和注册 Bean 对象1. 核心实现类 XmlBeanDefinitionReader3. 实现应用上下文,自动识别、资源加载、扩展机制1. 应用上下文2. 核心实现类…...
排列问题DFS入门
1、题目描述(全排列) 输入一个正整数n,输出1~n的全排列。 输入格式 一个正整数n。 输出格式 所有1~n的全排列,每个排列占一行。 样例输入 3 样例输出 1 2 3 1 3 2 2 1 3 2 3 1 3 1 2 3 2 1 算法思路 题目要求输出n的全…...
【每日一题Day159】LC1638统计只差一个字符的子串数目 | 枚举
统计只差一个字符的子串数目【LC1638】 给你两个字符串 s 和 t ,请你找出 s 中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t 串的子串。换言之,请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串对的数目。 比…...
【07 Metadata and VendorTag】
1. Metadata结构及分类 一个 metadata 通过tag,value及 type 来描述。不同的 metadata 分成三类 controls,dynamic 及 static 2. MTK Metadata IMetadata Mtk metadata containerIMetadataConverter Provide mutual conversion for Android camera_metadata and MTK Imetada…...
Golang中Model的使用
导语我们都知道在Golang中我们一般都是设置GOPATH目录,这个目录主要存放我们的第三方包,这个方式一直不是很方便,今天给大家介绍Go 1.11版本中推出的GoModul使用方法,学过java的同学,可能对maven包有所了解,…...
交友项目【基础环境搭建】
目录 1:交友项目架构介绍 1.1:前后端分离的概述 1.2:YAPI介绍(虚拟机中已经配好) 基本信息 使用 安装跨域拓展(浏览器上安装跨域处理插件) 2:虚拟机工具项目搭建 2.1࿱…...
入职时,公司要求自己带电脑,每月给100元补贴,如果不接受就不能入职!
为了节约成本,公司能做出什么事?一位网友遇到了这样的事:入职时,公司要求自己带电脑,每个月给100元补贴,如果不接受就得放弃入职,这样的公司有没有坑?有人问:连基本的公司…...
20道经典Redis面试题
20道经典Redis面试题 前言 整理了20道经典Redis面试题,希望对大家有帮助。 1. 什么是Redis?它主要用来什么的? Redis,英文全称是Remote Dictionary Server(远程字典服务),是一个开源的使用A…...
十分钟带你看懂接口测试,2023最全超大型接口测试攻略
一、什么是接口测试? 所谓接口,是指同一个系统中模块与模块间的数据传递接口、前后端交互、跨系统跨平台跨数据库的对接。而接口测试,则是通过接口的不同情况下的输入,去对比输出,看看是否满足接口规范所规定的功能、…...
【设计模式】创建型-单例模式
文章目录一、单例模式二、单例模式的八种实现方式2.1、饿汉式(静态常量)2.2、饿汉式(静态代码块)2.3、懒汉式(线程不安全)2.4、懒汉式(线程安全,同步方法)2.5、双重检查2…...
Python 练习 六
1、(最大数的出现)编写程序读取整数,找出它们中的最大值,然后计算它的出现次数。假设输入以数字0结束。假设你输入的是“352555 0";程序找出的最大数是5,而5的出现次数是4。(提示:维护两个变量max和 count。变量max存储的是当前最大数,而…...
「SQL面试题库」 No_22 员工奖金
🍅 1、专栏介绍 「SQL面试题库」是由 不是西红柿 发起,全员免费参与的SQL学习活动。我每天发布1道SQL面试真题,从简单到困难,涵盖所有SQL知识点,我敢保证只要做完这100道题,不仅能轻松搞定面试࿰…...
瞒不住了,Prefetch 就是一个大谎言
本文正在参加「金石计划」 Prefetch 是一个谎言 我们知道,现在的应用程序已经发展到可以拆分为多个 JavaScript包了,为了获得更好的用户体验,这些 bundle 包通常需要预获取,即 prefetch! 但是现在的prefetch 效果有多糟糕我想你…...
这个时候了,你还不会不知道JavaMail API吧
一、概述 1.1 简述 JavaMail API 顾名思义,提供给开发者处理电子邮件相关的编程接口,它是Sun发布的用来处理email的API,其提供独立于平台且与协议无关的框架来构建邮件和消息传递应用。JavaMail API 提供了一组抽象类,用于定义组…...
JavaScript var let区别
文章目录JavaScript var & let区别变量作用域变量提升变量重复声明全局对象属性for循环中的作用域JavaScript var & let区别 var和let都是用来声明变量的关键字。 变量作用域 var声明的变量作用域是函数作用域或全局作用域,而let声明的变量作用域是块级作…...
从myplaces.shp到专题地图:手把手教你用QGIS C++ API实现点要素分级渲染
从myplaces.shp到专题地图:QGIS C API实现点要素分级渲染实战指南 当我们需要在桌面GIS应用中直观展示气象站降雨量、城市人口密度或商业网点销售额等连续型空间数据时,分级色彩渲染是最有效的可视化手段之一。本文将深入探讨如何利用QGIS强大的C API&am…...
AI智能体生态的包管理器:agenticmarket-cli 设计与实践
1. 项目概述:一个面向AI智能体生态的命令行工具如果你和我一样,长期在AI智能体(Agent)这个领域里折腾,那你肯定经历过这样的场景:为了测试一个最新的开源智能体框架,你需要先找到它的GitHub仓库…...
低温预警!固化慢、易开裂……密封胶冬季施工手册
低温预警!固化慢、易开裂……密封胶冬季施工手册 硅酮耐候密封胶主要作用是保障幕墙的气密性、水密性。其出现问题,可能会导致耐候密封失效,从而造成幕墙漏水漏气,影响幕墙的正常使用。耐候密封胶由于考虑到现场施工,几乎都是单组分硅酮密封胶产品。进入冬季,气候变化明…...
Windows Cleaner终极指南:三步告别C盘爆红,让电脑运行如飞!
Windows Cleaner终极指南:三步告别C盘爆红,让电脑运行如飞! 【免费下载链接】WindowsCleaner Windows Cleaner——专治C盘爆红及各种不服! 项目地址: https://gitcode.com/gh_mirrors/wi/WindowsCleaner 还在为Windows系统…...
AI驱动命令行工具:用自然语言自动化开发任务
1. 项目概述:一个为开发者“下厨”的AI助手如果你是一名开发者,每天在终端里敲打命令,构建、部署、调试,那么你肯定对重复性的命令行操作感到厌倦。比如,每次启动一个新项目,都要手动创建目录结构、初始化G…...
防火墙和手动启动都试了?ArcGIS License Server无响应,可能是这两个核心文件在捣鬼
ArcGIS许可服务故障深度解析:当核心文件成为隐形杀手 当你面对ArcGIS License Server无响应的红色报错框,已经尝试了关闭防火墙、调整服务配置、甚至重启服务器等一系列标准操作后,那个令人沮丧的"cannot connect to license server sys…...
开源AI图像生成工具Dream-Creator:本地部署与Stable Diffusion实战指南
1. 项目概述:一个开源的AI图像生成与创作工具 最近在GitHub上闲逛,发现了一个挺有意思的项目叫“Dream-Creator”。光看名字,你可能会联想到一些AI绘画或者创意生成工具。没错,这确实是一个围绕AI图像生成的开源项目。作为一个在…...
数据分析师能力展示:从项目构建到报告呈现的完整指南
1. 项目概述:一个数据分析师的能力展示平台最近在GitHub上看到一个挺有意思的项目,叫“dataanalyst-showcase”。光看名字,你可能会觉得这又是一个数据科学项目合集,但点进去仔细研究后,我发现它的定位非常精准——它不…...
从单一AI到智能体集群:构建模块化AI协作系统的核心原理与实践
1. 项目概述:当AI学会“开会”,一个开源智能体集群的诞生最近在GitHub上看到一个挺有意思的项目,叫daveshap/OpenAI_Agent_Swarm。光看名字,你可能会觉得这又是一个调用OpenAI API的简单封装库。但如果你点进去,花上十…...
基于Rust与Candle的AI推理引擎cria:简化大模型本地部署与优化
1. 项目概述:从“左移”到“创造”的AI推理引擎 最近在折腾AI模型本地部署和推理优化的朋友,可能都绕不开一个名字: cria 。这个由 leftmove 开源的项目,全称是“Cria: The AI Inference Engine”,直译过来就是“创…...
