RabbitMQ基础篇
文章目录
- 1 RabbitMQ概述
- 1.1 消息队列
- 1.2 RabbitMQ体系结构
- 2 RabbitMQ工作模式
- 2.1 简单模式(Simple Queue)
- 2.2 工作队列模式(Work Queues)
- 2.3 发布/订阅模式(Publish/Subscribe)
- 2.4 路由模式(Routing)
- 2.5 通配符模式(Topics)
- 3 RabbitMQ整合SpringBoot
- 3.1 @RabbitListener注解属性
- 3.2 消费者工程
- 3.3 生产者工程
- 4 消息可靠性投递
- 4.1 什么是消息可靠投递?
- 4.2 消息的可靠发送
- 4.2.1 消息确认机制
- ①模块准备
- ②配置类说明
- ③配置类示例
- ④测试代码
- 4.2.2 备用交换机
- ①备用交换机配置
- ②备用交换机测试
- 4.3 消息的可靠存储
- 4.3.1 非持久化交换机和队列
- 4.3.2 持久化交换机和消息队列
- ①@Queue注解分析
- ②@Exchange注解分析
- 4.4 消息的可靠消费
- 4.4.1 模块准备
- 4.4.2 手动确认思路
- ①basicAck()方法
- ②basicNack()方法
- ③basicReject()方法
- 4.4.3 可靠消费代码
- 4.5 消息可靠性投递架构
- 5 消费端限流
- 5.1 未设置prefetch
- 5.2 设置prefetch
- 6 消息超时
- 6.1 队列层面设置过期时间
- 6.2 消息层面设置过期时间
- 7 死信和死信队列
- 7.1 准备工作
- 7.1.1 正常交换机和正常消息队列
- 7.1.2 死信交换机和死信队列
- 7.1.3 常量声明
- 7.2 死信--拒绝
- 7.3 死信--超时和溢出
- 8 延时队列
- 8.2 延迟插件的使用
- 8.2.1 生产者端
- 8.2.2 消费者端
- ① ui界面创建延迟交换机和队列
- ②代码创建延迟交换机和队列
- 8.2.3 效果展示
- 9 事务消息
- 9.1 什么是事务消息?
- 9.2 Springboot发送事务消息
- 9.2.1 准备工作
- 9.2.2 没有事务消息
- 9.2.3 使用事务消息
- 9.3 Channel发送事务消息
- 10 惰性队列
- 10.1 队列策略设定
- 10.3 `queue.declare`参数设定
- 11 优先级队列
- 11.1 准备工作
- 11.2 使用优先级队列
1 RabbitMQ概述
RabbitMQ简易安装教程
# 拉取镜像
docker pull rabbitmq:3.13-management# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-managementdocker exec -it 5129c41ad3d8 /bin/bash # 数字是rabbitmq的id,可以通过docker ps查看rabbitmq-plugins enable rabbitmq_management #启用 RabbitMQ Management 插件,使得你可以轻松地监控和管理 RabbitMQ 服务器
访问登录:http://192.168.145.130:15672,账号密码就是上面指定的
1.1 消息队列
消息队列是实现应用程序之间通信的中间件
消息队列的好处
- 消息的发送者和接收者进行异步通信
- 流量高峰保证服务稳定,消息队列可以暂存大量消息,达到流量削峰
- 扩展性高,可以水平扩展以支持更多的发送者和接收者,相应地增加或减少资源处理(功能处理)
- 解耦:消息的发送者和接收者只专注于消息,无需关系彼此细节
主流MQ对比
1.2 RabbitMQ体系结构

-
Channel(信道)
:信道是生产者消费者和RabbitMQ服务器之间通信的桥梁。所有的消息发布和消费都由信道来完成的- 建立在TCP连接上的虚拟连接,允许在单个TCP连接上建立多个信道,从而实现多线程处理
- 每个线程对应一个信道,信道在RabbitMQ中具有唯一的ID,保证了信道的私有性
- 引入信道的概念是为了减少建立和销毁TCP连接的开销,提高系统性能
-
Exchange(交换机)
:负责接收消息并根据路由键将消息转发到绑定的队列 -
Queue(队列)
:队列是RabbitMQ
中用于存储消息的容器,消息按照先进先出的顺序进行处理 -
Virtual Host(虚拟主机)
:是RabbitMQ中的命名空间(理解为分组),用于隔离不同的环境或应用程序。每个虚拟主机都有自己的队列、交换机和绑定关系 -
Broker(代理服务器)
:指RabbitMQ服务器本身,多个Broker组合成一个RabbitMQ集群
2 RabbitMQ工作模式
- 简单模式:生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
- 工作队列模式:生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
- 发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列
- 消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
- 消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列
项目导入依赖:采用原生的方式,开发中都是集成框架的
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
</dependencies>
封装连接工具类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static final String HOST_ADDRESS = "192.168.145.160";public static Connection getConnection() throws Exception { // 定义连接工厂 ConnectionFactory factory = new ConnectionFactory();// 设置服务地址 factory.setHost(HOST_ADDRESS);// 端口 factory.setPort(5672);//设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456");// 通过工程获取连接 Connection connection = factory.newConnection();return connection; }
}
2.1 简单模式(Simple Queue)
生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
生产者:发送消息
public class Producer { public static void main(String[] args) throws Exception {// 获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 // queue 参数1:队列名称 // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在 // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列 // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除 // arguments 参数5:队列其它参数 channel.queueDeclare("simple_queue", true, false, false, null); // 要发送的信息 String message = "你好;小兔子!"; // 参数1:交换机名称,如果没有指定则使用默认Default Exchange // 参数2:路由key,简单模式可以传递队列名称 // 参数3:配置信息 // 参数4:消息内容 channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); // 关闭资源 channel.close(); connection.close();}
}
运行效果:新增队列:simple_queue
消费者
public class Consumer { public static void main(String[] args) throws Exception {// 获取连接Connection connection = ConnectionUtil.getConnection();// 创建ChannelChannel channel = connection.createChannel(); // 创建队列// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建 // 参数1. queue:队列名称 // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在 // 参数3. exclusive:是否独占。 // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 // 参数5. arguments:其它参数。 channel.queueDeclare("simple_queue",true,false,false,null); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){// 回调方法,当收到消息后,会自动执行该方法 // 参数1. consumerTag:标识 // 参数2. envelope:获取一些信息,交换机,路由key... // 参数3. properties:配置信息 // 参数4. body:数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body));}};// 参数1. queue:队列名称 // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息 // 参数3. callback:回调对象 // 消费者类似一个监听程序,主要是用来监听消息 channel.basicConsume("simple_queue",true,consumer);}
}
控制台打印:
消息被消费掉了,所以RabbitMQ服务器上没有了
2.2 工作队列模式(Work Queues)
生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
生产者
public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) {String body = i+"hello rabbitmq~~~";channel.basicPublish("",QUEUE_NAME,null,body.getBytes());} channel.close();connection.close();}
}
发送消息:
消费者1:
public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
消费者2:
public class Consumer2 {static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序
运行结果:两个消费者竞争消息队列中消息
2.3 发布/订阅模式(Publish/Subscribe)
rabbitmq消息通讯过程:消息生产者将消息发送给交换机,由交换机处理消息。Exchange(交换机)只负责转发消息,不存储消息,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
常见的交换机类型
- Fanout Exchange(扇出交换机),将消息发送给所有绑定到交换机的队列
- Direct Exchange(直连交换机),把消息交给符合指定routing key的队列
- Topic Exchange(主题交换机),把消息交给符合routing pattern(路由模式)的队列
- Default Exchange(默认交换机),把消息发送给指定队列
发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列
生产者:
public class Producer {public static void main(String[] args) throws Exception {// 1、获取连接 Connection connection = ConnectionUtil.getConnection();// 2、创建频道 Channel channel = connection.createChannel(); // 参数1. exchange:交换机名称 // 参数2. type:交换机类型 // DIRECT("direct"):定向 // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。 // TOPIC("topic"):通配符的方式 // HEADERS("headers"):参数匹配 // 参数3. durable:是否持久化 // 参数4. autoDelete:自动删除 // 参数5. internal:内部使用。一般false // 参数6. arguments:其它参数 String exchangeName = "test_fanout"; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout,routingKey设置为"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; // 6、发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close();}
}
消费者1:
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");} };channel.basicConsume(queue1Name,true,consumer);}
}
消费者2
public class Consumer2 { public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");}};channel.basicConsume(queue2Name,true,consumer);}
}
先启动两个消费者,再启动生产者发送消息
交换机和队列的绑定关系如下图所示:

发布订阅模式与工作队列模式的区别:
- 工作队列模式消息由默认交换机处理,发布订阅模式消息由指定交换机处理
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
2.4 路由模式(Routing)
消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
当Direct交换机用相同的路由键routing key绑定多个队列,就会有广播效果(类似发布订阅)
生产者:
public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_direct";// 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);// 创建队列 String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";// 声明(创建)队列 channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name, exchangeName, "error");// 队列2绑定info error warning channel.queueBind(queue2Name, exchangeName, "info");channel.queueBind(queue2Name, exchangeName, "error");channel.queueBind(queue2Name, exchangeName, "warning");String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";// 发送消息 channel.basicPublish(exchangeName, "warning", null, message.getBytes());System.out.println(message);// 释放资源 channel.close();connection.close();}
}
消费者1:
public class Consumer1 { public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name = "test_direct_queue1";channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body)); System.out.println("Consumer1 将日志信息打印到控制台.....");}};channel.basicConsume(queue1Name,true,consumer);}
}
消费者2:
public class Consumer2 { public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name = "test_direct_queue2";channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer2 将日志信息存储到数据库.....");}};channel.basicConsume(queue2Name,true,consumer);}
}
先启动两个消费者,再启动生产者
绑定关系:
消费者2接受到消息,消费者1没有消息

2.5 通配符模式(Topics)
消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列
(通配符规则:
#
:匹配零个或多个词,*
:匹配一个词)
生产者:
public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout ,routingKey设置为"" // routing key 常用格式:系统的名称.日志的级别。 // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name, exchangeName, "#.error");channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*");// 分别发送消息到队列:order.info、goods.info、goods.error String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";channel.basicPublish(exchangeName, "order.info", null, body.getBytes());body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());channel.close();connection.close();}
}
消费者1监听消息队列1
public class Consumer1 { public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue1";channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
消费者2监听消息队列2
public class Consumer2 { public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue2";channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
先启动两个消费者,接着启动生产者发送消息
3 RabbitMQ整合SpringBoot
项目基本四步骤基本步骤:建module,改POM,写YAML,主启动
3.1 @RabbitListener注解属性
-
bindings
属性:指定交换机和队列之间的绑定关系,指定当前方法要监听的队列隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们
-
queues
属性@RabbitListener(queues = {QUEUE_LINZHUOWEI})
- 作用:指定当前方法要监听的队列
- 此时框架不会创建相关交换机和队列,必须提前创建好
3.2 消费者工程
-
建module:
module06-boot-consumer
-
改POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--引入web模块为了保证项目一直运行,持久监听消息队列消息--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> </dependencies>
-
写YAML
spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /logging:level:com.linzhuowei.mq.listener.MyMessageListener: info
-
主启动:正常添加
@SpringBootApplication
-
监听器:
import lombok.extern.slf4j.Slf4j; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @Slf4j public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; @RabbitListener(bindings = @QueueBinding(//队列信息value = @Queue(value = QUEUE_NAME, durable = "true"),//交换机信息exchange = @Exchange(value = EXCHANGE_DIRECT),//路由键信息,赋值为字符串数组key = {ROUTING_KEY}))public void processMessage(//对应消息数据本身,形参类型需要和发送消息的数据类型对应String data,//对应发送的消息本身,可以通过message获取消息数据,包括dataMessage message,//频道对象Channel channel) {log.info(data);}}
运行查看后台管理:

如图:交换机exchange.direct.order
通过order
路由键绑定消息队列
题外话:
- 使用
@RabbitListener
的bindings
属性能绑定交换机和队列的关系并监听队列消息,如果RabbitMQ
服务中没有交换机和队列,则会自动创建该队列- 使用
@RabbitListener
的queues
属性,监听指定消息队列所以如果只是单纯监听消息队列,不考虑交换机和队列的创建以及绑定(因为这些创建操作可以在后台页面点击完成嘿嘿),消费者代码也可以这样写:
@RabbitListener(queues = {QUEUE_LINZHUOWEI}) public void processMessage(//对应消息数据本身,形参类型需要和发送消息的数据类型对应String data,//对应发送的消息本身,可以通过message获取消息数据,包括dataMessage message,//频道对象Channel channel) {log.info(data); }
但建议还是写第一种
3.3 生产者工程
-
新建模块:
module05-boot-producer
-
改POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency> </dependencies>
-
写YAML
spring: rabbitmq: host: 192.168.145.130port: 5672 username: guest password: 123456 virtual-host: /
-
主启动
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args); } }
-
测试程序
@SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order";@Autowired private RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello linzhuowei"); } }
4 消息可靠性投递
4.1 什么是消息可靠投递?
消息可靠投递是确保消息从生产者发送到消息队列,再从消息队列消费到消费者的过程中,不丢失消息或重复处理消息
消息可靠投递主要三个方面:
- 消息的可靠发送(生产者 -> 消息队列)
- 消息的可靠存储(消息队列内部存储)
- 消息的可靠消费(消息队列 -> 消费者)
下面分别说这三个部分
4.2 消息的可靠发送
通过消息发送回调接口或备用交换机保证消息从生产者成功发送到消息队列中
4.2.1 消息确认机制
应答确认+ 失败重试
生产者发送消息后等待消息队列的响应,确保消息成功送达,如果发送失败可以尝试重新发送
①模块准备
-
新建模块:
module07-confirm-producer
-
改POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> </dependencies>
-
主启动
-
写YAML:启用消息确认机制
spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认 logging:level:com.linzhuowei.mq.config.MQProducerAckConfig: info
②配置类说明
通过配置类设置RabbitTemplate的回调接口,通过回调方法获取RabbitMQ服务器返回的确认信息,实现消息确认机制
代码实现过程:配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中
两个接口两个回调方法,是否发送到交换机和是否发送到消息队列
方法名 | 方法功能 | 所属接口 | 接口所属类 |
---|---|---|---|
confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
-
ConfirmCallback接口(RabbitTemplate内部的接口)
/*** A callback for publisher confirmations.**/ @FunctionalInterface public interface ConfirmCallback {/*** Confirmation callback.* @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);}
生产者端发送消息之后,回调
confirm()
方法- ack参数值为true:表示消息成功发送到了交换机
- ack参数值为false:表示消息没有发送到交换机
-
ReturnCallback接口(RabbitTemplate内部的接口)
/*** A callback for returned messages.** @since 2.3*/ @FunctionalInterface public interface ReturnsCallback {/*** Returned message callback.* @param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);}
接口中的
returnedMessage()
方法仅在消息没有发送到队列时调用
ReturnedMessage
类中主要属性含义如下:
属性名 | 类型 | 含义 |
---|---|---|
message | org.springframework.amqp.core.Message | 消息以及消息相关数据 |
replyCode | int | 应答码,类似于HTTP响应状态码 |
replyText | String | 应答码说明 |
exchange | String | 交换机名称 |
routingKey | String | 路由键名称 |
③配置类示例
配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功!数据:" + correlationData);} else {log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息发送到消息队列失败...");log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}
④测试代码
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowired private RabbitTemplate rabbitTemplate;@Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello atguigu"); } }
通过调整代码,测试如下三种情况:
- 交换机正确、路由键正确
- 交换机正确、路由键不正确,无法发送到队列
- 交换机不正确,无法发送到交换机
回顾交换机作用:接收消息并路由到消息队列
4.2.2 备用交换机
①备用交换机配置
当消息在队列中未被处理时(如消息过期、消息被拒绝或达到最大重试次数,无匹配队列等),这些消息就会转发到备用交换机
本次案例模拟交换机没有匹配的消息队列,消息转至备用交换机
-
首先创建备用交换机(扇出类型):
exchange.direct.order.backup
-
创建备用消息队列
queue.order.backup
-
将备用消息队列绑定备用交换机
\
-
重新创建原交换机(置顶备用交换机)
exchange.direct.order
需要删除原来的直连交换机,重新创建直连交换机,并设置备用交换机
exchange.direct.order.backup
-
原交换机绑定原队列
②备用交换机测试
消息发送端:
@SpringBootTest
public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order";//exchange.direct.order交换机绑定的路由键为order,这里order路由键错误,会转到备用交换机public static final String ROUTING_KEY = "order1";@Autowired private RabbitTemplate rabbitTemplate;@Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY+"11","Hello 备用交换机");} }
结果:消息发送成功,消息先发往直连交换机exchange.direct.order
,由于路由键无效,没有匹配的消息队列,所以消息发往备用的扇出交换机exchange.direct.order.backup
,最终发送到消息队列queue.test.backup
4.3 消息的可靠存储
通过将消息持久化到硬盘上防止消息队列宕机导致内存中消息丢失(交换机默认持久化,消息队列有指定也是默认持久化)
4.3.1 非持久化交换机和队列
即消息在内存存储,重启消息丢失
-
创建非持久化交换机
-
创建非持久化消息队列
-
绑定交换机和消息队列的关系
测试:发送消息后,队列成功收到消息。
docker restart rabbitmq
重启rabbitmq,内存的消息丢失,内存掉电设备
4.3.2 持久化交换机和消息队列
先来看卡监听消息队列的写法
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);
}
关注@RabbitListener
中,@QueueBinding
中的value和exchange两个注解,分别是Queue
和Exchange
类型
①@Queue注解分析
@Queue
注解抽出关注的部分
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {/*** Specifies if this queue should be durable.* By default if queue name is provided it is durable.* @return true if the queue is to be declared as durable.* @see org.springframework.amqp.core.Queue#isDurable()*/String durable() default "";/*** Specifies if this queue should be auto deleted when not used.* By default if queue name is provided it is not auto-deleted.* @return true if the queue is to be declared as auto-delete.* @see org.springframework.amqp.core.Queue#isAutoDelete()*/String autoDelete() default "";
}
durable
属性:By default if queue name is provided it is durable
autoDelete
属性:By default if queue name is provided it is not auto-deleted
翻译就是:只要消息队列指定,默认持久化且不自动删除
②@Exchange注解分析
@Exchange
注解抽出有用的部分
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {/*** @return false if the exchange is to be declared as non-durable.*/String durable() default TRUE;/*** @return true if the exchange is to be declared as auto-delete.*/String autoDelete() default FALSE;
}
durable
属性默认true:false if the exchange is to be declared as non-durable
autoDelete
属性默认false:true if the exchange is to be declared as auto-delete
交换机默认持久化
4.4 消息的可靠消费
消息确认机制
- 自动确认:消费者接收消息后自动返回ACK确认,RabbitMQ删除消息。自动确认机制,消息处理失败会导致消息丢失(因为消息已删)
- 手动确认:消费者处理消息成功后,显式发送ACK给消息队列,通知RabbitMQ消息成功消费删除消息,消费者处理消息失败后,显示发送NACK给消息队列,通知RabbitMQ消息消费失败,执行相应的失败策略。手动确认机制保证消息的可靠消费
4.4.1 模块准备
-
POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> </dependencies>
-
YAML:开启手动确认机制
spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认
-
主启动
-
消息监听:其实
durable
和autoDelete
可以不设置,默认值就是这样的@Component public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) {}}
4.4.2 手动确认思路
- 步骤1:YAML配置文件把消息确认模式改为手动确认
- 步骤2:调用Channel对象的方法返回信息
- ACK:Acknowledgement,表示消息处理成功
- NACK:Negative Acknowledgement,表示消息处理失败
- Reject:拒绝,同样表示消息处理失败
- 步骤3:拒绝或者消息处理失败的后续操作
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
①basicAck()方法
- 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
②basicNack()方法
- 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
③basicReject()方法
- 方法功能:根据指定的deliveryTag,对该消息表示拒绝
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
basicNack()和basicReject()有啥区别?
- basicNack()有批量操作
- basicReject()没有批量操作
Fanout交换机,同一个消息广播到不同的队列,deliveryTag会重复吗?不会,deliveryTag在Broker范围内唯一
4.4.3 可靠消费代码
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {// 1、获取当前消息的 deliveryTag 值备用long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 2、正常业务操作log.info("消费端接收到消息内容:" + dataString);// System.out.println(10 / 0);// 3、给 RabbitMQ 服务器返回 ACK 确认信息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 4、获取信息,看当前消息是否曾经被投递过Boolean redelivered = message.getMessageProperties().getRedelivered();if (!redelivered) {// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列channel.basicReject(deliveryTag, false);}}}
}
4.5 消息可靠性投递架构
MQ是系统解耦利器,能很好解除消息发送者和消息接收者之间的耦合。如何保证消息可靠性投递?通过前面我们知道主要分消息可靠发送,消息可靠存储,消息可靠消费。这一小节我们用另一个角度分析
要保证消息可靠性投递,我们分上下两个半场
-
上半场123分别对应:1发送方调用主动API发送消息,2MQ服务端收到消息并将消息落库(持久化),3发送方收到回调ACK(确认消息成功投递到MQ服务器)
timer起作用:步骤 3,如果发送方没有收到回调确认(比如服务端由于网络问题或者其他原因未能正确发送 ACK),则发送方会启动一个定时器,尝试重新发送消息。如果多次发送失败(超时),发送方会向业务方回调发送失败,这通常是在重试机制达到最大次数或超时后触发的
-
下半场456分别对应:4消费端接收消息处理业务逻辑,5接收方(消费者)发送 ACK 回应消息处理成功,6MQ服务端收到ACK并将库中的消息删除
timer起作用:步骤 5,消费者没有及时发送 ACK(比如消费者处理超时或发生了异常),MQ 服务端会启动定时器等待 ACK
如果 MQ 服务端在规定时间内没有收到消费者的 ACK,
timer
会触发重试机制,可能重新将消息投递到消费者,只到确认消息被处理并收到 ACK 后,消息才会从 MQ 服务端的持久化存储中删除,以确保消息的可靠性
上下半场均有重发,重发策略有定时重发(如每个10s重发直到超出次数)和指数退避(X秒重发,2X秒重发,4X秒重发)
综合来看关键点在于如何保证消息幂等
- 上半场消息幂等:发送方没有收到回调ACK,会重新发送消息到MQ服务器。上半场的消息幂等性有MQ服务器完成,MQ会为每条消息生成全局唯一的message ID用作去重和幂等依据(上半场消息幂等由MQ服务器完成无需关注)
- 下半场消息幂等:MQ服务端超时未收到ACK,导致MQ重复投递消息。业务方会收到重复消息,业务方需要保证消息幂等性。比如消息携带全局唯一id用于保证幂等,再处理消息前判断即可
5 消费端限流
利用消息队列的削峰限流,平滑流量避免大量请求涌入,限制请求数量,避免对后端服务造成过大的压力
常见的削峰限流策略有:
通过prefetch来设置消费者**同时接收未确认的消息的数量**,每次预取的消息数量来实现流量削峰
5.1 未设置prefetch
首先向消息队列中发送100个消息
public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}
- Ready100
- Unack0
- Total100
消息消费者监听对应的消息队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dataString,Message message,Channel channel) throws InterruptedException, IOException {// 正常业务操作log.info("消费端接收到消息内容:" + dataString);//如果不睡1秒,瞬间为0TimeUnit.SECONDS.sleep(1);//手动确认ACKchannel.basicAck(deliveryTag, false);
}
显示结果:Ready直接为0,Unack和Total逐渐减少直到0
5.2 设置prefetch
修改YAML
spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 # 设置每次最多从消息队列服务器取回多少消息(同时接收未确认的消息的数量)
首先发送消息:
public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}
消息消费者监听对应的消息队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dataString,Message message,Channel channel) throws InterruptedException, IOException {// 正常业务操作log.info("消费端接收到消息内容:" + dataString);//如果不睡1秒,瞬间为0TimeUnit.SECONDS.sleep(1);//手动确认ACKchannel.basicAck(deliveryTag, false);
}
效果,监听者每次只取一个消息消费,同时未确认消息只有prefetch
个
6 消息超时
设置过期时间,消息超过过期时间自动删除(更准确的说超时消息会变成死信)
可通过两个层面设置过期时间
- 队列层面:设置队列的消息过期时间,队列内的消息超出过期时间自动删除
- 消息层面:设置具体某个消息的过期时间,消息超出过期时间自动删除
如果两个层面都有设置,以过期时间短的为准
6.1 队列层面设置过期时间
创建交换机
创建消息队列,并设置过期时间10000毫秒
绑定交换机
发送消息,不启动消费端,等待消息过期
6.2 消息层面设置过期时间
MessagePostProcessor 是 Spring Framework 的接口,在消息发送前对消息进行处理和修改。通过接口MessagePostProcessor接口在消息层面设置过期时间
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;@Test
public void testSendMessageTTL() { // 1、创建消息后置处理器对象 MessagePostProcessor messagePostProcessor = (Message message) -> { // 设定 TTL 时间,以毫秒为单位message.getMessageProperties().setExpiration("5000"); return message;};// 2、发送消息 rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello linzhuowei", messagePostProcessor);
}
原来消息队列queue.test.timeout
过期时间10000毫秒,消息层面设置过期时间5000毫秒,以短的过期时间为标准,发送消息,等待消息过期
7 死信和死信队列
无法正常被消费的消息就称为死信
死信的原因有三种(就是消息没有被正常消费):
- 拒绝:消费者拒绝消息,
basicNack()/basicReject()
,并且不把消息重新放回原目标队列(requeue=false
) - 超时:消息达到超时时间未被消费
- 溢出:队列中消息数量达到最大限制,根据队列先进先出原理,后来再进入一条消息,队列中最早的消息会变成死信
死信的处理方式大致三种:
- 丢弃:不处理,死信直接丢弃
- 入库:死信写入数据库,日后处理
- 监听:死信进入死信队列,消费端监听死信队列,做后序处理(通常采用)
下面分别演示三种死信成因
7.1 准备工作
7.1.1 正常交换机和正常消息队列
- 正常交换机:exchange.normal.video
- 正常队列:queue.normal.video
- 正常路由键:routing.key.normal.video
-
创建正常交换机
-
创建正常队列,写好死信队列和死信交换机
-
绑定正常消息队列和正常交换机
完成设施后设置如下
7.1.2 死信交换机和死信队列
- 死信交换机:exchange.dead.letter.video
- 死信队列:queue.dead.letter.video
- 死信路由键:routing.key.dead.letter.video
-
创建死信交换机
-
创建死信队列
-
绑定死信队列和死信交换机
7.1.3 常量声明
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video"; public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video"; public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
7.2 死信–拒绝
-
发送端发送消息到正常交换机
@Test public void testSendMessageButReject() { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "★[normal]发送消息--正常交换机--正常消息队列..."); }
-
消费端监听正常消息队列和死信队列
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component @Slf4j public class DeadLetterListener {public static final String QUEUE_NORMAL = "queue.normal.video";public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";@RabbitListener(queues = {QUEUE_NORMAL})public void processMessageNormal(Message message, Channel channel) throws IOException {// 消费端监听正常消息队列,接收并拒绝消息log.info("★[normal]接收消息,但拒绝消息且不重新放入队列...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {QUEUE_DEAD_LETTER})public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {// 消费端监听死信队列,接收并成功消费消息log.info("★[dead letter]监听死信队列,接收到死信消息...");log.info("★[dead letter]dataString = " + dataString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
先启动消费端监听死信队列和正常队列,再向正常消息队列发送消息
过程:发送端将消息发送到正常消息队列,监听正常消息队列的消费者接收消息并拒绝,消息通过死信交换机路由到死信队列,监听死信队列的消费者接收并成功消费。
正常消息队列:queue.normal.video
,由于消息刚到达就被消费者接收,所以Queued messages
没有变化
同一时间,死信队列也是刚接收消息就被消费端消费,所以Queued messages
没有变化
消费端控制台打印:
★[normal]接收消息,但拒绝消息且不重新放入队列...
★[dead letter]监听死信队列,接收到死信消息...
★[dead letter]dataString = ★[normal]发送消息--正常交换机--正常消息队列...
7.3 死信–超时和溢出
前面创建正常消息队列时就置顶了正常消息队列最大消息数为10(
x-max-length=10
)且最大生存时间为10s(x-message-ttl=10000
)
先关闭消费者,向正常消息队列发送20条消息
@Test
public void testSendMessageButReject() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"★[normal]发送消息--正常交换机--正常消息队列...");}
}
- 发送者发送20条消息(m1,m2,…,m19,m20)
- 前十条消息(m1,m2,…,m9,m10)正常进入消息队列,到达最大消息数
- 后十条消息(m11,m12,…,m19,m20)进入消息队列,根据队列先进先出,前十条消息(m1,m2,…,m9,m10)溢出
- 这十条消息(m1,m2,…,m9,m10)通过死信交换机进入死信队列(对应死信队列第一个上坡)
- 后十条消息(m11,m12,…,m19,m20)超过10s未被消费,超时,后十条消息(m11,m12,…,m19,m20)也进入死信队列(对应死信队列第二个上坡)
消费者端省略,就还是监听然后消费…
8 延时队列
延时队列有两种实现思路
- 借助超时时间+死信队列来实现延时队列
- 通过RabbitMQ插件来完成延时队列
插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
需要将插件放入rabbitmq中容器的?/plugins
目录,我们来看看该目录映射到宿主机的哪个目录?
docker inspect rabbitmq
运行结果:
"Mounts": [{"Type": "volume","Name": "rabbitmq-plugin","Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data","Destination": "/plugins","Driver": "local","Mode": "z","RW": true,"Propagation": ""},{"Type": "volume","Name": "0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4","Source": "/var/lib/docker/volumes/0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4/_data","Destination": "/var/lib/rabbitmq","Driver": "local","Mode": "","RW": true,"Propagation": ""}],
和容器内/plugins
目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data
## 8.1 下载延迟插件
RabbitMQ社区插件:https://www.rabbitmq.com/community-plugins.html
延迟插件:
下载插件安装文件,并移动到对应目录
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
启用插件
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 退出Docker容器
exit# 重启Docker容器
docker restart rabbitmq
延迟插件启动成功:
8.2 延迟插件的使用
8.2.1 生产者端
通过MessageProcessor来设置延迟时间
@Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",messageProcessor -> {// 设置延迟时间:以毫秒为单位messageProcessor.getMessageProperties().setHeader("x-delay", "10000");return messageProcessor;});
}
8.2.2 消费者端
① ui界面创建延迟交换机和队列
使用插件创建交换机exchange.delay.happy
使用rabbitmq_delayed_message_exchange
插件要求交换机type=x-delayed-message
,并通过x-delayed-type
设置交换机的类型(direct、fanout、topic
),创建方式如下:
创建消息队列queue.delay.video
并绑定exchange.delay.happy
交换机
@Component
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException { log.info("[生产者]" + dataString);log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
②代码创建延迟交换机和队列
@Component
@Slf4j
public class MyDelayMessageListener { public static final String EXCHANGE_DELAY = "exchange.delay.video";public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"), exchange = @Exchange( value = EXCHANGE_DELAY, durable = "true", autoDelete = "false", type = "x-delayed-message", arguments = @Argument(name = "x-delayed-type", value = "direct")), key = {ROUTING_KEY_DELAY} )) public void process(String dataString, Message message, Channel channel) throws IOException { log.info("[生产者]" + dataString); log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
8.2.3 效果展示
前面消息可靠投递中说过,消息发送后回调
confirm()
,而returnMessage()
只有在消息发送失败才会回调,但是使用rabbitmq_delayed_message_exchange
插件后,即使消息成功发送到队列上,也会导致returnedMessage()
方法执行(问题不大嘛)
消费端效果:
[生产者]测试基于插件的延迟消息 [12:41:29]
[消费者]12:41:39
9 事务消息
9.1 什么是事务消息?
将生产者发送消息的操作打包成一个原子操作,要么全部成功要么全部失败,通过事务消息保证消息发送的原子性
RabbitMQ 的事务消息有点类似 Spring 的事务,分为开始事务、提交事务、回滚事务。
txSelect()
:开始事务,使用txSelect()
开启事务。txCommit()
:提交事务,如果txCommit()
提交事务成功了,则消息一定会发送到RabbitMQ
。txRollback()
:回滚事务,如果在执行txCommit()
之前RabbitMQ
发生了异常,txRollback()
会捕获异常进行回滚。
RabbitMQ
发送事务消息流程:txSelect
开启事务,消息发送到 RabbitMQ
缓存,接着 txCommit
提交事务,txCommit
成功后则消息一定发送到了 RabbitMQ。
如果在 txCommit
完成前出现任何异常,我们就捕获这个异常然后执行 txRollback
进行回滚操作,整个过程跟 Spring 的事务机制没太大的区别。因此,我们可以通过 RabbitMQ 事务机制保证消息一定可以发送成功。
了解了 RabbitMQ 的事务消息机制,接下来我们就分享两种方式来实现 RabbitMQ 事务消息
9.2 Springboot发送事务消息
9.2.1 准备工作
-
改pom.xml
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> </dependencies>
-
写yaml
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
-
主启动
-
事务配置
@Configuration @Data public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;} }
9.2.2 没有事务消息
没有事务消息,无法保证消息发送原子性
@SpringBootTest
@Slf4j
public class RabbitMQTest {public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");// 2、抛出异常log.info("do bad:" + 10 / 0);// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");}}
抛出异常前的消息发送了,抛异常后的消息没有发送:
9.2.3 使用事务消息
因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控
@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");log.info("do bad:" + 10 / 0);// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}
9.3 Channel发送事务消息
工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RabbitMqUtil {public static Channel getChannel() {// 创建一个连接工厂,并设置MQ的相关信息ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxxx");factory.setUsername("xxx");factory.setPassword("xxx");factory.setVirtualHost("/xxx");Channel channel = null;try {// 创建连接Connection connection = factory.newConnection();// 获取信道channel = connection.createChannel();} catch (Exception e) {log.error("创建 RabbitMQ Channel 失败", e);e.printStackTrace();}return channel;}
}
Channel发送事务消息
import com.rabbitmq.client.Channel;
import com.user.service.util.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Slf4j
@Component
public class RabbitTransactionChannelProducer {@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")public void sendTransactionaChannelMessage(String message) {//获取 ChannelChannel channel = RabbitMqUtil.getChannel();try {//开启事务channel.txSelect();//发送消息channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "1").getBytes(StandardCharsets.UTF_8));channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "2").getBytes(StandardCharsets.UTF_8));//发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了//int a = 1 / 0;channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "3").getBytes(StandardCharsets.UTF_8));//提交事务channel.txCommit();} catch (Exception e) {//回滚事务try {channel.txRollback();} catch (IOException ex) {log.error("txRollback error", e);ex.printStackTrace();}e.printStackTrace();} finally {try {channel.close();} catch (Exception e) {log.error("channel close error", e);e.printStackTrace();}}}
}
10 惰性队列
创建队列分两种:
-
default默认消息队列:消息存储在内存,当队列内存限制触发才会将部分消息移到磁盘
-
lazy惰性消息队列:消息尽可能地保存在磁盘,内存中只保持必要的元数据
惰性队列,将消息尽可能地保存在磁盘,减少内存的使用。有效防止由于队列消息过多导致的内存溢出,是处理需要处理大量消息但内存有限的场景。但是由于消息存于磁盘,生产者发送消息和消费者消费比普通队列慢,尤其在高吞吐场景
队列创建置顶模式方式有:使用队列策略(建议)和设置queue.declare
参数
如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
10.1 队列策略设定
# 登录Docker容器
docker exec -it rabbitmq /bin/bash# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
-
rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
-
set_policy是子命令,表示设置策略
-
Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的
-
"^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
-
'{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
-
–-apply-to参数指定该策略将应用于队列(queues)级别
-
命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列
如果需要修改队列模式可以执行如下命令(不必删除队列再重建):
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
10.3 queue.declare
参数设定
参数
x-queue-mode
设定队列创建模式,lazy
和default
(默认)
Java代码原生API设置方式:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
Java代码注解设置方式:
@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")
})
11 优先级队列
优先级队列允许你根据消息的优先级来处理消息。消息默认先进先出,通过设置不同的优先级值,消费者可以优先处理重要或紧急的消息,而延迟处理优先级较低的消息
11.1 准备工作
-
创建交换机:
exchange.test.priority
-
创建消息队列:
queue.test.priority
RabbitMQ消息优先级范围 1到255 ,建议使用 1到5(数字越大优先级越高)
通过设置
x-max-priority
来指定消息队列的最大优先级,默认为0。而消息的优先级不能大于x-max-priority
,所以使用优先级队列一定要指定x-max-priority
,这里指定为x-max-priority=10
-
改POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> </dependencies>
-
YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
-
主启动
11.2 使用优先级队列
不要启动消费者程序,让多条不同优先级的消息滞留在队列中
- 第一次发送优先级为1的消息
- 第二次发送优先级为2的消息
- 第三次发送优先级为3的消息
先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息
消息生产者:
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_PRIORITY = "exchange.test.priority";public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {//第一次发送优先级为1的消息rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{message.getMessageProperties().setPriority(1);return message;});//第二次发送优先级为2的消息//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{// message.getMessageProperties().setPriority(2);// return message;//});//第三次发送优先级为3的消息//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{// message.getMessageProperties().setPriority(3);// return message;//});}}
消费端:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyMessageProcessor {public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
效果:
I am a message with priority 3.
I am a message with priority 2.
I am a message with priority 1.
相关文章:

RabbitMQ基础篇
文章目录 1 RabbitMQ概述1.1 消息队列1.2 RabbitMQ体系结构 2 RabbitMQ工作模式2.1 简单模式(Simple Queue)2.2 工作队列模式(Work Queues)2.3 发布/订阅模式(Publish/Subscribe)2.4 路由模式(R…...

GPT-5 传言:一场正在幕后发生的 AI 变革
新的一年,让我们从一个引人入胜的话题开始:如果我告诉你,GPT-5 并非虚构,而是真实存在呢?它不仅真实存在,而且正在你看不见的地方悄然塑造着世界。我的基本假设是:OpenAI 已经秘密开发出 GPT-5&…...

CSS布局与响应式
学习链接 Grid网格布局 前端五大主流网页布局 flex布局看这一篇就够了 grid布局看这一篇就够了 用六个案例学会响应式布局 伸缩盒响应式页面布局实战 实现响应式布局的五种方式 - csdn 如何完成响应式布局,有几种方法?看这个就够了 响应式布局总…...

C++的auto_ptr智能指针:从诞生到被弃用的历程
C作为一种功能强大的编程语言,为开发者提供了众多便捷的特性和工具,其中智能指针是其重要特性之一。智能指针能够自动管理内存,有效避免内存泄漏等常见问题。然而,并非所有智能指针都尽善尽美,auto_ptr便是其中的一个例…...
iOS - Objective-C 底层实现中的哈希表
1. 关联对象存储(AssociationsHashMap) // 关联对象的哈希表实现 typedef DenseMap<const void *, ObjcAssociation> ObjectAssociationMap; typedef DenseMap<DisguisedPtr<objc_object>, ObjectAssociationMap> AssociationsHashMa…...
什么是软件架构
什么是软件架构 程序员说,软件架构是要决定编写哪些C程序或OO类、使用哪些库和框架 程序经理说,软件架构就是模块的划分和接口的定义 系统分析员说,软件架构就是为业务领域对象的关系建模 配置管理员说,软件架构就是开发出来的…...

【Golang/nacos】nacos配置的增删查改,以及服务注册的golang实例及分析
前言 本文分析的实例来源于nacos在github上的开源仓库 nacos配置的增删查改 先具体来看一段代码,我将逐步分析每一段的作用 package mainimport ("fmt""time""github.com/nacos-group/nacos-sdk-go/clients""github.com/naco…...

RabbitMQ集群安装rabbitmq_delayed_message_exchange
1、单节点安装rabbitmq安装延迟队列 安装延迟队列rabbitmq_delayed_message_exchange可以参考这个文章: rabbitmq安装延迟队列-CSDN博客 2、集群安装rabbitmq_delayed_message_exchange 在第二个节点 join_cluster 之后,start_app 就会报错了 (CaseC…...

Linux UDP 编程详解
一、引言 在网络编程领域,UDP(User Datagram Protocol,用户数据报协议)作为一种轻量级的传输层协议,具有独特的优势和适用场景。与 TCP(Transmission Control Protocol,传输控制协议࿰…...

【2024年华为OD机试】(B卷,100分)- 计算最接近的数 (Java JS PythonC/C++)
一、问题描述 题目解析 我们需要找到一个下标 i,使得表达式 X[i] - X[i 1] - ... - X[i K - 1] 的结果最接近于数组的中位数。如果有多个 i 满足条件,则返回最大的 i。 关键点: 中位数计算: 将数组排序后,中位数…...
Pytorch 自学笔记(三):利用自定义文本数据集构建Dataset和DataLoader
Pytorch 自学笔记(三) 1. Dataset与DataLoader1.1 torch.utils.data.Dataset1.2 torch.utils.data.DataLoader Pytorch 自学笔记系列的第三篇。针对Pytorch的Dataset和DataLoader进行简单的介绍,同时,介绍如何使用自定义文本数据集…...

QT 使用QSqlTableModel对数据库进行创建,插入,显示
文章目录 效果图概述功能点代码分析初始数据插入数据数据显示 总结 效果图 概述 本案例用于对数据库中的数据进行显示等其他操作,其他表格筛选,过滤等功能可看此博客 框架:数据模型使用QSqlTableModel,视图使用QTableView&#x…...
如何学习Transformer架构
Transformer架构自提出以来,在自然语言处理领域引发了革命性的变化。作为一种基于注意力机制的模型,Transformer解决了传统序列模型在并行化和长距离依赖方面的局限性。本文将探讨Transformer论文《Attention is All You Need》与Hugging Face Transform…...

浅谈云计算22 | Kubernetes容器编排引擎
Kubernetes容器编排引擎 一、Kubernetes管理对象1.1 Kubernetes组件和架构1.2 主要管理对象类型 二、Kubernetes 服务2.1 服务的作用与原理2.2 服务类型 三、Kubernetes网络管理3.1 网络模型与目标3.2 网络组件3.2.1 kube-proxy3.2.2 网络插件 3.3 网络通信流程 四、Kubernetes…...
计算 SAMOut V3 在将词汇表从1万 增加到6千万的情况下能够减少多少参数
当我们将词汇表从 60,000,000(六千万)减少到 10,000 时,实际上是在缩小模型的词嵌入层及其共享的语言模型头(LM Head)的规模。这将导致参数量显著减少。我们可以通过以下步骤来计算具体的参数减少量。 参数量减少计算…...
03.选择排序
一、题目思路 选择排序是一种简单直观的排序算法。它的工作原理是:首先在未排序序列中找到最小(或最大)元素,存放到排序序列的起始位置,然后,再从剩余未排序元素中继续寻找最小(或最大ÿ…...

02_登录窗口
新建场景 重命名为GameRoot 双击GameRoot进入新场景 同样摄像机清除格式 删除平行光并关闭渲染灯光的天空盒 新建空节点重命名为GameRoot GameRoot为游戏的根节点 在整个游戏中都不会被删除 在游戏的根节点下创建UI的根节点Canvas 创建一个空节点 作为UI根节点下的 登录场景UI…...

NodeJS | 搭建本地/公网服务器 live-server 的使用与安装
目录 介绍 安装 live-server 安装方法 安装后的验证 环境变量问题 Node.js 环境变量未配置正确 全局安装的 live-server 路径未添加到环境变量 运行测试 默认访问主界面 访问文件 报错信息与解决 问题一:未知命令 问题二:拒绝脚本 公网配置…...

SystemUI 实现音量条同步功能
需求:SystemUI 实现音量条同步功能 具体问题 以前在SystemUI 下拉框添加了音量条控制,目前发现在SystemUI下拉框显示状态的情况下, 按键或者底部虚拟导航点击音量加减时候,SystemUI音量条不更新。 如下图:两个Syste…...

嵌入式知识点总结 C/C++ 专题提升(一)-关键字
针对于嵌入式软件杂乱的知识点总结起来,提供给读者学习复习对下述内容的强化。 目录 1.C语言宏中"#“和"##"的用法 1.1.(#)字符串化操作符 1.2.(##)符号连接操作符 2.关键字volatile有什么含意?并举出三个不同的例子? 2.1.并行设备的硬件寄存…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...

vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...

零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...