RabbitMQ消息可靠性保证机制6--可靠性分析
在使用消息中间件的过程中,难免会出现消息错误或者消息丢失等异常情况。这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。
在RabbitMQ中可以使用Firehose实现消息的跟踪,Firehose可以记录每一次发送或者消息的记录,方便RabbitMQ的使用都进行调试、排错等。
FireHose的原理是将生产者投递给RabbitMQ的消息,或者RabbitMQ投递给消费者的消息按照指定的格式,发送到默认交换器上,这个默认交换器的名称是:amq.rabbitmq.trace它是一个topic类型的交换器。发送到交换器的消息的路由键为publis.{exchangename}和deliver.{queuename}。其中exchangename和queuename为交换器和队列名字。分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

上图是一个样例。生产者将消息发送至trace.ex交换器,交换器将消息路由至trace.qu这个队列,然后由消息者将消息取走。当消息到达trace.ex这个队列后,消息就会投递一份到名称为amq.rabbitmq.trace的交换器,按收到交换器的名称加上一个前缀变更publish.trace.ex作为路由的KEY,投递一份至publishtrace这个队列中;接收消息也样如此,当消费都取走消息时,会将消息发送一份到名称为amq.rabbitmq.trace的交换器,按消费者队列的名称加一个前缀变成deliver.trace.qu作为路由的KEY,投递至delivertrace这个队列中。
Firehose命令:
# 开启命令
rabbitmqctl trace_on [-p vhose]
# [-p vhose]是可选参数,用来指定虚拟主机的vhose# 关闭命令
rabbitmqctl trace_off [-p vhose]
Firehose默认情况下处于关闭状态,并且Firehose的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose开启之后会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储 。
7.9.1 Firehose验证
首先开启追溯
[root@nullnull-os rabbitmq]# rabbitmqctl trace_on -p /
Starting tracing for vhost "/" ...
Trace enabled for vhost /
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class TraceProduce {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换机channel.exchangeDeclare("trace.ex", BuiltinExchangeType.DIRECT, false, true, null);// 定义队列channel.queueDeclare("trace.qu", false, false, true, null);// 队列绑定channel.queueBind("trace.qu", "trace.ex", "");// 定义保留数据队列channel.queueDeclare("publishtrace", false, false, false, null);// 绑定channel.queueBind("publishtrace", "amq.rabbitmq.trace", "publish.trace.ex");for (int i = 0; i < 100; i++) {String msg = "这是发送的消息:" + i;channel.basicPublish("trace.ex", "", null, msg.getBytes(StandardCharsets.UTF_8));}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
检查队列的信息:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ publishtrace │ 100 │ 0 │ 100 │ 0 │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ trace.qu │ 100 │ 0 │ 100 │ 0 │
└──────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
这样生产者发送的消息就已经被保存至publishtrace中了,后缀便可以通过检查队列中的消息,检查消息内容。
消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.nio.charset.StandardCharsets;public class TraceConsumer {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换机channel.exchangeDeclare("trace.ex", BuiltinExchangeType.DIRECT, false, true, null);// 定义队列channel.queueDeclare("trace.qu", false, false, true, null);// 队列绑定channel.queueBind("trace.qu", "trace.ex", "");// 定义队列channel.queueDeclare("delivertrace", false, false, true, null);// 队列绑定channel.queueBind("delivertrace", "amq.rabbitmq.trace", "deliver.trace.qu");// 接收消息for (int i = 0; i < 25; i++) {GetResponse getResponse = channel.basicGet("trace.qu", true);String msg = new String(getResponse.getBody(), StandardCharsets.UTF_8);System.out.println("收到的消息:" + msg);}} catch (Exception e) {e.printStackTrace();}}
}
此处采用的是拉模式,从队列中获取了25条记录,也就是说队列中还剩余75条记录。
首先查看控制台输出:
收到的消息:这是发送的消息:0
收到的消息:这是发送的消息:1
收到的消息:这是发送的消息:2
收到的消息:这是发送的消息:3
......
收到的消息:这是发送的消息:22
收到的消息:这是发送的消息:23
收到的消息:这是发送的消息:24
检查队列的情况:
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ delivertrace │ 25 │ 0 │ 25 │ 0 │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ publishtrace │ 100 │ 0 │ 100 │ 0 │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ trace.qu │ 75 │ 0 │ 75 │ 0 │
└──────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]#
可以发现,拉的消息,都已经被推送到了delivertrace中了。
最后关闭Tracehose
[root@nullnull-os rabbitmq]# rabbitmqctl trace_off -p /
Stopping tracing for vhost "/" ...
Trace disabled for vhost /
使用Firehose验证完成。
相关文章:
RabbitMQ消息可靠性保证机制6--可靠性分析
在使用消息中间件的过程中,难免会出现消息错误或者消息丢失等异常情况。这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。 在RabbitMQ中可以使用Firehose实现消息的跟踪,Firehose可…...
k8s容器存储接口 CSI 相关知识
容器存储接口 CSI 相关知识 参考: https://blog.csdn.net/lovely_nn/article/details/122880876 https://developer.aliyun.com/article/783464 https://www.cnblogs.com/varden/p/15139819.html存储商需实现 CSI 插件的 NodeGetVolumeStats 接口,Kube…...
jmeter基础_打开1个jmeter脚本(.jmx文件)
课程大纲 方法1.菜单栏“打开” 菜单栏“文件” - “打开” (或快捷键,mac为“⌘ O”),打开文件选择窗口 - 选择脚本文件,点击“open”,即可打开脚本。 方法2.工具栏“打开”图标 工具栏点击“打开”图标&…...
Linux---对时/定时服务
文章目录 目录 文章目录 前言 一.对时服务 服务端配置 客户端配置 二.定时服务 单次定时任务 循环定时任务 前言 在当今信息化高速发展的时代,时间的准确性和任务的定时执行对于各种系统和服务来说至关重要。Linux操作系统,凭借其强大的功能和灵活的…...
Agent
Agent核心 1、自主性 2、交互性 3、适应性 4、目的性 ReAct Reasoning and Acting范式 模型的推理过程分为 推理 Reason 和行动 Action 两个步骤,交替执行,直至获得最终结果。 推理 Reason 生成分析步骤,解释当前任务的上下文或状态…...
Oracle 数据库执行增删改查命令的原理与过程
摘要: 本文深入探讨当向 Oracle 数据库发送一个增删改查(CRUD)命令时,数据库内部的执行机制与详细过程。从用户发起命令开始,逐步剖析命令在 Oracle 数据库体系结构各组件中的流转、解析、优化以及执行路径,…...
HBase难点
查询优化 一次Scan会返回大量数据,客户端向HBase发送一次Scan请求,实际上并不会将所有数据加载到本地,而是通过多次RPC请求进行加载,防止客户端OOM。禁止缓存优化:批量读取数据时会全表扫描一次业务表,这种…...
Y20030023 PHP+thinkphp+MYSQL+LW+基于PHP的健身房管理系统的设计与实现 源代码 配置 初稿
基于PHP的健身房管理系统 1.项目摘要2. 系统开发的背景及意义3.项目功能4.界面展示5.源码获取 1.项目摘要 近年来,随着社会发展和科技进步,人们越来越重视健康养生并关注电子商务对日常交流方式的影响。随着健身行业消费人群的增加,竞争变得…...
mongodb下载与使用
下载地址: Install MongoDB Community Kubernetes Operator | MongoDB 1、安装MongoDB (5.0.30) 将压缩包移动到C:\Program Files 下,然后解压创建C:\data\db目录,mongodb 会将数据默认保存在这个文件夹以mongodb 中 bin目录作为工作目录&…...
【Linux基础】Linux基本指令
目录 1、pwd 指令 2、clear 指令 3、ls 指令 1、ls 显示当前路径下的文件或者目录名称 2、ls -l 显示当前路径下的文件或者目录的更详细的属性信息 3、ls -a :显示所有文件,包括隐藏文件 4、ls -d 将目录像文件一样显示,而不是显示其…...
【RISC-V CPU debug 专栏 3 -- Debugging RISC-V Cores】
文章目录 RISC-V 调试规范开源与多样性挑战调试规范的重要性外部调试支持的主要组件调试功能Lauterbach 的贡献RISC-V 调试规范 调试 RISC-V 内核涉及许多独特的挑战,这是由 RISC-V 的开源特性和多样化的生态系统所决定的。为了避免专有调试接口的泛滥,RISC-V 基金会内的工作…...
思科实现网络地址转换(NAT)和访问控制列表(ACL)和动态路由配置并且区分静态路由和动态路由配置。
实验拓扑(分为静态路由和动态路由两种) 静态路由互通 动态路由互通 实验背景 这个是想实现外网与内网的连接跟网络的探讨,最终实现互通以及使用并且在网络地址转换后能使用网络然后再这个基础上再配置访问控制列表和网络地址转换的的学习过程。 实验需了解的知识…...
基于 Python、OpenCV 和 PyQt5 的人脸识别上课打卡系统
大家好,我是Java徐师兄,今天为大家带来的是基于 Python、OpenCV 和 PyQt5 的人脸识别上课签到系统。该系统采用 Python 语言开发,开发过程中采用了OpenCV框架,Sqlite db 作为数据库,系统功能完善 ,实用性强…...
MySQL 复合查询
实际开发中往往数据来自不同的表,所以需要多表查询。本节我们用一个简单的公司管理系统,有三张表EMP,DEPT,SALGRADE 来演示如何进行多表查询。表结构的代码以及插入的数据如下: DROP database IF EXISTS scott; CREATE database IF NOT EXIST…...
【Leetcode】19. 删除链表的第N个节点
【Leetcode】19. 删除链表的第N个节点 1. 题目介绍2. 方法一:计算链表长度逻辑流程:代码复杂度分析 1. 题目介绍 题目描述 给你一个链表,删除链表的倒数第 n 个结点,并且返回链表的头结点。 示例 1: 输入:head [1,2,…...
flutter底部导航栏中间按钮凸起,导航栏中间部分凹陷效果
关键代码: Scaffold中设置floatingActionButtonLocation: FloatingActionButtonLocation.centerDocked和extendBody: true, BottomAppBar中设置shape: const CircularNotchedRectangle() Scaffold(extendBody: true,//body是否延伸脚手架底部,在底部导航…...
<项目代码>YOLOv8 红绿灯识别<目标检测>
YOLOv8是一种单阶段(one-stage)检测算法,它将目标检测问题转化为一个回归问题,能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法(如Faster R-CNN),YOLOv8具有更高的…...
HTMLHTML5革命:构建现代网页的终极指南 - 4. 使用元素
HTML&HTML5革命:构建现代网页的终极指南 4. 使用元素 大家好,我是莫离老师。 到目前为止,我们已经了解了 HTML 和 HTML5 的基础概念,并且选择了适合自己的开发工具。 今天,我们将迈出实际开发的第一步,…...
使用 Selenium 和 Python 爬取腾讯新闻:从基础到实践
使用 Selenium 和 Python 爬取腾讯新闻:从基础到实践 在这篇博客中,我们将介绍如何利用 Selenium 和 Python 爬取腾讯新闻的内容,并将结果保存到 CSV 文件中。本教程包含以下内容: 项目简介依赖安装实现功能的代码实现中的关键技…...
CAD深度清理工具-AVappsDrawingPurge9.0.0(2024.8.27版本) 支持版本CAD2022-2025-供大家学习研究参考
图形文件DWG体积很大:通常没有明显的数据。同时,还其他症状包括: (1)无法复制和粘贴图元。 (2)悬挂较长时间选择文本与 “特性”选项板上打开。 (3)图形文件需要很长时间…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 设置网关 打开VMware虚拟机,点击编辑…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
