当前位置: 首页 > news >正文

RabbitMQ学习(七):交换器

〇、前言

在之前的内容中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消 费者(工作进程)。在今天的内容中,我们将做一些完全不同的事情——我们将消息传达给多个消费者。这种模式 称为 “发布/订阅”。

为了说明这种模式,我们将以一个简单的日志系统为例。它将由两个程序组成:第一个程序将发出日志消 息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘, 另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费 者 。

一、Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange)。

交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

1.1 Exchanges 的类型

总共有以下类型: 直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

1.2 无名 exchange

在前面部分中我们对 exchange 没有任何操作,但仍然能够将消息发送到队列。 原因是我们使用的是默认交换,我们通过空字符串(“”)进行标识。

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换机的名称。空字符串表示默认或无名称交换机。消息能路由发送到队列中其实 是由routingKey(bindingkey)绑定 key 指定的,如果它存在的话。

1.3 绑定

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定 。

二、fanout类型

2.1 fanout概述

Fanout 这种类型非常简单。正如名称,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange类型。

2.2 fanout实战

消费者1将接受到的消息打印在控制台,核心代码:

//绑定fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//创建一个临时的队列 队列的名称是随机的,当消费者断开和该队列的连接时 队列自动删除
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");

消费者2将接受到的消息保存在磁盘,核心代码:

//绑定fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//生成一个临时的队列 队列的名称是随机的,当消费者断开和该队列的连接时 队列自动删除
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");//写入磁盘
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");File file = new File("C:\\work\\rabbitmq_info.txt");FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("数据写入文件成功");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

生产者核心代码:

/*** 声明一个 exchange* 1.exchange 的名称* 2.exchange 的类型*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发消息
while (sc.hasNext()) {channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);
}

结果是两个消费者都能收到生产者发送的消息。

需要注意的是,交换机把消息分发给了两个队列,在每个队列中一个消息仍然只是被消费了一次

三、direct类型

3.1 direct概述

首先我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");绑定之后的 意义由其交换类型决定。

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性——它只能进行无意识的 广播。在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定 routingKey 队列中去。

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green。

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blac和kgreen 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。这也被称为多重绑定。

3.2 direct实战

消费者1(处理error)核心代码:

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk", false, false, false, null);
//绑定error
channel.queueBind(queueName, EXCHANGE_NAME, "error");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;FileUtils.writeStringToFile(new File("C:\\work\\rabbitmq_info.txt"), message, "UTF-8");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

消费者2(处理info和warning)核心代码:

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console", false, false, false, null);
//绑定info和warning
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {})

生产者核心代码:

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug","调试 debug 信息");//发消息
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));
}

四、topic类型

4.1 topic类型的作用

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是 使用了 direct 交换机,从而有能实现有选择性地接收日志。

尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型。

4.2 topic类型的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse"、"nyse.vmw"、 "quick.orange.rabbit"这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的 :*(星号)可以代替一个单词 、#(井号)可以替代零个或多个单词。

4.3 topic匹配案例

下图绑定关系如下 :

Q1-->绑定的是 :中间带 orange 带 3 个单词的字符串(*.orange.*)

Q2-->绑定的是 :最后一个单词是 rabbit 的 3 个单词(*.*.rabbit) 、

第一个单词是 lazy 的多个单词(lazy.#)

匹配结果如下表:

quick.orange.rabbit

被队列 Q1、Q2 接收到

lazy.orange.elephant

被队列 Q1、Q2 接收到

quick.orange.fox

被队列 Q1 接收到

lazy.brown.fox

被队列 Q2 接收到

lazy.pink.rabbit

虽然满足两个绑定,但只被队列 Q2 接收一次

quick.brown.fox

不匹配任何绑定,被丢弃

quick.orange.male.rabbit

是四个单词,不匹配任何绑定,被丢弃

lazy.orange.male.rabbit

是四个单词,但匹配 Q2

4.4 topic类型与fanout、direct的关系

当队列绑定关系是下列这种情况时需要引起注意 :

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,其实是fanout类型

  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 类型

4.5 topic实战

消费者1核心代码:

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q1 队列与绑定关系
channel.queueDeclare("Q1", false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("接收队列:"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

消费者2核心代码:

channel.exchangeDeclare(EXCHANGE_NAME, "topic");//声明 Q2 队列与绑定关系
channel.queueDeclare("Q2", false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");System.out.println("接收队列:"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

生产者核心代码:

//交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");// Q1-->绑定的是 中间带 orange 带 3 个单词的字符串(*.orange.*)
// Q2-->绑定的是 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
//                 第一个单词是 lazy 的多个单词(lazy.#)
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");//发消息
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
}

相关文章:

RabbitMQ学习(七):交换器

〇、前言在之前的内容中&#xff0c;我们创建了一个工作队列。我们假设的是工作队列背后&#xff0c;每个任务都恰好交付给一个消 费者(工作进程)。在今天的内容中&#xff0c;我们将做一些完全不同的事情——我们将消息传达给多个消费者。这种模式 称为 “发布/订阅”。为了说…...

cmd命令大全

文章目录变量输入输出逻辑命令符控制语句函数注释变量 在批处理中&#xff0c;变量全部是弱类型的&#xff0c;通常可以当做字符串处理 1.初始化定义 set varthis a var 2.获取变量值 %var% 3.链接 set varcat%var1%%var2% 4.截取 %var:~n,m% n是起点&#xff0c;m是长度&…...

Git的使用方法(保姆级)

一、安装git二、创建凭据 ①打开电脑的凭据管理器git:https://gitee.com是固定写法用户名、密码是你创建gitee的用户名、密码三、在gitee中创建一个仓库四、项目提交到仓库的方法①选择一个项目交由git管理按照步骤一中召唤小黑窗口输入 git init 就可以出现.git文件夹②右键选…...

TypeScript 学习之泛型

泛型使用 组件不仅能够支持当前的数据类型&#xff0c;同时也能支持未来的数据类型。就需要使用泛型。使用泛型就不会丢失类型信息&#xff0c;使用any会丢失类型信息。 function identity<T>(arg: T): T {return arg; }identity 添加了类型变量T, T 捕获用户传入的类型…...

新手学习node.js基础,node.js安装过程,node.js运行环境及javascript运行环境.

学习node.js1.什么是node.js?2.node.js中的javaScript运行环境3.node.js可以做什么&#xff1f;4. node.js学习思路5.node.js环境的安装6.如何在node.js中执行JavaScript代码1.什么是node.js? node.js是一个基于Chrome v8 引擎的JavaScript运行环境(后端) node.js官网 &…...

Maven的安装步骤(保姆级安装教程)

一、安装本地Maven 选择你需要的maven版本下载&#xff1a;官网下载传送门 我使用的是3.6.1版本&#xff1a;maven-3.6.1-bin.zip 二、安装 把下载好的maven压缩包解压到一个没有中文&#xff0c;空格或其他特殊字符的文件夹&#xff0c;如&#xff1a; 三、配置环境变量…...

Axure教程(一)——线框图与高保真原型图制作

前面我们学习了制作网页的技能&#xff0c;从这里开始我们来学习前端必备技能&#xff0c;就是用Axure来制作原型图&#xff0c;一方面我们能提前绘制出我们所需的页面&#xff0c;这在我们开发的时候能节省大量的时间&#xff0c;另一方面我们能通过给用户进行体验从而能够发现…...

wholeaked:一款能够追责数据泄露的文件共享工具

关于wholeaked wholeaked是一款功能强大的文件共享工具&#xff0c;该工具基于go语言开发&#xff0c;可以帮助广大系统管理员和安全研究人员在组织发生数据泄露的时候&#xff0c;迅速找出数据泄露的“始作俑者”。 wholeaked可以获取被共享的文件信息以及接收人列表&#x…...

动态规划——股票问题全解

引入 股票问题是一类动态问题&#xff0c;我们需要对其状态进行判定分析来得出答案 但其实&#xff0c;我们只需要抓住两个点&#xff0c;持有和不持有&#xff0c;在这两种状态下分析问题会简单清晰许多 下面将会对各个问题进行分析讲解&#xff0c;来解释什么是持有和不持…...

想做游戏开发要深入c/c++还是c#?

根据题主描述提三点建议&#xff1a; 先选择一个语言、选择一个引擎能入行确保精通一个及已入行的情况下&#xff0c;技多不压身不必想日后的”退而求其次“&#xff0c;现在的事情还没有开始做就想以后&#xff0c;太过虚无及功利了 下面是这三点的详细说明&#xff1a; 【选…...

【JMeter】【Mac】如何在Mac上打开JMeter

平常我们在Windows电脑里打开JMeter&#xff0c;只要双击JMeter.bat即可打开&#xff0c;那我换了Mac后&#xff0c;该怎么打开JMeter呢 一、命令行打开JMeter 1、打开JMeter路径 cd /Users/work/apache-jmeter-5.2/bin 2、运行JMeter sh jmeter 3、如果涉及到一些权限无…...

JAVA面试八股文一(并发与线程)

并发的三大特性原子性&#xff1a;cpu在执行过程不可以暂停然后再调度&#xff0c;不可以中断&#xff0c;要不全部执行完&#xff0c;要不全部不执行。可见性&#xff1a;当多个线程访问同一个变量时&#xff0c;一个线程改变了这个变量的值&#xff0c;其他线程能够立即看到修…...

C语言二级指针

目录一、1. 指针的作用2.二级指针3. 为什么要用二级指针一、 1. 指针的作用 内存的存储区就像一池湖水&#xff0c;数据就像池水里面的鱼&#xff0c;如果不用内存寻址的方式&#xff0c;那么当你找某个特定数据的时候&#xff0c;就相当于在一池湖水里找某一条叫做“张三”的…...

[java-面试]初级、中级、高级具备的技术栈和知识点

&#x1f31f;1.java初级1. Java基础知识&#xff1a;语法、包装类、泛型、数据结构和继承&#xff0c;以及基础API。2. Java开发工具&#xff1a;如Eclipse&#xff0c;NetBeans&#xff0c;Maven等。3. Java Web开发技术&#xff1a;如Servlet&#xff0c;JSP&#xff0c;Str…...

「5」线性代数(期末复习)

&#x1f680;&#x1f680;&#x1f680;大家觉不错的话&#xff0c;就恳求大家点点关注&#xff0c;点点小爱心&#xff0c;指点指点&#x1f680;&#x1f680;&#x1f680; 目录 第四章 向量组的线性相关性 &5&#xff09;向量空间 第五章 相似矩阵及二次型 &a…...

记一次20撸240的沙雕威胁情报提交(2019年老文)

0x01 起因 这是一篇沙雕文章&#xff0c;没什么技术含量&#xff0c;大家娱乐一下就好 前几个月&#xff0c;我的弟弟突然QQ给我发来了一条消息&#xff0c;说要买个QQ飞车的cdk&#xff0c;我作为一个通情达理的好哥哥&#xff0c;自然不好意思回绝&#xff0c;直接叫他发来…...

佳能镜头EOS系统EF协议逆向工程(三)解码算法

目录 数据结构 解码算法 解码效果 这篇文章基于上两篇文章继续&#xff0c; 佳能镜头EOS系统EF协议逆向工程&#xff08;一&#xff09;转接环电路设计_佳能ef自动对焦协议_岬淢箫声的博客-CSDN博客本文属于专栏——工业相机。此专栏首先提供我人工翻译的法语文档部分&…...

搞互联网吧,线下生意真不是人干的

搞互联网吧&#xff0c;线下生意真不是人干的 应该是正月初几里吧&#xff0c;好巧不巧的被迫去参加了一下我们初中同学的聚会。其实毕业这么多年&#xff0c;无论大学&#xff0c;高中还是中学&#xff0c;类似的聚会我都是能躲则躲&#xff0c;有特别想见的同学也都是私下单…...

MySQL性能调优与设计——MySQL中的索引

MySQL中的索引 InnoDB存储引擎支持以下几种常见索引&#xff1a;B树索引、全文索引、哈希索引&#xff0c;其中比较关键的是B树索引。 B树索引 InnoDB中的索引自然也是按照B树来组织的&#xff0c;B树的叶子节点用来存放数据。 聚集索引/聚簇索引 InnoDB中使用了聚集索引&…...

这5个代码技巧,让我的 Python 加速了很多倍

Python作为一种功能强大的编程语言&#xff0c;因其简单易学而受到很多初学者的青睐。它的应用领域又非常广泛&#xff1a;科学计算、游戏开发、爬虫、人工智能、自动化办公、Web应用开发等等。 而在数据科学领域中&#xff0c;Python 是使用最广泛的编程语言&#xff0c;并且…...

HTML 语义化

目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案&#xff1a; 语义化标签&#xff1a; <header>&#xff1a;页头<nav>&#xff1a;导航<main>&#xff1a;主要内容<article>&#x…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

Swagger和OpenApi的前世今生

Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章&#xff0c;二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑&#xff1a; &#x1f504; 一、起源与初创期&#xff1a;Swagger的诞生&#xff08;2010-2014&#xff09; 核心…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...

解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist

现象&#xff1a; android studio报错&#xff1a; [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决&#xff1a; 不要动CMakeLists.…...

LOOI机器人的技术实现解析:从手势识别到边缘检测

LOOI机器人作为一款创新的AI硬件产品&#xff0c;通过将智能手机转变为具有情感交互能力的桌面机器人&#xff0c;展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家&#xff0c;我将全面解析LOOI的技术实现架构&#xff0c;特别是其手势识别、物体识别和环境…...

stm32wle5 lpuart DMA数据不接收

配置波特率9600时&#xff0c;需要使用外部低速晶振...

API网关Kong的鉴权与限流:高并发场景下的核心实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中&#xff0c;API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关&#xff0c;Kong凭借其插件化架构…...