当前位置: 首页 > article >正文

使用java代码操作rabbitMQ收发消息

SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:

Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

快速入门

别忘了在我们的项目中,引入spring amqp的依赖。

<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。

在入门案例中,我们就演示这样的简单模型,如图:

也就是

  • publisher直接发送消息到队列
  • 消费者监听并处理队列中的消息

注意:这种模式一般测试使用,很少在生产中使用。

为了方便测试,我们在rabbitMQ控制台,创建名为 simple.queue 的队列。

添加队列后查看

接下来,我们就可以利用Java代码收发消息了。

消息发送

在我们的项目application.yml 中 添加关于rabbitmq的配置信息。

spring:rabbitmq:host: 123.56.247.70 # 你的虚拟机IPport: 5672          # rabbitMQ端口virtual-host: /sde  # 虚拟机名称username: sundaoen  # 用户名password: 8888888888       # 密码

编写测试类

在我们项目的publisher中创建测试类,并且利用 RabbitTemplate 发送消息。

@SpringBootTest
public class TestSendMessage {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void  testSimpleQueue(){// 1 队列名称String queueName = "simple.queue";// 2 消息String message = "hello simple.queue";// 3 发送消息rabbitTemplate.convertAndSend(queueName,message);}
}

打开控制台,可以看到消息已经发送到队列中:

看看消息内容

接下来,我们再来实现消息接收。

消息接收

同样的道理,也是先配置MQ地址。在application.yml 中

spring:rabbitmq:host: 123.56.247.70 # 你的虚拟机IPport: 5672          # rabbitMQ端口virtual-host: /sde  # 虚拟机名称username: sundaoen  # 用户名password: 8888888888       # 密码

在consumer服务中编写监听器类,并利用@RabbitListener实现消息的接收。

@Slf4j
@Component
public class SimpleQueueListener {/*利用@RabbitListener注解,可以监听到对应队列的消息一旦监听的队列有消息,就会回调当前方法,在方法中接收消息并消费处理消息*/@RabbitListener(queues = "simple.queue")public void listenerSimpleQueue(String msg){System.out.println("SpringRabbitListener 监听到 simple.queue 队列中的消息是:" + msg);}
}

WorkQueue模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

接下来,我们就来模拟这样的场景。

首先,我们在控制台创建一个新的队列,命名为work.queue:

添加后的效果

消息发送

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的WorkQueueSendTest类中添加一个测试方法:

@SpringBootTest
public class WorkQueueSendTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendWorkQueue() throws InterruptedException {// 1 队列名称String queueName = "work.queue";// 2 消息String message = "hello work.queue-";// 3 发送消息for (int i = 1; i <= 50; i++) {// 每隔20毫秒发送一条消息,相当于一秒发送50条消息。rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}}
}

可以看到在work.queue 队列中有50条消息。

消息接收

要模拟多个消费者绑定同一个队列,我们在consumer服务中的,listener包中。新增WorkQueueListener类并添加2个新的方法:

@Slf4j
@Component
public class WorkQueueListener {/*实现两个消费 work.queue的监听消费消息的方法;一个方法消费后沉睡 20毫秒;一个消息消费后沉睡200毫秒;*/@RabbitListener(queues = "work.queue")public void listenerWorkQueue1(String msg){System.out.println("消费者1接收到消息" + msg +" 时间:"+ LocalDateTime.now());try {Thread.sleep(20); // 沉睡20毫秒 1秒是1000毫秒等于1秒处理50条消息} catch (InterruptedException e) {e.printStackTrace();}}@RabbitListener(queues = "work.queue")public void listenerWorkQueue2(String msg){System.out.println("***消费者2接收到消息" + msg +" 时间:"+ LocalDateTime.now());try {Thread.sleep(200); // 沉睡200毫秒 1秒是1000毫秒等于1秒处理5条消息} catch (InterruptedException e) {e.printStackTrace();}}}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

测试看结果

消费者1接收到消息hello work.queue-1 时间:2025-02-05T14:53:00.928905400
***消费者2接收到消息hello work.queue-2 时间:2025-02-05T14:53:00.947629900
消费者1接收到消息hello work.queue-3 时间:2025-02-05T14:53:00.977764800
消费者1接收到消息hello work.queue-5 时间:2025-02-05T14:53:01.039608
消费者1接收到消息hello work.queue-7 时间:2025-02-05T14:53:01.101242200
消费者1接收到消息hello work.queue-9 时间:2025-02-05T14:53:01.160396600
***消费者2接收到消息hello work.queue-4 时间:2025-02-05T14:53:01.161396900
消费者1接收到消息hello work.queue-11 时间:2025-02-05T14:53:01.231704200
消费者1接收到消息hello work.queue-13 时间:2025-02-05T14:53:01.281879300
消费者1接收到消息hello work.queue-15 时间:2025-02-05T14:53:01.347333400
***消费者2接收到消息hello work.queue-6 时间:2025-02-05T14:53:01.376528100
消费者1接收到消息hello work.queue-17 时间:2025-02-05T14:53:01.407569700
消费者1接收到消息hello work.queue-19 时间:2025-02-05T14:53:01.464497900
消费者1接收到消息hello work.queue-21 时间:2025-02-05T14:53:01.525121200
消费者1接收到消息hello work.queue-23 时间:2025-02-05T14:53:01.587589500
***消费者2接收到消息hello work.queue-8 时间:2025-02-05T14:53:01.589591300
消费者1接收到消息hello work.queue-25 时间:2025-02-05T14:53:01.647549500
消费者1接收到消息hello work.queue-27 时间:2025-02-05T14:53:01.709757900
消费者1接收到消息hello work.queue-29 时间:2025-02-05T14:53:01.768879300
***消费者2接收到消息hello work.queue-10 时间:2025-02-05T14:53:01.801437800
消费者1接收到消息hello work.queue-31 时间:2025-02-05T14:53:01.829539900
消费者1接收到消息hello work.queue-33 时间:2025-02-05T14:53:01.895907400
消费者1接收到消息hello work.queue-35 时间:2025-02-05T14:53:01.950810
消费者1接收到消息hello work.queue-37 时间:2025-02-05T14:53:02.011575
***消费者2接收到消息hello work.queue-12 时间:2025-02-05T14:53:02.014526300
消费者1接收到消息hello work.queue-39 时间:2025-02-05T14:53:02.073814400
消费者1接收到消息hello work.queue-41 时间:2025-02-05T14:53:02.142812400
消费者1接收到消息hello work.queue-43 时间:2025-02-05T14:53:02.199522100
***消费者2接收到消息hello work.queue-14 时间:2025-02-05T14:53:02.228114600
消费者1接收到消息hello work.queue-45 时间:2025-02-05T14:53:02.255591100
消费者1接收到消息hello work.queue-47 时间:2025-02-05T14:53:02.315954800
消费者1接收到消息hello work.queue-49 时间:2025-02-05T14:53:02.377632900
***消费者2接收到消息hello work.queue-16 时间:2025-02-05T14:53:02.440855300
***消费者2接收到消息hello work.queue-18 时间:2025-02-05T14:53:02.654015100
***消费者2接收到消息hello work.queue-20 时间:2025-02-05T14:53:02.867783300
***消费者2接收到消息hello work.queue-22 时间:2025-02-05T14:53:03.080905400
***消费者2接收到消息hello work.queue-24 时间:2025-02-05T14:53:03.296731200
***消费者2接收到消息hello work.queue-26 时间:2025-02-05T14:53:03.512099400
***消费者2接收到消息hello work.queue-28 时间:2025-02-05T14:53:03.725353500
***消费者2接收到消息hello work.queue-30 时间:2025-02-05T14:53:03.939706400
***消费者2接收到消息hello work.queue-32 时间:2025-02-05T14:53:04.152588100
***消费者2接收到消息hello work.queue-34 时间:2025-02-05T14:53:04.367337200
***消费者2接收到消息hello work.queue-36 时间:2025-02-05T14:53:04.581549200
***消费者2接收到消息hello work.queue-38 时间:2025-02-05T14:53:04.793774100
***消费者2接收到消息hello work.queue-40 时间:2025-02-05T14:53:05.006103400
***消费者2接收到消息hello work.queue-42 时间:2025-02-05T14:53:05.220121400
***消费者2接收到消息hello work.queue-44 时间:2025-02-05T14:53:05.433498300
***消费者2接收到消息hello work.queue-46 时间:2025-02-05T14:53:05.645486500
***消费者2接收到消息hello work.queue-48 时间:2025-02-05T14:53:05.856447600
***消费者2接收到消息hello work.queue-50 时间:2025-02-05T14:53:06.065771700

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

能者多劳

更改一下我们的配置文件,就好了。更改的是consumer消费者服务 application.yml 配置文件。

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

重启项目,再次测试看结果。

消费者1接收到消息hello work.queue-1 时间:2025-02-05T16:19:40.610672600
***消费者2接收到消息hello work.queue-2 时间:2025-02-05T16:19:40.635078900
消费者1接收到消息hello work.queue-3 时间:2025-02-05T16:19:40.668399800
消费者1接收到消息hello work.queue-4 时间:2025-02-05T16:19:40.733468200
消费者1接收到消息hello work.queue-5 时间:2025-02-05T16:19:40.789432700
消费者1接收到消息hello work.queue-6 时间:2025-02-05T16:19:40.849740
***消费者2接收到消息hello work.queue-7 时间:2025-02-05T16:19:40.865255600
消费者1接收到消息hello work.queue-8 时间:2025-02-05T16:19:40.915186600
消费者1接收到消息hello work.queue-9 时间:2025-02-05T16:19:40.975302
消费者1接收到消息hello work.queue-10 时间:2025-02-05T16:19:41.035238100
消费者1接收到消息hello work.queue-11 时间:2025-02-05T16:19:41.098149900
***消费者2接收到消息hello work.queue-12 时间:2025-02-05T16:19:41.110162300
消费者1接收到消息hello work.queue-13 时间:2025-02-05T16:19:41.158752
消费者1接收到消息hello work.queue-14 时间:2025-02-05T16:19:41.214050800
消费者1接收到消息hello work.queue-15 时间:2025-02-05T16:19:41.275456500
消费者1接收到消息hello work.queue-16 时间:2025-02-05T16:19:41.338280900
***消费者2接收到消息hello work.queue-17 时间:2025-02-05T16:19:41.354040400
消费者1接收到消息hello work.queue-18 时间:2025-02-05T16:19:41.397333900
消费者1接收到消息hello work.queue-19 时间:2025-02-05T16:19:41.459536100
消费者1接收到消息hello work.queue-20 时间:2025-02-05T16:19:41.522984800
消费者1接收到消息hello work.queue-21 时间:2025-02-05T16:19:41.589369900
***消费者2接收到消息hello work.queue-22 时间:2025-02-05T16:19:41.595472400
消费者1接收到消息hello work.queue-23 时间:2025-02-05T16:19:41.639076100
消费者1接收到消息hello work.queue-24 时间:2025-02-05T16:19:41.702762100
消费者1接收到消息hello work.queue-25 时间:2025-02-05T16:19:41.761438700
消费者1接收到消息hello work.queue-26 时间:2025-02-05T16:19:41.823348300
***消费者2接收到消息hello work.queue-27 时间:2025-02-05T16:19:41.836398700
消费者1接收到消息hello work.queue-28 时间:2025-02-05T16:19:41.894946600
消费者1接收到消息hello work.queue-29 时间:2025-02-05T16:19:41.962451900
消费者1接收到消息hello work.queue-30 时间:2025-02-05T16:19:42.020227900
***消费者2接收到消息hello work.queue-31 时间:2025-02-05T16:19:42.066749100
消费者1接收到消息hello work.queue-32 时间:2025-02-05T16:19:42.080599800
消费者1接收到消息hello work.queue-33 时间:2025-02-05T16:19:42.143280700
消费者1接收到消息hello work.queue-34 时间:2025-02-05T16:19:42.204272700
消费者1接收到消息hello work.queue-35 时间:2025-02-05T16:19:42.270407300
***消费者2接收到消息hello work.queue-36 时间:2025-02-05T16:19:42.309818400
消费者1接收到消息hello work.queue-37 时间:2025-02-05T16:19:42.332003100
消费者1接收到消息hello work.queue-38 时间:2025-02-05T16:19:42.391974600
消费者1接收到消息hello work.queue-39 时间:2025-02-05T16:19:42.454012300
消费者1接收到消息hello work.queue-40 时间:2025-02-05T16:19:42.509398500
***消费者2接收到消息hello work.queue-41 时间:2025-02-05T16:19:42.555230800
消费者1接收到消息hello work.queue-42 时间:2025-02-05T16:19:42.570220
消费者1接收到消息hello work.queue-43 时间:2025-02-05T16:19:42.629378200
消费者1接收到消息hello work.queue-44 时间:2025-02-05T16:19:42.690519600
消费者1接收到消息hello work.queue-45 时间:2025-02-05T16:19:42.756214500
***消费者2接收到消息hello work.queue-46 时间:2025-02-05T16:19:42.797371400
消费者1接收到消息hello work.queue-47 时间:2025-02-05T16:19:42.813034800
消费者1接收到消息hello work.queue-48 时间:2025-02-05T16:19:42.876228100
消费者1接收到消息hello work.queue-49 时间:2025-02-05T16:19:42.939391
消费者1接收到消息hello work.queue-50 时间:2025-02-05T16:19:42.998590500

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了7条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

交换机类型

在之前的两个测试案例中,都没有交换机Exchange,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少

文档中,我们讲解前面的三种交换机模式。

Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

我们的计划是这样的:

  • 创建一个名为test.fanout的交换机,类型是Fanout
  • 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机test.fanout

声明交换机和队列

在控制台创建 fanout.queue1 和 fanout.queue2 两个队列。

然后在创建一个交换机

绑定两个队列到交换机

消息发送

在publisher服务的FanoutExchangeTest类中添加测试方法:

@SpringBootTest
public class FanoutExchangeTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*测试 fanout exchange;向 test.fanout 交换机发送消息,消息内容为 hello everyone!,会发送到所有绑定到该交换机的队列*/@Testpublic void testSendFanoutExchange(){// 1 交换机名称String exchangeName = "test.fanout";// 2 消息String msg = "hello everyone!";// 3 发送消息rabbitTemplate.convertAndSend(exchangeName,"",msg);}
}

注意:上述的 convertAndSend 方法的第2个参数:路由key 因为没有绑定,所以可以指定为空

看看rabbitMQ的控制台

消息接收

在consumer服务中添加FanoutQueueListener类,并新增两个方法,监听队列中的消息 作为消费者。

@Slf4j
@Component
public class FanoutQueueListener {/*** 监听fanout.queue1队列*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("【消费者1】 接收到消息:" + msg);}/*** 监听fanout.queue2队列*/@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("【消费者2】 接收到消息:" + msg);}
}

总结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。

案例需求如图

  1. 声明一个名为test.direct的交换机
  2. 声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
  3. 声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
  4. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  5. 在publisher中编写测试方法,向test.direct发送消息

声明队列和交换机

首先在控制台声明两个队列direct.queue1和direct.queue2,这里不再展示过程:

然后声明一个direct类型的交换机,命名为test.direct:

然后使用red和blue作为key,绑定direct.queue1到test.direct:

绑定diretc.queue2

看看最后的绑定关系

消息发送

在publish服务中,新增 DirectExchangeTest 类发送消息。

@SpringBootTest
public class DirectExchangeTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*测试 direct exchange;向 test.direct 交换机发送消息,会根据路由key发送到所有绑定到该交换机的队列*/@Testpublic void testSendDirectExchange(){// 1 交换机String exchangeName = "test.direct";// 2 消息String msg = "这是一条消息,并且路由key是red 红色。";// 3 发送消息 路由key为redrabbitTemplate.convertAndSend(exchangeName,"red",msg);//改变下消息msg = "这是一条消息,并且路由key是blue 蓝色。";rabbitTemplate.convertAndSend(exchangeName,"blue",msg);}}

看看rabbitMQ控制台,查看消息是否成功发送。

消息接收

在consumer服务中,添加 DirectQueueListener 类,并在里面编写两个方法。

@Slf4j
@Component
public class DirectQueueListener {/*** 监听direct.queue1队列*/@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg){log.info("【消费者1】接收到消息:{}",msg);}/*** 监听direct.queue2队列* @param msg*/@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg){log.info("【消费者2】接收到消息:{}",msg);}
}

由于 test.redirect 交换机绑定的两个队列的路由key有red;所以指定了路由key为red的消息能被两个消费者都收到。

而路由key为 blue 的队列只有direct.queue1;所以只有监听这个队列的 消费者1 能够接收到消息:

总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

Topic类型交换机

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定RoutingKey 的时候使用通配符!

RoutingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

图示:

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:

创建交换机和队列

创建test.topic 交换机

看看效果

给test.topic 交换机绑定两个队列

消息发送

在consumer服务中,新增 TopicExchangeTest类 发送消息。

@SpringBootTest
public class TopicExchangeTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendTopicExchange(){// 1 交换机String exchangeName = "test.topic";// 2 消息String msg = "我是TopicExchange交换机的消息,路由key是 china.news";// 3 发送路由key为 china.news 的消息rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);}
}

消息接收

在consumer服务中,添加 TopicExchangeListener 类,编写两个方法监听消息。

@Slf4j
@Component
public class TopicExchangeListener {/*** 监听topic.queue1队列*/@RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg) {log.info("【消费者1】监听到消息:{}", msg);}/*** 监听topic.queue2队列*/@RabbitListener(queues ="topic.queue2")public void listenTopicQueue2(String msg) {log.info("【消费者2】监听到消息:{}", msg);}}

总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的RoutingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

代码声明交换机和队列

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

基本API

SpringAMQP提供了一个Queue类,用来创建队列:

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

把之前创建的队列和交换机删除

删除后的队列

删除后的交换机

Ideal控制台报错

这是因为我们的队列和交换机都删除了,里面写的 RabbitListener 还在监听队列中的消息,但是队列没有了,所以报错。

fanout示例

在consumer服务中,新建config包。并创建FanoutConfig 类 在里面编写代码,创建test.fanout 交换机和fanout.queue1 和fanout.queue2 队列。 并启动consumer服务

@Configuration
public class FanoutConfig {// 声明 Fanout 类型的交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("test.fanout");}//声明队列,名称为 fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列和交换机@Beanpublic Binding fanoutBinding1(FanoutExchange fanoutExchange,Queue fanoutQueue1){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明队列,名称为 fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列和交换机@Beanpublic Binding fanoutBinding2(FanoutExchange fanoutExchange,Queue fanoutQueue2){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

看看rbbitMQ控制台效果

看看交换机

Direct示例

在consumer 服务中的 config包中,新建 DirectConfig 类,编写代码创建交换机和队列。direct模式由于要绑定多个key,会比较麻烦一点,因为每一个key都要写一个binding方法。

@Configuration
public class DirectConfig {//声明 test.direct 交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange("test.direct");}//声明 direct.queue1 队列@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}//绑定 direct.queue1 队列到 test.direct 交换机上 路由key是 red@Beanpublic Binding directBindingQueue1Red(DirectExchange directExchange,Queue directQueue1){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}//绑定 direct.queue1 队列到 test.direct 交换机上 路由key是 blue@Beanpublic Binding directBindingQueue1Blue(DirectExchange directExchange,Queue directQueue1){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}//声明 direct.queue2 队列@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}//绑定 direct.queue2 队列到 test.direct 交换机上 路由key是 red@Beanpublic Binding directBindingQueue2Red(DirectExchange directExchange, Queue directQueue2){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}//绑定 direct.queue2 队列到 test.direct 交换机上 路由key是 yellow@Beanpublic Binding directBindingQueue2Yellow(DirectExchange directExchange,Queue directQueue2){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

看看rabbitMQ控制台

看看交换机和绑定关系

Topic示例

在consumer 服务中的config包里面,新创建TopicConfig类,编写代码创建交换机和队列。

@Configuration
public class TopicConfig {//声明 test.topic 交换机@Beanpublic TopicExchange topicExchange(){return new TopicExchange("test.topic");}//声明 topic.queue1 队列@Beanpublic Queue topicQueue1(){return new Queue("topic.queue1");}//绑定队列和交换机 路由key是 china.#@Beanpublic Binding topicBinding1(TopicExchange topicExchange,Queue topicQueue1){return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#");}//声明 topic.queue2 队列@Beanpublic Queue topicQueue2(){return new Queue("topic.queue2");}//绑定队列和交换机 路由key是 #.news@Beanpublic Binding topicBinding2(TopicExchange topicExchange,Queue topicQueue2){return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news");}
}

看看控制台效果

交换机

基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。不过是在消息监听的时候基于注解的方式来声明。

例如,我们同样声明Direct模式的交换机和队列;用注解的方式声明下。

先把之前创建的 交换机和队列删除。

删除后的效果

Fanout示例

@Configuration
public class FanoutRabbitListener {// 监听fanout.queue1 队列的消息@RabbitListener(bindings = @QueueBinding(value = @Queue("fanout.queue1"),exchange = @Exchange(value = "test.fanout",type = ExchangeTypes.FANOUT),key = ""))public void listenFanoutQueue1(String msg){System.out.println("【消费者1】 监听到消息" + msg);}// 监听fanout.queue2 队列的消息@RabbitListener(bindings = @QueueBinding(value = @Queue("fanout.queue2"),exchange = @Exchange(value = "test.fanout",type = ExchangeTypes.FANOUT),key = ""))public void listenFanoutQueue2(String msg){System.out.println("【消费者2】 监听到消息" + msg);}
}

启动consumer服务看效果

交换机和绑定关系

Direct示例

新建 DirectRabbitListener 类,并在里面编写代码进行测试。

@Configuration
public class DirectRabbitListener {// 声明 direct.queue1@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "test.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String msg){System.out.println("【消费者1】 接收到消息:" + msg);}// 声明direct.queue2@RabbitListener(bindings =@QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "test.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg){System.out.println("【消费者2】 接收到消息:" + msg);}
}

看看效果

交换机和绑定关系

Topic示例

在consumer服务中的 config包里面,创建TopicRabbitListener类。编写代码进行测试

@Configuration
public class TopicRabbitListener {//声明topic.queue1 队列@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "test.topic",type = ExchangeTypes.TOPIC),key = {"china.#"}))public void listenTopicQueue1(String msg){System.out.println("【消费者1】接收到消息:"+msg);}//声明 topic.queue2 队列@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "test.topic",type = ExchangeTypes.TOPIC),key = {"#.news"}))public void listenTopicQueue2(String msg){System.out.println("【消费者2】接收到消息:"+msg);}
}

看看效果

交换机和绑定关系

相关文章:

使用java代码操作rabbitMQ收发消息

SpringAMQP 将来我们开发业务功能的时候&#xff0c;肯定不会在控制台收发消息&#xff0c;而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议&#xff0c;因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息&#xff0c;都可以与RabbitMQ交互。并且RabbitMQ官方也…...

【数据结构】(7) 栈和队列

一、栈 Stack 1、什么是栈 栈是一种特殊的线性表&#xff0c;它只能在固定的一端&#xff08;栈顶&#xff09;进行出栈、压栈操作&#xff0c;具有后进先出的特点。 2、栈概念的例题 答案为 C&#xff0c;以C为例进行讲解&#xff1a; 第一个出栈的是3&#xff0c;那么 1、…...

【Pytorch实战教程】让数据飞轮转起来:PyTorch Dataset与Dataloader深度指南

文章目录 让数据飞轮转起来:PyTorch Dataset与Dataloader深度指南一、为什么需要数据管理组件?二、Dataset:数据集的编程接口2.1 自定义Dataset三要素2.2 实战案例:图像分类数据集三、Dataloader:高效数据流水线3.1 核心参数解析3.2 数据流可视化3.3 多卡训练支持四、综合…...

Python的秘密基地--[章节13] Python 数据分析与可视化

第13章&#xff1a;Python 数据分析与可视化 在大数据时代&#xff0c;数据分析与可视化是至关重要的技能。Python 提供了多个强大的库&#xff0c;如 NumPy、Pandas、Matplotlib 和 Seaborn&#xff0c;用于数据处理、分析和可视化。本章将介绍如何使用 Python 进行数据分析&…...

Python 入门:文件操作、读写、管理

目录 1. 引言 2. 文件基础操作 2.1 打开文件&#xff08;open()&#xff09; 2.2 读取文件内容 2.3 写入文件 3. 处理 CSV 文件 3.1 读取 CSV 文件 3.2 写入 CSV 文件 4. 处理 JSON 文件 4.1 读取 JSON 文件 4.2 写入 JSON 文件 5. 文件管理操作 5.1 删除文件 5.…...

Composo:企业级AI应用的质量守门员

在当今快速发展的科技世界中,人工智能(AI)的应用已渗透到各行各业。然而,随着AI技术的普及,如何确保其可靠性和一致性成为了企业面临的一大挑战。Composo作为一家致力于为企业提供精准AI评估服务的初创公司,通过无代码和API双模式,帮助企业监测大型语言模型(LLM)驱动的…...

crictl和ctr命令详解

一&#xff0c;crictl crictl 是 CRI 兼容的容器运行时命令行接口。 你可以使用它来检查和调试 Kubernetes 节点上的容器运行时和应用程序。 crictl 和它的源代码在 cri-tools 代码库。 1&#xff0c;安装 需要下载与kubernetes相对应的版本&#xff0c;我的k8s版本是1.30的…...

Python数据分析案例71——基于十种模型的信用违约预测实战

背景 好久没写这种基础的做机器学习流程了&#xff0c;最近过完年感觉自己代码忘了好多.....复习一下。 本次带来的是信贷违约的预测&#xff0c;即根据这个人的特征&#xff08;年龄收入什么的&#xff09;&#xff0c;预测他是不是会违约&#xff0c;会违约就拒绝贷款&…...

Lesson 131 Don‘t be so sure

Lesson 131 Don’t be so sure 词汇 Egypt n. 埃及 相关&#xff1a;Egyptian n. 埃及人&#xff0c;埃及的    camel n. 骆驼    Mummy n. 木乃伊    Pyramid n. 金字塔    Pharaoh n. 法老 例句&#xff1a;你去过埃及吗&#xff1f;    Have you been to E…...

python康威生命游戏的图形化界面实现

康威生命游戏&#xff08;Conway’s Game of Life&#xff09;是由英国数学家约翰何顿康威&#xff08;John Horton Conway&#xff09;在1970年发明的一款零玩家的细胞自动机模拟游戏。尽管它的名字中有“游戏”&#xff0c;但实际上它并不需要玩家参与操作&#xff0c;而是通…...

区块链技术:Facebook 重塑社交媒体信任的新篇章

在这个信息爆炸的时代&#xff0c;社交媒体已经成为我们生活中不可或缺的一部分。然而&#xff0c;随着社交平台的快速发展&#xff0c;隐私泄露、数据滥用和虚假信息等问题也日益凸显。这些问题的核心在于传统社交媒体依赖于中心化服务器存储和管理用户数据&#xff0c;这种模…...

【自学笔记】文言一心的基础知识点总览-持续更新

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 文心一言知识点总览一、文心一言简介二、文心一言的核心功能三、文心一言的技术特点四、文心一言的应用场景五、文心一言的使用技巧六、文心一言的未来发展 总结 文…...

UE求职Demo开发日志#25 试试网络同步和尝试打包

1 改了一些时序上的bug&#xff0c;成功运行了多端 &#xff08;UE一些网络相关的功能都弄好了&#xff0c;只需要标记哪个变量或Actor需要复制&#xff09; 2 以前遗留的bug太多了&#xff0c;改到晚上才打包好一个能跑的版本&#xff0c;而且有的特效还不显示&#xff08;悲…...

2021 年 9 月青少年软编等考 C 语言五级真题解析

目录 T1. 问题求解思路分析T2. 抓牛思路分析T3. 交易市场思路分析T4. 泳池思路分析T1. 问题求解 给定一个正整数 N N N,求最小的 M M M 满足比 N N N 大且 M M M 与 N N N 的二进制表示中有相同数目的 1 1 1。 举个例子,假如给定 N N N 为 78 78 78,二进制表示为 …...

Win10环境使用ChatBox集成Deep Seek解锁更多玩法

Win10环境使用ChatBox集成Deep Seek解锁更多玩法 前言 之前部署了14b的Deep Seek小模型&#xff0c;已经验证了命令行及接口方式的可行性。但是纯命令行或者PostMan方式调用接口显然不是那么友好&#xff1a; https://lizhiyong.blog.csdn.net/article/details/145505686 纯…...

第 26 场 蓝桥入门赛

2.对联【算法赛】 - 蓝桥云课 问题描述 大年三十&#xff0c;小蓝和爷爷一起贴对联。爷爷拿出了两副对联&#xff0c;每副对联都由 N 个“福”字组成&#xff0c;每个“福”字要么是正的&#xff08;用 1 表示&#xff09;&#xff0c;要么是倒的&#xff08;用 0 表示&#…...

CodeGPT + IDEA + DeepSeek,在IDEA中引入DeepSeek实现AI智能开发

CodeGPT IDEA DeepSeek&#xff0c;在IDEA中引入DeepSeek 版本说明 建议和我使用相同版本&#xff0c;实测2022版IDEA无法获取到CodeGPT最新版插件。&#xff08;在IDEA自带插件市场中搜不到&#xff0c;可以去官网搜索最新版本&#xff09; ToolsVersionIntelliJ IDEA202…...

【2025年更新】1000个大数据/人工智能毕设选题推荐

文章目录 前言大数据/人工智能毕设选题&#xff1a;后记 前言 正值毕业季我看到很多同学都在为自己的毕业设计发愁 Maynor在网上搜集了1000个大数据的毕设选题&#xff0c;希望对大家有帮助&#xff5e; 适合大数据毕业设计的项目&#xff0c;完全可以作为本科生当前较新的毕…...

什么是中间件中间件有哪些

什么是中间件&#xff1f; 中间件&#xff08;Middleware&#xff09;是指在客户端和服务器之间的一层软件组件&#xff0c;用于处理请求和响应的过程。 中间件是指介于两个不同系统之间的软件组件&#xff0c;它可以在两个系统之间传递、处理、转换数据&#xff0c;以达到协…...

使用docker搭建FastDFS文件服务

1.拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/fastdfs:latest2.使用docker镜像构建tracker容器&#xff08;跟踪服务器&#xff0c;起到调度的作用&#xff09; docker run -dti --networkhost --name tracker -v /data/fdfs/tracker:/var/fdfs -…...

使用OpenGL自己定义一个button,响应鼠标消息:掠过、点击、拖动

button需要有一个外观 外观 大小跟随窗口改变&#xff0c;采用纯色背景、纯色文字 文字 大小跟随窗口改变 button需要获得鼠标消息 掠过 鼠标掠过时 button 出现阴影&#xff0c;鼠标掠过后 button 阴影消失 点击 点击后进入相应事件 拖动 改变图标所在位置 需要在g…...

天津三石峰科技——汽车生产厂的设备振动检测项目案例

汽车产线有很多传动设备需要长期在线运行&#xff0c;会出现老化、疲劳、磨损等 问题&#xff0c;为了避免意外停机造成损失&#xff0c;需要加装一些健康监测设备&#xff0c;监测设备运 行状态。天津三石峰科技采用 12 通道振动信号采集卡&#xff08;下图 1&#xff09;对…...

Linux之文件IO前世今生

在 Linux之文件系统前世今生&#xff08;一&#xff09; VFS中&#xff0c;我们提到了文件的读写&#xff0c;并给出了简要的读写示意图&#xff0c;本文将分析文件I/O的细节。 一、Buffered I/O&#xff08;缓存I/O&#xff09;& Directed I/O&#xff08;直接I/O&#…...

Java中实现定时锁屏的功能(可以指定时间执行)

Java中实现定时锁屏的功能&#xff08;可以指定时间执行&#xff09; 要在Java中实现定时锁屏的功能&#xff0c;可以使用java.util.Timer或java.util.concurrent.ScheduledExecutorService来调度任务&#xff0c;并通过调用操作系统的命令来执行锁屏。下面我将给出一个基本的…...

半导体制造工艺讲解

目录 一、半导体制造工艺的概述 二、单晶硅片的制造 1.单晶硅的制造 2.晶棒的切割、研磨 3.晶棒的切片、倒角和打磨 4.晶圆的检测和清洗 三、晶圆制造 1.氧化与涂胶 2.光刻与显影 3.刻蚀与脱胶 4.掺杂与退火 5.薄膜沉积、金属化和晶圆减薄 6.MOSFET在晶圆表面的形…...

深入理解进程优先级

目录 引言 一、进程优先级基础 1.1 什么是进程优先级&#xff1f; 1.2 优先级与系统性能 二、查看进程信息 2.1 使用ps -l命令 2.2 PRI与NI的数学关系 三、深入理解Nice值 3.1 Nice值的特点 3.2 调整优先级实践 四、进程特性全景图 五、优化实践建议 结语 引言 在操…...

python中的flask框架

Flask 是一个用Python编写的轻量级Web应用框架 基于WSGI和Jinja2模板引擎 被称为“微框架”&#xff0c;其核心功能简单&#xff0c;不捆绑数据库管理、表单验证等功能&#xff0c;而是通过扩展来增加其他功能 Flask提供最基本的功能&#xff0c;不强制使用特定工具或库 通…...

微信小程序案例2——天气微信小程序(学会绑定数据)

文章目录 一、项目步骤1 创建一个无AppID的weather项目2 进入index.wxml、index.js、index.wxss文件&#xff0c;清空所有内容&#xff0c;进入App.json&#xff0c;修改导航栏标题为“中国天气网”。3进入index.wxml&#xff0c;进行当天天气情况的界面布局&#xff0c;包括温…...

【Linux网络编程】之守护进程

【Linux网络编程】之守护进程 进程组进程组的概念组长进程 会话会话的概念会话ID 控制终端控制终端的概念控制终端的作用会话、终端、bash三者的关系 前台进程与后台进程概念特点查看当前终端的后台进程前台进程与后台进程的切换 作业控制相关概念作业状态&#xff08;一般指后…...

MarkupLM:用于视觉丰富文档理解的文本和标记语言预训练

摘要 结合文本、布局和图像的多模态预训练在视觉丰富文档理解&#xff08;VRDU&#xff09;领域取得了显著进展&#xff0c;尤其是对于固定布局文档&#xff08;如扫描文档图像&#xff09;。然而&#xff0c;仍然有大量的数字文档&#xff0c;其布局信息不是固定的&#xff0…...