Rabbit MQ整合springBoot
- 一、pom依赖
- 二、消费端
- 2.1、application.properties 配置文件
- 2.2、消费端核心组件
- 三、生产端
- 3.1、application.properties 配置文件
- 2.2、生产者 MQ消息发送组件
- 四、测试
- 1、生产端控制台
- 2、消费端控制台
一、pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--spring整合MQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
二、消费端
2.1、application.properties 配置文件
server.port=8002
#上下文路径
server.servlet.context-path=/
spring.application.name=rabbit_consumer# MQ配置
spring.rabbitmq.addresses=192.168.220.3:5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
# 虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时 15秒
spring.rabbitmq.connection-timeout=15000
# 设置消费端消费成功消息后手动签收消息,默认auto自动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=6
# 最大消费线程数,并发数
spring.rabbitmq.listener.simple.max-concurrency=11
# prefetch为限制一次传送给消费者的消息数
spring.rabbitmq.listener.simple.prefetch=1# 自定义属性配置 MQ
spring.rabbitmq.listener.test.exchange=test_topic_exchange
spring.rabbitmq.listener.test.exchange.type=topic
spring.rabbitmq.listener.test.queue=test_topic1
spring.rabbitmq.listener.test.key=test_topic1.*
2.2、消费端核心组件
package com.xiao.component;import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;@Component
public class RabbitMQReceived {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "${spring.rabbitmq.listener.test.exchange}",type = "${spring.rabbitmq.listener.test.exchange.type}",durable = "true",ignoreDeclarationExceptions = "true"),value = @Queue(value = "${spring.rabbitmq.listener.test.queue}",durable = "true"),key = "${spring.rabbitmq.listener.test.key}"/*,admins = "root"*/))/*** 监听消息* @param message 消息* @param channel 通道*/@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws IOException {System.err.println("=====================================");System.err.println("消费端 RabbitMQReceived 消费消息:" + message.getPayload());Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//由于消费端配置手动消费消息后签收机制 spring.rabbitmq.listener.simple.acknowledge-mode=manual
// channel.basicAck(deliveryTag,false);System.err.println("消费端 RabbitMQReceived ack:yes deliveryTag:" + deliveryTag);}
}
三、生产端
3.1、application.properties 配置文件
server.port=8001
#上下文路径
server.servlet.context-path=/
spring.application.name=rabbit_produce# MQ配置
spring.rabbitmq.addresses=192.168.220.3:5672
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
# 虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时 15秒
spring.rabbitmq.connection-timeout=15000
# 开启produce发送给broker的消息确认模式,可靠性投递
spring.rabbitmq.publisher-confirms=true
#spring.rabbitmq.publisher-confirm-type=true #有点问题
# 针对于broker未接收的消息return机制,需要结合mandatory一起使用
#spring.rabbitmq.template.mandatory=true
#spring.rabbitmq.publisher-returns=true
2.2、生产者 MQ消息发送组件
package com.xiao.component;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;@Component
public class RabbitMQSender {@Autowiredprivate RabbitTemplate rabbitTemplate;//生产者发送消息到broker确认回调接口private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 消息的唯一标识* @param ack broke broker是否签收成功* @param cause 失败异常信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String formatStr = String.format("生产端 confirmCallback 相关数据:%s," +"broker签收情况 ack=%s,异常信息:%s" ,correlationData.toString(),ack,cause);System.err.println(formatStr);/*System.out.println("生产端 confirmCallback 相关数据:" + correlationData);System.out.println("生产端 confirmCallback broker签收情况:" + ack);System.out.println("生产端 confirmCallback 异常信息:" + cause);*/}};/*** 发送消息* @param message 消息* @param properties 消息对应的属性,如时间*/public void send(Object message, Map<String,Object> properties) {MessageHeaders messageHeaders = new MessageHeaders(properties);Message<?> msg = MessageBuilder.createMessage(message, messageHeaders);rabbitTemplate.setConfirmCallback(confirmCallback);//消息发送完后置处理器MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {System.err.println("生产端 RabbitMQSender send后置处理:" + message);return message;}@Overridepublic org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message, Correlation correlation) {System.err.println("生产端 RabbitMQSender send后置处理:" + message+" 消息标识:" + correlation);return message;}};//消息唯一属性CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("test_topic_exchange",//exchange,"test_topic1.xiao",// routingKey,msg, //message,messagePostProcessor,correlationData);}
}
四、测试
package com.xiao;import com.xiao.component.RabbitMQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
import java.util.Map;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class SendMessageTest {@Autowiredprivate RabbitMQSender rabbitMQSender;@Testpublic void send() throws InterruptedException {Map<String,Object> properties = new HashMap<>(2);properties.put("userName","xiao");rabbitMQSender.send("hello world!",properties);Thread.sleep(5000);//10秒}
}
1、生产端控制台
生产端 RabbitMQSender send后置处理:(Body:'[B@3a6045c6(byte[535])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=535, deliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 消息标识:CorrelationData [id=8c78e89d-80f3-4f3d-ba8b-13e863c6295c]
2023-07-21 20:05:37.611 INFO 4536 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.220.3:5672]
2023-07-21 20:05:37.653 INFO 4536 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#6f38a289:0/SimpleConnection@6215366a [delegate=amqp://root@192.168.220.3:5672/, localPort= 4712]
生产端 confirmCallback 相关数据:CorrelationData [id=8c78e89d-80f3-4f3d-ba8b-13e863c6295c],broker签收情况 ack=true,异常信息:null
2、消费端控制台
=====================================
消费端 RabbitMQReceived 消费消息:hello world!
消费端 RabbitMQReceived ack:yes deliveryTag:1

相关文章:
Rabbit MQ整合springBoot
一、pom依赖二、消费端2.1、application.properties 配置文件2.2、消费端核心组件 三、生产端3.1、application.properties 配置文件2.2、生产者 MQ消息发送组件四、测试1、生产端控制台2、消费端控制台 一、pom依赖 <dependency><groupId>org.springframework.boo…...
Golang 中的 time 包详解(一):time.Time
在日常开发过程中,会频繁遇到对时间进行操作的场景,使用 Golang 中的 time 包可以很方便地实现对时间的相关操作。接下来的几篇文章会详细讲解 time 包,本文先讲解一下 time 包中的结构体 time.Time。 time.Time time.Time 类型用来表示一个…...
CMU 15-445 -- Database Recovery - 18
CMU 15-445 -- Database Recovery - 18 引言ARIESLog Sequence NumbersNormal ExecutionTransaction CommitTransaction AbortCompensation Log Records Non-fuzzy & fuzzy CheckpointsSlightly Better CheckpointsFuzzy Checkpoints ARIES - Recovery PhasesAnalysis Phas…...
HTTP Header定制,客户端使用Request,服务器端使用Response
在服务器端通过request.getHeaders()是无效的,只能使用response.getHeaders()。 Overridepublic Object beforeBodyWrite(Object body, MethodParameter returnType, MediaType mediaType,Class selectedConverterType, ServerHttpRequest request, ServerHttpRespo…...
Vue 3编写的父子组件示例,包括传递数据和调用父组件方法
下面是一个使用Vue 3编写的父子组件示例,包括传递数据和调用父组件方法: ChildComponent.vue: <template><div><p>Child Component</p><p>Message: {{ message }}</p><button click"updateMes…...
[ 容器 ] Docker 的数据管理
目录 一、Docker 的数据管理1.1 数据卷2. 数据卷容器 二、 端口映射三、容器互联(使用centos镜像)四、Docker 镜像的创建1.基于现有镜像创建2.基于本地模板创建3.基于Dockerfile 创建3.1 联合文件系统(Unio…...
【环境配置】使用Docker搭建LAMP环境
这篇文章不是介绍DOCKER是什么,也不是阐述DOCKER的核心:镜像/容器和仓库之间的关系,它只是一篇让刚刚接触DOCKER的初学者,在没有完全了解DOCKER是什么之前,也能尽快的在Linux系统下面通过DOCKER来搭建一个LAMP环境,这是其一&#…...
MLIR (Multi-Level Intermediate Representation)
MLIR(Multi-Level Intermediate Representation)是一种多级中间表示的编译器基础架构,旨在提供通用的、可扩展的编译器基础设施。它最初由谷歌开发,并且现在已经成为一个开源项目,受到广泛关注和采用。 MLIR 的设计理…...
VR全景在酒店的发展状况如何?酒店该如何做营销?
现阶段,VR全景技术已经被酒店、民宿、旅游景区、房产楼盘、校园等行业所应用,每天都有不少人通过VR全景展示来了解酒店的设施环境,而酒店也可以借此机会,详细展示自身优势,更大范围吸引顾客。 VR酒店拥有真实、立体的全…...
Winform使用PictureBox控件显示图片并且自适应
一.首先我们只需要在项目文件中的/bin/Debug 下面创建一个文件夹保存你的照片。我这里文件夹名字叫Resources.。如图: 二. 然后我们把我们的照片放入Resources文件夹中即可。如图: 三.在构造器中添加picturebox控件。如图: 四.我们到初始化代…...
HTML中的焦点管理
前言 焦点作为页面交互中的重要一环,涉及到的知识点也比较多,有必要做一个统一的总结。 HTML 中的可获取焦点的元素 具有 href 属性的 HTMLAnchorElement/HTMLAreaElement非禁用态的 HTMLInputElement/HTMLSelectElement/HTMLTextAreaElement/HTMLBut…...
如何区分接口测试和功能测试
接口测试和功能测试的区别: 2023最新Jmeter接口测试从入门到精通(全套项目实战教程) 本文主要分为两个部分: 第一部分:主要从问题出发,引入接口测试的相关内容并与前端测试进行简单对比,总结两者…...
limit分页查询
controller层 ApiOperation("员工分页查询")GetMapping("/page")public Result<PageResult> page(EmployeePageQueryDTO employeePageQueryDTO){log.info("员工分页查询,参数为{}",employeePageQueryDTO);PageResult pageResul…...
mysql null 值查询不出来问题
最新遇到mysql null 值查询的问题,当查询这个字段有的为null 有的不为null 该字段查询条件查询为null值得将不显示。 举例 新建表 test_user name和phone得值默认值为null 我们添加一些数据 查询下name 不是张三得数据 select * from test_user where name !张…...
面试之CurrentHashMap的底层原理
首先回答HashMap的底层原理? HashMap是数组链表组成。数字组是HashMap的主体,链表则是主要为了解决哈希冲突而存在的。要将key 存储到(put)HashMap中,key类型实现必须计算hashcode方法,默认这个方法是对象的地址。接…...
Error in onLoad hook: “ReferenceError: plus is not defined“ found in
项目场景: 项目背景如下所示: 使用 HBuilder X 开发 项目, 调整页面时,直接运行到 浏览器查看页面设置效果,导致控制台出现下述报错信息 例如: 问题描述 遇到的问题如下所示: APP 中接收数据…...
ansible自动化运维(二)剧本、角色编写实战
😘作者简介:一名运维工作人员。 👊宣言:人生就是B(birth)和D(death)之间的C(choise),做好每一个选择。 🙏创作不易,动动小…...
【Spring框架】@Resource注入以及与@Autowired的区别
目录 使用Resource设置name的方式来重命名注入的对象区别 使用Resource设置name的方式来重命名注入的对象 package com;import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.spr…...
FTP服务器的搭建和配置上传脚本
文章目录 前言一、配置本地用户可上传权限ftp服务器1、用户登录ftp 二、配置FTP上传脚本文件1.脚本代码如下 补充知识 前言 vsftpd(Very Secure FTP Daemon)是一个在 Linux/Unix 系统上运行的一款开源免费的 FTP 服务器软件。vsftpd 支持支持 匿名用户、…...
Ubuntu22.04上部署Lua开发环境
需求背景 想在Ubuntu22.04上搭建一下Lua的开发环境,其实步骤比较简单的,此文章也适用于Ubuntu主机环境搭建Lua,如果想在在Ubuntu内部署一个容器,然后在容器内搭建Lua的环境,可以先参考容器的创建过程 ubuntu22.04上如何创建有pri…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
是否存在路径(FIFOBB算法)
题目描述 一个具有 n 个顶点e条边的无向图,该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序,确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数,分别表示n 和 e 的值(1…...
