203、RabbitMQ 之 使用 direct 类型的 Exchange 实现 消息路由 (RoutingKey)
目录
- ★ 使用direct实现消息路由
- 代码演示这个情况二
- ConstantUtil 常量工具类
- ConnectionUtil 连接RabbitMQ的工具类
- Publisher 消息生产者
- 测试消息生产者
- Consumer01 消息消费者01
- 测试消费者结果:
- Consumer02 消息消费者02
- 测试消费者结果:
- 完整代码:
- ConstantUtil
- ConnectionUtil
- Publisher
- Consumer01
- Consumer02
- pom.xml
★ 使用direct实现消息路由
direct类型的Exchange: 生产者发送给Exchange 消息的路由key 要和 Exchange 绑定 Queue 消息队列的路由key一致,这个生产者发送的消息才会被Exchange 分发到那个消息队列里面去。

direct类型的Exchange会根据消息的路由key将消息分发给指定的Queue。
▲ 一个队列能与一个Exchange绑定多个路由key,
Q1队列与Exchange就绑定的路由key:orange
比如Q2队列与Exchange就绑定了两个路由key:black和green。
若消息生产者发送路由key为orange的消息到Exchange时,该消息将会被分发到Q1队列;
若发送路由key为black或green的消息到Exchange时,该消息都将被分发到Q2队列。
▲ 情况一
RabbitMQ也允许多个队列绑定相同的路由key,此时又变成了Pub-Sub模型。
比如C1、C2两个队列都绑定了black作为路由key,这意味着若消息生产者发送路由key为black的消息时,该消息将会被分发Q1和Q2两个队列,Q1、Q2两个队列将会收到各自不同副本——这就是Pub-Sub模型。

▲ 情况二
C1队列绑定了error作为路由key,而Q2则绑定了info、error和warning作为路由key,
这意味着若消息生产者发送路由key为error的消息时,该消息将会被分发Q1和Q2两个队列,Q1、Q2两个队列将会同时收到各自的副本;
若消息生产者发送路由key为info或warning的消息时,该消息只会被分发给Q2队列,Q1队列不会收到任何消息。

代码演示这个情况二
需求:就是演示这张图

过程:
1、创建一个消息生产者,2个消息消费者,
2、生产者声明一个Exchange,2个消息队列,
然后如图,Exchange绑定Q1这个消息队列用1个路由key,Exchange绑定Q2这个消息队列用3个路由key,
3、生产者发送30条消息给Exchange,每个路由key发送10条。
结果应该是:Q1消息队列得到10条消息,Q2消息队列得到30条消息。
4、创建2个消费者,分别消费Q1和Q2.
ConstantUtil 常量工具类

ConnectionUtil 连接RabbitMQ的工具类

Publisher 消息生产者



测试消息生产者
如图:结果正确。
消息队列queue_01,因为只绑定一个路由key–error,所以exchange分发10条路由key为error的消息给它。
消息队列queue_01,因为只绑定3个路由key–error、info、warning,所以exchange分发30条消息给它。


Consumer01 消息消费者01
消息消费者就没啥好说的,
Consumer01 消费 queue_01 消息队列,Consumer02 消费 queue_02 消息队列。
代码差不多。


测试消费者结果:
正常成功,符合期望

Consumer02 消息消费者02


测试消费者结果:
正常成功,符合期望

完整代码:
ConstantUtil
package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{// 消息队列的名称public final static String QUEUE01 = "queue_01";public final static String QUEUE02 = "queue_02";// Exchange的名称public static final String EXCHANGE_NAME = "myex02.direct";// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS = {"info", "error", "warning"};}
ConnectionUtil
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}
Publisher
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用 direct 类型的exchange------就是广播模式
public class Publisher
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange,--------调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定channel.exchangeDeclare(ConstantUtil.EXCHANGE_NAME,/* Exchange名字 */BuiltinExchangeType.DIRECT,/* Exchange 类型 */true,/* 是否持久化 */false,/* 是否自动栅除 */false,/* 是否为内部的 Exchange */null /* 指定 Exchange 的额外属性 */);//调用queueDeclare()方法声明队列----声明多个消息队列------声明第1个消息队列---------路由key是 errorchannel.queueDeclare(ConstantUtil.QUEUE01, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第一个消息队列channel.queueBind(ConstantUtil.QUEUE01, ConstantUtil.EXCHANGE_NAME,ConstantUtil.ROUTING_KEYS[1] /* exchange类型是direct,这里指定路由key */,null /* 指定 Exchange 的额外属性 */);//声明第2个消息队列--------这个exchange绑定这个queue,使用了三个路由key---info,error,warningchannel.queueDeclare(ConstantUtil.QUEUE02, true, false, false, null);//用循环为第2个消息队列绑定三个路由keyfor (int i = 0; i < ConstantUtil.ROUTING_KEYS.length; i++){//把 Exchange 和 Queue 绑定起来,绑定第2个消息队列channel.queueBind(ConstantUtil.QUEUE02,ConstantUtil.EXCHANGE_NAME,ConstantUtil.ROUTING_KEYS[i] /* 循环绑定路由key */,null /* 指定 Exchange 的额外属性 */);}//生产者发送30条消息for (int i = 1; i <= 30; i++){//用嵌套的三元运算符,动态的让消息的发送使用到不同的路由keyString routingKey = i <= 10 ? ConstantUtil.ROUTING_KEYS[0] :(i <= 20 ? ConstantUtil.ROUTING_KEYS[1] : ConstantUtil.ROUTING_KEYS[2]);//要发送的消息String message = "生产者发送的第【 " + i + " 】条消息的内容";//4、调用Channel 的 basicPublish() 方法发送消息channel.basicPublish(ConstantUtil.EXCHANGE_NAME /* 指定向这个Exchange发送消息 */,routingKey /* 动态指定路由key */,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);System.out.println("生产者发送【 " + i + " 】条消息完成,发送到Exchange的路由key是:"+routingKey);}//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}
Consumer01
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
//(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
//(2)通过Connection获取Channel。
//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布
// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//消息消费者1
public class Consumer01
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(ConstantUtil.QUEUE01, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(ConstantUtil.QUEUE01 /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}
}
Consumer02
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者2
public class Consumer02
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(ConstantUtil.QUEUE02, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(ConstantUtil.QUEUE02 /*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>direct</artifactId><version>1.0.0</version><name>direct</name><!-- 属性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依赖 --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies></project>
相关文章:
203、RabbitMQ 之 使用 direct 类型的 Exchange 实现 消息路由 (RoutingKey)
目录 ★ 使用direct实现消息路由代码演示这个情况二ConstantUtil 常量工具类ConnectionUtil 连接RabbitMQ的工具类Publisher 消息生产者测试消息生产者 Consumer01 消息消费者01测试消费者结果: Consumer02 消息消费者02测试消费者结果: 完整代码&#x…...
微服务+Java+Spring Cloud +UniApp +MySql智慧工地综合管理云平台源码,SaaS模式
智慧工地围绕工程现场人、机、料、法、环及施工过程中质量、安全、进度、成本等各项数据满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效. 智慧工地综合管理云平台源码,PC监管端、项目端;APP监管端、项目端、数据可视化大屏端源码…...
QMidi Pro for Mac:打造您的专属卡拉OK体验
你是否曾经厌倦于在KTV里与朋友们争夺麦克风?是否想要在家中享受自定义的卡拉OK体验?现在,有了QMidi Pro for Mac,一切变得简单而愉快! QMidi Pro是一款功能强大的卡拉OK播放器,专为Mac用户设计。它充分利…...
bindtap和catchtap的区别?
bindtap和catchtap都是小程序中用于绑定点击事件的方法。 1.bindtap的作用是绑定一个触摸事件并指定对应的处理函数。当用户点击或触摸相关元素时,会触发该事件,并执行相应的处理逻辑。 示例: <button bindtap"handleTap">…...
IDEA—java: 常量字符串过长问题解决
问题描述: Error: java: 常量字符串过长 问题分析: 字符串长度过长,导致 idea 默认使用的 javac 编译器编译不了。 解决办法: Javac 编译器改为 Eclipse 编译器。 File -> Settings -> Build,Execution,Deployment -&…...
云原生SIEM解决方案
云原生(Cloud Native)是一种基于云计算的软件开发和部署方法论,它强调将应用程序和服务设计为云环境下的原生应用,以实现高可用性、可扩展性和灵活性。 云原生的优势有哪些 高可用性:云原生可以实现应用程序的高可用…...
工艺边与定位孔设计经验规则总结
🏡《总目录》 目录 1,什么是工艺边和定位孔2,工艺边的设计经验原则2.1,避免尖锐角2.2,工艺边宽度设置2.3,工艺边的方向2.4,定位孔尺寸2.5,定位孔的位置3,去除工艺边的方法注意事项4,总结1,什么是工艺边和定位孔 工艺边是在SMT焊接时,为了PCB和导轨接触预留的PCB边…...
软件架构设计(业务架构、应用架构、数据架构、技术架构)
一、架构相关概念 1、系统 系统:由一群有关联的个体组成,根据某种规则运作,能完成个别原件不能独立完成的工作的群体。大的系统可以嵌套小系统,被嵌套的小系统往往称为大系统的子系统。 2、模块 模块是从逻辑上将系统分解&#…...
我们又组织了一次欧洲最大开源社区活动,Hugging Face 博客欢迎社区成员发帖、Hugging Chat 功能更新!...
每一周,我们的同事都会向社区的成员们发布一些关于 Hugging Face 相关的更新,包括我们的产品和平台更新、社区活动、学习资源和内容更新、开源库和模型更新等,我们将其称之为「Hugging News」。本期 Hugging News 有哪些有趣的消息࿰…...
学信息系统项目管理师第4版系列26_项目绩效域(下)
1. 项目工作绩效域 1.1. 涉及项目工作相关的活动和职能 1.2. 预期目标 1.2.1. 高效且有效的项目绩效 1.2.2. 适合项目和环境的项目过程 1.2.3. 干系人适当的沟通和参与 1.2.4. 对实物资源进行了有效管理 1.2.5. 对采购进行了有效管理 1.2.6. 有效处理了变更 1.2.7. 通…...
SQL sever中的索引
目录 一、索引定义 二、索引结构 2.1. B-树索引结构: 2.2. 哈希索引结构: 三、索引作用 四、索引与约束区别 五、索引级别 六、索引分类 6.1. 聚集索引(Clustered Index): 6.2. 非聚集索引(Noncl…...
多目标鳟海鞘算法(Multi-objective Salp Swarm Algorithm,MSSA)求解微电网优化MATLAB
一、微网系统运行优化模型 微电网优化模型介绍: 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 参考文献: [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、多目标鳟海鞘算法MSSA 多…...
软件测试之概念篇(需求,测试用例,BUG描述,产品的生命周期)
目录 1.什么是需求 2.什么是测试用例 3.什么是BUG 4.软件的生命周期 5.测试的生命周期 1.什么是需求 在大多数软件公司,一般会有两部分需求: 用户需求:可以理解为就是甲方提出需求,如果没有甲方,那么就是终端用…...
jwt详细介绍
jwt详细介绍 1.jwt 简介:2.jwt 工具类介绍3.案列演示:3.1并在web.xml进行配置过滤器 3.2过滤3.3全局响应设置 1.jwt 简介: 。JWT(JSON Web Token) 是一种用于安全传输信息的开放标准(RFC 7519)…...
电子笔记真的好用吗?手机上适合记录学习笔记的工具
提及笔记,不少人都会和学习挂钩,的确学习过程中我们经常会遇到很多难题,而经常记录笔记可以有效地帮助大家记住很多知识,而且时常拿出笔记查看一下,可方便巩固过去学习的知识。 手机作为大家日常随身携带的工具&#…...
用 SQL 找出某只股票连续上涨的最长天数
涉及多张中间表: SELECT MAX(consecutive_day) FROM (SELECT COUNT(*) as consecutive_dayFROM (SELECT trade_date, SUM(rise_mark) OVER (ORDER BY trade_date) AS days_no_gainFROM (SELECT trade_date,CASEWHEN closing_price > LAG(closing_price) OVER (ORDER BY tra…...
Vue 绑定 class 与 style
在应用界面中,某些元素的样式是动态的。class 与 style 绑定就是专门用来实现动态样式效果的技术。 如果需要动态绑定 class 或 style 样式,可以使用 v-bind 绑定。 绑定 class 样式【字符串写法】 适用于:类名不确定,需要动态指…...
【微服务部署】九、使用Docker Compose搭建高可用双机热备MySQL数据库
通常,一般业务我们使用云服务器提供的数据库,无论是MySQL数据库还是其他数据库,云服务厂商都提供了主备功能,我们不需要自己配置处理。而如果需要我们自己搭建数据库,那么考虑到数据的高可用性、故障恢复和扩展性&…...
HTTP Basic 认证
HTTP Basic 认证 难度等级:【初级】 由RFC7617定义的HTTP Basic认证是一种非常基础而简单的认证模式,因此叫他Basic认证。他本质上就是浏览器提供的一个接口,能够根据HTTP返回值,自动弹出一个登录框,让用户输入ID和密码…...
计算机网络第2章-HTTP和Web协议(2)
Web和HTTP 一个新型应用即万维网(World Wide Web)Web。 HTTP概况 Web的应用层协议是超文本传输协议(HTPP),它是Web的核心。 HTTP由两个程序实现:一个用户程序和一个服务器程序。 Web页面(W…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
