RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式)
官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html
使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。

发布订阅使用fanout的交换器,创建交换器,名称为test
channel.exchangeDeclare("test","fanout");
fanout交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。
存在一个默认的交换器。
此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。
队列名称如
amq.gen-gjKBgQ9PSmoj2YQGMOdPfA
样例代码
消费者1的代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class OneConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明的临时队列,名称由rabbitMQ自动生成String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class TwoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者3
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class ThirdConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {// 声明fanout类型交换机channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);for (int i = 0; i < 20; i++) {channel.basicPublish("ex.testfan",// 路由key"",// 属性null,// 信息("hello world fan " + i).getBytes(StandardCharsets.UTF_8));}} catch (IOException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}}
}
观察下队列的绑定的情况:
在未启动消费都队列之前:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。
启动三个消费者:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ amq.gen-UG67rAw03FGbBupHX6o18g │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ │ │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]#
当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。
启动生产者,查看消费情况:
消费者1
临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19
消费者2:
临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19
消息者3:
临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19
再停止几个消费者查看队列信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
可以看到,当客户端退出之后,临时队列也就消失了。
相关文章:
RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式) 官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html 使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个…...
JDK源码解析-Object
1. Object类 所有类的基类——java.lang.Object Object 类是所有类的基类,当一个类没有直接继承某个类时,默认继承Object类Object 类属于 java.lang 包,此包下的所有类在使用时无需手动导入,系统会在程序编译期间自动导入。 思…...
pinia——添加插件——基础积累
问题:是否给pinia添加过插件?具体添加的方式是什么? 在pinia中,我们可以为仓库添加插件,通过添加插件能够扩展以下的内容: 为 store 添加新的属性 定义 store 时增加新的选项 为 store 增加新的方法 包装现…...
软件国产化之殇
今天又看到这么一个帖子讨论一款国产化软件,属实给我震撼到了。 对于国产化产品,一直主打的都是”自研“,难道是我对”自研“这个词的理解有误? 做一个产品,别人开源了,你拿过来使用,你可以说…...
SQLyog问题处理集合
sqlyog 问题处理 1. 错误号码:1049错误: 数据库命令参数参考:数据库命令地址 检查数据库是否存在检查创建的数据库名称 与 要进行连接的数据库名称是否一致; 2. 错误号码:1819错误: MySQL授予远程连接权限时出现: …...
JavaSE【继承和多态】(1)(重点:初始化、pretected封装、组合)
一、继承 继承 (inheritance) 机制 :是面向对象程序设计使代码可以复用的最重要的手段,它允许程序员在保持原有类特 性 的基础上进行扩展,增加新功能 ,这样产生新的类,称 派生类 。 继承呈现了面向对象程序设计的层次结…...
无涯教程-Android Studio函数
第1步-系统要求 您将很高兴知道您可以在以下两种操作系统之一上开始Android应用程序的开发- MicrosoftWindows10/8/7/Vista/2003(32或64位)MacOSX10.8.5或更高版本,最高10.9(小牛) GNOME或KDE桌面 第二点是,开发Android应用程序所需的所有工具都是开源的,可以从Web上下载。以…...
CentOS8安装mysql8.0.24
一、下载mysql安装包并解压 执行以下命令: # 创建mysql安装目录 mkdir /usr/local/mysql # 进入mysql安装目录 cd /usr/local/mysql/ # 下载mysql-8.0.24 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.24-linux-glibc2.12-x86_64.tar.xz # 解压…...
Quasi-eccentricity Error Modeling and Compensation in Vision Metrology
论文:Quasi-eccentricity Error Modeling and Compensation in Vision Metrology 中文:视觉计量中准偏心误差建模与补偿 论文地址:Sci-Hub | Quasi-eccentricity error modeling and compensation in vision metrology. Measurement Scienc…...
ai智能电话机器人是人类的助手和朋友
一直以来,人工智能都是人们关注的热门话题。在以前,说到人工智能,第一想到的是“机器人”,随着人工智能的普及,AI已经渗透到我们生活的每一个角落。现在,说起人工智能,可能会想到“无人驾驶、无…...
应用TortoiseSVN的SubWCRev管理VisualStudio C#项目编译版本号
首先要安装 TortoiseSVN, 并确保TortoiseSVN的bin目录被加入到系统环境变量Path中。 1、拷贝Porperties目录下的文件AssemblyInfo.cs生成副本AssemblyInfo.template, 作为版本管理的模板文件。 2、修改模板文件中的想要管理的版本号信息 // [assembly: AssemblyVersion(&quo…...
【八股】2023秋招八股复习笔记5(计算机网络-CN)
文章目录 八股目录目录1、应用层 & HTTP一些http题HTTPS 加密原理(问过)HTTP/1.1 新特性HTTP/2.0 与 RPC(问过)GET 和 POST 比较 2、传输层 & TCPTCP三次握手 & 四次挥手(问过)为什么每次TCP 连…...
【C++】SLT——Vector详解
本片要分享的是关于STL中Vector的内容,Vector的内容于string非常相似,只要会使用string那么学习Vector时会非常流畅。 目录 1.vector介绍 2.vector的简单实用 2.1.简单的无参构造 编辑2.2.简单带参构造 2.3.迭代器区间初始化 2.4.vector的遍历 …...
企业网络安全:威胁情报解决方案
什么是威胁情报 威胁情报是网络安全的关键组成部分,可为潜在的恶意来源提供有价值的见解,这些知识可帮助组织主动识别和防止网络攻击,通过利用 STIX/TAXII 等威胁源,组织可以检测其网络中的潜在攻击,从而促进快速检测…...
为什么2G、3G、4G成功了,5G却?
你可能已经多年来一直听到关于闪电般的5G的炒作。虽然新的无线网络在美国仍然没有普及,但5G正在波士顿和西雅图到达拉斯和堪萨斯城等城市慢慢出现。随着连接速度的加快,用户的安全性和隐私保护将增加,因为无线行业试图改善3G和4G的防御。但是…...
C语言每日一练------Day(10)
本专栏为c语言练习专栏,适合刚刚学完c语言的初学者。本专栏每天会不定时更新,通过每天练习,进一步对c语言的重难点知识进行更深入的学习。 今日练习题关键字:自除数 除自身以外数组的乘积 💓博主csdn个人主页ÿ…...
发力服务业务,龙湖集团半程领跑赢在“智慧”
成立三十载,龙湖集团一直是房地产行业“特立独行”的存在。 一方面,龙湖在对外战略方面长期量入为出,从不背上过重的“包袱”。 不久前,一则消息引发市场关注:龙湖集团提前偿还17亿元债务,已基本全部还清…...
Kubernetes(七)修改 pod 网络(flannel 插件)
一、 提示 需要重启服务器 操作之前备份 k8s 中所有资源的 yaml 文件 如下是备份脚本,仅供参考 # 创建备份目录 test -d $3 || mkdir $3 # $1 命名空间 # $2 资源名称: sts deploy configMap svc 等 # $3 资源备份存放的目录名称for app in kubec…...
测试平台metersphere
metersphere可以做接口测试、UI测试、性能测试。 metersphere接口测试底层是jmeter,可以做API管理,快捷调试,接口用例管理,接口自动化场景执行一键选取用例范围,生成测试报告。 会用jmeter,metersphere会…...
论文笔记: One Fits All:Power General Time Series Analysis by Pretrained LM
1 intro 时间序列领域预训练模型/foundation 模型的研究还不是很多 主要挑战是缺乏大量的数据来训练用于时间序列分析的基础模型——>论文利用预训练的语言模型进行通用的时间序列分析 为各种时间序列任务提供了一个统一的框架 论文还调查了为什么从语言领域预训练的Transf…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用
文章目录 一、背景知识:什么是 B-Tree 和 BTree? B-Tree(平衡多路查找树) BTree(B-Tree 的变种) 二、结构对比:一张图看懂 三、为什么 MySQL InnoDB 选择 BTree? 1. 范围查询更快 2…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...
TJCTF 2025
还以为是天津的。这个比较容易,虽然绕了点弯,可还是把CP AK了,不过我会的别人也会,还是没啥名次。记录一下吧。 Crypto bacon-bits with open(flag.txt) as f: flag f.read().strip() with open(text.txt) as t: text t.read…...
门静脉高压——表现
一、门静脉高压表现 00:01 1. 门静脉构成 00:13 组成结构:由肠系膜上静脉和脾静脉汇合构成,是肝脏血液供应的主要来源。淤血后果:门静脉淤血会同时导致脾静脉和肠系膜上静脉淤血,引发后续系列症状。 2. 脾大和脾功能亢进 00:46 …...
32位寻址与64位寻址
32位寻址与64位寻址 32位寻址是什么? 32位寻址是指计算机的CPU、内存或总线系统使用32位二进制数来标识和访问内存中的存储单元(地址),其核心含义与能力如下: 1. 核心定义 地址位宽:CPU或内存控制器用32位…...
6.计算机网络核心知识点精要手册
计算机网络核心知识点精要手册 1.协议基础篇 网络协议三要素 语法:数据与控制信息的结构或格式,如同语言中的语法规则语义:控制信息的具体含义和响应方式,规定通信双方"说什么"同步:事件执行的顺序与时序…...
