RabbitMQ-交换机
文章目录
- 交换机
- fanout
- Direct
- topic
- Headers
- RPC
交换机
**交换机 **是消息队列中的一个组件,其作用类似于网络路由器。它负责将我们发送的消息转发到相应的目标,就像快递站将快递发送到对应的站点,或者网络路由器将网络请求转发到相应的服务器或客户端一样。交换机的主要功能是提供转发消息的能力,根据消息的路由规则将消息投递到合适的队列或绑定的消费者。
我们可以理解为,如果说一个快递站已经承受不了那么多的快递了,就建多个快递站。
fanout
扇出,广播
特点:消息会被转发到所有绑定到该交换机的队列
场景:很适用于发布订阅的场景,比如写日志,可以多个系统间共享

示例场景:

生产者代码:
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class FanoutProducer {// 交换机名字private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 创建交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
消费者代码:
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class FanoutConsumer {//交换机名字private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();//创建频道Channel channel1 = connection.createChannel();// 声明交换机channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");// 创建队列1,连接到交换机上String queueName = "xiaowang_queue";channel1.queueDeclare(queueName, true, false, false, null);channel1.queueBind(queueName, EXCHANGE_NAME, "");// 创建队列2,连接到交换机上String queueName2 = "xiaoli_queue";channel1.queueDeclare(queueName2, true, false, false, null);channel1.queueBind(queueName2, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建交付回调函数1DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [小王] Received '" + message + "'");};// 创建交付回调函数2DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [小李] Received '" + message + "'");};// 开始消费消息队列1channel1.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });// 开始消费消息队列2channel1.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });}
}
Direct
官方教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
特点:消息会根据路由键转发到指定的队列
场景:特定的消息只交给特定的系统(程序)来处理

注意:不同队列可以绑定相同的路由键

示例场景:
老板在发送消息同时会带上路由键,根据路由键找对应的队列来发送

生产者代码:
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class DirectProducer {private static final String EXCHANGE_NAME = "direct-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//声明交换机是directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");//输入消息 和 路由键Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length < 1) {continue;}String message = strings[0];String routingKey = strings[1];//发布消息的时候注意指定路由键channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");}}}
}
消费者代码:
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class DirectConsumer {private static final String EXCHANGE_NAME = "direct-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明交换机,不过生成者已经声明过了,消费者声不声明都可以channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建队列String queueName = "xiaoyu_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "xiaoyu"); //指定2交换机和路由键// 创建队列,随机分配一个队列名称String queueName2 = "xiaopi_queue";channel.queueDeclare(queueName2, true, false, false, null);channel.queueBind(queueName2, EXCHANGE_NAME, "xiaopi");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaoyu] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaopi] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {});channel.basicConsume(queueName2, true, xiaopiDeliverCallback, consumerTag -> {});}
}
topic
官方教程:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
特点:消息会根据一个模糊的路由键转发到指定的队列
场景:特定的一类消息可以交给特定的一类系统(程序)来处理
规则:
- :匹配一个单词,比如.orange,那么 abc.orange、ikun.orange 都能匹配
- #:匹配0个或多个单词,比如orange.#,那么orange,orange.abc.ikun都能匹配

应用场景:
老板要下发一个任务,让多个组来处理

生产者代码:
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class TopicProducer {private static final String EXCHANGE_NAME = "topic-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length < 1) {continue;}String message = strings[0];String routingKey = strings[1];channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");}}}
}
消费者代码:
package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class TopicConsumer {private static final String EXCHANGE_NAME = "topic-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建队列String queueName = "frontend_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "#.前端.#");// 创建队列String queueName2 = "backend_queue";channel.queueDeclare(queueName2, true, false, false, null);channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");// 创建队列String queueName3 = "product_queue";channel.queueDeclare(queueName3, true, false, false, null);channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback xiaoaDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaoa] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaobDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaob] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaocDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaoc] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, xiaoaDeliverCallback, consumerTag -> {});channel.basicConsume(queueName2, true, xiaobDeliverCallback, consumerTag -> {});channel.basicConsume(queueName3, true, xiaocDeliverCallback, consumerTag -> {});}
}
这样生产者发消息:前端.后端

就可以匹配到前端和后端两个队列

Headers
可以根据headers中的内容来指定发送到哪个队列,由于性能差,比较复杂,一般不推荐使用
RPC
支持用消息队列来模拟RPC的调用,但是一般没必要,直接用 Dubbo、GRPC 等 RPC 框架就好了。
相关文章:
RabbitMQ-交换机
文章目录 交换机fanoutDirecttopicHeadersRPC 交换机 **交换机 **是消息队列中的一个组件,其作用类似于网络路由器。它负责将我们发送的消息转发到相应的目标,就像快递站将快递发送到对应的站点,或者网络路由器将网络请求转发到相应的服务器…...
mapreduce中的MapTask工作机制(Hadoop)
MapTask工作机制 MapReduce中的Map任务是整个计算过程的第一阶段,其主要工作是将输入数据分片并进行处理,生成中间键值对,为后续的Shuffle和Sort阶段做准备。 1. 输入数据的划分: 输入数据通常存储在分布式文件系统(…...
景区文旅剧本杀小程序亲子公园寻宝闯关系统开发搭建
要开发景区文旅剧本杀小程序亲子公园寻宝闯关系统,您需要考虑以下步骤: 1. 设计游戏场景和规则:根据亲子公园的主题和特点,设计适合亲子游玩的游戏场景和规则。您需要考虑游戏的安全性、趣味性和互动性,确保孩子们能够…...
性能优化---webpack优化
1、如何提高webpack打包速度 a、优化Loader--影响Loader打包速度的首要元素是Babel,Babel 会将代码转为字符串生成 AST,然后对 AST 继续进行转变最后再生成新的代码,项目越大,转换代码越多,效率就越低。先优化 Loader …...
YOLOv9改进策略 | 损失函数篇 | EIoU、SIoU、WIoU、DIoU、FocusIoU等二十余种损失函数
一、本文介绍 这篇文章介绍了YOLOv9的重大改进,特别是在损失函数方面的创新。它不仅包括了多种IoU损失函数的改进和变体,如SIoU、WIoU、GIoU、DIoU、EIOU、CIoU,还融合了“Focus”思想,创造了一系列新的损失函数。这些组合形式的…...
贪心算法-跳跃游戏
给你一个非负整数数组 nums ,你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标,如果可以,返回 true ;否则,返回 false 。 示例 1: 输…...
sql知识总结二
一.报错注入 1.什么是报错注入? 这是一种页面响应形式,响应过程如下: 用户在前台页面输入检索内容----->后台将前台输入的检索内容无加区别的拼接成sql语句,送给数据库执行------>数据库将执行的结果返回给后台ÿ…...
VSCode和CMake实现C/C++开发
VSCode和CMake实现Ubuntu下C/C++开发总结 目录 0.简介1.Linux系统介绍2.开发环境搭建2.1 编译器,调试器安装2.2 CMake安装3.GCC编译器3.1 编译过程3.2 g++重要编译参数4.g++编译实战4.0 编译前4.1 直接编译4.2 生成库文件并编译4.3 编译后4.3.1 编译完成后的目录结构4.3.2 运行…...
【机器学习300问】74、如何理解深度学习中L2正则化技术?
深度学习过程中,若模型出现了过拟合问题体现为高方差。有两种解决方法: 增加训练样本的数量采用正则化技术 增加训练样本的数量是一种非常可靠的方法,但有时候你没办法获得足够多的训练数据或者获取数据的成本很高,这时候正则化技…...
C语言程序设计每日一练(4)
完全平方数 首先,我们需要明确什么是完全平方数。完全平方数是指一个整数,它可以表示为另一个整数的平方。例如,1、4、9、16等都是完全平方数,因为它们分别是1、2、3、4的平方。 现在,让我们回到这个问题。我们知道这…...
m4p转换mp3格式怎么转?3个Mac端应用~
M4P文件格式的诞生伴随着苹果公司引入FairPlay版权管理系统,该系统旨在保护音频的内容。M4P因此而生,成为受到FairPlay系统保护的音频格式,常见于苹果设备的iTunes等平台。 MP3文件格式的多个优点 MP3格式的优点显而易见。首先,其…...
全国产化无风扇嵌入式车载电脑在车队管理嵌入式车载行业应用
车队管理嵌入式车载行业应用 车队管理方案能有效解决车辆繁多管理困难问题,配合调度系统让命令更加精确有效执行。实时监控车辆状况、行驶路线和位置,指导驾驶员安全有序行驶,有效降低保险成本、事故概率以及轮胎和零部件的磨损与损坏。 方…...
爬虫入门——Request请求
目录 前言 一、Requests是什么? 二、使用步骤 1.引入库 2.请求 3.响应 三.总结 前言 上一篇爬虫我们已经提及到了urllib库的使用,为了方便大家的使用过程,这里为大家介绍新的库来实现请求获取响应的库。 一、Requests是什么࿱…...
创建一个javascript公共方法的npm包,js-tool-big-box,发布到npm上,一劳永逸
前端javascript的公共方法太多了,时间日期的,数值的,字符串的,搞复制的,搞网络请求的,搞数据转换的,几乎就是每个新项目,有的拷一拷,没有的继续写,放个utils目…...
【在线OJ系统】自定义注解实现分布式ID无感自增
实现思路 首先自定义参数注解,然后根据AOP思想,找到该注解作用的切点,也就是mapper层对于mapper层的接口在执行前都会执行该aop操作:获取到对于的方法对象,根据方法对象获取参数列表,根据参数列表判断某个…...
35. UE5 RPG制作火球术技能
接下来,我们将制作技能了,总算迈进了一大步。首先回顾一下之前是如何实现技能触发的,然后再进入正题。 如果想实现我之前的触发方式的,请看此栏目的31-33篇文章,讲解了实现逻辑,这里总结一下: …...
计算机网络 TCP/IP体系 物理层
一. TCP/IP体系 物理层 1.1 物理层的基本概念 物理层作为TCP/IP网络模型的最低层,负责直接与传输介质交互,实现比特流的传输。 要完成物理层的主要任务,需要确定以下特性: 机械特性:物理层的机械特性主要涉及网络…...
微服务相关
1. 微服务主要七个模块 中央管理平台:生产者、消费者注册,服务发现,服务治理,调用关系生产者消费者权限管理流量管理自定义传输协议序列化反序列化 2. 中央管理平台 生产者A在中央管理平台注册后,中央管理平台会给他…...
虚拟机下如何使用Docker(完整版)
Docker详细介绍: Docker 是一款开源的应用容器引擎,由Docker公司最初开发并在2013年发布。Docker的核心理念源自于操作系统级别的虚拟化技术,尤其是Linux上的容器技术(如LXC),它为开发人员和系统管理员提供…...
asp.net core 依赖注入后的服务生命周期
ASP.NET Core 依赖注入(DI)容器支持三种服务的生命周期选项,它们定义了服务实例的创建和销毁的时机。理解这三种生命周期对于设计健壯且高效的应用程序非常重要: 瞬时(Transient): 瞬时服务每次…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
