202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型
目录
- ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
- 代码演示:
- 生产者:producer
- 消费者:Consumer01
- 消费者:Consumer02
- 测试结果
- 完整代码
- ConnectionUtil
- Publisher
- Consumer01
- Consumer02
- pom.xml
★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费
fanout 类型就是广播模式。
fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。
生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的一条副本,
而消费者也能分别从不同的队列中读取消息,互不干扰。
▲ fanout类型的Exchange可以很好地模拟JMS的Pub-Sub消息模型。

代码演示:
都是在前面一篇的代码基础上修改的。
需求:使用 fanout 类型的Exchange ,实行发布-订阅的功能,其实就是创建一个生产者和两个消费者,实现广播模式的消息分发。

生产者:producer
在生产者中声明Exchange ,然后声明两个消息队列 Queue,
然后给这个Exchange 绑定 这个两个Queue


消费者:Consumer01
两个消费者的代码没啥区别,
消费方法的参数 autoAck 都是true, 都是自动确认消费。
两个消费者各自消费自己指定的消息队列。


消费者:Consumer02


测试结果
消费生产者发送10条消息,两个消费者都能各自消费到10条消息就是正确的。
消息生产者使用fanout这个广播的类型发送消息。

两个消费者都能消费到10条消息,正确。

完整代码
ConnectionUtil
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}
Publisher
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用fanout类型的exchange------就是广播模式
public class Publisher
{//常量:定义个Exchange的名字作为常量public static final String EXCHANGE_NAME = "myex01.fanout";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange名字 */BuiltinExchangeType.FANOUT,/* Exchange 类型 */true,/* 是否持久化 */false,/* 是否自动栅除 */false,/* 是否为内部的 Exchange */null /* 指定 Exchange 的额外属性 */);//声明多个消息队列------声明第1个消息队列channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第一个消息队列channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,"" /* 因为Exchange 是fanout类型,所以无需 路由key */,null /* 指定 Exchange 的额外属性 */);//声明第2个消息队列channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第2个消息队列channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,"" /* 因为Exchange 是fanout类型,所以无需 路由key */,null /* 指定 Exchange 的额外属性 */);//生产者发送10条消息for (int i = 1; i <= 10; i++){String message = "生产者发送的第【 " + i + " 】条消息的内容";//4、调用Channel的basicPublish()方法发送消息channel.basicPublish(EXCHANGE_NAME /* 向这个 fanout类型的 Exchange 发送消息 */,"" /* 因为 Exchange 是fanout 类型,所以有没有路由key都无所谓 */,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);System.out.println("生产者发送【 "+i+" 】条消息完成");}//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}
Consumer01
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者1
public class Consumer01
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE01 = "firstQueue";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(QUEUE01, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE01 /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}
}
Consumer02
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者2
public class Consumer02
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE02 = "secondQueue";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(QUEUE02, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE02 /*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmq_fanout</artifactId><version>1.0.0</version><name>rabbitmq_fanout</name><!-- 属性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依赖 --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>
相关文章:
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型
目录 ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型代码演示:生产者:producer消费者:Consumer01消费者:Consumer02测试结果 完整代码ConnectionUtilPublisherConsumer01Consumer02pom.xml ★ 使用 fanout 类型的Exchange …...
web 性能优化详解(Lighthouse工具、优化方式、强缓存和协商缓存、代码优化、算法优化)
1.性能优化包含的方面 优化性能概念宽泛,可以从信号、系统、计算机原理、操作系统、网络通信、DNS解析、负载均衡、页面渲染。只要结合一个实际例子讲述清楚即可。 2.什么是性能? Web 性能是客观的衡量标准,是用户对加载时间和运行时的直观…...
docker-compose部署elk(8.9.0)并开启ssl认证
docker部署elk并开启ssl认证 docker-compose部署elk部署所需yml文件 —— docker-compose-elk.yml部署配置elasticsearch和kibana并开启ssl配置基础数据认证配置elasticsearch和kibana开启https访问 配置logstash创建springboot项目进行测试kibana创建视图,查询日志…...
解决java.lang.IllegalArgumentException: servlet映射中的<url pattern>[demo1]无效
当我使用tomcat启动使用servlet项目时,出现了报错: java.lang.IllegalArgumentException: servlet映射中的<url pattern>[demo1]无效 显示路径错误,于是去检查Web.xml中的配置,发现是配置文件的路径写错了,少写了…...
软件测试学习(三)易用性测试、测试文档、软件安全性测试、网站测试
目录 易用性测试 用户界面测试 优秀Ul由什么构成 符合标准和规范 直观 一致 灵活 舒适 正确 实用 为有残疾障碍的人员测试:辅助选项测试 测试文档 软件文档的类型 文档测试的重要性 软件安全性测试 了解黑客的动机 威胁模式分析 网站测试 网页基…...
Java中,对象一定在堆中分配吗?
在我们的日常编程实践中,我们经常会遇到各种类型的对象,比如字符串、列表、自定义类等等。这些对象在内存中是如何存储的呢? 你可能会毫不犹豫地回答:“在堆中!”如果你这样回答了,那你大部分情况下是正确…...
AI:38-基于深度学习的抽烟行为检测
🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…...
Hadoop 配置 Kerberos 认证
1、安装 Kerberos 服务器和客户端 1.1 规划 服务端: bigdata3 客户端(Hadoop集群): bigdata0 bigdata1 bigdata2 192.168.50.7 bigdata0.example.com bigdata0 192.168.50.8 bigdata1.example.com bigdata1 192.168.50.9 b…...
在 Elasticsearch 中实现自动完成功能 2:n-gram
在第一部分中,我们讨论了使用前缀查询,这是一种自动完成的查询时间方法。 在这篇文章中,我们将讨论 n-gram - 一种索引时间方法,它在基本标记化后生成额外的分词,以便我们稍后在查询时能够获得更快的前缀匹配。 但在此…...
美客多、亚马逊卖家如何运用自养账号进行有效测评?
到了10月,卖家朋友们都在忙着准备Q4旺季吧! 首先,祝愿所有看到这条推文的卖家朋友,今年旺季都能爆单,赚得盆满钵满! 测评是珑哥常谈,一直备受关注,不论是新老卖家都是一个逃不开的…...
MyBatis的缓存,一级缓存,二级缓存
10、MyBatis的缓存 10.1、MyBatis的一级缓存 一级缓存是SqlSession级别的,通过同一个SqlSession对象 查询的结果数据会被缓存,下次执行相同的查询语句,就 会从缓存中(缓存在内存里)直接获取,不会重新访问…...
GitLab(1)——GitLab安装
目录 一、使用设备 二、使用rpm包安装 Gitlab国内清华源下载地址: ①下载命令如下: ②安装命令如下: ③删除rpm包 ④配置 ⑤重载 ⑥重启 ⑦配置自启动 ⑧打开8989端口并重启防火墙 三、GitLab登录 ①访问GitLab的URL ②输入用户…...
退税政策线上VR互动科普展厅为税收工作带来了强大活力
缴税纳税是每个公民应尽的义务和责任,由于很多人缺乏专业的缴税纳税操作专业知识和经验,因此为了提高大家的缴税纳税办事效率和好感度,越来越多地区税务局开始引进VR虚拟现实、web3d开发和多媒体等技术手段,基于线上为广大公民提供…...
centos 7.9离线安装wget
1.下载安装包 登录到wget官网上下载最新的wget的rpm安装包到本地 http://mirrors.163.com/centos/7/os/x86_64/Packages/ 2.上传安装包到服务器 3.安装 rpm -ivh wget-1.14-18.el7_6.1.x86_64.rpm 4.查看版本 wget -V...
【Java学习之道】网络编程的基本概念
引言 这一章我们将一同进入网络编程的世界。在开始学习网络编程之前,我们需要先了解一些基本概念。那么,我们就从“什么是网络编程”这个问题开始吧。 一、网络编程的基本概念 1.1 什么是网络编程 网络编程,顾名思义,就是利用…...
Restful API 设计示例
Restful API 设计示例 一 ,HTTP状态码 ✔️正例: 200: 返回成功 说明:200表示成功,4xx表示客户端异常,5xx表示服务端异常,参见HTTP 的返回码含义 ❌反例: 除了200就是500 说明࿱…...
为知笔记一个日记模板
<!DOCTYPE HTML><html><head> <meta http-equiv"Content-Type" content"text/html; charsetunicode"> <title>日记:</title><style id"wiz_custom_css">html, .wiz-editor-body {font-siz…...
软件测试中如何测试算法?
广义的算法是指解决问题的方案,小到求解数学题,大到制定商业策略,都可以叫做算法。而我们 今天讨论的软件测试中的算法,对应的英文单词为Algorithm ,专指计算机处理复杂问题的程序或 指令。 随着最近几年人工智能等领域的快速发展,算法受到前所未有的重视,算法测试也随之兴起。…...
CMOS图像传感器——Sony Ta-Kuchi图像传感器
2023 年国际图像传感器研讨会于 5 月在苏格兰克里夫举行,第四场会议重点关注汽车传感器,汽车应用中 CMOS 图像传感器 (CIS) 的技术要求与消费(移动)设备中的要求不同。毕竟,很少有人关心车载摄像头的像素数或图像美观度。主要驱动因素是安全性、可靠性和成本。 而汽车领域…...
一文理解登录鉴权(Cookie、Session、Jwt、CAS、SSO)
1 前言 登录鉴权是任何一个网站都无法绕开的部分,当系统要正式上线前都会要求接入统一登陆系统,一方面能够让网站只允许合法的用户访问,另一方面,当用户在网站上进行操作时也需要识别操作的用户,用作后期的操作审计。…...
嵌入式硬件设计核心要点与实战技巧
嵌入式硬件设计关键要点解析1. 嵌入式系统硬件架构概述嵌入式系统的硬件架构以CPU为核心,所有外围设备都围绕CPU进行配置。这种架构最显著的特点是硬件可裁剪性,设计者可以根据具体应用需求灵活调整系统组成。在典型的嵌入式硬件设计中,需要重…...
终极指南:如何用VideoDownloadHelper快速下载网页视频
终极指南:如何用VideoDownloadHelper快速下载网页视频 【免费下载链接】VideoDownloadHelper Chrome Extension to Help Download Video for Some Video Sites. 项目地址: https://gitcode.com/gh_mirrors/vi/VideoDownloadHelper 还在为无法保存网页视频而烦…...
OpenClaw+nanobot:个人学习计划智能生成与跟踪
OpenClawnanobot:个人学习计划智能生成与跟踪 1. 为什么需要AI驱动的学习计划助手 去年备考PMP认证时,我陷入了典型的学习规划困境:教材有600多页,模拟题库超过2000题,而我的备考时间只有8周。传统学习计划工具&…...
在 Windows 11 家庭版安装 Docker Desktop解决虚拟化问题
目录 前言 环境说明 架构原理 第一步:启用 Windows 虚拟化功能 第二步:修复 Hypervisor 启动配置 第三步:安装 WSL 2 与 Ubuntu 第四步:启动 Docker Desktop 第五步:验证安装 常见问题 总结 前言 Docker 是目…...
华为NPU上跑大模型?手把手教你用vLLM-Ascend插件部署Qwen2
华为NPU实战:基于vLLM-Ascend插件的高效大模型部署指南 1. 环境准备与基础配置 在华为Ascend NPU上部署大模型,首先需要确保硬件和软件环境满足基本要求。Atlas 800I A2或Atlas A2 Training系列设备是当前官方推荐的选择,操作系统需为Linux发…...
如何使用Aimeos构建高效产品目录:从基础商品到复杂配置型产品的完整指南
如何使用Aimeos构建高效产品目录:从基础商品到复杂配置型产品的完整指南 【免费下载链接】aimeos Integrated online shop based on Laravel 10 and the Aimeos e-commerce framework for ultra-fast online shops, scalable marketplaces, complex B2B application…...
从零到一:在Windows系统上部署JDK11与Neo4j 4.3.5开发环境
1. 环境准备:JDK11与Neo4j 4.3.5的版本选择 刚开始接触Java和图数据库时,我踩过不少版本不兼容的坑。比如有一次装了最新版JDK17,结果Neo4j死活启动不了,折腾半天才发现是版本冲突。所以现在每次搭建环境,我都会先确认…...
Leather Dress Collection多场景落地:独立设计师IP开发、虚拟试衣、NFT服饰创作
Leather Dress Collection多场景落地:独立设计师IP开发、虚拟试衣、NFT服饰创作 1. 项目概述 Leather Dress Collection 是一个基于Stable Diffusion 1.5的LoRA模型集合,专门用于生成各种皮革服装风格的图像。这个系列由Stable Yogi开发,包…...
腾讯混元翻译模型惊艳展示:HY-MT1.5-1.8B多语言翻译案例集
腾讯混元翻译模型惊艳展示:HY-MT1.5-1.8B多语言翻译案例集 1. 引言:当翻译遇见大模型,语言不再是障碍 想象一下,你正在阅读一篇最新的科技论文,原文是英文,但你的母语是中文。或者,你收到一封…...
QLVideo终极指南:三步让Mac视频预览功能全面升级
QLVideo终极指南:三步让Mac视频预览功能全面升级 【免费下载链接】QuickLookVideo This package allows macOS Finder to display thumbnails, static QuickLook previews, cover art and metadata for most types of video files. 项目地址: https://gitcode.com…...
