Java真的不难(五十四)RabbitMQ的入门及使用
RabbitMQ的入门及使用
一、什么是RabbitMQ?
MQ全称为Message Queue,即消息队列。消息队列是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦
二、RabbitMQ与Kafka的全面对比
| 对比项 | Kafka | RabbitMQ | 
|---|---|---|
| 开发语言 | scala、Java | erlang | 
| 是否支持多租户 | 2.x.x支持 | 支持 | 
| 是否支持topic优先级 | 不支持 | 支持 | 
| 是否支持消息全局有序 | 不支持 | 支持 | 
| 是否支持消息分区有序 | 支持 | 支持 | 
| 是否有内置监控 | 无 | 有 | 
| 是否支持多个生产者 | 支持 | 支持 | 
| 是否支持多个消费者 | 支持 | 支持 | 
| 是否支持一个分区多个消费者 | 不支持 | 不支持 | 
| 是否支持JMX | 支持 | 不支持 | 
| 是否支持加密 | 支持 | 支持 | 
| 消息队列协议支持 | 仅支持自定义协议 | 支持AMQP、MQTT、STOMP协议 | 
| 客户端语言支持 | 支持 | 支持 | 
| 是否支持消息追踪 | 不支持 | 支持 | 
| 是否支持消费者推模式 | 不支持 | 支持 | 
| 否支持消费者拉模式 | 支持 | 支持 | 
| 是否支持广播消息 | 支持 | 支持 | 
| 是否支持消息回溯 | 支持消息回溯,因为消息持久化,消息被消费后会记录offset和timstamp | 不支持,消息确认被消费后,会被删除 | 
| 是否支持消息数据持久化 | 支持 | 支持 | 
| 是否支持流量控制 | 支持 | 支持 | 
| 是否支持事务性消息 | 支持 | 不支持 | 
| 元数据管理 | 通过zookeeper进行管理 | 支持 | 
| 默认服务端口 | 9092 | 5672 | 
| 默认监控端口 | kafka web console 9000;kafka manager 9000; | 15672 | 
| 相对网络开销 | 较小 | 较大 | 
| 相对内存消耗 | 较小 | 较大 | 
| 相对cpu消耗 | 较大 | 较小 | 
实际场景选择:
Kafka :
 常作为消息传输的数据管道 ,优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,严谨性上不如 RabbitMQ,但 kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况 ,若消息吞吐量极大则Kafka
RabbitMQ:
 RabbitMQ金融场景中经常使用 ,常作为交易数据作为数据传输管道, 具有较高的严谨性,数据丢失的可能性更小,具备更高的实时性,和Spring是统一厂商开发,后期支持比较好,目前最流行的,对容错性的处理比较完善
RabbitMQ 支持发布订阅、轮询分发、公平分发、重发、消息拉取
 Kafka 不支持重发、事务
三、Linux上安装RabbitMQ
这次安装将RabbitMQ部署在Linux上,可以在电脑本地安装一台Linux,也可以购买云服务器,若购买云服务器,则需要在安全组内把后面要用到的端口给打开!
1、使用Docker安装(最简单的方法)
拉取RabbitMQ:
以下均为Linux指令
#拉取RabbitMQ:
docker pull RabbitMQ#启动rabbitmq
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq#查看docker目前在运行的容器,是否有rabbitmq
docker ps
2、图形化安装插件
#进入运行中的容器
docker exec -it 镜像ID /bin/bash#rabbitmq图形化安装插件
rabbitmq-plugins enable rabbitmq_management
3、WEB页面开启资源监控
#进入容器
docker exec -it rabbitmq /bin/bash#切到对应目录
cd /etc/rabbitmq/conf.d/#修改 management_agent.disable_metrics_collector = false
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf#退出容器
exit#重启容器
docker restart rabbitmq
然后在浏览器内输入服务器地址+端口(15672)即可进入WEB管理页面,默认账号密码均为:guest

四、Docker配置RabbitMQ集群
先停止在运行的MQ
docker stop 运行容器ID
#启动三个容器
docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -p 15672:15672 -p 5672:5672 -p 1883:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmqdocker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -p 15673:15672 -p 5673:5672 -p 1884:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 rabbitmqdocker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -p 15674:15672 -p 5674:5672 -p 1885:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02 rabbitmq#Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。因为 RabbitMQ 是用Erlang实现的,Erlang Cookie 相当于不同节点之间通讯的密钥,Erlang节点通过交换 Erlang Cookie 获得认证#进入第二个容器
docker exec -it rabbitmqCluster02 bash
rabbitmqctl stop_app
rabbitmqctl reset#加入集群
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit#进入第三个容器
docker exec -it rabbitmqCluster03 bash
rabbitmqctl stop_app
rabbitmqctl reset#加入集群
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit#主要参数
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
-p 1883:1883 mqtt  访问端口
然后依次运行2、3的步骤安装插件和开启资源监控
 上诉15672、15673、15674端口均需要在服务器安全组内开启
完成后即可的WEB页面看到MQ的集群
五、SpringBoot整合使用RabbitMQ
依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml文件的配置:
spring:rabbitmq:username: guestpassword: guestvirtual-host: /host: 通道运行的地址port: 5672
生产者配置类Config:
模拟美团外卖下单,正常下单后若马上被接单则被这条信息直接消费,若5秒内没人接单,该订单消息将进入加急派单队列(死信队列)
所以我们需要两台交换机和两个通道
@Configuration
public class mtRabbitConfig {//正常通道的交换机@Beanpublic FanoutExchange mtExchange(){return new FanoutExchange("mt_fanout_exchange",true,false);}//死信通道的交换机@Beanpublic FanoutExchange mtDeadExchange(){return new FanoutExchange("mt_fanout_dead_exchange",true,false);}//正常通道,给消息设计过期时间,超过该时间未被消费,则进入指定的mt_fanout_dead_exchange@Beanpublic Queue mtQueue(){Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","mt_fanout_dead_exchange");return new Queue("mt_queue",true,false,false,args);}@Beanpublic Queue mtDaedQueue(){return new Queue("mt_dead_queue",true,false,false);}//交换机与通道绑定@Beanpublic Binding mtBinding(){return BindingBuilder.bind(mtQueue()).to(mtExchange());}@Beanpublic Binding mtDeadBinding(){return BindingBuilder.bind(mtDaedQueue()).to(mtDeadExchange());}
}
生产者(模拟用户下单):
    @Autowiredprivate RabbitTemplate rabbitTemplate;//模拟美团订单下单,若5秒不接单(消费),则进入死信队列(加急派单)public void mtTakeOutOrder(String name, String food, String number) {UUID takeOutId = UUID.randomUUID();String orderTime = DateFormat.getDateTimeInstance().format(new Date());String exchangeName = "mt_fanout_exchange";String takeOutMes = "美团订单编号:" + takeOutId + " " + orderTime + " " + name + " " + food + " " + number;String routingKey = "";//将消息放入通道内Object result = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, takeOutMes);System.out.println("配送中心响应:"+result);}
测试类:
    @Testvoid mtTakeOutOrder() throws InterruptedException {takeOutOrder.mtTakeOutOrder("小张"+i, "麻辣烫", "10086");}
消费者(外卖接单中心):
正常接单消费者:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "mt_queue", autoDelete = "false"),exchange = @Exchange(value = "mt_fanout_exchange", type = ExchangeTypes.FANOUT)))
@Component
public class mtTakeOutDelivery {@RabbitHandlerpublic String buyTrainTickets(String message) {System.out.println("正常美团外卖订单已接单:" + message);return "配送中心已接单";}
}
加急接单消费者(死信队列内的消息):
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "mt_dead_queue", autoDelete = "false"),exchange = @Exchange(value = "mt_fanout_dead_exchange", type = ExchangeTypes.FANOUT)))
@Component
public class mtDeadTakeOutDelivery {@RabbitHandlerpublic String buyTrainTickets(String message) {System.out.println("加急饿了么外卖订单已接单:" + message);return "配送中心已接单";}
}
消费者运行后,只要监听的两个通道内有消息,就会被消费
六、RabbitMQ的手动ACK
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
自动应答就是上面的案例,只要被消费者取出,通道内就会删除这个消息,万一这个消息在消费者那边处理异常,因为通道里已经没用这条消息了,就会出现消息丢失。所以在有些场景需要改为手动应答ACK,就是消费者把这条消息确认处理完毕后,再告诉通道删除消息,若异常,这条消息将返回通道内可以重新处理,这就是手动应答。
还是一样,来配置两个通道:
生产者配置:
@Configuration
public class TestQueueConfig {@Beanpublic FanoutExchange TestExchange() {return new FanoutExchange("test_exchange", true, false);}@Beanpublic FanoutExchange TestDeadExchange() {return new FanoutExchange("test_dead_exchange", true, false);}@Beanpublic Queue TestDeadQueue() {return new Queue("test_dead_queue", true, false, false);}@Beanpublic Queue TestQueue() {Map<String, Object> args = new HashMap<>();//20秒钟未消费转到死信队列args.put("x-message-ttl", 5000);args.put("x-dead-letter-exchange", "test_dead_exchange");return new Queue("test_queue", true, false, false, args);}@Beanpublic Binding TestBinding() {return BindingBuilder.bind(TestQueue()).to(TestExchange());}@Beanpublic Binding TestDeadBinding() {return BindingBuilder.bind(TestDeadQueue()).to(TestDeadExchange());}
}生产者业务代码不变,与上个案例一致也行
消费者配置类:
@Configuration
public class MyselfReceiverConfig {@Autowiredprivate CachingConnectionFactory cachingConnectionFactory;@Autowiredprivate MyselfReceiver myselfReceiver;@Autowiredprivate MyselfDeadReceiver myselfDeadReceiver;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(10);//手动确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setQueueNames("test_queue");container.setMessageListener(myselfReceiver);return container;}@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer_Dead(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(10);//手动确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setQueueNames("test_dead_queue");container.setMessageListener(myselfDeadReceiver);return container;}
}
正常消费者:
@Component
public class MyselfReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte[] bytes = message.getBody();String mes = new String(bytes);String substring = mes.replace("\\\"","'");System.out.println("正常通道内消息:"+substring);//业务主体//若业务处理无异常,则回复通道删除消息channel.basicAck(deliveryTag,true);}catch (Exception e){channel.basicReject(deliveryTag,true);}}
}
死信通道消费者:
@Component
public class MyselfDeadReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte[] bytes = message.getBody();String mes = new String(bytes);String result = mes.replace("\\\"", "'");System.out.println("死信通道内消息:" + result);//业务主体//若业务处理无异常,则回复通道删除消息channel.basicAck(deliveryTag, true);} catch (Exception e) {//有异常把消息返回通道channel.basicReject(deliveryTag, true);}}
}
这样就完成了手动ACK,若消费者处理没有异常,将使用channel.basicAck(deliveryTag, true);
 若出现了异常,将使用channel.basicReject(deliveryTag, true);,此消息将重新进入通道内,这也确保了未消费成功的消息不会出现丢失的情况。

相关文章:
 
Java真的不难(五十四)RabbitMQ的入门及使用
RabbitMQ的入门及使用 一、什么是RabbitMQ? MQ全称为Message Queue,即消息队列。消息队列是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。…...
 
Unity | Script Hot Reload
官网地址:https://hotreload.net/ 一、作用 Unity在运行时,可以直接修改代码,避免等待过长的编译时间。 二、说明 1、支持的平台? Windows、MacOS、Linux 2、支持的Unity版本? 2018.4 (LTS)2019.4 (LTS)2020.3 (L…...
 
3|射频识别技术|第五讲:数据通信和编码技术|第九章:编码与调制|重点理解掌握传输介质中的有线传输介质
计算机网络部分:https://blog.csdn.net/m0_57656758/article/details/128943949传输介质分为有线传输介质和无线传输介质两大类;有线传输介质通常包含双绞线、同轴电缆和光导纤维;无线传输介质包含微波、红外线等。传输介质的选择和连接是网络…...
 
【遇见青山】基于Redis的Feed流实现案例
【遇见青山】基于Redis的Feed流实现案例1.关注推送2.具体代码实现1.关注推送 关注推送也叫做Feed流,直译为投喂。为用户持续的提供"沉浸式”的体验,通过无限下拉刷新获取新的信息。 Feed流产品有两种常见模式: 这里我们实现基本的TimeL…...
【芯片前端】一文搞定|寄存器组织生成与uvm ral_model环境全流程
前言 本文以组织一个系统(或模块)寄存器为例,进行寄存器与ral生成相关的全流程展示。内容包括如下几个部分: 寄存器文档组织 描述文件与辅助RTL代码结构 ralf/ral/rtl文件代码结构 UVM RAL访问环境组织 寄存器文档组织 在windows路径下组织寄存器文档,格式为excel表格。…...
 
Leetcode力扣秋招刷题路-0061
从0开始的秋招刷题路,记录下所刷每道题的题解,帮助自己回顾总结 61. 旋转链表 给你一个链表的头节点 head ,旋转链表,将链表每个节点向右移动 k 个位置。 示例 1: 输入:head [1,2,3,4,5], k 2 输出&…...
 
xilinx srio ip学习笔记之axistream接口
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 xilinx srio ip学习笔记之axistream接口前言接口转化前言 srio 的IQ接口都是基于axistream的,以前没怎么用过axistream的接口,或者说没怎么用过复杂条…...
 
轨迹误差评估指标[APE/RPE]和EVO
轨迹误差评估指标[APE/RPE]和EVO1. ATE/APE2. RPE3. EVO3.1 评估指标3.2 使用3.2.1 轨迹可视化3.2.2 APE3.2.3 RPEReference: 高翔,张涛 《视觉SLAM十四讲》视觉SLAM基础:算法精度评价指标(ATE、RPE) 在实际工程中,我…...
 
uni-app 消息推送功能UniPush
uni-app 消息推送功能UniPush,这里用的是uni-app自带的UniPush1.0(个推服务),所以只针对UniPush1.0介绍实现步骤。 建议查阅的文章: UniPush 1.0 使用指南[2] Unipush 常见问题[3] 当然现在已经出了UniPush2.0(HBuilde…...
面试题(二十六)场景应用
1. 场景应用 1.1 微信红包相关问题 参考答案 概况:2014年微信红包使用数据库硬抗整个流量,2015年使用cache抗流量。 微信的金额什么时候算? 微信红包的金额是拆的时候实时算出来,不是预先分配的,采用的是纯内存计…...
 
密码技术在车联网安全中的应用与挑战
随着智慧交通和无人驾驶的快速发展,车联网产业呈现蓬勃发展态势,车与云、车与车、车与路、车与人等综合网络链接的融合程度越来越高,随之而来的安全挑战也更加严峻。解决车联网的安全问题需要一个整体的防护体系,而密码技术凭借技…...
 
富媒体数据管理解决方案:简化、优化、自动化
富媒体数据管理解决方案:简化、优化、自动化 适用于富媒体的 NetApp 解决方案有助于简化和降低数据管理成本,优化全球媒体工作流并自动执行媒体资产管理。这将有助于减轻您的负担。 为什么选择 NetApp 的富媒体数据管理解决方案? 成本更低…...
 
QT入门Input Widgets之QFontComboBox、QTextEdit、QPlainTextEdit、QDial、QKeySequenceEdit
目录 一、QFontComboBox的相关介绍 1、实际使用 二、QTextEdit与QPlainTextEdit 三、QDial的相关介绍 四、QKeySequenceEdit的相关介绍 此文为作者原创,创作不易,转载请标明出处! 一、QFontComboBox的相关介绍 1、实际使用 一般使用较…...
 
Java企业级开发学习笔记
文章目录一、Spring1.1、Slay Dragon1.2、RescueDamselQuest一、Spring 第一周写了两个小项目均使用了原始调用和容器的方法 两个项目:<斩杀大龙与上路保卫战> 配一张文件位置图 1.1、Slay Dragon BraveKnight package net.sherry.spring.day01;public c…...
【算法基础】(一)基础算法 ---高精度
✨个人主页:bit me ✨当前专栏:算法基础 🔥专栏简介:该专栏主要更新一些基础算法题,有参加蓝桥杯等算法题竞赛或者正在刷题的铁汁们可以关注一下,互相监督打卡学习 🌹 🌹 dz…...
 
电源口防雷器电路设计方案
电源口防雷电路的设计需要注意的因素较多,有如下几方面:1、防雷电路的设计应满足规定的防护等级要求,且防雷电路的残压水平应能够保护后级电路免受损坏。2、在遇到雷电暂态过电压作用时,保护装置应具有足够快的动作响应速度&#…...
 
【零基础入门前端系列】—表单(七)
【零基础入门前端系列】—表单(七) 一、什么是表单 表单在Web网页中用来给访问者填写信息,从而采集客户信息端,使得网页具有交互功能。一般是将表单设计在一个HTML文档中,当用户填写完信息后做提交操作,于…...
 
Linux安装python3
Linux安装python3一.介绍二.下载三.配置1.文件夹2.安装依赖3.安装4.配置4.1python关系4.2配置测试-映射python3文件4.2.1 不用设置默认python3为默认版本4.2.2 将python3设置默认版本一.介绍 因为我的Centos7虚拟机里面只有python2.7.5,我想安装一个python3但是还要…...
怎么通过中级职称有窍门吗?
中级职称评审对人才加薪、升职自然不必说,更重要的是职称证书对于公司和企业同样具有重要的价值和意义,因此只要是说公司办理资质或者有项目招投标的公司对于人才参加中级职称评审毫无疑问会给予大力支持,既然工程师职称有这么多的好处&#…...
 
SAP ABAP根据事务码查找增强最直接的方法
下面是为任意事务代码查找用户出口的步骤: 方法一: 第 1 步:使用 事务代码:SE93。输入您要搜索用户出口的 事务代码。 在我们的场景中,我们将使用 CO11N。 第 2 步:点击显示: 第 3 步…...
 
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
 
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
安卓基础(aar)
重新设置java21的环境,临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的: MyApp/ ├── app/ …...
 
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
 
从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...
 
自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
 
论文阅读:Matting by Generation
今天介绍一篇关于 matting 抠图的文章,抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法,已经有很多的工作和这个任务相关。这两年 diffusion 模型很火,大家又开始用 diffusion 模型做各种 CV 任务了&am…...
[USACO23FEB] Bakery S
题目描述 Bessie 开了一家面包店! 在她的面包店里,Bessie 有一个烤箱,可以在 t C t_C tC 的时间内生产一块饼干或在 t M t_M tM 单位时间内生产一块松糕。 ( 1 ≤ t C , t M ≤ 10 9 ) (1 \le t_C,t_M \le 10^9) (1≤tC,tM≤109)。由于空间…...
 
yaml读取写入常见错误 (‘cannot represent an object‘, 117)
错误一:yaml.representer.RepresenterError: (‘cannot represent an object’, 117) 出现这个问题一直没找到原因,后面把yaml.safe_dump直接替换成yaml.dump,确实能保存,但出现乱码: 放弃yaml.dump,又切…...
 
算法—栈系列
一:删除字符串中的所有相邻重复项 class Solution { public:string removeDuplicates(string s) {stack<char> st;for(int i 0; i < s.size(); i){char target s[i];if(!st.empty() && target st.top())st.pop();elsest.push(s[i]);}string ret…...
