RabbitMQ (4)
RabbitMQ (4)
文章目录
- 1. 死信的概念
- 2. 死信的来源
- 3. 死信代码案例
- 3.1 TTL 过期时间
- 3.2 超过队列最大长度
- 3.3 拒绝消息
前言
上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列
1. 死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
2. 死信的来源
- 消息 TTL 过期 : TTL 是 Time To Live 的缩写, TTL 就是 生存时间
- 队列达到最长长度 : 队列满了 , 无法添加数据到 MQ 中
- 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue = false
3. 死信代码案例
这里 创建一个 direct 交换机 ,两个消费者 , 一个生产者 , 两个 队列 (一个为 消息队列 , 一个为死信队列)
图:
代码 :
3.1 TTL 过期时间
生产者:
package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}
消费者 c1 (启动之后关闭该消费者, 模拟其接受不到消息)
package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
// arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}
先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息
c1 看完,我们在来写 c2 消费者 ,将进入到死信队列的消息 进行消费.
消费者c2
package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}
图:
看完 消息过期后 ,消息转发到 死信队列 被 c2 消费,下面我们来 尝试使用 死信最大长度 (队列满了,将多的消息转发到死信队列中)
3.2 超过队列最大长度
消息生产者代码 去掉 TTL 属性 , 将 basicPublish
的第三个参数改为 null
生产者:
package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10s
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;
// channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
c1 消费者 (启动之后关闭该消费者 模拟其接收不到消息)
package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
// arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}
注意:
这参数改变了(没有设置 ttl 时间,新增了 队列的 最大长度限制 为 6) ,所以 需要把原来队列删除
消费者c2 代码不变
package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}
效果:
这里 之所以要启动 c1 后在关闭,是为了展示 6个消息放到 普通队列 ,4个消息放到死信队列, 如果不这么做,发送的 10个消息 都会被 c1 消费 ,(消息发送到 队列后 , 立马 转发给 c1 导致 队列就不会达到 6 个 ,队列不会满 ,也就不会将消息转化给 死信队列).
3.3 拒绝消息
消息生产者 和 消费者 c2 与上面的代码一样
这里我们 拒绝 info7 消息 ,想要 拒绝 info7 消息,我们可以采用手动应答.
消费者c1
package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
// arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信
// arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {String msg = new String(message.getBody(), "UTF-8");if (msg.equals("info7")) {System.out.println("C1 接收到消息为: " + msg + " 此消息被 C1 拒绝");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("C1 接收到的消息为: " + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};
// channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});// 开启 手动应答channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (tag) -> {});}
}
相关文章:

RabbitMQ (4)
RabbitMQ (4) 文章目录 1. 死信的概念2. 死信的来源3. 死信代码案例3.1 TTL 过期时间3.2 超过队列最大长度3.3 拒绝消息 前言 上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列 1. 死信的概念 先从概念解释上搞清楚这…...

导入Embassy库进行爬虫
Embassy是一个基于Lua的轻量级爬虫框架,可以方便地进行网页抓取和数据提取。它提供了简单易用的接口和丰富的功能,可以帮助开发者快速构建爬虫应用。 要使用Embassy进行爬虫,首先需要安装Embassy库。可以通过Lua的包管理工具luarocks来安装E…...

GoLong的学习之路(十三)语法之标准库 log(日志包)的使用
上回书说到,flag的问题。这回说到日志。无论是软件开发的调试阶段还是软件上线之后的运行阶段,日志一直都是非常重要的一个环节,我们也应该养成在程序中记录日志的好习惯。 文章目录 log配置logger配置日志前缀配置日志输出位置自定义logger …...

别处拿来的VUE项目 npm run serve报错
问题现象: 从别处拷贝来的VUE项目,根据说明通过npm install 加载了项目依赖 ,但是运行npm run serve里报错: npm ERR! Missing script: "serve" npm ERR! npm ERR! To see a list of scripts, run: npm ERR! npm ru…...

Istio 运行错误 failed to update resource with server-side apply for obj 问题解决
Istio 环境 kubernetes version: v1.18.2 istio version: v1.10.0运行之后 istio-operator 的日志就抛出下面错误,而且会一直重启 # kubectl get iop -A NAMESPACE NAME REVISION STATUS AGE istio-system iop-pro-cluster…...

分布式事务(Seata)——Seata分布式事务XA模式、AT模式、TCC模式的介绍和对比 结合案例分析AT模式和XA模式【源码】
前言 事务(TRANSACTION)是一个不可分割的逻辑单元,包含了一组数据库操作命令,并且把所有的命令作为一个整体向系统提交,要么都执行、要么都不执行。 事务作为系统中必须考虑的问题,无论是在单体项目还是在分布式项目中都需要进行…...
GMT 格式 转 标准日期格式
需求:有一个时间格式:TUE NOV 14 08:00:00 GMT08:00 2000 我需要将这种格式的时间转换为标准日期格式,并且只修改这种时间格式的时间,不影响其他的 思路:我想到的是用正则来判断,SimpleDateFormat来进行转换…...

【蓝桥杯选拔赛真题01】C++参赛建议 青少年组蓝桥杯C++选拔赛真题 STEMA比赛真题解析
目录 C/C++参赛建议 一、题目要求 1、编程实现 2、输入输出 二、算法分析 <...

小红书为什么流量不好,小红书笔记质量评判标准有哪些?
我们都知道小红书平台强大的种草力与传播力,需要依靠优质笔记的输出来达成。但是很多时候,我们撰写了笔记,却无法被收录,获得流量,这都是因为笔记质量出现了问题。那么小红书为什么流量不好,小红书笔记质量…...

优化改进 | YOLOv2算法超详细解析(包括诞生背景+论文解析+技术原理等)
前言:Hello大家好,我是小哥谈。YOLOv2是YOLO(You Only Look Once)目标检测算法的第二个版本,它在YOLOv1的基础上做了很多改进,包括使用更深的卷积神经网络Darknet-19作为特征提取器、使用Batch Normalizati…...

作为前端开发,你应该知道的这十几个在线免费工具
偶然刷到知乎一位前端大佬 表歌 多篇优秀实用的文章,真的发现宝藏了 以下内容就是他在知乎分享的十几个在线免费工具 1. 页面设计检查清单:https://www.checklist.design/ 页面设计检查清单 通过清单可以检查一些常用容易忽略的设计要素。 2. 背景色…...

【广州华锐互动】关于物理力学的3D实验实操平台
在科学的广阔领域中,物理力学是一个至关重要的分支,它探索了物体在力作用下的运动规律。然而,传统的物理实验往往需要复杂的设备和大量的操作,这对于学生来说是一项巨大的挑战。为了解决这个问题,广州华锐互动开发了物…...

LVS负载均衡(LVS简介、三种工作模式、十种调度算法)
LVS简介 LVS(Linux Virtual Server)是一种基于Linux内核的高可用性负载均衡软件。它通过将客户端请求分发到多个后端真实服务器,提高系统性能和可靠性。LVS支持多种调度算法,如轮询、最少连接、源地址哈希等,用于决定…...

Vue响应式数据的实现原理(手写副作用函数的存储和执行过程)
1.命令式和声明式框架 命令式框架关注过程 声明式框架关注结果(底层对命令式的DOM获取和修改进行了封装) 2.vue2 Object.defineProperty()双向绑定的实现 <body><div id"app"><input type"text" /><h1>…...

内核进程的调度与进程切换
进程被创建到了链表中,如何再进行进一步的调用和调用? 进程调度 void schedule(void); 进程调度 switch_to(next); 进程切换函数 void schedule(void) {int i,next,c;struct task_struct ** p;/* check alarm, wake up any i…...

docker-rabbitmq 安装依赖
出现的问题如下: channel error; protocol method: #method(reply-code404, reply-textNOT_FOUND - no channel error; protocol method: #method<channel.close>(reply-code404, reply-textNOT_FOUND - no 查看rabbitmq 客户端是否存在如…...

(1)(1.9) HC-SR04声纳
文章目录 前言 1 连接到自动驾驶仪 2 参数说明 前言 HC-SR04 声纳是一种价格低廉但量程很短(最远只有 2m)的测距仪,主要设计用于室内,但也成功地在室外的 Copter 上使用过。极短的测距范围使其用途有限。 !Warning…...

06 MIT线性代数-列空间和零空间 Column space Nullspace
1. Vector space Vector space requirements vw and c v are in the space, all combs c v d w are in the space 但是“子空间”和“子集”的概念有区别,所有元素都在原空间之内就可称之为子集,但是要满足对线性运算封闭的子集才能成为子空间 中 2 …...
【每日一题Day360】LC1465切割后面积最大的蛋糕 | 贪心
切割后面积最大的蛋糕【LC1465】 矩形蛋糕的高度为 h 且宽度为 w,给你两个整数数组 horizontalCuts 和 verticalCuts,其中: horizontalCuts[i] 是从矩形蛋糕顶部到第 i 个水平切口的距离verticalCuts[j] 是从矩形蛋糕的左侧到第 j 个竖直切口…...

中国地名信息库
地名是社会基本公共信息,是历史文化的重要载体。 2014年至2018年,国家启动实施并完成了第二次全国地名普查工作,全国共计采集地名1320多万条,修测标绘地名图2.4万多幅,新设更新地名标志68万多块,普遍建立了…...

docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
uniapp中使用aixos 报错
问题: 在uniapp中使用aixos,运行后报如下错误: AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...

【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
Caliper 负载(Workload)详细解析
Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...

给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...