当前位置: 首页 > news >正文

实用篇-MQ消息队列

一、初识MQ

通讯分为同步通讯和异步通讯,同步通讯就比如我们日常生活中的打电话,看直播,能够得到及时的反馈。而异步通讯则类似于聊天软件聊天,不需要建立实时的连接,并且可以进行建立多个业务一起异步执行

1. 同步通讯的优缺点

springcloud多个微服务之间的调用(就是你请求我,我请求它,它请求它)是基于Feign,Feign就是同步的方式,就会导致下面的问题:

问题1: 如果A微服务需要请求B、C、D、E、F微服务时,必须按顺序,不能同时去请求,也就导致耗时大增,业务系统扛不住并发吞吐量

问题2: 还有一个问题就是当A请求B的时候,C、D、E、F就会干等着,相当于空闲,空闲相当于浪费系统资源,简单说就是资源利用不够充分

问题3: 当A请求B时,如果B挂了,此时C、D、E、F就执行不到,简单说就是在B那里阻塞了,也就是级联失败

同步调用(本质原因是只能一对一,不能一对多):

优点:

时效性强,可以立即得到结果

缺点:

1、耦合度高: 每次加入新的需求,都要修改原来的代码

2、性能下降: 调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和

3、资源浪费: 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

4、级联失败: 如果服务提供者出现问题,所有调用方都会跟着出问题,导致整个微服务群故障

2. 异步通讯的优缺点

异步调用常见实现就是事件驱动模式,如图。有用户支付了之后,支付服务只管把谁支付了告诉Broker,不需要等待 '订单服务、仓储服务、短信服务' 就可以马上告诉用户支付成功了,Broker得到了消息之后,就会把消息分发给 '订单服务、仓储服务、短信服务',保证了高效运行

异步相对于同步,拿上图(异步)来说的话,有如下优点

1、如果是同步,我们需要修改订单或仓储或短信业务代码时,需要到 '支付服务' 里面修改,很麻烦,不解耦。但是,如果是异步的话,我们只需要到 '订单服务、仓储服务、短信服务' 中去修改即可,修改好之后, '订单服务、仓储服务、短信服务' 重新向Broker发起订阅即可

2、如果是同步,我们需要添加 'xx服务' 时,需要到 '支付服务' 里面修改,很麻烦,不解耦。但是,如果是异步的话,我们只需要在上图的最右侧添加 'xx服务',然后跟Broker建立订阅即可

3、如果是同步,我们需要删除 '短信服务' 时,需要到 '支付服务' 里面修改,很麻烦,不解耦。但是,如果是异步的话,我们只需要在上图把 '短信服务' 与 Broker之间的订阅关系取消即可

当Broker与 '某个服务' 之间有订阅关系的时候,这些服务才会接收到来自Broker(负责分发消息)的通知

异步调用的优点:

1、服务解耦。例如修改业务代码

2、性能提升,吞吐量提高。例如请求时间,'支付业务' 不需要等待 '订单服务、仓储服务、短信服务' ,直接告诉Broker,然后就告诉用户支付成功了

3、服务没有强依赖,没有级联问题。例如 '仓储服务' 挂了,不会影响 '支付服务',会由其它没挂的服务继续响应Broker的通知

4、流量削峰。例如Broker接收到来自 '支付服务' 的庞大请求,Broker会把这些大请求抗住做缓存,众多订阅Broker的微服务可以基于自己的能力接受Broker中的请求

异步调用的缺点:

1、依赖于Broker的可靠性、安全性、吞吐能力。例如Broker挂掉了的话,就全废了

2、架构复杂,业务没有明显的流程线,不利于追踪管理

3、时效性差。例如 '支付服务' 把请求给Broker之后,'支付服务' 并不能知道什么时候会被 '订单服务、仓储服务、短信服务' 处理

如果你不需要知道用户的支付是否成功,仅仅想让 '订单服务、仓储服务、短信服务' 来响应用户的请求,那么异步调用是适合的,否择不建议异步

3. mq常见技术介绍

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker

MQ常见的具体技术实现如下

RabbitMQ

ActiveMQ

RocketMQ

Kafka

公司/社区

Rabbit

Apache

阿里

Apache

开发语言

Erlang

Java

Java

Scala&Java

协议支持

AMQP、XMPP、SMTP、STOMP

AMQP、XMPP、SMTP、STOMP、OpenWire、REST

自定义协议

自定义协议

可用性

一般

单机吞吐量

一般

非常高

消息延迟

微秒级

毫秒级

毫秒级

毫秒内

消息可靠性

一般

一般

二、RabbitMQ消息队列

官网:https://www.rabbitmq.com

RabbitMQ的结构图如下,生产者 -> 交换机 -> 信道 -> 队列 -> 消费者

publisher表示消息发送者,会把消息发送到交换

systemctl start docker    # 启动docker服务

exchange表示交换机,会把消息路由到不同的队列

queue表示队列,队列负责暂存消息

consumer表示消费者,会从队列中获取消息,并且处理消息

VirtualHost表示虚拟主机,当我们创建用户的时候,用户会有自己的虚拟主机,虚拟主机之间相互隔离,每个虚拟主机都有自己的exchange、queue

1. 下载安装启动

学习的是基于Docker,在Centos7虚拟机中使用Docker来安装RabbitMQ

第一步: 启动docker(上面已经学过Docker容器,这里启动就行)

systemctl start docker    # 启动docker服务

第二步: 在Windows下载rabbitmq镜像,然后上传到Centos7虚拟机的/temp目录。下载下来是一个名称为mq.tar的镜像压缩包

rabbitmq镜像快速下载: https://cowtransfer.com/s/a76639caf2f54e

然后执行下面那个命令,就可以把名称为mq.tar的rabbitmq镜像压缩包,导入到Docker

第三步: 在Docker主机,使用rabbitmq镜像来创建运行rabbitmq容器。

-e参数表示环境变量,例如自定义rabbitmq的默认用户名为keke,密码为123456

-name参数表示自定义rabbitmq容器的名字

-hostname参数表示自定义主机名,由于我们不是集群部署,所以主机名可写可不写,下面是写上了

-p参数表示端口映射,例如开放了两个端口,分别是15672管理后台端口、5672是服务端口

-d参数表示后台运行

rabbitmq:3-management 表示镜像名称,由于我们的rabbitmq镜像是这个版本,所以镜像名称需要带上镜像版本号

docker run \-e RABBITMQ_DEFAULT_USER=keke \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

第四步: 测试。浏览器访问http://192.168.229.128:15672

第五步: 熟悉页面

2. 消息模型介绍

MQ的官方文档RabbitMQ Tutorials — RabbitMQ

上图中的官方文档中给出了5种消息模型,分别叫 'Hello World' 、'Work Queues' 、'Publish/Subscribe' 、'Routing' 、'Topics'

根据用途和交换机类型进行如下分类,前两种是根据用途,后三种是根据交换机类型进行分类

1、基本消息队列: Hello World

2、工作消息队列: Work Queues

3、广播发布订阅(使用Fanout交换机): Publish/Subscribe

4、路由发布订阅(使用Direct交换机): Routing

5、主题发布订阅(使用Topic交换机): Topics

我们会在下面逐步学习这五种消息模型

3. Hello World 队列模型案例

Hello World 叫简单队列模型,官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括如下三个角色,如图

下面我们以项目代码来演示一下,如何在代码中使用 'RabbitMQ消息队列' 的 'Hello World 队列模型'。具体操作如下

第一步: 下载写好的项目文件 'mq-demo',下载到的是mq-demo.zip,解压会得到一个mq-demo文件夹,用IDEA打开

链接:https://pan.baidu.com/s/1z-oYXu1Q_LjuaTYfO6W-WQ 
提取码:mfkw

如果导入后,maven加载很慢,半天加载不进来,可以参见这篇博文

maven解决加载慢

第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器

systemctl start docker   # 启动docker服务
docker start mq          # 启动rabbitmq容器

第三步: 修改mq-demo工程代码,把相关参数改成自己的

第四步:运行PublisherTest

首先是建立连接工厂作用是跟RabbitMQ建立连接

然后设置一些信息,设置IP地址,MQ的消息通信端口是5672,设置虚拟主机keke用户有/这个虚拟主机的访问权限,设置用户名密码

然后是建立连接,打开控制台,有一个连接已经建立了

接着是建立通道,通道一旦建立就可以向队列中发送消息了,我们打开控制台,可以看到有一个通道

声明队列

 发送消息

最后是关闭连接,释放资源 

我们这里已经发送了一个消息,但是消息是否被消费者接受,我们不关心,这就是解耦,就是异步

第五步:运行ConsumerTest,和PulisherTest的逻辑大致一样

首先还是建立连接工厂作用是跟RabbitMQ建立连接,设置连接参数

然后是创建通道

接着是声明队列,为什么这里还要声明一次队列,刚才生产者进程不是已经声明过一次队列了吗?

这是因为我们的生产者和消费者进程的启动先后顺序是不确定的,万一消费者先启动,想要找队列去消费却找不到,所以为了避免这种情况的发生,消费者和生产者各自都去声明一个队列

当然代码的重复执行不会导致创建出来两个队列,而是一种保险措施

继续执行代码,只要消费者发送的消息一旦被消费,消息队列中的消息就会被删除

总结

三、SpringAMQP消息中间件

SpringAMQP简单说就是一个中间件,提供了模板方便我们操作各种消息模型

上面已经学了RabbitMQ消息队列是有五种消息模型,并且我们演示了其中的基本消息队列(Hello World)。用的是官方API,来实现的基本消息队列(Hello World)。会发现官方提供的API写起来麻烦,所以我们直接用的就是写好的 'mq-demo' 项目,只是简单运行了解一下运行机制

下面我们将学习SpringAMQP,可以大大简化API的书写。也就是使用SpringAMQP来分别演示五种消息模型

根据用途和交换机类型进行如下分类,前两种是根据用途,后三种是根据交换机类型进行分类

1、基本消息队列: Hello World
这个是上面使用官方API演示过一次(但是接下来也会使用SpringAMQP再演示一次),下面四个还没演示过(接下来用SpringAMQP演示)

2、工作消息队列: Work Queues

3、广播发布订阅(使用Fanout交换机): Publish/Subscribe

4、路由发布订阅(使用Direct交换机): Routing

5、主题发布订阅(使用Topic交换机): Topics

什么是AMQP ?Advanced Messaging

用于在应用程序或之间传递业务消息的开放标准。该协议与语言、平台无关,更符合微服务中独立性的要求

什么是SpringAMQP ?Advanced Message Queuing Protocol 高级消息队列协议

SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板(我们之前学习SpringRedis的时候,Spring提供了RedisTemplate)来发送和接收消息,所以api都定义好了。简单说就是一个中间件,提供了模板方便我们操作各种消息模型
SpringAMQP包含两部分内容,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。也就是spring-amqp是规范,具体由spring-rabbit来实现

SpringAmqp的官网: Spring AMQP

SpringAMQP的特点:

1、提供了监听器容器,可以用异步来处理入站消息,也就是负责接收消息

2、提供了RabbitTemplate,用于发送和接收消息,一般用于发送消息,接受用监听器容器去做

3、提供了RabbitAdmin,用于自动声明队列、交换和绑定。声明也就是创建的意思,会自动创建消息队列,不需要手动创建消息队列

1. HelloWorld模型的消息发送

Hello World 叫简单队列模型,官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括如下三个角色,如图

案例: 在 '实用篇-RabbitMQ消息队列' 的 '3. Hello World 队列模型案例' ,里面写用官方API实现的基本消息队列。因为有五种模型都要演示一次,现在我们同样是先演示Hello World 队列模型,这次就不是用官方API来写了,而是使用SpringAMQP来写,代码更加的简洁。用的项目还是mq-demo。我把下载链接拿下来了

这里我们先做消息发送,下一课在做消息接收。消息发送的具体操作过程如下

第一步(已做好可跳过): 下载写好的项目文件 'mq-demo',下载到的是mq-demo.zip,解压会得到一个mq-demo文件夹

第二步(已做好可跳过): 在idea软件,打开mq-demo,mq-demo是一个项目工程

第三步(已做好可跳过): 确保已经启动Docker、并启动Docker里面的rabbitmq容器

systemctl start docker   # 启动docker服务
docker start mq          # 启动rabbitmq容器

第四步: 打开idea,在父工程(也就是mq-demo)的pom.xml中引入spring-amqp的依赖(已做好可跳过)

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第五步: 在publisher(也就是publisher子工程,这个子工程是负责发消息的)中,的application.yml写入如下

spring:rabbitmq:host: 192.168.229.128port: 5672 #端口username: keke  #连接rabbitmq的用户名password: 123456 #连接rabbitmq的密码virtual-host: / #huanfqc对应的虚拟主机

第六步: 在publisher子工程的src/test/java目录新建cn.itcast.mq.spring.SpringAmqpTest类,写入如下

package cn.itcast.mq.spring;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageSimpleQueue(){String queueName = "simple-queue2";String message = "我是使用amqp模板,往队列里面发消息,这里是keke";rabbitTemplate.convertAndSend(queueName,message);}}

第六步: 浏览器访问 http://你的ip:15672。例如如下。打开浏览器后手动在页面创建一个叫simple-queue2的队列,不然上面运行找不到该往哪个队列发消息

 

第七步: 运行publisher子工程的SpringAmqpTest类,此时就会向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息

第八步: 我们还没有消费者,所以消息会一直在队列里面放着,下面我们将来学习消息接收,也就是编写消费者,去消费队列里面的信息。如下

2. HelloWorld模型的消息接收

注意: 消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能,也就是你找不到已经被消费了的消息

上面学了如何在publisher子工程,向simple.queue2队列,发送消息。下面我们就学习如何让consumer子工程去处理simple.queue2队列里面的消息

第一步: 在consumer子工程的application.yml,添加如下

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.229.128port: 5672username: kekepassword: 123456virtual-host: /

第二步:由于消费者要做的事情是监听消息,那么spring已经帮我们跟mq建立了连接,唯一要做的就是要监听哪个队列,监听到队列后应该做什么操作?所以我们把操作封装到方法就可以了。在consumer子工程的src/main/java目录新建cn.itcast.mq.listener.SpringRabbitListener类,写入如下

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue2")//指定监听哪个消息队列public void listenSimpleQueue(String msg){System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");}}

第三步: 运行consumer子工程的ConsumerApplication引导类,此时consumer子工程的SpringRabbitListener类就会去消费来自simple.queue2队列的消息

拿到了之前生产者在队列中放的消息 

3. WorkQueue模型的消息发接

WorkQueue也叫工作队列,模型图如下

案例: 实现一个队列绑定多个消费者,要求如下

1、在publisher子工程中定义测试方法,每秒产生50条信息,发送到simple.queue2队列

2、在consumer子工程中定义两个消息监听者(也就是消息消费者),都监听simple.queue队列

3、一个消费者每秒处理50条消息,另一个消费者每秒处理10条消息

具体过程如下

第一步: 由于在上面的 '1. HelloWorld模型的消息发送' 已经有一个SpringAmqpTest测试类,我们把这个测试类修改为如下

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageSimpleQueue() throws InterruptedException {String queueName = "simple-queue2";String message = "我是使用amqp模板,往队列里面发消息,这里是keke";for(int i = 0; i < 50; i++){rabbitTemplate.convertAndSend(queueName,message + i + "条消息");//每隔20毫秒发送一次,共需要1秒才能发完50条消息Thread.sleep(20);}}
}

第二步: 由于在上面的 '1. HelloWorld模型的消息发送' 已经有一个SpringRabbitListener类,我们把这个类修改为如下

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue(String msg){
//          System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
//     }@RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列public void listenSimpleQueue1(String msg) throws InterruptedException {System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());//每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息Thread.sleep(20);}@RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列public void listenSimpleQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());//每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息Thread.sleep(200);}}

第三步: 运行consumer子工程的ConsumerApplication引导类,此时consumer子工程的SpringRabbitListener类就会去消费来自simple-queue2队列的消息

第四步:运行publisher子工程的SpringAmqpTest类,此时就会向名为simple-queue2的队列发送一条(每运行一次就发送一条)消息

从上面的控制台日志信息中,我们发现消费者1处理的都是奇数消息,消费者2处理的都是偶数消息,看一下总的50条信息的分配情况,可以发现50条信息竟然是平均分配(默认是消息预取机制:当有大量的消息到达队列时,队列把消息投递给消费者,这叫做消息预取机制)给这两个消费者进行处理,但是我们给消费者1处理的速度设置的更快(每20毫秒就可以处理一条消息),消费者2的处理速度设置的慢(每100毫秒才能处理一条消息),那我们就希望队列给消费者1多分点消息,给消费者2少分点消息,这样50条消息就能被更快处理完。如何实现呢,如下

把publisher子工程的application.yml,修改为如下,主要就是添加了preFetch参数(默认为无限),表示最大预取数。分析:这两个消费者默认的预取值是"总消息条数/总消费者数量",也就是上面我们有50条消息,然后被两个消费者分别拿了25条,而消费者2处理的慢但是却拿了这么多消息,所以我们通过修改最大预取值,如果把它修改为1的话,那么两个消费者每次最多向队列里面拿一条消息,拿完就处理,处理之后才能继续向队列拿数据

logging:pattern:dateformat: MM-dd HH:mm:ss:SSSspring:rabbitmq:host: 192.168.229.129 #虚拟机ip地址port: 5672  # 端口username: keke # 用户名password: 123456 #密码virtual-host: /listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一条消息

然后重新运行consumer子工程的ConsumerApplication引导类,此时这两个消费者就准备就绪了,如果队列有消息就会被消费

最后运行运行publisher子工程的SpringAmqpTest类,此时就会又向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息

重点看一下现在consumer子工程的两个消费者的消费情况

谁处理消息快,谁就分配的消息多

总结:Work模型的使用

多个消费者绑定一个队列,同一条消息只会被一个消费者处理,一旦消费完就会从队列中删除

通过设置preftch来控制消费者预取的消息数量

4. Fanout交换机的消息发接

Fanout交换机,其中也就是Publish/Subscribe,是消息订阅模式的其中一种,共三种

'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下

如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下

1、Fanout: 广播

2、Direct: 路由

3、Topic: 话题

我们前面已经学了前两种消息队列,接下来学习后三种(其实就是交换机的三种类型),后三种统称为 '发布订阅模式',刚刚也简单介绍了 '发布订阅模式'

案例: 利用AMQP演示FanoutExchange的使用

实现思路如下:

1、在consumer子工程,利用代码声明(声明就是创建的意思)队列、交换机,并将两者绑定

2、在consumer子工程,编写两个消费者方法,分别监听fanout.queue1队列和fanout.queue2队列

3、在publisher子工程中编写测试方法,向publisher.fanout交换机发送消息(publisher.fanout交换机接收到消息之后,会自动向队列路由消息)

案例结构图如下

SpringAMQP提供了声明交换机、队列、绑定关系的API,如下继承关系图。下面需要用到的Exchange、FanoutExchange这两个API

具体操作如下

第一步: 在consumer子工程的src/test/java/cn.itcast.mq目录,新建config.FanoutConfig类,写入如下

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("keke.fanout");}//声明一个队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout-queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明一个队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout-queue2");}//绑定队列2和交换机@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

第二步: 运行consumer子工程的ConsumerApplication引导类

第三步: 消息接收功能。在consumer子工程的SpringRabbitListener类修改为如下

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue(String msg){
//          System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
//     }//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue1(String msg) throws InterruptedException {
//          System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
//          Thread.sleep(20);
//     }
//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
//          Thread.sleep(200);
//     }@RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列public void listenFanoutQueue1(String msg) throws InterruptedException {System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");}@RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列public void listenFanoutQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");}}

第四步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;//     @Test
//     public void testSendMessageSimpleQueue() throws InterruptedException {
//          String queueName = "simple-queue2";
//          String message = "我是使用amqp模板,往队列里面发消息,这里是keke";
//          for(int i = 0; i < 50; i++){
//               rabbitTemplate.convertAndSend(queueName,message + i + "条消息");
//               //每隔20毫秒发送一次,共需要1秒才能发完50条消息
//               Thread.sleep(20);
//          }
//     }@Testpublic void testSendMessageFanoutExchange(){//交换机String exchange = "keke-fanout";//发送什么信息String message = "发送信息fanout";//发送rabbitTemplate.convertAndSend(exchange,"",message);}
}

第五步: 重新运行consumer子工程(消费者)的ConsumerApplication引导类,且保持运行,等待处理消息

第六步: 运行publisher子工程(生产者)的SpringAmqpTest类,表示向消费者发送消息

第七步: 查看consumer子工程的控制台,是否能处理消息。是可以处理消息的,并且我们发现两个队列处理的是同一条消息,也就是交换机会把同一条消息路由到两个不同的队列

总结:

交换机的作用是什么?

做消息的转发,不做消息存储,如果路由不成功,消息会丢失

FanoutExchange会将消息路由到每个绑定的队列

5. Direct交换机的消息发接

案例: 利用SpringAMQP演示DirectExchange(Direct类型的交换机)的使用。

实现思路如下:

1、利用@RabbitListener声明Exchange、Queue、RoutingKey

2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

3、在publisher中编写测试方法,向keke-direct交换机发送消息

具体操作如下

第一步: 把consumer子工程的SpringRabbitListener修改为如下

package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue(String msg){
//          System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
//     }//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue1(String msg) throws InterruptedException {
//          System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
//          Thread.sleep(20);
//     }
//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
//          Thread.sleep(200);
//     }
//
//     @RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列
//     public void listenFanoutQueue1(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");
//     }
//
//     @RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列
//     public void listenFanoutQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");
//     }//演示direct交换机@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct-queue1"),exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),key = {"red","blue"})) //指定交换机,监听哪个队列,指定keypublic void listenDirectQueue1(String msg) throws InterruptedException {System.err.println("我接收并处理了direct-queue1队列的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct-queue2"),exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了direct-queue2队列的消息:【"+msg+"】");}
}

第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器

systemctl start docker   # 启动docker服务
docker start mq          # 启动rabbitmq容器

第三步: 运行consumer子工程的ConsumerApplication引导类

 

第四步: 运行publisher子工程的SpringAmqpTest类,作用仅仅是发送消息,才能触发 'consumer子工程的FanoutConfig类' 的绑定

第六步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下。表示发送到交换机的消息是带了一个key为blue的消息

第七步: 再一次验证,发一条带有key值为yellow的消息,查看控制台是否只有direct-queue2队列才能获取交换机里面带有对应key值为yellow的消息

第八步: 我们发现,direct.queue1和direct.queue2队列都带有值为red的key,那请问在生产者往交换机发送一条key值为red的消息,那么会被direct.queue1和direct.queue2队列同时接收吗。答案是肯定的

6. Topics交换机的消息发接

Topics是消息订阅模式的其中一种,共三种

'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下

如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下

1、Fanout类型: 广播 (上面在 '4. Publish/Subscribe模型的消息发接' 学过)

2、Direct类型: 路由 (现在正在学的),也就是Routing

3、Topic类型: 话题

Topic交换机和上一节学的Direct交换机类似,区别在于Topic交换机的key必须是多个单词的列表,并且以小数点.分隔。结构图如下

 

案例: 利用SpringAMQP演示TopicExchange的使用

实现思路如下:

1、利用@RabbitListener声明Exchange、Queue、RoutingKey。在上面学Direct交换机时,也是使用@RabbitListener来声明

2、在consumer子工程中,编写两个消费者方法,分别监听topic-queue1和topic-queue2

3、在publisher子工程中编写测试方法,向keke-topic发送消息

案例结构图如下

第一步: 把consumer子工程的SpringRabbitListener修改为如下

package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue(String msg){
//          System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
//     }//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue1(String msg) throws InterruptedException {
//          System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
//          Thread.sleep(20);
//     }
//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
//          Thread.sleep(200);
//     }
//
//     @RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列
//     public void listenFanoutQueue1(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");
//     }
//
//     @RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列
//     public void listenFanoutQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");
//     }//     //演示direct交换机
//     @RabbitListener(bindings = @QueueBinding(
//             value = @Queue(name = "direct-queue1"),
//             exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
//             key = {"red","blue"}
//     )) //指定交换机,监听哪个队列,指定key
//     public void listenDirectQueue1(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了direct-queue1队列的消息:【"+msg+"】");
//     }
//
//     @RabbitListener(bindings = @QueueBinding(
//             value = @Queue(name = "direct-queue2"),
//             exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
//             key = {"red","yellow"}
//     ))
//     public void listenDirectQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了direct-queue2队列的消息:【"+msg+"】");
//     }//演示topic交换机@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic-queue1"),exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),//china有关的一切信息都可以接收到key = {"china.#"}))public void listenTopicQueue1(String msg) throws InterruptedException {System.err.println("我接收并处理了topic-queue1队列的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic-queue2"),exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),//一切有关新闻的都可以接收到key = {"#.news"}))public void listenTopicQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了topic-queue2队列的消息:【"+msg+"】");}
}

第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器

systemctl start docker   # 启动docker服务
docker start mq          # 启动rabbitmq容器

第三步: 运行consumer子工程的ConsumerApplication引导类

 

第六步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下。表示发送到交换机的消息是带了一个key为china.weather的消息

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;//     @Test
//     public void testSendMessageSimpleQueue() throws InterruptedException {
//          String queueName = "simple-queue2";
//          String message = "我是使用amqp模板,往队列里面发消息,这里是keke";
//          for(int i = 0; i < 50; i++){
//               rabbitTemplate.convertAndSend(queueName,message + i + "条消息");
//               //每隔20毫秒发送一次,共需要1秒才能发完50条消息
//               Thread.sleep(20);
//          }
//     }//     //测试fanout交换机
//     @Test
//     public void testSendMessageFanoutExchange(){
//          //交换机
//          String exchange = "keke-fanout";
//          //发送什么信息
//          String message = "发送信息fanout";
//          //发送
//          rabbitTemplate.convertAndSend(exchange,"",message);
//     }//     //测试direct交换机
//     @Test
//     public void testSendMessageDirectExchange(){
//          //往哪个交换机发送消息
//          String exchangeName = "keke-direct";
//          //要发送什么消息
//          String message = "来自生产者向你发送的消息hello";
//          //发送,需要指定key,例如发送到交换机的消息带了一个key为blue的消息,只有对应的队列才能从交换机获取这个消息
//          rabbitTemplate.convertAndSend(exchangeName,"red",message);
//     }@Testpublic void testSendMessageTopicExchange(){String exchangeName = "keke-topic";String message = "今天天气阴,大风";rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);}}

修改生产者为,再次发送消息,看消费者接受情况

 @Testpublic void testSendMessageTopicExchange(){String exchangeName = "keke-topic";String message = "本台报道,日本把核污水排海";rabbitTemplate.convertAndSend(exchangeName,"japan.news",message);}

 修改生产者为,再次发送消息,看消费者接受情况

    @Testpublic void testSendMessageTopicExchange(){String exchangeName = "keke-topic";String message = "本台报道:2023年11月3日,S13全球LOL总决赛中,来自lpl的二号种子BLG战队(全华班)抬走韩国一号种子GEN(全韩班)";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

7. 消息转换器的消息发送

案例: 测试发送Object类型的消息

说明: 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

从上面的学习中,我们知道有两种方式可以创建队列(交换机也同理),一种是通过注解(例如我们在FanoutConfig类里面写的代码),

另一种是通过代码(例如我们在SpringRabbitListener类里面写的代码)。在这个案例中,我们将使用通过注解的方式来创建队列

第一步: 创建队列。在consumer子工程的FanoutConfig类,写入如下。然后,重新运行consumer子工程的ConsumerApplication引导类

     //创建一个演示发送Object队列@Beanpublic Queue ObjectQueue(){return new Queue("Object-Queue");}

第二步: 向队列发送消息。在publisher子工程的SpringAmqpTest类,写入如下,然后运行

     @Testpublic void testSendObjectMessage(){Map<String,Object> map = new HashMap<>();map.put("name","张三");map.put("age","20");rabbitTemplate.convertAndSend("Object-Queue",map);}

原因是rabbit只支持字节,在发送object对象时,spring会默认根据jdk序列化方式进行对象的序列化,这种方式性能差,安全性差,冗长繁杂

解决:用json的方式进行序列化

我们已经实现了把对象类型的消息发送给队列。下面是解决序列化问题,spring默认的序列化方式是基于jdk的,缺陷较多

如何使用基于json的序列化方式呢,具体操作过程如下

第一步: 由于consumer子工程和publisher子工程都需要使用这个依赖,所以我们直接在mq-demo父工程的pom.xml添加如下

<!--Java对象转换为JSON格式数据-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

 第二步: 在publisher子工程,声明bean去覆盖默认bean,需要新建一个配置类,或者直接在引导类写。例如把PublisherApplication引导类修改为如下

package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Bean//这里只添加这个方法public MessageConverter messageConverter(){//使用Jackson消息转换工具,将消息转换为JSON格式数据return new Jackson2JsonMessageConverter();}
}

第三步: 为了不影响测试,先去浏览器清理刚刚的那条消息

第四步: 重新运行consumer子工程的ConsumerApplication引导类,并且重新运行publisher子工程的SpringAmqpTest类(运行testSendObjectMessage方法)

然后打开浏览器

总结

SpringAMQP中消息的 序列化(消息发送) 和 反序列化(消息接收) 是怎么实现的

1、利用MessageConverter实现的,默认是JDK的序列化

2、注意发送方与接收方必须使用相同的MessageConverter(下面学消息接收的时候会才学)

上面只是学习发送对象类型的消息,下面会学如何接收对象类型的消息

8. 消息转换器的消息接收

案例: 测试接收Object类型的消息

说明: 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

上述 '7. 消息转换器的消息发送' 我们已经成功向队列发送了Object类型(Map类型的对象)的消息,该消息是以json序列化的形式存放在队列,我们怎么让消费者能从队列中获取该消息呢,也就是我们下面要学的

第一步(做好可跳过): 由于consumer子工程和publisher子工程都需要使用这个依赖,所以我们直接在mq-demo父工程的pom.xml添加如下

<!--Java对象转换为JSON格式数据-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

第二步: 在consumer子工程,声明bean去覆盖默认bean,需要新建一个配置类,或者直接在引导类写。例如把ConsumerApplication引导类修改为如下

package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Bean//这里只添加这个方法public MessageConverter messageConverter(){//使用Jackson消息转换工具,将消息转换为JSON格式数据return new Jackson2JsonMessageConverter();}
}

第三步: 把consumer子工程的SpringRabbitListener修改为如下,增加新增接收对象消息的方法

package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue(String msg){
//          System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
//     }//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue1(String msg) throws InterruptedException {
//          System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
//          Thread.sleep(20);
//     }
//     @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
//     public void listenSimpleQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
//          //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
//          Thread.sleep(200);
//     }
//
//     @RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列
//     public void listenFanoutQueue1(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");
//     }
//
//     @RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列
//     public void listenFanoutQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");
//     }//     //演示direct交换机
//     @RabbitListener(bindings = @QueueBinding(
//             value = @Queue(name = "direct-queue1"),
//             exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
//             key = {"red","blue"}
//     )) //指定交换机,监听哪个队列,指定key
//     public void listenDirectQueue1(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了direct-queue1队列的消息:【"+msg+"】");
//     }
//
//     @RabbitListener(bindings = @QueueBinding(
//             value = @Queue(name = "direct-queue2"),
//             exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
//             key = {"red","yellow"}
//     ))
//     public void listenDirectQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了direct-queue2队列的消息:【"+msg+"】");
//     }//     //演示topic交换机
//     @RabbitListener(bindings = @QueueBinding(
//             value = @Queue(name = "topic-queue1"),
//             exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),
//             //china有关的一切信息都可以接收到
//             key = {"china.#"}
//     ))
//     public void listenTopicQueue1(String msg) throws InterruptedException {
//          System.out.println("我接收并处理了topic-queue1队列的消息:【"+msg+"】");
//     }
//
//
//     @RabbitListener(bindings = @QueueBinding(
//             value = @Queue(name = "topic-queue2"),
//             exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),
//             //一切有关新闻的都可以接收到
//             key = {"#.news"}
//     ))
//     public void listenTopicQueue2(String msg) throws InterruptedException {
//          System.err.println("我接收并处理了topic-queue2队列的消息:【"+msg+"】");
//     }//演示接收Object类型的消息@RabbitListener(queues = "Object-Queue")public void listenObjectQueue(Map<String,Object> map){System.out.println("接收到Object类型的消息" + map);}
}

第四步: 测试。先运行publisher子工程的SpringAmqpTest表示向队列发送消息,再运行consumer子工程的ConsumerApplication引导类

查看子工程是否能接收到消息

总结

SpringAMQP中消息的 序列化(消息发送) 和 反序列化(消息接收) 是怎么实现的

1、利用MessageConverter实现的,默认是JDK的序列化

2、注意发送方与接收方必须使用相同的MessageConverter

相关文章:

实用篇-MQ消息队列

一、初识MQ 通讯分为同步通讯和异步通讯&#xff0c;同步通讯就比如我们日常生活中的打电话&#xff0c;看直播&#xff0c;能够得到及时的反馈。而异步通讯则类似于聊天软件聊天&#xff0c;不需要建立实时的连接&#xff0c;并且可以进行建立多个业务一起异步执行 1. 同步通…...

springboot打包时依赖jar和项目jar分开打包;jar包瘦身

概述 最近感觉项目在部署时时jar包传输太慢了&#xff1b; 看了下jar包内容&#xff0c;除了项目代码&#xff0c;其余大部分都是依赖jar&#xff1b; 平时改动较多的只是项目代码&#xff0c;依赖jar改动比较少&#xff1b; 所以就在想能不能分开打包&#xff1b;这样只部署项…...

嵌入式系统的元素

注意&#xff1a;关于嵌入式系统的元素这一块儿内容&#xff0c;定义太多了。例如&#xff1a;吉姆莱丁 著&#xff0c;陈会翔 译&#xff0c;由清华大学出版社出版的《构建高性能嵌入式系统》中提到&#xff1a;嵌入式系统通常由电源、时基、数字处理、内存、软件和固件、专用…...

提升ChatGPT答案质量和准确性的方法Prompt engineering实用的prompt灵感和技巧

文章目录 1. 实用的prompt灵感和技巧小技巧常用prompt保存到输入法中普通promptprompt通用公式保存到输入法快捷指令中尝试用英语去写prompt沉浸式翻译软件3. 补充1. 实用的prompt灵感和技巧 解释***,并且给出暗喻/隐喻/类比(解释术语、专业名称,用一个词或短语指出常见的一…...

[Machine Learning] Learning with Noisy Labels

文章目录 随机分类噪声 (Random Classification Noise, RCN)类别依赖的标签噪声 (Class-Dependent Noise, CCN)二分类多分类 实例和类别依赖的标签噪声 (Instance and Label-Dependent Noise, ILN) 标签噪声是指分类任务中的标签被错误地标记。这可能是由于各种原因&#xff0c…...

集简云slack(自建)无需API开发轻松连接OA、电商、营销、CRM、用户运营、推广、客服等近千款系统

slack是一个工作效率管理平台&#xff0c;让每个人都能够使用无代码自动化和 AI 功能&#xff0c;还可以无缝连接搜索和知识共享&#xff0c;并确保团队保持联系和参与。在世界各地&#xff0c;Slack 不仅受到公司的信任&#xff0c;同时也是人们偏好使用的平台。 官网&#x…...

Idea 对容器中的 Java 程序断点远程调试

第一种&#xff1a;简单粗暴型 直接在java程序中添加log.info()&#xff0c;根据需要打印信息然后打包覆盖&#xff0c;根据日志查看相关信息 第二种&#xff1a;远程调试 在IDEA右上角点击编辑配置设置相关参数在Dockerfile中加入 "-jar", "-agentlib:jdwp…...

vscode设置保存后,自动格式化代码

第一步&#xff1a;打开setting.json文件 第二步&#xff1a;在setting.json中加入以下代码 "editor.formatOnType": true, "editor.formatOnSave": true, "editor.formatOnPaste": true...

datagrip出现 java.net.ConnectException: Connection refused: connect.

出现这样的情况要看一下hadoop有没有启动 start-all.sh nohup /export/server/apache-hive-3.1.2-bin/bin/hive --service hiveserver2 & scp -r /export/server/apache-hive-3.1.2-bin/ node3:/export/server/ /export/server/apache-hive-3.1.2-bin/bin/hive show databa…...

Docker 安装ELK7.7.1

(在安装之前&#xff0c;本方法必须安装jdk1.8以上版本) 一、安装elasticsearch 1、下载elasticsearch7镜像&#xff1a;docker pull elasticsearch:7.7.1 2、创建挂载目录&#xff1a;mkdir -p /data/elk/es/{config,data,logs} 3、赋予权限&#xff1a;chown -R 1000:100…...

决策树算法

决策树算法是一种用于分类和回归问题的机器学习算法。它通过构建树形结构来进行决策&#xff0c;每个内部节点代表一个特征或属性&#xff0c;每个叶子节点代表一个类别或值。 下面是决策树算法的一般步骤&#xff1a; 数据准备&#xff1a;收集相关的训练数据&#xff0c;并对…...

maven之pom文件详解

一、maven官网 maven官网 maven官网pom文件详解链接 二、maven之pom 1、maven项目的目录结构 pom文件定于了一个maven项目的maven配置&#xff0c;一般pom文件的放在项目或者模块的根目录下。 maven的遵循约定大于配置&#xff0c;约定了如下的目录结构&#xff1a; 目录目…...

深度学习之基于Python+OpenCV+dlib的考生信息人脸识别系统(GUI界面)

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习在人脸识别领域的应用已经取得了显著的进展。Python是一种常用的编程语言&#xff0c;它提供了许多强大的库…...

创建javaEE项目(无maven),JSP(九大内置对象)、Servlet(生命周期)了解

一、Servlet和jsp 0.创建web项目(无maven)&#xff1a; 1.创建一个普通的java项目 2.项目根目录右键&#xff0c;添加模板 3.配置tomcat服务器 4.配置项目tomcat依赖 1.Servlet(Server Applet)服务端小程序 用户通过浏览器发送一个请求&#xff0c;服务器tomcat接收到后&…...

BIOS开发笔记 - HDA Audio

在PC中,音频输出是一个重要的功能之一,目前大多数采用的是英特尔高清晰音效(英语:Intel High Definition Audio,简称为HD Audio或IHD)方案,它是由Intel于2004年所提出的音效技术,能够展现高清晰度的音质效果,且能进行多声道的播放,在音质(音效质量)上超越过去的其他…...

C语言——选择排序

完整代码&#xff1a; //选择排序 // 选择排序是一种简单直观的排序算法。它的工作原理如下:首先在未排序序列中找到最小&#xff08;大&#xff09;元素&#xff0c;存放到排序序列的起始位置&#xff0c;然后&#xff0c;再从剩余未排序元素中继续寻找最小&#xff08;大&am…...

vue详细安装教程

这里写目录标题 一、下载和安装node二、创建全局安装目录和缓存日志目录三、安装vue四、创建一个应用程序五、3x版本创建六、创建一个案例 一、下载和安装node 官网下载地址&#xff1a;https://nodejs.org/en/download 选择适合自己的版本&#xff0c;推荐LTS&#xff0c;长久…...

Java 正则表达式字符篇

精确匹配一个字符 精确匹配字符串 abc &#xff0c; //精确匹配字符串 "abc"String regexabc "abc";System.out.println("abc".matches(regexabc));// trueSystem.out.println("ABC".matches(regexabc));// falseSystem.out.println…...

shell脚本代码混淆

文章目录 起因安装 Bashfuscator安装BashfuscatorBashfuscator的使用 起因 很多时候我并不希望自己的shell脚本被别人看到&#xff0c;于是我在想有没有什么玩意可以把代码加密而又正常执行&#xff0c;于是我想到了代码混淆&#xff0c;简单来看一下&#xff1a; 现在我的目…...

【MATLAB第81期】基于MATLAB的LSTM长短期记忆网络预测模型时间滞后解决思路(更新中)

【MATLAB第81期】基于MATLAB的LSTM长短期记忆网络预测模型时间滞后解决思路&#xff08;更新中&#xff09; 在LSTM预测过程中&#xff0c;极易出现时间滞后&#xff0c;类似于下图&#xff0c;与一个以上的样本点结果错位&#xff0c;产生滞后的效果。 在建模过程中&#xf…...

订单业务和系统设计(一)

一、背景简介 订单其实很常见&#xff0c;在电商购物、外卖点餐、手机话费充值等生活场景中&#xff0c;都能见到它的影子。那么&#xff0c;一笔订单的交易过程是什么样子的呢&#xff1f;文章尝试从订单业务架构和产品功能流程&#xff0c;描述对订单的理解。 二、订单业务…...

安全模型的分类与模型介绍

安全模型的分类 基本模型&#xff1a;HRU机密性模型&#xff1a;BLP、Chinese Wall完整性模型&#xff1a;Biba、Clark-Wilson BLP模型 全称&#xff08;Bell-LaPadula&#xff09;模型&#xff0c;是符合军事安全策略的计算机安全模型。 BLP模型的安全规则&#xff1a; 简…...

I/O多路转接之select

承接上文&#xff1a;I/O模型之非阻塞IO-CSDN博客 简介 select函数原型介绍使用 一个select简单的服务器的代码书写 select的缺点 初识select 系统提供select函数来实现多路复用输入/输出模型 select系统调用是用来让我们的程序监视多个文件描述符的状态变化的; 程序会停在s…...

“如何对TXT文件的内容进行连续行删除?实现一键文件整理!

如果你有一个TXT文件&#xff0c;需要删除其中的连续行&#xff0c;这可能是为了整理文件、去除重复信息或清除不需要的文本。尽管手动删除每一行可能很耗时&#xff0c;但幸运的是&#xff0c;有一个简单而高效的方法可以帮助你实现这个目标。 首先&#xff0c;在首助编辑高手…...

stable diffusion公司发布4款LLM大语言模型,为何大家都喜爱LLM?

stable diffusion模型是Stability AI开源的一个text-to-image的扩散模型&#xff0c;其模型在速度与质量上面有了质的突破&#xff0c;玩家们可以在自己消费级GPU上面来运行此模型&#xff0c;本模型基于CompVis 和 Runway 团队的Latent Diffusion Models。本期我们不介绍stabl…...

堆排序--C++实现

1. 简介 堆排序利用的是堆序性&#xff0c;最小堆进行从大到小的排序。 先建初堆&#xff0c;保证堆序性。将堆顶元素与最后一个元素交换&#xff0c; 就将当前堆中的最大(小)的元素放到了最后后。堆大小递减&#xff0c;再重新调整堆选出第二大&#xff0c;重复上述过程。 2…...

【数据结构】数组和字符串(十四):字符串匹配1:朴素的模式匹配算法(StringMatching)

文章目录 4.3 字符串4.3.1 字符串的定义与存储4.3.2 字符串的基本操作4.3.3 模式匹配算法1. 算法原理2. ADL语言3. 伪代码4. C语言实现5 时间复杂度 4.3 字符串 字符串(String)是由零个或多个字符(char)顺序排列组成的有限序列&#xff0c;简称为串。例如 “good morning”就是…...

VMWare虚拟机问题

镜像下载 阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区...

代码随想录算法训练营第23期day39 |62.不同路径、63. 不同路径 II

目录 一、&#xff08;leetcode 62&#xff09;不同路径 1.动态规划 1&#xff09;确定dp数组&#xff08;dp table&#xff09;以及下标的含义 2&#xff09;确定递推公式 3&#xff09;dp数组的初始化 4&#xff09;确定遍历顺序 5&#xff09;举例推导dp数组 2.数论方…...

白帽黑客入门,“每天一个黑客技巧”实现黑客的自我突破 !(附工具包!)

年底了&#xff0c;不少朋友都是在总结一年的学习成果。最后发现完成情况与自己最初定下的目标相去甚远。 同时也针对粉丝和网上大部分存在的问题进行了整理&#xff1a; “为什么我感觉学安全好难&#xff1f;” “渗透测试到底该怎么学&#xff1f;” “为什么总是挖不到漏…...