实用篇-MQ消息队列
一、初识MQ
通讯分为同步通讯和异步通讯,同步通讯就比如我们日常生活中的打电话,看直播,能够得到及时的反馈。而异步通讯则类似于聊天软件聊天,不需要建立实时的连接,并且可以进行建立多个业务一起异步执行
1. 同步通讯的优缺点
springcloud多个微服务之间的调用(就是你请求我,我请求它,它请求它)是基于Feign,Feign就是同步的方式,就会导致下面的问题:
问题1: 如果A微服务需要请求B、C、D、E、F微服务时,必须按顺序,不能同时去请求,也就导致耗时大增,业务系统扛不住并发吞吐量
问题2: 还有一个问题就是当A请求B的时候,C、D、E、F就会干等着,相当于空闲,空闲相当于浪费系统资源,简单说就是资源利用不够充分
问题3: 当A请求B时,如果B挂了,此时C、D、E、F就执行不到,简单说就是在B那里阻塞了,也就是级联失败
同步调用(本质原因是只能一对一,不能一对多):
优点:
时效性强,可以立即得到结果
缺点:
1、耦合度高: 每次加入新的需求,都要修改原来的代码
2、性能下降: 调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
3、资源浪费: 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
4、级联失败: 如果服务提供者出现问题,所有调用方都会跟着出问题,导致整个微服务群故障
2. 异步通讯的优缺点
异步调用常见实现就是事件驱动模式,如图。有用户支付了之后,支付服务只管把谁支付了告诉Broker,不需要等待 '订单服务、仓储服务、短信服务' 就可以马上告诉用户支付成功了,Broker得到了消息之后,就会把消息分发给 '订单服务、仓储服务、短信服务',保证了高效运行
异步相对于同步,拿上图(异步)来说的话,有如下优点
1、如果是同步,我们需要修改订单或仓储或短信业务代码时,需要到 '支付服务' 里面修改,很麻烦,不解耦。但是,如果是异步的话,我们只需要到 '订单服务、仓储服务、短信服务' 中去修改即可,修改好之后, '订单服务、仓储服务、短信服务' 重新向Broker发起订阅即可
2、如果是同步,我们需要添加 'xx服务' 时,需要到 '支付服务' 里面修改,很麻烦,不解耦。但是,如果是异步的话,我们只需要在上图的最右侧添加 'xx服务',然后跟Broker建立订阅即可
3、如果是同步,我们需要删除 '短信服务' 时,需要到 '支付服务' 里面修改,很麻烦,不解耦。但是,如果是异步的话,我们只需要在上图把 '短信服务' 与 Broker之间的订阅关系取消即可
当Broker与 '某个服务' 之间有订阅关系的时候,这些服务才会接收到来自Broker(负责分发消息)的通知
异步调用的优点:
1、服务解耦。例如修改业务代码
2、性能提升,吞吐量提高。例如请求时间,'支付业务' 不需要等待 '订单服务、仓储服务、短信服务' ,直接告诉Broker,然后就告诉用户支付成功了
3、服务没有强依赖,没有级联问题。例如 '仓储服务' 挂了,不会影响 '支付服务',会由其它没挂的服务继续响应Broker的通知
4、流量削峰。例如Broker接收到来自 '支付服务' 的庞大请求,Broker会把这些大请求抗住做缓存,众多订阅Broker的微服务可以基于自己的能力接受Broker中的请求
异步调用的缺点:
1、依赖于Broker的可靠性、安全性、吞吐能力。例如Broker挂掉了的话,就全废了
2、架构复杂,业务没有明显的流程线,不利于追踪管理
3、时效性差。例如 '支付服务' 把请求给Broker之后,'支付服务' 并不能知道什么时候会被 '订单服务、仓储服务、短信服务' 处理
如果你不需要知道用户的支付是否成功,仅仅想让 '订单服务、仓储服务、短信服务' 来响应用户的请求,那么异步调用是适合的,否择不建议异步
3. mq常见技术介绍
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker
MQ常见的具体技术实现如下
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | AMQP、XMPP、SMTP、STOMP、OpenWire、REST | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
二、RabbitMQ消息队列
官网:https://www.rabbitmq.com
RabbitMQ的结构图如下,生产者 -> 交换机 -> 信道 -> 队列 -> 消费者
publisher表示消息发送者,会把消息发送到交换
systemctl start docker # 启动docker服务
机
exchange表示交换机,会把消息路由到不同的队列
queue表示队列,队列负责暂存消息
consumer表示消费者,会从队列中获取消息,并且处理消息
VirtualHost表示虚拟主机,当我们创建用户的时候,用户会有自己的虚拟主机,虚拟主机之间相互隔离,每个虚拟主机都有自己的exchange、queue
1. 下载安装启动
学习的是基于Docker,在Centos7虚拟机中使用Docker来安装RabbitMQ
第一步: 启动docker(上面已经学过Docker容器,这里启动就行)
systemctl start docker # 启动docker服务
第二步: 在Windows下载rabbitmq镜像,然后上传到Centos7虚拟机的/temp目录。下载下来是一个名称为mq.tar的镜像压缩包
rabbitmq镜像快速下载: https://cowtransfer.com/s/a76639caf2f54e
然后执行下面那个命令,就可以把名称为mq.tar的rabbitmq镜像压缩包,导入到Docker
第三步: 在Docker主机,使用rabbitmq镜像来创建运行rabbitmq容器。
-e参数表示环境变量,例如自定义rabbitmq的默认用户名为keke,密码为123456
-name参数表示自定义rabbitmq容器的名字
-hostname参数表示自定义主机名,由于我们不是集群部署,所以主机名可写可不写,下面是写上了
-p参数表示端口映射,例如开放了两个端口,分别是15672管理后台端口、5672是服务端口
-d参数表示后台运行
rabbitmq:3-management 表示镜像名称,由于我们的rabbitmq镜像是这个版本,所以镜像名称需要带上镜像版本号
docker run \-e RABBITMQ_DEFAULT_USER=keke \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
第四步: 测试。浏览器访问http://192.168.229.128:15672
第五步: 熟悉页面
2. 消息模型介绍
MQ的官方文档RabbitMQ Tutorials — RabbitMQ
上图中的官方文档中给出了5种消息模型,分别叫 'Hello World' 、'Work Queues' 、'Publish/Subscribe' 、'Routing' 、'Topics'
根据用途和交换机类型进行如下分类,前两种是根据用途,后三种是根据交换机类型进行分类
1、基本消息队列: Hello World
2、工作消息队列: Work Queues
3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
4、路由发布订阅(使用Direct交换机): Routing
5、主题发布订阅(使用Topic交换机): Topics
我们会在下面逐步学习这五种消息模型
3. Hello World 队列模型案例
Hello World 叫简单队列模型,官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括如下三个角色,如图
下面我们以项目代码来演示一下,如何在代码中使用 'RabbitMQ消息队列' 的 'Hello World 队列模型'。具体操作如下
第一步: 下载写好的项目文件 'mq-demo',下载到的是mq-demo.zip,解压会得到一个mq-demo文件夹,用IDEA打开
链接:https://pan.baidu.com/s/1z-oYXu1Q_LjuaTYfO6W-WQ
提取码:mfkw
如果导入后,maven加载很慢,半天加载不进来,可以参见这篇博文
maven解决加载慢
第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
systemctl start docker # 启动docker服务
docker start mq # 启动rabbitmq容器
第三步: 修改mq-demo工程代码,把相关参数改成自己的
第四步:运行PublisherTest
首先是建立连接工厂作用是跟RabbitMQ建立连接
然后设置一些信息,设置IP地址,MQ的消息通信端口是5672,设置虚拟主机keke用户有/这个虚拟主机的访问权限,设置用户名密码
然后是建立连接,打开控制台,有一个连接已经建立了
接着是建立通道,通道一旦建立就可以向队列中发送消息了,我们打开控制台,可以看到有一个通道
声明队列
发送消息
最后是关闭连接,释放资源
我们这里已经发送了一个消息,但是消息是否被消费者接受,我们不关心,这就是解耦,就是异步
第五步:运行ConsumerTest,和PulisherTest的逻辑大致一样
首先还是建立连接工厂作用是跟RabbitMQ建立连接,设置连接参数
然后是创建通道
接着是声明队列,为什么这里还要声明一次队列,刚才生产者进程不是已经声明过一次队列了吗?
这是因为我们的生产者和消费者进程的启动先后顺序是不确定的,万一消费者先启动,想要找队列去消费却找不到,所以为了避免这种情况的发生,消费者和生产者各自都去声明一个队列
当然代码的重复执行不会导致创建出来两个队列,而是一种保险措施
继续执行代码,只要消费者发送的消息一旦被消费,消息队列中的消息就会被删除
总结
三、SpringAMQP消息中间件
SpringAMQP简单说就是一个中间件,提供了模板方便我们操作各种消息模型
上面已经学了RabbitMQ消息队列是有五种消息模型,并且我们演示了其中的基本消息队列(Hello World)。用的是官方API,来实现的基本消息队列(Hello World)。会发现官方提供的API写起来麻烦,所以我们直接用的就是写好的 'mq-demo' 项目,只是简单运行了解一下运行机制
下面我们将学习SpringAMQP,可以大大简化API的书写。也就是使用SpringAMQP来分别演示五种消息模型
根据用途和交换机类型进行如下分类,前两种是根据用途,后三种是根据交换机类型进行分类
1、基本消息队列: Hello World
这个是上面使用官方API演示过一次(但是接下来也会使用SpringAMQP再演示一次),下面四个还没演示过(接下来用SpringAMQP演示)
2、工作消息队列: Work Queues
3、广播发布订阅(使用Fanout交换机): Publish/Subscribe
4、路由发布订阅(使用Direct交换机): Routing
5、主题发布订阅(使用Topic交换机): Topics
什么是AMQP ?Advanced Messaging
用于在应用程序或之间传递业务消息的开放标准。该协议与语言、平台无关,更符合微服务中独立性的要求
什么是SpringAMQP ?Advanced Message Queuing Protocol 高级消息队列协议
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板(我们之前学习SpringRedis的时候,Spring提供了RedisTemplate)来发送和接收消息,所以api都定义好了。简单说就是一个中间件,提供了模板方便我们操作各种消息模型
SpringAMQP包含两部分内容,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。也就是spring-amqp是规范,具体由spring-rabbit来实现
SpringAmqp的官网: Spring AMQP
SpringAMQP的特点:
1、提供了监听器容器,可以用异步来处理入站消息,也就是负责接收消息
2、提供了RabbitTemplate,用于发送和接收消息,一般用于发送消息,接受用监听器容器去做
3、提供了RabbitAdmin,用于自动声明队列、交换和绑定。声明也就是创建的意思,会自动创建消息队列,不需要手动创建消息队列
1. HelloWorld模型的消息发送
Hello World 叫简单队列模型,官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括如下三个角色,如图
案例: 在 '实用篇-RabbitMQ消息队列' 的 '3. Hello World 队列模型案例' ,里面写用官方API实现的基本消息队列。因为有五种模型都要演示一次,现在我们同样是先演示Hello World 队列模型,这次就不是用官方API来写了,而是使用SpringAMQP来写,代码更加的简洁。用的项目还是mq-demo。我把下载链接拿下来了
这里我们先做消息发送,下一课在做消息接收。消息发送的具体操作过程如下
第一步(已做好可跳过): 下载写好的项目文件 'mq-demo',下载到的是mq-demo.zip,解压会得到一个mq-demo文件夹
第二步(已做好可跳过): 在idea软件,打开mq-demo,mq-demo是一个项目工程
第三步(已做好可跳过): 确保已经启动Docker、并启动Docker里面的rabbitmq容器
systemctl start docker # 启动docker服务
docker start mq # 启动rabbitmq容器
第四步: 打开idea,在父工程(也就是mq-demo)的pom.xml中引入spring-amqp的依赖(已做好可跳过)
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第五步: 在publisher(也就是publisher子工程,这个子工程是负责发消息的)中,的application.yml写入如下
spring:rabbitmq:host: 192.168.229.128port: 5672 #端口username: keke #连接rabbitmq的用户名password: 123456 #连接rabbitmq的密码virtual-host: / #huanfqc对应的虚拟主机
第六步: 在publisher子工程的src/test/java目录新建cn.itcast.mq.spring.SpringAmqpTest类,写入如下
package cn.itcast.mq.spring;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageSimpleQueue(){String queueName = "simple-queue2";String message = "我是使用amqp模板,往队列里面发消息,这里是keke";rabbitTemplate.convertAndSend(queueName,message);}}
第六步: 浏览器访问 http://你的ip:15672。例如如下。打开浏览器后手动在页面创建一个叫simple-queue2的队列,不然上面运行找不到该往哪个队列发消息
第七步: 运行publisher子工程的SpringAmqpTest类,此时就会向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息
第八步: 我们还没有消费者,所以消息会一直在队列里面放着,下面我们将来学习消息接收,也就是编写消费者,去消费队列里面的信息。如下
2. HelloWorld模型的消息接收
注意: 消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能,也就是你找不到已经被消费了的消息
上面学了如何在publisher子工程,向simple.queue2队列,发送消息。下面我们就学习如何让consumer子工程去处理simple.queue2队列里面的消息
第一步: 在consumer子工程的application.yml,添加如下
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.229.128port: 5672username: kekepassword: 123456virtual-host: /
第二步:由于消费者要做的事情是监听消息,那么spring已经帮我们跟mq建立了连接,唯一要做的就是要监听哪个队列,监听到队列后应该做什么操作?所以我们把操作封装到方法就可以了。在consumer子工程的src/main/java目录新建cn.itcast.mq.listener.SpringRabbitListener类,写入如下
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue2")//指定监听哪个消息队列public void listenSimpleQueue(String msg){System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");}}
第三步: 运行consumer子工程的ConsumerApplication引导类,此时consumer子工程的SpringRabbitListener类就会去消费来自simple.queue2队列的消息
拿到了之前生产者在队列中放的消息
3. WorkQueue模型的消息发接
WorkQueue也叫工作队列,模型图如下
案例: 实现一个队列绑定多个消费者,要求如下
1、在publisher子工程中定义测试方法,每秒产生50条信息,发送到simple.queue2队列
2、在consumer子工程中定义两个消息监听者(也就是消息消费者),都监听simple.queue队列
3、一个消费者每秒处理50条消息,另一个消费者每秒处理10条消息
具体过程如下
第一步: 由于在上面的 '1. HelloWorld模型的消息发送' 已经有一个SpringAmqpTest测试类,我们把这个测试类修改为如下
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageSimpleQueue() throws InterruptedException {String queueName = "simple-queue2";String message = "我是使用amqp模板,往队列里面发消息,这里是keke";for(int i = 0; i < 50; i++){rabbitTemplate.convertAndSend(queueName,message + i + "条消息");//每隔20毫秒发送一次,共需要1秒才能发完50条消息Thread.sleep(20);}}
}
第二步: 由于在上面的 '1. HelloWorld模型的消息发送' 已经有一个SpringRabbitListener类,我们把这个类修改为如下
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue(String msg){
// System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
// }@RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列public void listenSimpleQueue1(String msg) throws InterruptedException {System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());//每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息Thread.sleep(20);}@RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列public void listenSimpleQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());//每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息Thread.sleep(200);}}
第三步: 运行consumer子工程的ConsumerApplication引导类,此时consumer子工程的SpringRabbitListener类就会去消费来自simple-queue2队列的消息
第四步:运行publisher子工程的SpringAmqpTest类,此时就会向名为simple-queue2的队列发送一条(每运行一次就发送一条)消息
从上面的控制台日志信息中,我们发现消费者1处理的都是奇数消息,消费者2处理的都是偶数消息,看一下总的50条信息的分配情况,可以发现50条信息竟然是平均分配(默认是消息预取机制:当有大量的消息到达队列时,队列把消息投递给消费者,这叫做消息预取机制)给这两个消费者进行处理,但是我们给消费者1处理的速度设置的更快(每20毫秒就可以处理一条消息),消费者2的处理速度设置的慢(每100毫秒才能处理一条消息),那我们就希望队列给消费者1多分点消息,给消费者2少分点消息,这样50条消息就能被更快处理完。如何实现呢,如下
把publisher子工程的application.yml,修改为如下,主要就是添加了preFetch参数(默认为无限),表示最大预取数。分析:这两个消费者默认的预取值是"总消息条数/总消费者数量",也就是上面我们有50条消息,然后被两个消费者分别拿了25条,而消费者2处理的慢但是却拿了这么多消息,所以我们通过修改最大预取值,如果把它修改为1的话,那么两个消费者每次最多向队列里面拿一条消息,拿完就处理,处理之后才能继续向队列拿数据
logging:pattern:dateformat: MM-dd HH:mm:ss:SSSspring:rabbitmq:host: 192.168.229.129 #虚拟机ip地址port: 5672 # 端口username: keke # 用户名password: 123456 #密码virtual-host: /listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一条消息
然后重新运行consumer子工程的ConsumerApplication引导类,此时这两个消费者就准备就绪了,如果队列有消息就会被消费
最后运行运行publisher子工程的SpringAmqpTest类,此时就会又向名为simple.queue2的队列发送一条(每运行一次就发送一条)消息
重点看一下现在consumer子工程的两个消费者的消费情况
谁处理消息快,谁就分配的消息多
总结:Work模型的使用
多个消费者绑定一个队列,同一条消息只会被一个消费者处理,一旦消费完就会从队列中删除
通过设置preftch来控制消费者预取的消息数量
4. Fanout交换机的消息发接
Fanout交换机,其中也就是Publish/Subscribe,是消息订阅模式的其中一种,共三种
'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下
如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下
1、Fanout: 广播
2、Direct: 路由
3、Topic: 话题
我们前面已经学了前两种消息队列,接下来学习后三种(其实就是交换机的三种类型),后三种统称为 '发布订阅模式',刚刚也简单介绍了 '发布订阅模式'
案例: 利用AMQP演示FanoutExchange的使用
实现思路如下:
1、在consumer子工程,利用代码声明(声明就是创建的意思)队列、交换机,并将两者绑定
2、在consumer子工程,编写两个消费者方法,分别监听fanout.queue1队列和fanout.queue2队列
3、在publisher子工程中编写测试方法,向publisher.fanout交换机发送消息(publisher.fanout交换机接收到消息之后,会自动向队列路由消息)
案例结构图如下
SpringAMQP提供了声明交换机、队列、绑定关系的API,如下继承关系图。下面需要用到的Exchange、FanoutExchange这两个API
具体操作如下
第一步: 在consumer子工程的src/test/java/cn.itcast.mq目录,新建config.FanoutConfig类,写入如下
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("keke.fanout");}//声明一个队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout-queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明一个队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout-queue2");}//绑定队列2和交换机@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
第二步: 运行consumer子工程的ConsumerApplication引导类
第三步: 消息接收功能。在consumer子工程的SpringRabbitListener类修改为如下
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue(String msg){
// System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
// }// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue1(String msg) throws InterruptedException {
// System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
// Thread.sleep(20);
// }
// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
// Thread.sleep(200);
// }@RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列public void listenFanoutQueue1(String msg) throws InterruptedException {System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");}@RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列public void listenFanoutQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");}}
第四步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;// @Test
// public void testSendMessageSimpleQueue() throws InterruptedException {
// String queueName = "simple-queue2";
// String message = "我是使用amqp模板,往队列里面发消息,这里是keke";
// for(int i = 0; i < 50; i++){
// rabbitTemplate.convertAndSend(queueName,message + i + "条消息");
// //每隔20毫秒发送一次,共需要1秒才能发完50条消息
// Thread.sleep(20);
// }
// }@Testpublic void testSendMessageFanoutExchange(){//交换机String exchange = "keke-fanout";//发送什么信息String message = "发送信息fanout";//发送rabbitTemplate.convertAndSend(exchange,"",message);}
}
第五步: 重新运行consumer子工程(消费者)的ConsumerApplication引导类,且保持运行,等待处理消息
第六步: 运行publisher子工程(生产者)的SpringAmqpTest类,表示向消费者发送消息
第七步: 查看consumer子工程的控制台,是否能处理消息。是可以处理消息的,并且我们发现两个队列处理的是同一条消息,也就是交换机会把同一条消息路由到两个不同的队列
总结:
交换机的作用是什么?
做消息的转发,不做消息存储,如果路由不成功,消息会丢失
FanoutExchange会将消息路由到每个绑定的队列
5. Direct交换机的消息发接
案例: 利用SpringAMQP演示DirectExchange(Direct类型的交换机)的使用。
实现思路如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey
2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3、在publisher中编写测试方法,向keke-direct交换机发送消息
具体操作如下
第一步: 把consumer子工程的SpringRabbitListener修改为如下
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue(String msg){
// System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
// }// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue1(String msg) throws InterruptedException {
// System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
// Thread.sleep(20);
// }
// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
// Thread.sleep(200);
// }
//
// @RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列
// public void listenFanoutQueue1(String msg) throws InterruptedException {
// System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");
// }
//
// @RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列
// public void listenFanoutQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");
// }//演示direct交换机@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct-queue1"),exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),key = {"red","blue"})) //指定交换机,监听哪个队列,指定keypublic void listenDirectQueue1(String msg) throws InterruptedException {System.err.println("我接收并处理了direct-queue1队列的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct-queue2"),exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了direct-queue2队列的消息:【"+msg+"】");}
}
第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
systemctl start docker # 启动docker服务
docker start mq # 启动rabbitmq容器
第三步: 运行consumer子工程的ConsumerApplication引导类
第四步: 运行publisher子工程的SpringAmqpTest类,作用仅仅是发送消息,才能触发 'consumer子工程的FanoutConfig类' 的绑定
第六步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下。表示发送到交换机的消息是带了一个key为blue的消息
第七步: 再一次验证,发一条带有key值为yellow的消息,查看控制台是否只有direct-queue2队列才能获取交换机里面带有对应key值为yellow的消息
第八步: 我们发现,direct.queue1和direct.queue2队列都带有值为red的key,那请问在生产者往交换机发送一条key值为red的消息,那么会被direct.queue1和direct.queue2队列同时接收吗。答案是肯定的
6. Topics交换机的消息发接
Topics是消息订阅模式的其中一种,共三种
'发布订阅模式' 与之前案例的区别就是允许将同一个消息发送给多个消费者(之前案例也就是前两种消息队列,只能允许同一个消息发送给一个消费者)。实现方式是加入了exchange(交换机)。'发布订阅模式' 的结构图如下
如上图,交换机可以把同一个消息只发给其中一个队列,也可以把同一个消息发送给多个队列,是由交换机的类型来决定,不同的交换机类型,消息转发规则不同,相同点是都是负责转发消息的。交换机如果路由失败的话,消息就会丢失,因为交换机不负责存储消息。交换机的常见类型如下
1、Fanout类型: 广播 (上面在 '4. Publish/Subscribe模型的消息发接' 学过)
2、Direct类型: 路由 (现在正在学的),也就是Routing
3、Topic类型: 话题
Topic交换机和上一节学的Direct交换机类似,区别在于Topic交换机的key必须是多个单词的列表,并且以小数点.分隔。结构图如下
案例: 利用SpringAMQP演示TopicExchange的使用
实现思路如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey。在上面学Direct交换机时,也是使用@RabbitListener来声明
2、在consumer子工程中,编写两个消费者方法,分别监听topic-queue1和topic-queue2
3、在publisher子工程中编写测试方法,向keke-topic发送消息
案例结构图如下
第一步: 把consumer子工程的SpringRabbitListener修改为如下
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue(String msg){
// System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
// }// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue1(String msg) throws InterruptedException {
// System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
// Thread.sleep(20);
// }
// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
// Thread.sleep(200);
// }
//
// @RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列
// public void listenFanoutQueue1(String msg) throws InterruptedException {
// System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");
// }
//
// @RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列
// public void listenFanoutQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");
// }// //演示direct交换机
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(name = "direct-queue1"),
// exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
// key = {"red","blue"}
// )) //指定交换机,监听哪个队列,指定key
// public void listenDirectQueue1(String msg) throws InterruptedException {
// System.err.println("我接收并处理了direct-queue1队列的消息:【"+msg+"】");
// }
//
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(name = "direct-queue2"),
// exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
// key = {"red","yellow"}
// ))
// public void listenDirectQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了direct-queue2队列的消息:【"+msg+"】");
// }//演示topic交换机@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic-queue1"),exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),//china有关的一切信息都可以接收到key = {"china.#"}))public void listenTopicQueue1(String msg) throws InterruptedException {System.err.println("我接收并处理了topic-queue1队列的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic-queue2"),exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),//一切有关新闻的都可以接收到key = {"#.news"}))public void listenTopicQueue2(String msg) throws InterruptedException {System.err.println("我接收并处理了topic-queue2队列的消息:【"+msg+"】");}
}
第二步: 确保已经启动Docker、并启动Docker里面的rabbitmq容器
systemctl start docker # 启动docker服务
docker start mq # 启动rabbitmq容器
第三步: 运行consumer子工程的ConsumerApplication引导类
第六步: 消息发送功能。在publisher子工程的SpringAmqpTest类修改为如下。表示发送到交换机的消息是带了一个key为china.weather的消息
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
//表明Test测试类要使用注入的类,
// 比如@Autowired注入的类,
// 有了@RunWith(SpringRunner.class)
// 这些类才能实例化到spring容器中,自动注入才能生效。
@SpringBootTest
public class SpringAmqpTest {//Rabbit模板注入@Autowiredprivate RabbitTemplate rabbitTemplate;// @Test
// public void testSendMessageSimpleQueue() throws InterruptedException {
// String queueName = "simple-queue2";
// String message = "我是使用amqp模板,往队列里面发消息,这里是keke";
// for(int i = 0; i < 50; i++){
// rabbitTemplate.convertAndSend(queueName,message + i + "条消息");
// //每隔20毫秒发送一次,共需要1秒才能发完50条消息
// Thread.sleep(20);
// }
// }// //测试fanout交换机
// @Test
// public void testSendMessageFanoutExchange(){
// //交换机
// String exchange = "keke-fanout";
// //发送什么信息
// String message = "发送信息fanout";
// //发送
// rabbitTemplate.convertAndSend(exchange,"",message);
// }// //测试direct交换机
// @Test
// public void testSendMessageDirectExchange(){
// //往哪个交换机发送消息
// String exchangeName = "keke-direct";
// //要发送什么消息
// String message = "来自生产者向你发送的消息hello";
// //发送,需要指定key,例如发送到交换机的消息带了一个key为blue的消息,只有对应的队列才能从交换机获取这个消息
// rabbitTemplate.convertAndSend(exchangeName,"red",message);
// }@Testpublic void testSendMessageTopicExchange(){String exchangeName = "keke-topic";String message = "今天天气阴,大风";rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);}}
修改生产者为,再次发送消息,看消费者接受情况
@Testpublic void testSendMessageTopicExchange(){String exchangeName = "keke-topic";String message = "本台报道,日本把核污水排海";rabbitTemplate.convertAndSend(exchangeName,"japan.news",message);}
修改生产者为,再次发送消息,看消费者接受情况
@Testpublic void testSendMessageTopicExchange(){String exchangeName = "keke-topic";String message = "本台报道:2023年11月3日,S13全球LOL总决赛中,来自lpl的二号种子BLG战队(全华班)抬走韩国一号种子GEN(全韩班)";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
7. 消息转换器的消息发送
案例: 测试发送Object类型的消息
说明: 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
从上面的学习中,我们知道有两种方式可以创建队列(交换机也同理),一种是通过注解(例如我们在FanoutConfig类里面写的代码),
另一种是通过代码(例如我们在SpringRabbitListener类里面写的代码)。在这个案例中,我们将使用通过注解的方式来创建队列
第一步: 创建队列。在consumer子工程的FanoutConfig类,写入如下。然后,重新运行consumer子工程的ConsumerApplication引导类
//创建一个演示发送Object队列@Beanpublic Queue ObjectQueue(){return new Queue("Object-Queue");}
第二步: 向队列发送消息。在publisher子工程的SpringAmqpTest类,写入如下,然后运行
@Testpublic void testSendObjectMessage(){Map<String,Object> map = new HashMap<>();map.put("name","张三");map.put("age","20");rabbitTemplate.convertAndSend("Object-Queue",map);}
原因是rabbit只支持字节,在发送object对象时,spring会默认根据jdk序列化方式进行对象的序列化,这种方式性能差,安全性差,冗长繁杂
解决:用json的方式进行序列化
我们已经实现了把对象类型的消息发送给队列。下面是解决序列化问题,spring默认的序列化方式是基于jdk的,缺陷较多
如何使用基于json的序列化方式呢,具体操作过程如下
第一步: 由于consumer子工程和publisher子工程都需要使用这个依赖,所以我们直接在mq-demo父工程的pom.xml添加如下
<!--Java对象转换为JSON格式数据-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
第二步: 在publisher子工程,声明bean去覆盖默认bean,需要新建一个配置类,或者直接在引导类写。例如把PublisherApplication引导类修改为如下
package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Bean//这里只添加这个方法public MessageConverter messageConverter(){//使用Jackson消息转换工具,将消息转换为JSON格式数据return new Jackson2JsonMessageConverter();}
}
第三步: 为了不影响测试,先去浏览器清理刚刚的那条消息
第四步: 重新运行consumer子工程的ConsumerApplication引导类,并且重新运行publisher子工程的SpringAmqpTest类(运行testSendObjectMessage方法)
然后打开浏览器
总结
SpringAMQP中消息的 序列化(消息发送) 和 反序列化(消息接收) 是怎么实现的
1、利用MessageConverter实现的,默认是JDK的序列化
2、注意发送方与接收方必须使用相同的MessageConverter(下面学消息接收的时候会才学)
上面只是学习发送对象类型的消息,下面会学如何接收对象类型的消息
8. 消息转换器的消息接收
案例: 测试接收Object类型的消息
说明: 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
上述 '7. 消息转换器的消息发送' 我们已经成功向队列发送了Object类型(Map类型的对象)的消息,该消息是以json序列化的形式存放在队列,我们怎么让消费者能从队列中获取该消息呢,也就是我们下面要学的
第一步(做好可跳过): 由于consumer子工程和publisher子工程都需要使用这个依赖,所以我们直接在mq-demo父工程的pom.xml添加如下
<!--Java对象转换为JSON格式数据-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
第二步: 在consumer子工程,声明bean去覆盖默认bean,需要新建一个配置类,或者直接在引导类写。例如把ConsumerApplication引导类修改为如下
package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Bean//这里只添加这个方法public MessageConverter messageConverter(){//使用Jackson消息转换工具,将消息转换为JSON格式数据return new Jackson2JsonMessageConverter();}
}
第三步: 把consumer子工程的SpringRabbitListener修改为如下,增加新增接收对象消息的方法
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue(String msg){
// System.out.println("我接收并处理了simple.queue2队列的消息:【"+msg+"】");
// }// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue1(String msg) throws InterruptedException {
// System.out.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠20毫秒,也就是1秒能处理50条消息
// Thread.sleep(20);
// }
// @RabbitListener(queues = "simple-queue2")//指定监听哪个消息队列
// public void listenSimpleQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了simple-queue2队列的消息:【"+msg+"】" + "当前时间是" + LocalTime.now());
// //每处理一条消息就休眠100毫秒,也就是1秒能处理10条消息
// Thread.sleep(200);
// }
//
// @RabbitListener(queues = "fanout-queue1")//指定监听哪个消息队列
// public void listenFanoutQueue1(String msg) throws InterruptedException {
// System.err.println("我接收并处理了fanout-queue1队列的消息:【"+msg+"】");
// }
//
// @RabbitListener(queues = "fanout-queue2")//指定监听哪个消息队列
// public void listenFanoutQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了fanout-queue2队列的消息:【"+msg+"】");
// }// //演示direct交换机
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(name = "direct-queue1"),
// exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
// key = {"red","blue"}
// )) //指定交换机,监听哪个队列,指定key
// public void listenDirectQueue1(String msg) throws InterruptedException {
// System.err.println("我接收并处理了direct-queue1队列的消息:【"+msg+"】");
// }
//
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(name = "direct-queue2"),
// exchange = @Exchange(name = "keke-direct",type = ExchangeTypes.DIRECT),
// key = {"red","yellow"}
// ))
// public void listenDirectQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了direct-queue2队列的消息:【"+msg+"】");
// }// //演示topic交换机
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(name = "topic-queue1"),
// exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),
// //china有关的一切信息都可以接收到
// key = {"china.#"}
// ))
// public void listenTopicQueue1(String msg) throws InterruptedException {
// System.out.println("我接收并处理了topic-queue1队列的消息:【"+msg+"】");
// }
//
//
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(name = "topic-queue2"),
// exchange = @Exchange(name = "keke-topic",type = ExchangeTypes.TOPIC),
// //一切有关新闻的都可以接收到
// key = {"#.news"}
// ))
// public void listenTopicQueue2(String msg) throws InterruptedException {
// System.err.println("我接收并处理了topic-queue2队列的消息:【"+msg+"】");
// }//演示接收Object类型的消息@RabbitListener(queues = "Object-Queue")public void listenObjectQueue(Map<String,Object> map){System.out.println("接收到Object类型的消息" + map);}
}
第四步: 测试。先运行publisher子工程的SpringAmqpTest表示向队列发送消息,再运行consumer子工程的ConsumerApplication引导类
查看子工程是否能接收到消息
总结
SpringAMQP中消息的 序列化(消息发送) 和 反序列化(消息接收) 是怎么实现的
1、利用MessageConverter实现的,默认是JDK的序列化
2、注意发送方与接收方必须使用相同的MessageConverter
相关文章:

实用篇-MQ消息队列
一、初识MQ 通讯分为同步通讯和异步通讯,同步通讯就比如我们日常生活中的打电话,看直播,能够得到及时的反馈。而异步通讯则类似于聊天软件聊天,不需要建立实时的连接,并且可以进行建立多个业务一起异步执行 1. 同步通…...

springboot打包时依赖jar和项目jar分开打包;jar包瘦身
概述 最近感觉项目在部署时时jar包传输太慢了; 看了下jar包内容,除了项目代码,其余大部分都是依赖jar; 平时改动较多的只是项目代码,依赖jar改动比较少; 所以就在想能不能分开打包;这样只部署项…...

嵌入式系统的元素
注意:关于嵌入式系统的元素这一块儿内容,定义太多了。例如:吉姆莱丁 著,陈会翔 译,由清华大学出版社出版的《构建高性能嵌入式系统》中提到:嵌入式系统通常由电源、时基、数字处理、内存、软件和固件、专用…...
提升ChatGPT答案质量和准确性的方法Prompt engineering实用的prompt灵感和技巧
文章目录 1. 实用的prompt灵感和技巧小技巧常用prompt保存到输入法中普通promptprompt通用公式保存到输入法快捷指令中尝试用英语去写prompt沉浸式翻译软件3. 补充1. 实用的prompt灵感和技巧 解释***,并且给出暗喻/隐喻/类比(解释术语、专业名称,用一个词或短语指出常见的一…...
[Machine Learning] Learning with Noisy Labels
文章目录 随机分类噪声 (Random Classification Noise, RCN)类别依赖的标签噪声 (Class-Dependent Noise, CCN)二分类多分类 实例和类别依赖的标签噪声 (Instance and Label-Dependent Noise, ILN) 标签噪声是指分类任务中的标签被错误地标记。这可能是由于各种原因,…...

集简云slack(自建)无需API开发轻松连接OA、电商、营销、CRM、用户运营、推广、客服等近千款系统
slack是一个工作效率管理平台,让每个人都能够使用无代码自动化和 AI 功能,还可以无缝连接搜索和知识共享,并确保团队保持联系和参与。在世界各地,Slack 不仅受到公司的信任,同时也是人们偏好使用的平台。 官网&#x…...

Idea 对容器中的 Java 程序断点远程调试
第一种:简单粗暴型 直接在java程序中添加log.info(),根据需要打印信息然后打包覆盖,根据日志查看相关信息 第二种:远程调试 在IDEA右上角点击编辑配置设置相关参数在Dockerfile中加入 "-jar", "-agentlib:jdwp…...

vscode设置保存后,自动格式化代码
第一步:打开setting.json文件 第二步:在setting.json中加入以下代码 "editor.formatOnType": true, "editor.formatOnSave": true, "editor.formatOnPaste": true...

datagrip出现 java.net.ConnectException: Connection refused: connect.
出现这样的情况要看一下hadoop有没有启动 start-all.sh nohup /export/server/apache-hive-3.1.2-bin/bin/hive --service hiveserver2 & scp -r /export/server/apache-hive-3.1.2-bin/ node3:/export/server/ /export/server/apache-hive-3.1.2-bin/bin/hive show databa…...

Docker 安装ELK7.7.1
(在安装之前,本方法必须安装jdk1.8以上版本) 一、安装elasticsearch 1、下载elasticsearch7镜像:docker pull elasticsearch:7.7.1 2、创建挂载目录:mkdir -p /data/elk/es/{config,data,logs} 3、赋予权限:chown -R 1000:100…...
决策树算法
决策树算法是一种用于分类和回归问题的机器学习算法。它通过构建树形结构来进行决策,每个内部节点代表一个特征或属性,每个叶子节点代表一个类别或值。 下面是决策树算法的一般步骤: 数据准备:收集相关的训练数据,并对…...
maven之pom文件详解
一、maven官网 maven官网 maven官网pom文件详解链接 二、maven之pom 1、maven项目的目录结构 pom文件定于了一个maven项目的maven配置,一般pom文件的放在项目或者模块的根目录下。 maven的遵循约定大于配置,约定了如下的目录结构: 目录目…...

深度学习之基于Python+OpenCV+dlib的考生信息人脸识别系统(GUI界面)
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习在人脸识别领域的应用已经取得了显著的进展。Python是一种常用的编程语言,它提供了许多强大的库…...

创建javaEE项目(无maven),JSP(九大内置对象)、Servlet(生命周期)了解
一、Servlet和jsp 0.创建web项目(无maven): 1.创建一个普通的java项目 2.项目根目录右键,添加模板 3.配置tomcat服务器 4.配置项目tomcat依赖 1.Servlet(Server Applet)服务端小程序 用户通过浏览器发送一个请求,服务器tomcat接收到后&…...

BIOS开发笔记 - HDA Audio
在PC中,音频输出是一个重要的功能之一,目前大多数采用的是英特尔高清晰音效(英语:Intel High Definition Audio,简称为HD Audio或IHD)方案,它是由Intel于2004年所提出的音效技术,能够展现高清晰度的音质效果,且能进行多声道的播放,在音质(音效质量)上超越过去的其他…...

C语言——选择排序
完整代码: //选择排序 // 选择排序是一种简单直观的排序算法。它的工作原理如下:首先在未排序序列中找到最小(大)元素,存放到排序序列的起始位置,然后,再从剩余未排序元素中继续寻找最小(大&am…...

vue详细安装教程
这里写目录标题 一、下载和安装node二、创建全局安装目录和缓存日志目录三、安装vue四、创建一个应用程序五、3x版本创建六、创建一个案例 一、下载和安装node 官网下载地址:https://nodejs.org/en/download 选择适合自己的版本,推荐LTS,长久…...
Java 正则表达式字符篇
精确匹配一个字符 精确匹配字符串 abc , //精确匹配字符串 "abc"String regexabc "abc";System.out.println("abc".matches(regexabc));// trueSystem.out.println("ABC".matches(regexabc));// falseSystem.out.println…...

shell脚本代码混淆
文章目录 起因安装 Bashfuscator安装BashfuscatorBashfuscator的使用 起因 很多时候我并不希望自己的shell脚本被别人看到,于是我在想有没有什么玩意可以把代码加密而又正常执行,于是我想到了代码混淆,简单来看一下: 现在我的目…...

【MATLAB第81期】基于MATLAB的LSTM长短期记忆网络预测模型时间滞后解决思路(更新中)
【MATLAB第81期】基于MATLAB的LSTM长短期记忆网络预测模型时间滞后解决思路(更新中) 在LSTM预测过程中,极易出现时间滞后,类似于下图,与一个以上的样本点结果错位,产生滞后的效果。 在建模过程中…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...

自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...

均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...

OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...