203、RabbitMQ 之 使用 direct 类型的 Exchange 实现 消息路由 (RoutingKey)
目录
- ★ 使用direct实现消息路由
- 代码演示这个情况二
- ConstantUtil 常量工具类
- ConnectionUtil 连接RabbitMQ的工具类
- Publisher 消息生产者
- 测试消息生产者
- Consumer01 消息消费者01
- 测试消费者结果:
- Consumer02 消息消费者02
- 测试消费者结果:
- 完整代码:
- ConstantUtil
- ConnectionUtil
- Publisher
- Consumer01
- Consumer02
- pom.xml
★ 使用direct实现消息路由
direct类型的Exchange: 生产者发送给Exchange 消息的路由key 要和 Exchange 绑定 Queue 消息队列的路由key一致,这个生产者发送的消息才会被Exchange 分发到那个消息队列里面去。
direct类型的Exchange会根据消息的路由key将消息分发给指定的Queue。
▲ 一个队列能与一个Exchange绑定多个路由key,
Q1队列与Exchange就绑定的路由key:orange
比如Q2队列与Exchange就绑定了两个路由key:black和green。
若消息生产者发送路由key为orange的消息到Exchange时,该消息将会被分发到Q1队列;
若发送路由key为black或green的消息到Exchange时,该消息都将被分发到Q2队列。
▲ 情况一
RabbitMQ也允许多个队列绑定相同的路由key,此时又变成了Pub-Sub模型。
比如C1、C2两个队列都绑定了black作为路由key,这意味着若消息生产者发送路由key为black的消息时,该消息将会被分发Q1和Q2两个队列,Q1、Q2两个队列将会收到各自不同副本——这就是Pub-Sub模型。
▲ 情况二
C1队列绑定了error作为路由key,而Q2则绑定了info、error和warning作为路由key,
这意味着若消息生产者发送路由key为error的消息时,该消息将会被分发Q1和Q2两个队列,Q1、Q2两个队列将会同时收到各自的副本;
若消息生产者发送路由key为info或warning的消息时,该消息只会被分发给Q2队列,Q1队列不会收到任何消息。
代码演示这个情况二
需求:就是演示这张图
过程:
1、创建一个消息生产者,2个消息消费者,
2、生产者声明一个Exchange,2个消息队列,
然后如图,Exchange绑定Q1这个消息队列用1个路由key,Exchange绑定Q2这个消息队列用3个路由key,
3、生产者发送30条消息给Exchange,每个路由key发送10条。
结果应该是:Q1消息队列得到10条消息,Q2消息队列得到30条消息。
4、创建2个消费者,分别消费Q1和Q2.
ConstantUtil 常量工具类
ConnectionUtil 连接RabbitMQ的工具类
Publisher 消息生产者
测试消息生产者
如图:结果正确。
消息队列queue_01,因为只绑定一个路由key–error,所以exchange分发10条路由key为error的消息给它。
消息队列queue_01,因为只绑定3个路由key–error、info、warning,所以exchange分发30条消息给它。
Consumer01 消息消费者01
消息消费者就没啥好说的,
Consumer01 消费 queue_01 消息队列,Consumer02 消费 queue_02 消息队列。
代码差不多。
测试消费者结果:
正常成功,符合期望
Consumer02 消息消费者02
测试消费者结果:
正常成功,符合期望
完整代码:
ConstantUtil
package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{// 消息队列的名称public final static String QUEUE01 = "queue_01";public final static String QUEUE02 = "queue_02";// Exchange的名称public static final String EXCHANGE_NAME = "myex02.direct";// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS = {"info", "error", "warning"};}
ConnectionUtil
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}
Publisher
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用 direct 类型的exchange------就是广播模式
public class Publisher
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange,--------调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定channel.exchangeDeclare(ConstantUtil.EXCHANGE_NAME,/* Exchange名字 */BuiltinExchangeType.DIRECT,/* Exchange 类型 */true,/* 是否持久化 */false,/* 是否自动栅除 */false,/* 是否为内部的 Exchange */null /* 指定 Exchange 的额外属性 */);//调用queueDeclare()方法声明队列----声明多个消息队列------声明第1个消息队列---------路由key是 errorchannel.queueDeclare(ConstantUtil.QUEUE01, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第一个消息队列channel.queueBind(ConstantUtil.QUEUE01, ConstantUtil.EXCHANGE_NAME,ConstantUtil.ROUTING_KEYS[1] /* exchange类型是direct,这里指定路由key */,null /* 指定 Exchange 的额外属性 */);//声明第2个消息队列--------这个exchange绑定这个queue,使用了三个路由key---info,error,warningchannel.queueDeclare(ConstantUtil.QUEUE02, true, false, false, null);//用循环为第2个消息队列绑定三个路由keyfor (int i = 0; i < ConstantUtil.ROUTING_KEYS.length; i++){//把 Exchange 和 Queue 绑定起来,绑定第2个消息队列channel.queueBind(ConstantUtil.QUEUE02,ConstantUtil.EXCHANGE_NAME,ConstantUtil.ROUTING_KEYS[i] /* 循环绑定路由key */,null /* 指定 Exchange 的额外属性 */);}//生产者发送30条消息for (int i = 1; i <= 30; i++){//用嵌套的三元运算符,动态的让消息的发送使用到不同的路由keyString routingKey = i <= 10 ? ConstantUtil.ROUTING_KEYS[0] :(i <= 20 ? ConstantUtil.ROUTING_KEYS[1] : ConstantUtil.ROUTING_KEYS[2]);//要发送的消息String message = "生产者发送的第【 " + i + " 】条消息的内容";//4、调用Channel 的 basicPublish() 方法发送消息channel.basicPublish(ConstantUtil.EXCHANGE_NAME /* 指定向这个Exchange发送消息 */,routingKey /* 动态指定路由key */,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);System.out.println("生产者发送【 " + i + " 】条消息完成,发送到Exchange的路由key是:"+routingKey);}//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}
Consumer01
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
//(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
//(2)通过Connection获取Channel。
//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布
// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//消息消费者1
public class Consumer01
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(ConstantUtil.QUEUE01, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(ConstantUtil.QUEUE01 /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}
}
Consumer02
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者2
public class Consumer02
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(ConstantUtil.QUEUE02, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(ConstantUtil.QUEUE02 /*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>direct</artifactId><version>1.0.0</version><name>direct</name><!-- 属性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依赖 --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies></project>
相关文章:

203、RabbitMQ 之 使用 direct 类型的 Exchange 实现 消息路由 (RoutingKey)
目录 ★ 使用direct实现消息路由代码演示这个情况二ConstantUtil 常量工具类ConnectionUtil 连接RabbitMQ的工具类Publisher 消息生产者测试消息生产者 Consumer01 消息消费者01测试消费者结果: Consumer02 消息消费者02测试消费者结果: 完整代码&#x…...

微服务+Java+Spring Cloud +UniApp +MySql智慧工地综合管理云平台源码,SaaS模式
智慧工地围绕工程现场人、机、料、法、环及施工过程中质量、安全、进度、成本等各项数据满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效. 智慧工地综合管理云平台源码,PC监管端、项目端;APP监管端、项目端、数据可视化大屏端源码…...

QMidi Pro for Mac:打造您的专属卡拉OK体验
你是否曾经厌倦于在KTV里与朋友们争夺麦克风?是否想要在家中享受自定义的卡拉OK体验?现在,有了QMidi Pro for Mac,一切变得简单而愉快! QMidi Pro是一款功能强大的卡拉OK播放器,专为Mac用户设计。它充分利…...

bindtap和catchtap的区别?
bindtap和catchtap都是小程序中用于绑定点击事件的方法。 1.bindtap的作用是绑定一个触摸事件并指定对应的处理函数。当用户点击或触摸相关元素时,会触发该事件,并执行相应的处理逻辑。 示例: <button bindtap"handleTap">…...

IDEA—java: 常量字符串过长问题解决
问题描述: Error: java: 常量字符串过长 问题分析: 字符串长度过长,导致 idea 默认使用的 javac 编译器编译不了。 解决办法: Javac 编译器改为 Eclipse 编译器。 File -> Settings -> Build,Execution,Deployment -&…...

云原生SIEM解决方案
云原生(Cloud Native)是一种基于云计算的软件开发和部署方法论,它强调将应用程序和服务设计为云环境下的原生应用,以实现高可用性、可扩展性和灵活性。 云原生的优势有哪些 高可用性:云原生可以实现应用程序的高可用…...

工艺边与定位孔设计经验规则总结
🏡《总目录》 目录 1,什么是工艺边和定位孔2,工艺边的设计经验原则2.1,避免尖锐角2.2,工艺边宽度设置2.3,工艺边的方向2.4,定位孔尺寸2.5,定位孔的位置3,去除工艺边的方法注意事项4,总结1,什么是工艺边和定位孔 工艺边是在SMT焊接时,为了PCB和导轨接触预留的PCB边…...

软件架构设计(业务架构、应用架构、数据架构、技术架构)
一、架构相关概念 1、系统 系统:由一群有关联的个体组成,根据某种规则运作,能完成个别原件不能独立完成的工作的群体。大的系统可以嵌套小系统,被嵌套的小系统往往称为大系统的子系统。 2、模块 模块是从逻辑上将系统分解&#…...

我们又组织了一次欧洲最大开源社区活动,Hugging Face 博客欢迎社区成员发帖、Hugging Chat 功能更新!...
每一周,我们的同事都会向社区的成员们发布一些关于 Hugging Face 相关的更新,包括我们的产品和平台更新、社区活动、学习资源和内容更新、开源库和模型更新等,我们将其称之为「Hugging News」。本期 Hugging News 有哪些有趣的消息࿰…...

学信息系统项目管理师第4版系列26_项目绩效域(下)
1. 项目工作绩效域 1.1. 涉及项目工作相关的活动和职能 1.2. 预期目标 1.2.1. 高效且有效的项目绩效 1.2.2. 适合项目和环境的项目过程 1.2.3. 干系人适当的沟通和参与 1.2.4. 对实物资源进行了有效管理 1.2.5. 对采购进行了有效管理 1.2.6. 有效处理了变更 1.2.7. 通…...

SQL sever中的索引
目录 一、索引定义 二、索引结构 2.1. B-树索引结构: 2.2. 哈希索引结构: 三、索引作用 四、索引与约束区别 五、索引级别 六、索引分类 6.1. 聚集索引(Clustered Index): 6.2. 非聚集索引(Noncl…...

多目标鳟海鞘算法(Multi-objective Salp Swarm Algorithm,MSSA)求解微电网优化MATLAB
一、微网系统运行优化模型 微电网优化模型介绍: 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 参考文献: [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、多目标鳟海鞘算法MSSA 多…...

软件测试之概念篇(需求,测试用例,BUG描述,产品的生命周期)
目录 1.什么是需求 2.什么是测试用例 3.什么是BUG 4.软件的生命周期 5.测试的生命周期 1.什么是需求 在大多数软件公司,一般会有两部分需求: 用户需求:可以理解为就是甲方提出需求,如果没有甲方,那么就是终端用…...

jwt详细介绍
jwt详细介绍 1.jwt 简介:2.jwt 工具类介绍3.案列演示:3.1并在web.xml进行配置过滤器 3.2过滤3.3全局响应设置 1.jwt 简介: 。JWT(JSON Web Token) 是一种用于安全传输信息的开放标准(RFC 7519)…...

电子笔记真的好用吗?手机上适合记录学习笔记的工具
提及笔记,不少人都会和学习挂钩,的确学习过程中我们经常会遇到很多难题,而经常记录笔记可以有效地帮助大家记住很多知识,而且时常拿出笔记查看一下,可方便巩固过去学习的知识。 手机作为大家日常随身携带的工具&#…...

用 SQL 找出某只股票连续上涨的最长天数
涉及多张中间表: SELECT MAX(consecutive_day) FROM (SELECT COUNT(*) as consecutive_dayFROM (SELECT trade_date, SUM(rise_mark) OVER (ORDER BY trade_date) AS days_no_gainFROM (SELECT trade_date,CASEWHEN closing_price > LAG(closing_price) OVER (ORDER BY tra…...

Vue 绑定 class 与 style
在应用界面中,某些元素的样式是动态的。class 与 style 绑定就是专门用来实现动态样式效果的技术。 如果需要动态绑定 class 或 style 样式,可以使用 v-bind 绑定。 绑定 class 样式【字符串写法】 适用于:类名不确定,需要动态指…...

【微服务部署】九、使用Docker Compose搭建高可用双机热备MySQL数据库
通常,一般业务我们使用云服务器提供的数据库,无论是MySQL数据库还是其他数据库,云服务厂商都提供了主备功能,我们不需要自己配置处理。而如果需要我们自己搭建数据库,那么考虑到数据的高可用性、故障恢复和扩展性&…...

HTTP Basic 认证
HTTP Basic 认证 难度等级:【初级】 由RFC7617定义的HTTP Basic认证是一种非常基础而简单的认证模式,因此叫他Basic认证。他本质上就是浏览器提供的一个接口,能够根据HTTP返回值,自动弹出一个登录框,让用户输入ID和密码…...

计算机网络第2章-HTTP和Web协议(2)
Web和HTTP 一个新型应用即万维网(World Wide Web)Web。 HTTP概况 Web的应用层协议是超文本传输协议(HTPP),它是Web的核心。 HTTP由两个程序实现:一个用户程序和一个服务器程序。 Web页面(W…...

css3 table表格
使用CSS3来美化HTML表格(table)可以提高表格的外观和可读性 表格样式: table { width: 100%; border-collapse: collapse; } width: 100%; 使表格宽度充满其容器。border-collapse: collapse; 合并相邻的表格边框,使表格看起来更整…...

【【萌新的SOC学习之AXI DMA环路测试介绍】】
萌新的SOC学习之AXI DMA环路测试介绍 AXI DMA环路测试 DMA(Direct Memory Access,直接存储器访问)是计算机科学中的一种内存访问技术。它允许某些计算机内部的硬件子系统可以独立地直接读写系统内存,而不需中央处理器(CPU)介入处…...

07 | @Entity 之间的关联关系注解如何正确使用?
实体与实体之间的关联关系一共分为四种,分别为 OneToOne、OneToMany、ManyToOne 和 ManyToMany;而实体之间的关联关系又分为双向的和单向的。实体之间的关联关系是在 JPA 使用中最容易发生问题的地方,接下来我将一一揭晓并解释。我们先看一下…...

深入理解AQS之ReentrantLock源码分析
开题:如何自己生成一把独占锁? 1. 管程 — Java同步的设计思想 管程:指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。 互斥:同一时刻只允许一个线程访问共享资源; 同步:线程之间…...

微软宣布延长Azure支持Apache Cassandra 3.11时间到2024年
近日微软表示为缓解管理员不适应升级节奏,将Azure托管实例对Apache Cassandra 3.11 的支持延长1年,从而时间将持续到2024年年底。 Multiable万达宝汽车ERP(www.multiable.com.cn/solutions_qc)支持自定义栏位,实时生产排产,提高生产效率 此…...

cv_bridge和opencv 记录
过程记录 背景 实验室笔记本上想跑一下vins-fusion。但是因为是有毕业师兄的代码,不敢随意破坏环境。 电脑环境: ubuntu 20.04 opencv 3.3.1 和 4.2.0 Error: vins-fusion中修改CMakeLists.txt,find_package(OpenCV 3.3.1 REQUIRED)&…...

关于OWL-carousel插件在ajax调用后需要重新实例化问题(页面无轮播效果)
维护公司老项目,发现问题,记录一下~ 1.产生原因 owl 已经实例已经存在,在ajax请求成功后并更改完页面数据后, 但是没有销毁之前实例,并重新生成新的实例,导致没有owl插件没有轮播效果. 2.解决方案 html: <div class"owl-slider …...

day4作业
1,判断一个整数是奇数还是偶数,至少有两种方式实现 #1,判断一个整数是奇数还是偶数,至少有两种方式实现 #1) number int(input("请输入一个数:"))if number % 2 0:print("偶数") else:print("奇数&qu…...

SSMS中的SQL sever代理
目录 一、用途: 二、用法 SQL Server代理(SQL Server Agent)是SQL Server Management Studio (SSMS) 2008中的一个功能模块,它用于执行和调度自动化任务、作业和脚本,如作业和警报。SQL Server代理允许在指定的时间间…...

估算总体标准差的极差均值估计法sigma = R/d2
总体标准差的估算值可以通过将平均极差除以合适的常数因子d2来计算。这个估算方法是用于估算总体标准差的一种常见方法,尤其在质量控制和过程监控中经常使用。 总体标准差的估算值 (平均极差) / d2 其中: "总体标准差的估算值" 表示用极差…...