RabbitMQ (4)
RabbitMQ (4)
文章目录
- 1. 死信的概念
- 2. 死信的来源
- 3. 死信代码案例
- 3.1 TTL 过期时间
- 3.2 超过队列最大长度
- 3.3 拒绝消息
前言
上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列
1. 死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
2. 死信的来源
- 消息 TTL 过期 : TTL 是 Time To Live 的缩写, TTL 就是 生存时间
- 队列达到最长长度 : 队列满了 , 无法添加数据到 MQ 中
- 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue = false
3. 死信代码案例
这里 创建一个 direct 交换机 ,两个消费者 , 一个生产者 , 两个 队列 (一个为 消息队列 , 一个为死信队列)
图:
代码 :
3.1 TTL 过期时间
生产者:
package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}
消费者 c1 (启动之后关闭该消费者, 模拟其接受不到消息)
package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
// arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}
先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息
c1 看完,我们在来写 c2 消费者 ,将进入到死信队列的消息 进行消费.
消费者c2
package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}
图:
看完 消息过期后 ,消息转发到 死信队列 被 c2 消费,下面我们来 尝试使用 死信最大长度 (队列满了,将多的消息转发到死信队列中)
3.2 超过队列最大长度
消息生产者代码 去掉 TTL 属性 , 将 basicPublish
的第三个参数改为 null
生产者:
package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10s
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;
// channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
c1 消费者 (启动之后关闭该消费者 模拟其接收不到消息)
package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
// arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}
注意:
这参数改变了(没有设置 ttl 时间,新增了 队列的 最大长度限制 为 6) ,所以 需要把原来队列删除
消费者c2 代码不变
package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}
效果:
这里 之所以要启动 c1 后在关闭,是为了展示 6个消息放到 普通队列 ,4个消息放到死信队列, 如果不这么做,发送的 10个消息 都会被 c1 消费 ,(消息发送到 队列后 , 立马 转发给 c1 导致 队列就不会达到 6 个 ,队列不会满 ,也就不会将消息转化给 死信队列).
3.3 拒绝消息
消息生产者 和 消费者 c2 与上面的代码一样
这里我们 拒绝 info7 消息 ,想要 拒绝 info7 消息,我们可以采用手动应答.
消费者c1
package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
// arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信
// arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {String msg = new String(message.getBody(), "UTF-8");if (msg.equals("info7")) {System.out.println("C1 接收到消息为: " + msg + " 此消息被 C1 拒绝");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("C1 接收到的消息为: " + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};
// channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});// 开启 手动应答channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (tag) -> {});}
}
相关文章:

RabbitMQ (4)
RabbitMQ (4) 文章目录 1. 死信的概念2. 死信的来源3. 死信代码案例3.1 TTL 过期时间3.2 超过队列最大长度3.3 拒绝消息 前言 上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列 1. 死信的概念 先从概念解释上搞清楚这…...

导入Embassy库进行爬虫
Embassy是一个基于Lua的轻量级爬虫框架,可以方便地进行网页抓取和数据提取。它提供了简单易用的接口和丰富的功能,可以帮助开发者快速构建爬虫应用。 要使用Embassy进行爬虫,首先需要安装Embassy库。可以通过Lua的包管理工具luarocks来安装E…...

GoLong的学习之路(十三)语法之标准库 log(日志包)的使用
上回书说到,flag的问题。这回说到日志。无论是软件开发的调试阶段还是软件上线之后的运行阶段,日志一直都是非常重要的一个环节,我们也应该养成在程序中记录日志的好习惯。 文章目录 log配置logger配置日志前缀配置日志输出位置自定义logger …...

别处拿来的VUE项目 npm run serve报错
问题现象: 从别处拷贝来的VUE项目,根据说明通过npm install 加载了项目依赖 ,但是运行npm run serve里报错: npm ERR! Missing script: "serve" npm ERR! npm ERR! To see a list of scripts, run: npm ERR! npm ru…...

Istio 运行错误 failed to update resource with server-side apply for obj 问题解决
Istio 环境 kubernetes version: v1.18.2 istio version: v1.10.0运行之后 istio-operator 的日志就抛出下面错误,而且会一直重启 # kubectl get iop -A NAMESPACE NAME REVISION STATUS AGE istio-system iop-pro-cluster…...

分布式事务(Seata)——Seata分布式事务XA模式、AT模式、TCC模式的介绍和对比 结合案例分析AT模式和XA模式【源码】
前言 事务(TRANSACTION)是一个不可分割的逻辑单元,包含了一组数据库操作命令,并且把所有的命令作为一个整体向系统提交,要么都执行、要么都不执行。 事务作为系统中必须考虑的问题,无论是在单体项目还是在分布式项目中都需要进行…...
GMT 格式 转 标准日期格式
需求:有一个时间格式:TUE NOV 14 08:00:00 GMT08:00 2000 我需要将这种格式的时间转换为标准日期格式,并且只修改这种时间格式的时间,不影响其他的 思路:我想到的是用正则来判断,SimpleDateFormat来进行转换…...

【蓝桥杯选拔赛真题01】C++参赛建议 青少年组蓝桥杯C++选拔赛真题 STEMA比赛真题解析
目录 C/C++参赛建议 一、题目要求 1、编程实现 2、输入输出 二、算法分析 <...

小红书为什么流量不好,小红书笔记质量评判标准有哪些?
我们都知道小红书平台强大的种草力与传播力,需要依靠优质笔记的输出来达成。但是很多时候,我们撰写了笔记,却无法被收录,获得流量,这都是因为笔记质量出现了问题。那么小红书为什么流量不好,小红书笔记质量…...

优化改进 | YOLOv2算法超详细解析(包括诞生背景+论文解析+技术原理等)
前言:Hello大家好,我是小哥谈。YOLOv2是YOLO(You Only Look Once)目标检测算法的第二个版本,它在YOLOv1的基础上做了很多改进,包括使用更深的卷积神经网络Darknet-19作为特征提取器、使用Batch Normalizati…...

作为前端开发,你应该知道的这十几个在线免费工具
偶然刷到知乎一位前端大佬 表歌 多篇优秀实用的文章,真的发现宝藏了 以下内容就是他在知乎分享的十几个在线免费工具 1. 页面设计检查清单:https://www.checklist.design/ 页面设计检查清单 通过清单可以检查一些常用容易忽略的设计要素。 2. 背景色…...

【广州华锐互动】关于物理力学的3D实验实操平台
在科学的广阔领域中,物理力学是一个至关重要的分支,它探索了物体在力作用下的运动规律。然而,传统的物理实验往往需要复杂的设备和大量的操作,这对于学生来说是一项巨大的挑战。为了解决这个问题,广州华锐互动开发了物…...

LVS负载均衡(LVS简介、三种工作模式、十种调度算法)
LVS简介 LVS(Linux Virtual Server)是一种基于Linux内核的高可用性负载均衡软件。它通过将客户端请求分发到多个后端真实服务器,提高系统性能和可靠性。LVS支持多种调度算法,如轮询、最少连接、源地址哈希等,用于决定…...

Vue响应式数据的实现原理(手写副作用函数的存储和执行过程)
1.命令式和声明式框架 命令式框架关注过程 声明式框架关注结果(底层对命令式的DOM获取和修改进行了封装) 2.vue2 Object.defineProperty()双向绑定的实现 <body><div id"app"><input type"text" /><h1>…...

内核进程的调度与进程切换
进程被创建到了链表中,如何再进行进一步的调用和调用? 进程调度 void schedule(void); 进程调度 switch_to(next); 进程切换函数 void schedule(void) {int i,next,c;struct task_struct ** p;/* check alarm, wake up any i…...

docker-rabbitmq 安装依赖
出现的问题如下: channel error; protocol method: #method(reply-code404, reply-textNOT_FOUND - no channel error; protocol method: #method<channel.close>(reply-code404, reply-textNOT_FOUND - no 查看rabbitmq 客户端是否存在如…...

(1)(1.9) HC-SR04声纳
文章目录 前言 1 连接到自动驾驶仪 2 参数说明 前言 HC-SR04 声纳是一种价格低廉但量程很短(最远只有 2m)的测距仪,主要设计用于室内,但也成功地在室外的 Copter 上使用过。极短的测距范围使其用途有限。 !Warning…...

06 MIT线性代数-列空间和零空间 Column space Nullspace
1. Vector space Vector space requirements vw and c v are in the space, all combs c v d w are in the space 但是“子空间”和“子集”的概念有区别,所有元素都在原空间之内就可称之为子集,但是要满足对线性运算封闭的子集才能成为子空间 中 2 …...
【每日一题Day360】LC1465切割后面积最大的蛋糕 | 贪心
切割后面积最大的蛋糕【LC1465】 矩形蛋糕的高度为 h 且宽度为 w,给你两个整数数组 horizontalCuts 和 verticalCuts,其中: horizontalCuts[i] 是从矩形蛋糕顶部到第 i 个水平切口的距离verticalCuts[j] 是从矩形蛋糕的左侧到第 j 个竖直切口…...

中国地名信息库
地名是社会基本公共信息,是历史文化的重要载体。 2014年至2018年,国家启动实施并完成了第二次全国地名普查工作,全国共计采集地名1320多万条,修测标绘地名图2.4万多幅,新设更新地名标志68万多块,普遍建立了…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...

02.运算符
目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&:逻辑与 ||:逻辑或 !:逻辑非 短路求值 位运算符 按位与&: 按位或 | 按位取反~ …...