202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型
目录
- ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
- 代码演示:
- 生产者:producer
- 消费者:Consumer01
- 消费者:Consumer02
- 测试结果
 
- 完整代码
- ConnectionUtil
- Publisher
- Consumer01
- Consumer02
- pom.xml
 
★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费
 fanout 类型就是广播模式。
fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。
生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的一条副本,
 而消费者也能分别从不同的队列中读取消息,互不干扰。
▲ fanout类型的Exchange可以很好地模拟JMS的Pub-Sub消息模型。

代码演示:
都是在前面一篇的代码基础上修改的。
 需求:使用 fanout 类型的Exchange ,实行发布-订阅的功能,其实就是创建一个生产者和两个消费者,实现广播模式的消息分发。

生产者:producer
在生产者中声明Exchange ,然后声明两个消息队列 Queue,
 然后给这个Exchange 绑定 这个两个Queue
 

消费者:Consumer01
两个消费者的代码没啥区别,
 消费方法的参数 autoAck 都是true, 都是自动确认消费。
 两个消费者各自消费自己指定的消息队列。


消费者:Consumer02

 
测试结果
消费生产者发送10条消息,两个消费者都能各自消费到10条消息就是正确的。
消息生产者使用fanout这个广播的类型发送消息。
 
 两个消费者都能消费到10条消息,正确。
 
完整代码
ConnectionUtil
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory =  new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}Publisher
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用fanout类型的exchange------就是广播模式
public class Publisher
{//常量:定义个Exchange的名字作为常量public static final String EXCHANGE_NAME = "myex01.fanout";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange名字 */BuiltinExchangeType.FANOUT,/* Exchange 类型 */true,/* 是否持久化 */false,/* 是否自动栅除 */false,/* 是否为内部的 Exchange */null /* 指定 Exchange 的额外属性 */);//声明多个消息队列------声明第1个消息队列channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第一个消息队列channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,"" /* 因为Exchange 是fanout类型,所以无需 路由key */,null /* 指定 Exchange 的额外属性 */);//声明第2个消息队列channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第2个消息队列channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,"" /* 因为Exchange 是fanout类型,所以无需 路由key */,null /* 指定 Exchange 的额外属性 */);//生产者发送10条消息for (int i = 1; i <= 10; i++){String message = "生产者发送的第【 " + i + " 】条消息的内容";//4、调用Channel的basicPublish()方法发送消息channel.basicPublish(EXCHANGE_NAME /* 向这个 fanout类型的 Exchange 发送消息 */,"" /* 因为 Exchange 是fanout 类型,所以有没有路由key都无所谓 */,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);System.out.println("生产者发送【 "+i+" 】条消息完成");}//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}
Consumer01
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者1
public class Consumer01
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布//    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE01 = "firstQueue";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,//   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(QUEUE01, /* 声明的队列名 */true,    /* 消息队列是否持久化 */false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null   /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE01 /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}
}Consumer02
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者2
public class Consumer02
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布//    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE02 = "secondQueue";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,//   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(QUEUE02, /* 声明的队列名 */true,    /* 消息队列是否持久化 */false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null   /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE02 /*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmq_fanout</artifactId><version>1.0.0</version><name>rabbitmq_fanout</name><!--  属性  --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!--  依赖  --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>
相关文章:
 
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型
目录 ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型代码演示:生产者:producer消费者:Consumer01消费者:Consumer02测试结果 完整代码ConnectionUtilPublisherConsumer01Consumer02pom.xml ★ 使用 fanout 类型的Exchange …...
 
web 性能优化详解(Lighthouse工具、优化方式、强缓存和协商缓存、代码优化、算法优化)
1.性能优化包含的方面 优化性能概念宽泛,可以从信号、系统、计算机原理、操作系统、网络通信、DNS解析、负载均衡、页面渲染。只要结合一个实际例子讲述清楚即可。 2.什么是性能? Web 性能是客观的衡量标准,是用户对加载时间和运行时的直观…...
 
docker-compose部署elk(8.9.0)并开启ssl认证
docker部署elk并开启ssl认证 docker-compose部署elk部署所需yml文件 —— docker-compose-elk.yml部署配置elasticsearch和kibana并开启ssl配置基础数据认证配置elasticsearch和kibana开启https访问 配置logstash创建springboot项目进行测试kibana创建视图,查询日志…...
 
解决java.lang.IllegalArgumentException: servlet映射中的<url pattern>[demo1]无效
当我使用tomcat启动使用servlet项目时,出现了报错: java.lang.IllegalArgumentException: servlet映射中的<url pattern>[demo1]无效 显示路径错误,于是去检查Web.xml中的配置,发现是配置文件的路径写错了,少写了…...
 
软件测试学习(三)易用性测试、测试文档、软件安全性测试、网站测试
目录 易用性测试 用户界面测试 优秀Ul由什么构成 符合标准和规范 直观 一致 灵活 舒适 正确 实用 为有残疾障碍的人员测试:辅助选项测试 测试文档 软件文档的类型 文档测试的重要性 软件安全性测试 了解黑客的动机 威胁模式分析 网站测试 网页基…...
 
Java中,对象一定在堆中分配吗?
在我们的日常编程实践中,我们经常会遇到各种类型的对象,比如字符串、列表、自定义类等等。这些对象在内存中是如何存储的呢? 你可能会毫不犹豫地回答:“在堆中!”如果你这样回答了,那你大部分情况下是正确…...
 
AI:38-基于深度学习的抽烟行为检测
🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…...
 
Hadoop 配置 Kerberos 认证
1、安装 Kerberos 服务器和客户端 1.1 规划 服务端: bigdata3 客户端(Hadoop集群): bigdata0 bigdata1 bigdata2 192.168.50.7 bigdata0.example.com bigdata0 192.168.50.8 bigdata1.example.com bigdata1 192.168.50.9 b…...
 
在 Elasticsearch 中实现自动完成功能 2:n-gram
在第一部分中,我们讨论了使用前缀查询,这是一种自动完成的查询时间方法。 在这篇文章中,我们将讨论 n-gram - 一种索引时间方法,它在基本标记化后生成额外的分词,以便我们稍后在查询时能够获得更快的前缀匹配。 但在此…...
 
美客多、亚马逊卖家如何运用自养账号进行有效测评?
到了10月,卖家朋友们都在忙着准备Q4旺季吧! 首先,祝愿所有看到这条推文的卖家朋友,今年旺季都能爆单,赚得盆满钵满! 测评是珑哥常谈,一直备受关注,不论是新老卖家都是一个逃不开的…...
 
MyBatis的缓存,一级缓存,二级缓存
10、MyBatis的缓存 10.1、MyBatis的一级缓存 一级缓存是SqlSession级别的,通过同一个SqlSession对象 查询的结果数据会被缓存,下次执行相同的查询语句,就 会从缓存中(缓存在内存里)直接获取,不会重新访问…...
 
GitLab(1)——GitLab安装
目录 一、使用设备 二、使用rpm包安装 Gitlab国内清华源下载地址: ①下载命令如下: ②安装命令如下: ③删除rpm包 ④配置 ⑤重载 ⑥重启 ⑦配置自启动 ⑧打开8989端口并重启防火墙 三、GitLab登录 ①访问GitLab的URL ②输入用户…...
 
退税政策线上VR互动科普展厅为税收工作带来了强大活力
缴税纳税是每个公民应尽的义务和责任,由于很多人缺乏专业的缴税纳税操作专业知识和经验,因此为了提高大家的缴税纳税办事效率和好感度,越来越多地区税务局开始引进VR虚拟现实、web3d开发和多媒体等技术手段,基于线上为广大公民提供…...
 
centos 7.9离线安装wget
1.下载安装包 登录到wget官网上下载最新的wget的rpm安装包到本地 http://mirrors.163.com/centos/7/os/x86_64/Packages/ 2.上传安装包到服务器 3.安装 rpm -ivh wget-1.14-18.el7_6.1.x86_64.rpm 4.查看版本 wget -V...
 
【Java学习之道】网络编程的基本概念
引言 这一章我们将一同进入网络编程的世界。在开始学习网络编程之前,我们需要先了解一些基本概念。那么,我们就从“什么是网络编程”这个问题开始吧。 一、网络编程的基本概念 1.1 什么是网络编程 网络编程,顾名思义,就是利用…...
Restful API 设计示例
Restful API 设计示例 一 ,HTTP状态码 ✔️正例: 200: 返回成功 说明:200表示成功,4xx表示客户端异常,5xx表示服务端异常,参见HTTP 的返回码含义 ❌反例: 除了200就是500 说明࿱…...
 
为知笔记一个日记模板
<!DOCTYPE HTML><html><head> <meta http-equiv"Content-Type" content"text/html; charsetunicode"> <title>日记:</title><style id"wiz_custom_css">html, .wiz-editor-body {font-siz…...
 
软件测试中如何测试算法?
广义的算法是指解决问题的方案,小到求解数学题,大到制定商业策略,都可以叫做算法。而我们 今天讨论的软件测试中的算法,对应的英文单词为Algorithm ,专指计算机处理复杂问题的程序或 指令。 随着最近几年人工智能等领域的快速发展,算法受到前所未有的重视,算法测试也随之兴起。…...
CMOS图像传感器——Sony Ta-Kuchi图像传感器
2023 年国际图像传感器研讨会于 5 月在苏格兰克里夫举行,第四场会议重点关注汽车传感器,汽车应用中 CMOS 图像传感器 (CIS) 的技术要求与消费(移动)设备中的要求不同。毕竟,很少有人关心车载摄像头的像素数或图像美观度。主要驱动因素是安全性、可靠性和成本。 而汽车领域…...
 
一文理解登录鉴权(Cookie、Session、Jwt、CAS、SSO)
1 前言 登录鉴权是任何一个网站都无法绕开的部分,当系统要正式上线前都会要求接入统一登陆系统,一方面能够让网站只允许合法的用户访问,另一方面,当用户在网站上进行操作时也需要识别操作的用户,用作后期的操作审计。…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
 
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
 
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
 
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
 
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
 
USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
 
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
 
LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf
FTP 客服管理系统 实现kefu123登录,不允许匿名访问,kefu只能访问/data/kefu目录,不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...
 
使用Spring AI和MCP协议构建图片搜索服务
目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式(本地调用) SSE模式(远程调用) 4. 注册工具提…...
 
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
