当前位置: 首页 > news >正文

【SpringCloud学习笔记】RabbitMQ(中)

1. 交换机概述

前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ,当时采用的是生产者直接将消息发布给队列,但是实际开发中不建议这么做,更加推荐生产者将消息发布到交换机(exchange),然后由exchange路由到队列,其架构如下所示:

可以看出,在发布-订阅模型中新增一个"交换机"角色,此后各个角色的任务如下:

  • publisher:不再是将message直接转发到queue,而是将message转发给exchange
  • exchange:一方面接收来自publisher生产的消息;另一方面,依据route key以及type将消息路由给绑定的不同的队列
  • queue:与以前一样,暂存消息,供消费者消费,另外还需要同交换机建立绑定关系
  • consumer:与以前一样,订阅queue中的消息,并进行业务处理消费消息

注意:由于我们的exchange不暂存消息,只做消息的路由,因此如果没有queue与exchange绑定或者routing key设置错误,就会导致消息丢失!!!

2. 交换机类型

RabbitMQ提供的交换机类型有如下四种:

  1. Fanout Exchange:扇出交换机,形象来说就是"广播交换机",会将消息路由给所有绑定的queue
  2. Direct Exchange:定向交换机,基于RoutingKey发给订阅的queue
  3. Topic Exchange: 通配符订阅,在Direct的基础上引入通配符
  4. Headers Exchange: 头匹配,基于MQ的消息头匹配,使用场景较少(此处不讲解)

2.1 Fanout Exchange

下面是Fanout Exchange的工作流程图:

特征:Fanout Exchange将消息路由给全部跟它绑定的queue
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:fanout.queue1、fanout.queue2image.png
  2. 在RabbitMQ控制台中新建一个Fanout类型的Exchange:fanout.exchange

image.png

  1. 将fanout.exchange与fanout.queue1、fanout.queue2分别建立binding关系

image.png

  1. 新建两个方法用于模拟consumer,分别监听fanout.queue1以及fanout.queue2队列
/*** 订阅fanout.queue1队列* @param msg 消息*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {log.info("listener1 从【fanout.queue1】接收到消息:" + msg);
}/*** 订阅fanout.queue2队列* @param msg 消息*/
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {log.info("listener2 从【fanout.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给fanout.exchange
/*** 测试FanoutExchange交换机类型*/
@Test
public void testFanoutExchange() {// 1. 定义exchange名称String exchangeName = "fanout.exchange";// 2. 定义消息体String msg = "震惊!某大学频频被曝出食堂安全问题";// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
  1. 观察结果

image.png
结果如上图所示:说明fanout.exchange雀氏将消息广播给了所有与之绑定的queue

2.2 Direct Exchange

特点:Direct Exchange要求在与queue建立binding关系的时候定义一个BindingKey,之后publisher生产者携带消息的同时也会指定RoutingKey,只有RoutingKey与BindingKey一致的queue才会被路由消息

工作流程如上图所示,其中queue1与exchange的Binding Key为"blue"以及"red",queue2与exchange的Binding Key为"yellow"以及"red",此时当Routing Key为"blue",Direct Exchange只会将消息路由给queue1
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:direct.queue1、direct.queue2

image.png

  1. 在RabbitMQ控制台中新建一个Direct类型的Exchange:direct.exchange

image.png

  1. 将direct.exchange与direct.queue1、direct.queue2分别建立binding关系,其中与queue1的binding key为"blue"与"red",与queue2的binding key为"yellow"与"red"

image.png

  1. 新建两个方法用于模拟consumer,分别监听direct.queue1以及direct.queue2队列
/*** 订阅direct.queue1队列* @param msg 消息*/
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {log.info("listener1 从【direct.queue1】接收到消息:" + msg);
}/*** 订阅direct.queue2队列* @param msg 消息*/
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {log.info("listener2 从【direct.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给direct.exchange,并指定routing key为"blue"
/*** 测试DirectExchange交换机类型*/
@Test
public void testDirectExchange() {// 1. 定义交换机名称String exchangeName = "direct.exchange";// 2. 定义消息体String msg = "今日份消息只交给幸运色为blue的哦~";// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
  1. 观察结果

image.png
结果符合预期,只有direct.queue1能够接受到消息!

2.3 Topic Exchange

Topic Exchange与Direct Exchange非常类似,都可以依据BindingKey以及RoutingKey的匹配程度进而路由给特定符合条件的queue,但是Topic Exchange定义Binding Key可以为一组词,中间用"."进行分隔,并且支持使用通配符,规则如下:

  • #:匹配0个或者多个词
  • *:匹配1个单词

例如现在queue1的BindingKey为"china.#“,而queue2的BindingKey为”#.news",而RoutingKey为"china.reports",此时可以路由给queue1,但是无法路由给queue2,如果RoutingKey为"china.news"则queue1、queue2均可以被路由
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:topic.queue1、topic.queue2

image.png

  1. 在RabbitMQ控制台中新建一个Topic类型的Exchange:topic.exchange

image.png

  1. 将topic.exchange与topic.queue1、topic.queue2分别建立binding关系,其中与queue1的binding key为"china.#“,与queue2的binding key为”#.news"

image.png

  1. 新建两个方法用于模拟consumer,分别监听topic.queue1以及topic.queue2队列
/*** 订阅topic.queue1队列* @param msg 消息*/
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {log.info("listener1 从【topic.queue1】接收到消息:" + msg);
}/*** 订阅topic.queue2队列* @param msg 消息*/
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {log.info("listener2 从【topic.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给topic.exchange,并指定routing key为"china.news"
/*** 测试TopicExchange交换机类型*/
@Test
public void testTopicExchange() {// 1. 定义交换机名称String exchangeName = "topic.exchange";// 2. 定义消息体String msg = "中国新闻报,快来买呀!";// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
  1. 观察结果

image.png
证明通配符生效!

3. 声明队列和交换机

前面我们收发消息的过程是使用Java代码实现的,但是创建Queues以及Exchanges仍然需要我们在RabbitMQ提供的控制台实现,那么如何使用Java代码来创建Queue以及Exchange呢?
SpringAMQP API:

  • 声明队列:使用new Queue("队列名称")创建
  • 声明交换机:使用new FanoutExchange("交换机名称")(以FanoutExchange为例)
  • 声明绑定关系:使用BindingBuilder.bind(队列对象).to(交换机对象)构建

3.1 Fanout声明

步骤:

  1. 编写一个配置类,使用@Configuration 声明
  2. 内部配置Queue、Exchange、Binding,并使用@Bean声明
@Configuration
public class FanoutConfig {/*** 声明FanoutExchange交换机* @return 返回FanoutExchange对象*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("code.fanout.exchange");}/*** 声明FanoutQueue队列* @return 返回FanoutQueue队列*/@Beanpublic Queue fanoutQueue() {return new Queue("code.fanout.queue");}/*** 声明绑定关系* @param fanoutExchange 交换机* @param fanoutQueue 队列* @return 绑定关系*/@Beanpublic Binding fanoutBinding(FanoutExchange fanoutExchange, Queue fanoutQueue) {return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}
}

3.2 Direct声明

步骤:

  1. 编写一个配置类,使用@Configuration 声明
  2. 内部配置Queue、Exchange、Binding,并使用@Bean声明
@Configuration
public class DirectConfig {/*** 声明一个DirectExchange交换机* @return 返回一个DirectExchange类型对象*/@Beanpublic DirectExchange directExchange() {return new DirectExchange("code.direct.exchange");}/*** 声明一个Queue队列* @return 返回一个Queue类型对象*/@Beanpublic Queue directQueue() {return new Queue("code.direct.queue");}/*** 声明一个绑定关系* @return 返回Binding对象*/@Beanpublic Binding directBinding(DirectExchange directExchange, Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with("");}
}

3.3 基于注解声明

注解声明格式:

@Component
@Slf4j
public class AnnotateRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue("annotate.direct.queue"),key = {"blue", "red"},exchange = @Exchange(name = "annotate.direct.exchange", type = ExchangeTypes.DIRECT)))public void listenAnnotateDirect(String msg) {log.info("接收到消息:" + msg);}
}

4. 消息转换器

4.1 现象演示

前面我们都是将字符串类型的数据作为消息进行传输,那么如果是对象类型的消息呢,我们尝试发送一个自定义User类型作为消息传输:

/*** 自定义User类型* @author 米饭好好吃*/
@Data
@AllArgsConstructor
public class User implements Serializable {private String name;private Integer age;
}
@Test
public void testSendObject() {// 1. 声明队列名称String queueName = "work.queue";// 2. 定义消息体User user = new User("jack", 22);// 3. 发送消息rabbitTemplate.convertAndSend(queueName, user);
}

从RabbitMQ控制台中查看消息内容如下:
image.png

4.2 追踪源码

image.png
我们发现实际调用了convertMessageIfNecessary(object)方法,我们继续追踪进去:
image.png
该方法判断object是否为Message类型,如果不是就调用getRequiredMessageConverter()获取所需的消息转换器,继续追踪进去:
image.png
image.png
该方法返回了一个SimpleMessageConverter实例对象,因此我们回到上一层,获取到MessageConverter实例后又调用了toMessage方法,我们继续追踪进去观察是如何转换消息的:
image.png
在AbstruectMessageConverter中实现了toMessage方法,而createMessage方法在子类 SimpleMessageConverter重写了该方法:
image.png
可以看出调用了SerialzationUtils.serialize(object)进行了序列化,继续追踪观察到底是如何序列化的:
image.png
可以看出是借助ObjectOutputStream进行序列化的,而这这个是JDK默认的序列化方式,该方式有如下缺点:

  • 序列化过程不够安全,可能存在注入风险
  • 序列化结果可读性较差
  • 序列化结果占用体积较大

因此我们需要重写消息转换器中的序列化机制:

4.3 自定义JSON序列化器

因此JDK原生序列化器有诸多确定,因此我们需要使用自定义的JSON序列化器,此处需要引入jackson-databind相关依赖

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
/*** 消息转换器配置* @author 米饭好好吃*/
@Configuration
public class MessageConvertConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

验证结果:
image.png
在控制台中我们可以发现消息格式就是熟悉的JSON格式了

相关文章:

【SpringCloud学习笔记】RabbitMQ(中)

1. 交换机概述 前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ&#xff0c;当时采用的是生产者直接将消息发布给队列&#xff0c;但是实际开发中不建议这么做&#xff0c;更加推荐生产者将消息发布到交换机(exchange)&#xff0c;然后由exchange路由…...

【C++】类和对象的引入

文章目录 前言一、类的定义二、类的访问控制与封装三、类的作用域四、类的实例化五、类的存储方式及大小计算六、隐藏的this指针 前言 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题。 C是基于面向对象的&…...

11.5.k8s中pod的调度-cordon,drain,delete

目录 一、概念 二、使用 1.cordon 停止调度 1.1.停止调度 1.2.解除恢复 2.drain 驱逐节点 2.1.驱逐节点 2.2.参数介绍 2.3.解除恢复 3.delete 删除节点 一、概念 cordon节点&#xff0c;drain驱逐节点&#xff0c;delete 节点&#xff0c;在对k8s集群节点执行维护&am…...

Java中线程的创建方式

一、继承Thread类&#xff0c;重写run方法 public class MyThread{public static void main(String[] args) {Thread threadDome new ThreadDome();threadDome.start();} }class ThreadDome extends Thread{Overridepublic void run() {for (int i 0; i < 5; i) {try {Th…...

猫头虎推荐20个值得体验的通用大模型

猫头虎推荐20个值得体验的通用大模型 &#x1f680; 大家好&#xff0c;我是猫头虎&#xff0c;一名专注于科技领域的自媒体博主。今天是周一&#xff0c;新的开始&#xff0c;我们来深入探讨一下当前最值得体验的通用大模型。这些AI模型不仅功能强大&#xff0c;而且在各自领…...

Novartis诺华制药社招综合能力性格动机问卷入职测评笔试题库答案及包过助攻

【华东同舟求职】由资深各行业从业者建立的一站式人才服务网络平台&#xff0c;现阶段目标是“提升全市场各行业岗位信息的流动性和透明度”。我们接受众多行业机构的直接委托发布&#xff0c;并尽力通过各种方法搜寻高价值岗位信息。事实上&#xff0c;我们以发现不为人知的优…...

Adam优化算法

Adam优化算法 Adam&#xff08;Adaptive Moment Estimation&#xff09;是一种用于训练深度学习模型的优化算法&#xff0c;由Diederik P. Kingma和Jimmy Ba在2014年提出。Adam结合了动量和自适应学习率的方法&#xff0c;具有高效、稳定和适应性强的特点&#xff0c;被广泛应…...

MYSQL 三、mysql基础知识 7(MySQL8其它新特性)

一、mysql8新特性概述 MySQL从5.7版本直接跳跃发布了8.0版本 &#xff0c;可见这是一个令人兴奋的里程碑版本。MySQL 8版本在功能上 做了显著的改进与增强&#xff0c;开发者对MySQL的源代码进行了重构&#xff0c;最突出的一点是多MySQL Optimizer优化器进行了改进。不仅在速度…...

git error: does not have a commit checked out fatal: adding files failed

git add net error: net/ does not have a commit checked out fatal: adding files failed这个错误是因为尝试将一个尚未被提交的文件夹添加到Git中。解决这个问题的方法是先将文件夹中的文件提交到Git仓库中&#xff0c;然后再将文件夹添加到Git中。 首先&#xff0c;需要进…...

Java Websocket分片发送

一、分片发送和接收(复杂) 如果数据量太大&#xff0c;需要分多次发送&#xff0c; 需要考虑数据划分和重组的问题。 二、具体思路 每次发送和接收用一个布尔值变量指定是否为最后一个分片。 三、具体使用 (一)字符串分片发送&#xff1a; sendText(文本&#xff0c; 布尔值)…...

vivado NODE、PACKAGE_PIN

节点是Xilinx部件上用于路由连接或网络的设备对象。它是一个 WIRE集合&#xff0c;跨越多个瓦片&#xff0c;物理和电气 连接在一起。节点可以连接到单个SITE_&#xff0c; 而是简单地将NETs携带进、携带出或携带穿过站点。节点可以连接到 任何数量的PIP&#xff0c;并且也可以…...

JavaEE、SSM基础框架、JavaWeb、MVC(认识)

目录 一、引言 &#xff08;0&#xff09;简要介绍 &#xff08;1&#xff09;主要涉及的学习内容 &#xff08;2&#xff09;学习的必要性 &#xff08;3&#xff09;适用学习的人群&#xff08;最好有这个部分的知识基础&#xff09; &#xff08;4&#xff09;这个基础…...

【漏洞复现】飞企互联-FE企业运营管理平台 treeXml.jsp SQL注入漏洞

0x01 产品简介 飞企互联-FE企业运营管理平台是一个基于云计算、智能化、大数据、物联网、移动互联网等技术支撑的云工作台。这个平台可以连接人、链接端、联通内外&#xff0c;支持企业B2B、C2B与020等核心需求&#xff0c;为不同行业客户的互联网转型提供支持。其特色在于提供…...

Android基础-运行时权限

一、引言 随着智能手机和移动互联网的普及&#xff0c;Android操作系统作为其中的佼佼者&#xff0c;其安全性问题日益受到关注。为了保障用户数据的安全和隐私&#xff0c;Android系统引入了权限机制来管理和控制应用程序对系统资源和用户数据的访问。特别是在Android 6.0&am…...

postman断言及变量及参数化

1&#xff1a;postman断言 断言&#xff1a;判断接口是否执行成功的过程 针对接口请求完成之后&#xff0c;针对他的响应状态码及响应信息进行判断,代码如下&#xff1a; //判断响应信息状态码是否正确 pm.test("Status code is 200", function () { pm.response.…...

安装和使用TrinityCore NPCBot

安装TrinityCore NPCBot 官网&#xff1a;GitHub - trickerer/Trinity-Bots: NPCBots for TrinityCore and AzerothCore 3.3.5 基本安装方法 Follow TrinityCore Installation Guide (https://TrinityCore.info/) to install the server firstDownload NPCBots.patch and put …...

Hvv--知攻善防应急响应靶机--Linux2

HW–应急响应靶机–Linux2 所有靶机均来自 知攻善防实验室 靶机整理&#xff1a; 夸克网盘&#xff1a;https://pan.quark.cn/s/4b6dffd0c51a#/list/share百度云盘&#xff1a;https://pan.baidu.com/s/1NnrS5asrS1Pw6LUbexewuA?pwdtxmy 官方WP&#xff1a;https://mp.weixin.…...

replaceAll is not a function 详解

先说说原因&#xff1a; 在chrome 浏览器中使用 replaceAll 报这个错误&#xff0c;是因为chrome 版本过低&#xff0c; 在chrome 85 以上版本才支持 用法 replaceAll(pattern, replacement)const paragraph "I think Ruths dog is cuter than your dog!"; console…...

如何设置天锐绿盾的数据防泄密系统

设置天锐绿盾的数据防泄密系统&#xff0c;可以按照以下步骤进行&#xff1a; 一、系统安装与初始化 在线或离线安装天锐绿盾数据防泄密系统&#xff0c;确保以管理员身份运行安装包&#xff0c;并按照安装向导的提示完成安装。输入序列号进行注册&#xff0c;激活系统。 二…...

003 gitee怎样将默认的私有仓库变成公开仓库

先点击“管理”&#xff0c; 再点击“基本信息” 在“是否开源”里&#xff0c; 选择&#xff1a;开源...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...