当前位置: 首页 > news >正文

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

  • 前言
  • 添加依赖
  • 配置文件
  • 编写监听器
  • 创建SimpleRabbitListenerContainerFactory
  • 发送消息

前言

RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一些特殊的需求,比如批量获取数据。
在这里插入图片描述

在本文中,我们将使用Spring Boot来整合RabbitMQ,并自定义一个监听器容器,实现批量获取数据的功能。
前置条件:
在开始之前,您需要具备以下条件:

  • 已经安装好RabbitMQ服务器并启动。
  • 已经创建好要使用的队列。
  • 已经熟悉了Spring Boot和RabbitMQ的基本知识。

环境准备:
在开始之前,我们需要准备好以下环境:

  • JDK 1.8或以上版本
  • Spring Boot 2.5.0或以上版本
  • RabbitMQ 3.8.0或以上版本

添加依赖

首先,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

接下来,在application.properties文件中添加以下配置:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/# 队列名称
spring.rabbitmq.listener.simple.queue-name=myQueue# 最大并发消费者数量
spring.rabbitmq.listener.simple.concurrency=5# 最小数量
spring.rabbitmq.listener.simple.min-concurrency=1# 最大数量
spring.rabbitmq.listener.simple.max-concurrency=10# 批量处理消息的大小
spring.rabbitmq.listener.simple.batch-size=50

spring:rabbitmq:host: localhostlistener:simple:batch-size: 50concurrency: 5max-concurrency: 10min-concurrency: 1queue-name: myQueuepassword: guestport: 5672username: guestvirtual-host: /

编写监听器

然后,我们需要创建一个监听器类,以便处理从队列中接收到的消息。以下是一个简单的示例:

@Component
public class MyListener {@RabbitListener(queues = "myQueue", containerFactory = "myFactory")public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)throws IOException {try {// 处理消息System.out.println("Received " + messages.size() + " messages");for (Message message : messages) {// 处理消息System.out.println("Received message: " + new String(message.getBody()));}channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true);} finally {// 手动确认消息channel.basicAck(deliveryTag, true);}}
}

在上面的代码中,我们使用了@RabbitListener注解来指定要监听的队列名称,同时也指定了使用myFactory工厂来创建监听容器。在这个监听器中,我们简单地打印了接收到的消息。

创建SimpleRabbitListenerContainerFactory

接下来,我们需要创建一个SimpleRabbitListenerContainerFactory工厂,以便能够自定义监听容器的行为。以下是一个简单的示例:

@Configuration
public class RabbitMQConfig {//    @Bean
//    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setConcurrentConsumers(1);
//        factory.setMaxConcurrentConsumers(10);
//        factory.setBatchListener(true);
//        factory.setBatchSize(50);
//        return factory;
//    }@Beanpublic SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager,MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 并发消费者数,默认为 1factory.setConcurrentConsumers(5);// 最大并发消费者数,默认为 1factory.setMaxConcurrentConsumers(10);// 拒绝未确认的消息并重新将它们放回队列,默认为 truefactory.setDefaultRequeueRejected(false);// 容器启动时是否自动启动,默认为 truefactory.setAutoStartup(true);// 消息确认模式,默认为 AUTOfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 每个消费者在一次请求中预获取的消息数,默认为 1factory.setPrefetchCount(5);// 从队列中接收消息的超时时间,默认为 0,表示没有超时限制factory.setReceiveTimeout(1000);// 与容器一起使用的事务管理器。默认情况下,容器不会使用事务factory.setTransactionManager(transactionManager);// 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息factory.setMessageConverter(messageConverter);// 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutorfactory.setTaskExecutor(new SimpleAsyncTaskExecutor());// 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒factory.setShutdownTimeout(10000);// 重试失败的消息之前等待的时间,默认为 5000 毫秒factory.setRecoveryInterval(5000);// 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 truefactory.setMissingQueuesFatal(false);// 监听器容器连接工厂factory.setConnectionFactory(connectionFactory);return factory;}
}

这些属性中的大多数都是可选的,可以根据需要进行设置。根据应用程序的需求,我们可以自由地调整这些属性,以提高应用程序的性能和可靠性。

发送消息

最后,我们可以编写一个简单的发送消息的代码来向队列中发送一些消息。以下是一个简单的示例:

@Component
public class MySender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend("myQueue", "message:" + i);}}
}

总之, SimpleRabbitListenerContainerFactory 提供了一个灵活而强大的机制,用于自定义和配置 RabbitMQ 消息监听容器。

相关文章:

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理前言添加依赖配置文件编写监听器创建SimpleRabbitListenerContainerFactory发送消息前言 RabbitMQ是一种常用的消息队列&#xff0c;Spring Boot对其进行了深度的整合&#xff0c;可以快速地实现消息的发送和接收…...

jquery基础之操作节点对象

jquery操作节点&#xff08;元素&#xff09;对象 捕获-DOM操作&#xff0c;获取内容&#xff0c;值 获取内容&#xff1a;1.text&#xff08;&#xff09;获取元素的文本内容 2.html&#xff08;&#xff09;获取元素的文档内容 …...

对于Java的深入理解及其特点--面试

前言 计算机语言千千万&#xff0c;每一种语言都有其自己的特点、擅长的领域。在学习了Java之后才对Java有了进一步的理解。 面试问一&#xff1a; 你是如何理解Java这门语言的&#xff1f; 这里我们应该从下面几个点去总结 1、Java语言具有的属性 2、他的特点在哪 Java语…...

Linux GPSD的使用

目录 1: GPSD 运行状态查看 2:停止GPSD 服务 3: GPSD运行输出(协议的识别) 4:开启的服务...

ArrayList无参构造添加元素源码解读

一、ArrayList无参构造add方法源码阅读 Test//无参构造源码阅读 public void testArrayListNoConstructorAdd(){ArrayList<Integer> arrayList new ArrayList<>();ArrayList<Integer> list new ArrayList<>();arrayList.add(1);arrayList.add(12);a…...

手写简易 Spring(二)

文章目录手写简易 Spring&#xff08;二&#xff09;1. 扩展 BeanFactory 接口2. 实现资源加载器&#xff0c;从 Spring.xml 解析和注册 Bean 对象1. 核心实现类 XmlBeanDefinitionReader3. 实现应用上下文&#xff0c;自动识别、资源加载、扩展机制1. 应用上下文2. 核心实现类…...

排列问题DFS入门

1、题目描述&#xff08;全排列&#xff09; 输入一个正整数n&#xff0c;输出1~n的全排列。 输入格式 一个正整数n。 输出格式 所有1~n的全排列&#xff0c;每个排列占一行。 样例输入 3 样例输出 1 2 3 1 3 2 2 1 3 2 3 1 3 1 2 3 2 1 算法思路 题目要求输出n的全…...

【每日一题Day159】LC1638统计只差一个字符的子串数目 | 枚举

统计只差一个字符的子串数目【LC1638】 给你两个字符串 s 和 t &#xff0c;请你找出 s 中的非空子串的数目&#xff0c;这些子串满足替换 一个不同字符 以后&#xff0c;是 t 串的子串。换言之&#xff0c;请你找到 s 和 t 串中 恰好 只有一个字符不同的子字符串对的数目。 比…...

【07 Metadata and VendorTag】

1. Metadata结构及分类 一个 metadata 通过tag,value及 type 来描述。不同的 metadata 分成三类 controls,dynamic 及 static 2. MTK Metadata IMetadata Mtk metadata containerIMetadataConverter Provide mutual conversion for Android camera_metadata and MTK Imetada…...

Golang中Model的使用

导语我们都知道在Golang中我们一般都是设置GOPATH目录&#xff0c;这个目录主要存放我们的第三方包&#xff0c;这个方式一直不是很方便&#xff0c;今天给大家介绍Go 1.11版本中推出的GoModul使用方法&#xff0c;学过java的同学&#xff0c;可能对maven包有所了解&#xff0c…...

交友项目【基础环境搭建】

目录 1&#xff1a;交友项目架构介绍 1.1&#xff1a;前后端分离的概述 1.2&#xff1a;YAPI介绍&#xff08;虚拟机中已经配好&#xff09; 基本信息 使用 安装跨域拓展&#xff08;浏览器上安装跨域处理插件&#xff09; 2&#xff1a;虚拟机工具项目搭建 2.1&#xff1…...

入职时,公司要求自己带电脑,每月给100元补贴,如果不接受就不能入职!

为了节约成本&#xff0c;公司能做出什么事&#xff1f;一位网友遇到了这样的事&#xff1a;入职时&#xff0c;公司要求自己带电脑&#xff0c;每个月给100元补贴&#xff0c;如果不接受就得放弃入职&#xff0c;这样的公司有没有坑&#xff1f;有人问&#xff1a;连基本的公司…...

20道经典Redis面试题

20道经典Redis面试题 前言 整理了20道经典Redis面试题&#xff0c;希望对大家有帮助。 1. 什么是Redis&#xff1f;它主要用来什么的&#xff1f; Redis&#xff0c;英文全称是Remote Dictionary Server&#xff08;远程字典服务&#xff09;&#xff0c;是一个开源的使用A…...

十分钟带你看懂接口测试,2023最全超大型接口测试攻略

一、什么是接口测试&#xff1f; 所谓接口&#xff0c;是指同一个系统中模块与模块间的数据传递接口、前后端交互、跨系统跨平台跨数据库的对接。而接口测试&#xff0c;则是通过接口的不同情况下的输入&#xff0c;去对比输出&#xff0c;看看是否满足接口规范所规定的功能、…...

【设计模式】创建型-单例模式

文章目录一、单例模式二、单例模式的八种实现方式2.1、饿汉式&#xff08;静态常量&#xff09;2.2、饿汉式&#xff08;静态代码块&#xff09;2.3、懒汉式&#xff08;线程不安全&#xff09;2.4、懒汉式&#xff08;线程安全&#xff0c;同步方法&#xff09;2.5、双重检查2…...

Python 练习 六

1、(最大数的出现)编写程序读取整数,找出它们中的最大值&#xff0c;然后计算它的出现次数。假设输入以数字0结束。假设你输入的是“352555 0";程序找出的最大数是5&#xff0c;而5的出现次数是4。(提示:维护两个变量max和 count。变量max存储的是当前最大数&#xff0c;而…...

「SQL面试题库」 No_22 员工奖金

&#x1f345; 1、专栏介绍 「SQL面试题库」是由 不是西红柿 发起&#xff0c;全员免费参与的SQL学习活动。我每天发布1道SQL面试真题&#xff0c;从简单到困难&#xff0c;涵盖所有SQL知识点&#xff0c;我敢保证只要做完这100道题&#xff0c;不仅能轻松搞定面试&#xff0…...

瞒不住了,Prefetch 就是一个大谎言

本文正在参加「金石计划」 Prefetch 是一个谎言 我们知道&#xff0c;现在的应用程序已经发展到可以拆分为多个 JavaScript包了&#xff0c;为了获得更好的用户体验&#xff0c;这些 bundle 包通常需要预获取&#xff0c;即 prefetch! 但是现在的prefetch 效果有多糟糕我想你…...

这个时候了,你还不会不知道JavaMail API吧

一、概述 1.1 简述 JavaMail API 顾名思义&#xff0c;提供给开发者处理电子邮件相关的编程接口&#xff0c;它是Sun发布的用来处理email的API&#xff0c;其提供独立于平台且与协议无关的框架来构建邮件和消息传递应用。JavaMail API 提供了一组抽象类&#xff0c;用于定义组…...

JavaScript var let区别

文章目录JavaScript var & let区别变量作用域变量提升变量重复声明全局对象属性for循环中的作用域JavaScript var & let区别 var和let都是用来声明变量的关键字。 变量作用域 var声明的变量作用域是函数作用域或全局作用域&#xff0c;而let声明的变量作用域是块级作…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地

借阿里云中企出海大会的东风&#xff0c;以**「云启出海&#xff0c;智联未来&#xff5c;打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办&#xff0c;现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...