快速搞定分布式RabbitMQ---RabbitMQ进阶与实战
本篇内容是本人精心整理;主要讲述RabbitMQ的核心特性;RabbitMQ的环境搭建与控制台的详解;RabbitMQ的核心API;RabbitMQ的高级特性;RabbitMQ集群的搭建;还会做RabbitMQ和Springboot的整合;内容会比较多,将原理跟应用相结合,希望大家能有所收获!
1.初始RabbitMQ核心概念
RabbitMQ是一个开源的消息代理与队列服务器,用来通过普通协议在完全不同
 的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ
 是基于AMQP协议的。
 MQ使用的场景:
 电商系统跟物流体系:体系之间(为什么可以使用MQ,
 因为对于消息的传递无非就是几种模式,rpc通信(包括http,主流的java框架
 Springcloud,springboot都是;),)
 逻辑:即时性很强,比如付款马上知道是否成功,一般来讲都会采用rpc
 或者http请求
 服务解耦;削峰填谷;异步化;
本身也可完成消息堆积的这件事情。消息堆积能力有限。
 解决方式:要么有足够的consumer;消费处理能力比较快
语言分析:
 Erlang语言:
 一种交换机语言,强大点在于:数据同步非常快(节点与节点之间)
 **AMQP协议:**高级消息队列协议;
 定义:是具有现代特征的二进制协议,是一个提供统一消息服务的
 应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向
 消息的中间件设计。
 AMQP核心概念:
 Server:又称Broker,接受客户端的连接,实现AMQP实体服务
 Connection:连接,应用程序与broker的网络连接
 Channel:网络信道,几乎所有的操作都在channel中进行,Channel是
 进行消息读写的通道。客户端可以建立多个channel,每个channel
 代表一个会话任务。
 Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
协议模型:
 生产者产生通过rabbitMQ去把消息投递到rabbitMQ,
 通过复杂的exchange放入Message Queue中;消费者从messageQueue
 中获取消息
 
 server:代表rabbitMQ
 virtualhost: 虚拟主机;可以进行作用域的划分;虚拟地址,用于进行逻辑隔离,最上层的消息路由。
 一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual
 Host 里面不能有相同名称的Exchange 或 Queue。
 Exchange:交换机,接收消息,根据路由键转单消息到绑定队列(主题。与message queue相关,决定数据发送到那个message)
 Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing key
 Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)
 Queue:消息队列。多对多的关系,应用场景下多为一对多的关系).
Rabbit的整理架构:
 实际工作中一个consumer消费一个queue最好。
 应用场景下,一个exchange对多个queue;

 消息是如何流转的:
 
2.RabbltMQ环境搭建与控制详解
(1)RabbltMQ安装教程:
 这里使用RabbitMQ3.6.5版本进行操作
 环境搭建:
 官网地址:http://www.rabbitmq.com/
 环境描述:linux (centos 7 RedHat7)
 1.首先在linux上进行一些软件的准备工作,yum下来一些基础的软件包
 注:总共三个节点需要配置
 yum install build-essential openssl openssl-devel unixOOBC-idevel make gcc
 gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
配置好主机名称
 /etc/hosts /etc/hostname
2.下载RabbitMQ所需软件包(这里使用的是RabbitMQ 3.6.5稳定版本)
3.安装服务命令:
 rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm(命令报错)
 错误信息:依赖检测失败:tcp_wrappers 被 socat-1.7.3.2-1.1.el7.x86_64 需要
 使用 rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
4.修改用户登录与连接心跳检测,注意修改
 vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
 修改点1:loopback_users 中的<<”guest”>>,只保留guest(用于用户登录)
 (修改前:{loopback_users, [<<“guest”>>]}, 这里只需要将<<>>删除即可)
 修改点2:heartbeat 为10(用于心跳连接)
5.安装控制插件
5.1首先启动服务(后面|包含了停止,查看状态以及重启的命令)
 /etc/init.d/rabbitmq-server start | stop | status |restart
5.2 查看服务有没有启动:lsof -i:5672(5672是rabbit的默认端口)
 rabbitmq-plugins enable rabbitmq_management
5.3可查看管理端口有没有启动:
 lsof -i:15672 或者netstat -tnlp| grep 15672
 6. 一切ok,我们访问地址,输入用户名密码均为 guest:
 http://你的ip地址:15672/
总结:
 1.防火墙
 需要添加5672以及15672端口,防火墙允许其开放
 第一个命令:firewall-cmd --add-port=5672/tcp --permanent
 显示执行成功:success
 但是使用:firewall-cmd --query-port=5672/tcp查询的时候未查询到
 2.hostname
 修改涉及集群的测试;Hostname,修改后需要重启才能够生效;
(2)rabbitMQ控制台相关的内容:
 rabbitMQ自带的交换机:
 
 看不到相应的文件在哪,可以查看控制台:
 
 安装文档中,配置文件的地址也是有的:这两个是有优先级的,
 默认是控制台的优先级比较高
 /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
测试:
 
 1.创建exchange:
 exchange的type实际工作中使用的比较多的是direct(直连)和topic(主题)。
 fanout是广播.
 Durability:持久化设置durable:持久化,transient:不持久化。
 不持久化的时候,服务重启exchange就会消失。
 arguments:表示参数
创建队列:
 
 如何将exchange与queue建立关系:
 可以在控制台选择exchange操作,也可以在控制台的queue中选择对应的
 queue添加exchange就可。
绑定成功:
 
 发布消息:
 使用特定routing key消息才能够发送过来。
 通过binding实现数据发送
 使用queue对应的routing key,数据发送成功。
 
 此时,数据显示为两条。
 
 当不使用对应routing key:
 
 Butnot routed:表示发布失败。
 
 数据的总数没变。
3.RabbitMQ急速入门Helloworld
ConnectionFactory:获取连接工厂
 Connection:一个连接
 Channel:数据通信信道,可发送和接收消息
 Queue:具体消息存储队列
 Prodecer&Consumer:生产和消费者
核心:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version>
</dependency>  
 
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来
 发送和接收消息,包含两部分,其中spring-amqp是基础抽象,
 Spring-rabbit是底层的默认实现。
 SpringAMQP提供了三个功能:
 自动声明队列、交换机及其绑定关系
 基于注解的监听器模式,异步接收消息
 封装了RabbitTemplate工具,用于发送消息
生产者Sender:
ConnectionFactory connectionFactory  = new ConnectionFactory();
connectionFactory.setHost("192.168.56.107");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建ConnectionFactory
Connection connection = connectionFactory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
// 4 声明
String queueName = "testone";
///  参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时
是否自动删除, 其他参数
channel.queueDeclare(queueName,false,false,false,null);
Map<String, Object> headers = new HashMap<>();
//deliveryMode 1:消息非持久化,2:消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("UTF-8").headers(headers).build();
for(int i = 0; i < 5;i++) {String msg = "Hello World RabbitMQ " + i;//channel第一没有设置exchange的名称,是按照queueName来发送消息//使用的是默认的exchange,props就是上面的props。channel.basicPublish("", queueName , props , msg.getBytes());
}
 
备注:队列持久化与消息持久化是区分开的。
 只有队列跟消息都持久化时,重启之后消息才能够保存。
是否自动ack:ack就是确认收到消息发送的一个ack确认。
 Offset:下一条需要处理的消息的编号。
 
 补充:同步的情况下,没有ack;只有在异步的时候,才能回发ACK,ack会将offset传回去。
正常的生产环境,一般使用的是手动ack,因为不确定消息是否成功的传输,所以一定要手动ack
 消费者:Receiver
ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.56.107");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");//服务如果宕机的话,可以进行自动切换。
connectionFactory.setAutomaticRecoveryEnabled(true);
//设置网络恢复间隔
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName = "testone";
// durable是否持久化消息
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称,是否自动ACK,Consumer,相当于设置监听
channel.basicConsume(queueName,true,consumer);
//循环获取消息while (true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息"+msg);}
 
4.RabbltMQ核心API
(1)API-exchange之Direct
 Exchange:发送和接收消息;并根据路由键转发消息
 所绑定的队列
 
 消息是根据你的路由key加上你的exchange所对应的
 决定他到底传到那个队列;
交换机属性:
 Name:交换机名称;
 Type:交换机类型 direct,topic,fanout,headers
 Durability:是否需要持久化,true为持久化
 AutoDelete:当最后一个绑定到Exchange上的队列删除后,
 自动删除该exchange(一般设置不自动);
 Internal:当前exchange是否用于RabbitMQ内部使用,默认为False
 Arguments:扩展参数,用于扩展AMQP协议自制定义使用,
 
 
 首先设置exchange的name,然后设置queue信息。
 最后通过queueBind实现三者的binding关系,生产者
 只关注发到那个exchange和携带的routingKey,
 direct模式要求exchangename必须与routingkey完全匹配,不能模糊匹配。
 
 当routingkey和队列的名字一样,也能发过去;
 走的是amqp.defaultexchanage
(2)API-exchange之Topic
 所有发送的TopicExchange 的消息都被转发到所有关心RouteKey
 中指定的Topic的Queue上。
 Exchange将RouteKey和某topic进行模糊匹配,此时队列需要
 绑定一个topic
 注:可以使用通配符进行模糊匹配
 
 
 最好别用上面的匹配规则,建议一种消息就搞一种匹配规则就好了。
问题:
 consumer2为什么也能够消费user.delete.abc.
 
因为对于统一个queue绑定了两个不同的规则同一个队列,绑定了两个规则,所以这两个消费者无论谁都可以随机消费,
 但如果两个规则对应两个不同的队列,两个消费者
 也消费各自的队列,这时候就按匹配规则来了,
另外,如果两个消费者监听同一个队列,那么他俩会均摊消息不回重复消费,但如果监听各自不同的队列,而且发送方也都能发送给这两个队列,消费者也消费两个不同的队列,那这个时候就各自消费各自的了
备注:现实情况,最好使用控制台去创建台,去绑定key,queue
 以及对应的exchange之间的关系。
(3)API-exchange之Fanout
 不处理路由键,只需要简单的将队列绑定到交换机上。
 发送到交换机的消息会被转发到与该 交换机绑定的所有队列上
 Fanout交换机转发消息是最快的。
 
 (4)API-其他关键概念讲解
 Binding-绑定
 Exchange和exchange,queue之间的连接关系;
 Binding可以包含RoutingKey和参数
 队列:Queue
Message:
 服务器和应用程序之间传送的数据
 本质上就是一段数据,由properties和Payload(Body)组成
 常用属性:delivery mode.headers(自定义属性);
 自定义属性:content_type,content_encoding,piority;
 correlation_id,reply_to,expiration,message_id;
 Timestamp,type,user_id,app_id,cluster_id;
 备注:correlation_id可以作为唯一标记,message_id也是可以当成
 唯一的id.expiration:可以用来设置过期时间。
 Virtual host:虚拟主机
 虚拟地址:用于进行逻辑隔离,最上层的消息路由;
 一个virtual host里面可以有若干个Exchange和Queue;
 同一个Virtual host里面不能有相同的名称和Exchange和Queue。
5.RabbitMQ高级特性
(1)生产端可靠性投递与消费端幂等性
 需要解决的问题:
 如何保障消息100%的投递成功?
 幂等性概念详解
 在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
 Confirm确认消息,return返回消息;
 消息的ACK与重回队列
 消息的限流;
 TTL消息;生存时间
 死信队列;
如何保障消息100%的投递成功?
 什么是生产端的可靠性投递?
 保障消息的成功发出;保障MQ节点成功接收;
 发送端收到MQ节点(Broker)确认应答;完善的消息补偿机制;
BAT/TMD互联网大厂的解决方案:
 机制一:消息落库,对消息的状态进行打标;机制二:分两次发送
 
 BIZDB:就是自己的业务,MSGDB一个记录(理解为log日志记录)。
 理解:
 业务入库要做一个log日志。(同源:一个链接可以去操作
 一个地址下面多个数据库,一个connection,它在一个事务内的)
 Step1:业务入库;step2:日志记录;(上面两个step需要实现原子性)
 Step3:将消息发送到MQ Broker上。(需要broker返回ack,确认消息发送成功)
 Step4:传递ack;
 Producer Component:监听,接受broker端给与的ACK,
 Step5:更新MSGDB,更新状态,表示发送成功。
 (当一直没有ack,这时你可以设置超时时间,用定时任务去扫描日志表
 里状态一直是0的,然后再将对应的数据进行重发。)
幂等性概念:
 类比数据库的乐观锁机制;sql语句
 
 在海量订单产生的业务高峰期,如何避免消息的重复消费问题:
 消费端实现幂等性,意味着,我们的消息永远不会消费多次;
 即使我们收到了多条一样的消息;
业界主流的幂等性操作:
 业务唯一ID或者指纹码机制,利用数据库主键去重
 
 好处:实现简单
(2)生产端特性讲解-确认机制和返回机制
 理解Confirm消息确认机制:
 消息的确认,是指生产者投递消息后,如果Broker收到消息,
 则会给生产者一个应答;
 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,
 进行这种方式也是消息的可靠性投递的核心保障
Confirm消息的流程:brokerconfirm就是ack。
 
 
 Return消息机制:
 ReturnListener:用于处理一些不可路由的消息;
 我们消息生产者,通过制定一个exchange和Routingkey,
 把消息送达到某队列中,然后消费者监听队列,进行消费处理;
 但是在某些情况下,如果我们再发送消息的时候,当时的exchange不存在
 或者执行的路由key路由不到,这个时候我们需要监听这种不可达的消息
 就需要return Listener!
 基础api中有一个关键的配置项;
 Mandatory:如果为true,则监听器会收到路由不可达的消息,然后进行
 后续处理;如果为false,则,broker端会自动删除该消息。
 流程:
 Producer发送消息;路由不可达或者exchange不存在的时候;
 broker会丢弃掉这条消息;producer要知道消息发送情况,这
 就存在Return Listener机制。
代码示例:
 Confirm(Sender)核心部分:
//采用confirm模式,异步实现
channel.confirmSelect();
//添加监听,等待broker应答
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("------- error ---------");}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("------- ok ---------");}
});channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
 
不能保证百分之百发送成功,因为可能消息发到broker之后,
 Broker发送的ack没收到,网络中断了。压根没收到回调函数。就需要补偿机制。
Return在满足对应机制时是没有回应的。不调用handleReturn
 Return(Sender)核心部分:
//5 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException {System.out.println("**************handleReturn**********");System.out.println("replyCode: " + replyCode);System.out.println("replyText: " + replyText);System.out.println("exchange: " + exchange);System.out.println("routingKey: " + routingKey);System.out.println("body: " + new String(body));}});
 
(3)流控服务和ACK重回队列
 消费端限流:
 假设有个场景:首先,我们rabbitMQ服务器有上万条未处理的消息
 我们随便打开一个消费者客户端,会出现下面的情况:
巨量的消息全部推送过来,但是我们单个客户端无法同时处理这么
 多数据!
 RabbitMQ提供了一种qos(服务质量保证),即在非自动确认消息的
 前提下,如果一定数目的消息(通过基于consume或者channel设置
 Qos的值)未被确认前,不进行消费消息。
Void BasicQos(unit prefetchSize ,unshort prefetchCount,bool global);
 
参数:prefetchSize:报文大小
 prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于
 N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,
 知道有消息ack
 Global:true/false:是否将上面设置应用于channel。
 **理解:**global就是上面限制是channel级别还是consumer级别
 
 消费端的手工ACK与NACK
 消费端进行消费的时候,如果由于业务异常,我们可以进行日志标记,
 然后进行补偿(尽量不要做重回队列)
 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK,
 保障消费端消费成功
消息的重回队列:
 消息的重回队列是为了对没有处理成功的消息,把消息重新递给Broker!
 在实际工作中,都会关闭重回队列,设置为false(避免多次重回)
代码示例:
 核心部分(Receiver):
//  参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);  
// 循环获取消息  
while(true){  // 获取消息,如果没有消息,这一步将会一直阻塞  Delivery delivery = consumer.nextDelivery();  String msg = new String(delivery.getBody());    System.out.println("收到消息:" + msg);  Thread.sleep(1000);if((Integer)delivery.getProperties().getHeaders().get("flag") == 0) {//throw new RuntimeException("异常");channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);} else {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
} 
 
在这里,nack时候可以使用重回队列(与ack的主要区别)。
 设置重回队列的话,一直会打印消息,因为一直在重回队列。
关于流控的内容:如下示例。
 
 在传递消息前添加basicQos设置就可。
(4)TTL消息与死信队列详解
TTL队列/消息:(生存时间,Time to Live)
 RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
 RabbiitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了
 队列的超时时间配置,那么消息会自动删除。
**死信队列:**DLX,Dead-Letter-Exchange.
 利用DLX,当消息在某一个队列中变成死信(dead message)之后,
 它能被重新publish到另一个exchange,这个exchange就是DLX.
消息变成死信有几种情况:
 消息被拒绝(basic.reject/basic.nack),并且requeue=false(重回队列=false);
 消息的TTL过期;队列达到最大长度;
 DLX是一个正常的exchange,和一般的Exchange没有区别,它能在任何队列
 上被指定,实际上就是设置某个队列的属性。
 当某个队列中有死信时,rabbitMQ会自动的将这个消息发布到这只的exchange上去。
 进而被路由到另一个队列。
 可以监听这个队列中消息做相应处理,这个特性弥补rabbitMq3.0
 以前支持的immediate参数的功能。
死信队列的设置:
 首先,需要设置死信队列的exchange和queue,然后进行绑定。
 Exchange:dlx.exchange; Queue:dlx.queue;RoutingKey:#;(自己定义)
 然后我们进行正常声明交换机,队列,绑定,只不过我们需要在队列加上
 一个参数即可。Arguments.put(“x-dead-letter-exchange”,dlx.exchange);
 这样消息在过期,requeue,队列在达到最大长度时,消息就可以直接
 路由到死信队列!
代码示例:dlx
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.107");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();  
//4 声明
String exchangeName = "test_dlx_exchange";
String routingKey = "group.bfxy";
//5 发送Map<String, Object> headers = new HashMap<String, Object>();AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// TTL
.expiration("6000")
.headers(headers).build();String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , props , msg.getBytes()); 
 
6.RabbitMQ集群搭建-镜像队列集群搭建环境搭建实操
镜像模式:
 集群模式非常经典的就是Mirror镜像模式,保证数据100%
 不丢失,在实际工作中也是用的最多的,并且实现集群非常简单。
 一般互联网大厂都会构建这种镜像集群模式。
 Mirror镜像队列,目的是为了保证rabbitmq数据的高可靠性的解决方案。
 主要是实现数据的同步,一般来讲是2-3个节点实现数据同步
 (对于100%数据可靠性解决方案一般是3个节点)集群架构如下:
 
 具体安装步骤参考安装文档:
 Haproxy+keepalived可以参考我之前写的文章,执行安装!
 参考地址:
 https://blog.csdn.net/TOMORROW6COME/article/details/140296869
 节点参考:分配节点,定义安装集群;
 修改hostname即可。
安装成功:
 
 
 镜像模式配置成功:+2表示两个节点。可以在控制台看到具体的信息
7.RabbitMQ与Springboot整合生产端
依赖,参考上面的demo;
 编写测试RabbitSender
 确认消息,需要一个回调的监听,这个监听就是生产者把消息发出去之后,
 Borker收到消息,然后给我们回一个confirm应答,根据应答的状态是成功
 还是失败,来确认消息是发送成功还是失败。这样的话需要一个Listener。
代码示例:
public class RabbitSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 这里就是确认消息的回调监听,用于确认消息是否被broker所收到*/final RabbitTemplate.ConfirmCallback  confirmCallback = new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 作为唯一的标识* @param ack broker是否落盘成功* @param cause 失败的一些异常信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {}};/***  对外发送消息的方法* @param message 具体消息的内容* @param properties 额外的附加属性* @throws Exception*/public void send(Object message, Map<String, Object> properties)throws Exception{MessageHeaders mhs = new MessageHeaders(properties);//构造消息Message<?>  msg = MessageBuilder.createMessage(message,mhs);//设置监听rabbitTemplate.setConfirmCallback(confirmCallback);MessagePostProcessor mpp = new MessagePostProcessor() {@Overridepublic org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)throws AmqpException {System.err.println("---> post to do: " + message);return message;}};//指定业务唯一的idCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//发送消息,correlationData作为唯一的标记rabbitTemplate.convertAndSend("exchange-1","springboot.rabbit",msg,mpp,correlationData);}
 
8.RabbitMQ与springboot整合-消费端
代码示例:
 编写consumer类
@Component
public class RabbitReceive {/***     组合使用监听*     @RabbitListener @QueueBinding @Queue @Exchange* @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable = "true"),exchange = @Exchange(name = "exchange-1",durable = "true",type = "topic",ignoreDeclarationExceptions = "true"),key = "springboot.*"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {// 1. 收到消息以后进行业务端消费处理System.err.println("-----------------------");System.err.println("消费消息:" + message.getPayload());//  2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收// spring.rabbitmq.listener.simple.acknowledge-mode=manualLong deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}
}
 
测试类:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {@Autowiredprivate RabbitSender reRabbitSender;@Testpublic void testSender() throws Exception {Map<String, Object> properties = new HashMap<String, Object>();properties.put("attr1", "12345");properties.put("attr2", "abcde");reRabbitSender.send("hello rabbitmq!", properties);Thread.sleep(10000);} 
}
相关文章:
快速搞定分布式RabbitMQ---RabbitMQ进阶与实战
本篇内容是本人精心整理;主要讲述RabbitMQ的核心特性;RabbitMQ的环境搭建与控制台的详解;RabbitMQ的核心API;RabbitMQ的高级特性;RabbitMQ集群的搭建;还会做RabbitMQ和Springboot的整合;内容会比较多&#…...
5万字长文吃透快手大数据面试题及参考答案(持续更新)
目录 Flink为什么用aggregate()不用process() 为什么使用aggregate() 为什么不用process() 自定义UDF, UDTF实现步骤,有哪些方法?UDTF中的ObjectInspector了解吗? 自定义UDF实现步骤 自定义UDTF实现步骤 UDTF中的ObjectInspector Spark Streaming和Flink的区别 Flu…...
WordPress原创插件:启用关闭经典编辑器和小工具
WordPress原创插件:启用关闭经典编辑器和小工具 主要功能 如图所示,用于启用或禁用经典编辑器和经典小工具,以替代Gutenberg编辑器。 插件下载 https://download.csdn.net/download/huayula/89592822...
萝卜快跑:自动驾驶的先锋与挑战
萝卜快跑:自动驾驶的先锋与挑战 萝卜快跑作为自动驾驶领域的重要参与者,被视为自动驾驶的先锋。它代表了自动驾驶技术在实际应用中的重要突破,为人们的出行方式带来了革新。萝卜快跑的发展展示了自动驾驶技术的巨大潜力,如提高交通…...
得到xml所有label 名字和数量 get_xml_lab.py,get_json_lab.py
import os import xml.etree.ElementTree as ETrootdir2 r"F:\images3\xmls" file_list os.listdir(rootdir2) # 列出文件夹下所有的目录与文件# 初始化字典 classes_dict {}for file_name in file_list:path os.path.join(rootdir2, file_name)if os.path.isfi…...
数据结构算法-排序(二)
插入排序 插入排序核心 假设数组中的一部数据已经排好序,要插入的数据和这些数据进行比较,直到找到合适的位置插入新数据。 插入排序步骤 插入排序主要有以下步骤构成: 假设有序,我们假设**a[0]**已经排好序待插入的数据为a[j]…...
Linux安装与配置
下载VMware 首先我们需要下载一个叫VMware的软件: 进入官方下载,地址:https://www.vmware.com/cn/products/workstation-pro/workstation-pro-evaluation.html选择与自己电脑版本适配的VMware版本【 输入许可证密钥 MC60H-DWHD5-H80U9-6V85…...
AI赋能交通治理:非机动车监测识别技术在城市街道安全管理中的应用
引言 城市交通的顺畅与安全是城市管理的重要组成部分。非机动车如自行车、电动车、摩托车等在城市交通中扮演着重要角色,但同时也带来了管理上的挑战。尤其是在机动车道上误入非机动车的现象,不仅影响交通秩序,还可能引发交通事故。思通数科…...
水电站泄洪放水预警广播系统解决方案
一、背景 在现代水利工程管理中,水电站泄洪放水预警广播系统扮演着至关重要的角色。这一系统不仅关系到水电站的安全运行,也直接关系到下游地区人民群众的生命财产安全。因此,设计一套完善、高效、可靠的泄洪放水预警广播系统显得尤为必要。…...
【Django】ajax和django接口交互(获取新密码)
文章目录 一、需求1. 效果图 二、实验1. 写get接口后端2. 写html后端3. 写前端4. 测试 一、需求 1. 效果图 二、实验 1. 写get接口后端 写views import string import random def getnewpwd(request):words list(string.ascii_lowercasestring.ascii_uppercasestring.digi…...
Logback 日志打印导致程序崩溃的实战分析
在软件开发和运维中,日志记录是必不可少的一环,帮我们追踪程序的行为,定位问题所在。然而,有时日志本身却可能成为问题的根源。本文将通过一个真实的案例来探讨 Logback 日志系统中的一个常见问题,当并发量大ÿ…...
新加坡 Numen Cyber 与香港光环云数据有限公司达成战略合作
新加坡本土网络安全公司 Numen Cyber 宣布与香港光环云数据有限公司(简称“光环云香港”)建立战略合作伙伴关系。此次合作将重点放在云服务器和云服务业务场景的安全领域。 Numen Cyber,作为一家致力于为客户提供专业网络安全服务和一体化安…...
Laravel魔术方法:框架的隐秘力量
Laravel魔术方法:框架的隐秘力量 引言 Laravel是一个充满魔力的PHP框架,它通过许多巧妙的设计让Web开发变得简洁而优雅。在Laravel中,魔术方法(Magic Methods)是这些魔力的体现之一。魔术方法是PHP预定义的、可以在类…...
系统复习Java日志体系
一,我们采用硬编码体验一下几个使用比较多的日志 分别导入几种日志的 jar 包 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSch…...
网络管理linux命令
在Linux系统中,有许多常用的网络命令用于检查网络配置、诊断网络问题以及管理网络连接。以下是一些常用的网络命令及其简要说明: ifconfig 显示或配置网络接口。 ifconfigip 用于显示和操作路由、设备、策略路由和隧道。 ip addr show ip link show ip …...
PowerDNS架构解析与安装部署指南
1、背景介绍 目前公司使用PowerDNS进行DNS管理,但由于采用的是单节点架构,存在不可用的风险。为提升系统的稳定性和可靠性,我们计划对现有架构进行重构。通过引入高可用性设计,我们将优化系统架构,使其能够在故障情况…...
Ubuntu 20.04.6 安装 Elasticsearch
1.准备 -- 系统更新 sudo apt update sudo apt upgrade -- 安装vim 文本编辑器 sudo apt install vim-- jdk 版本确认 java -versionjdk 安装可以参照:https://blog.csdn.net/CsethCRM/article/details/140768670 2.官方下载Elasticsearch 官方地址:h…...
Python for循环迭代原理(迭代器 Iterator)
在使用Python时,我们经常会使用for循环来访问容器对象(列表、字符、字典等)中的元素。其幕后实际是通过迭代协议来完成的,迭代是一种依次访问对象中元素的方式,for循环在对象上调用iter()函数生成一个迭代器࿰…...
通信原理-思科实验四:静态路由项配置实验
实验四 静态路由项配置实验 一:实验内容 二:实验目的 三、实验原理 四、实验步骤 选择三个2811型号的路由器 R1、R2、R3 路由器默认只有两个快速以太网接口,为路由器R1和R3增加快速以太网接口模块NM-1FE-TX,安装后检查路由器的接…...
ngzero使用外部的svg图标
1.将图标svg下下来,放到项目中,路径如下所示 之后 <span nz-icon [nzIconfont]“‘icon-zhibiao’”>使用 2.直接使用阿里的图标 先将你要用的图标放入购物车,再将购物车的图标添加到你主页的我的项目中 之后代码中在startupService…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
