当前位置: 首页 > news >正文

B080-RabbitMQ

目录

      • RabbitMQ认识
        • 概念
        • 使用场景
        • 优点
        • AMQP协议
        • JMS
      • RabbitMQ安装
        • 安装elang
        • 安装RabbitMQ
        • 安装管理插件
        • 登录RabbitMQ
        • 消息队列的工作流程
      • RabbitMQ常用模型
        • HelloWorld-基本消息模型
          • 生产者发送消息
            • 导包
            • 获取链接工具类
            • 消息的生产者
          • 消费者消费消息
            • 模拟消费者
            • 手动签收消息
        • Work Queues
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-FANOUT-广播
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Direct-定向
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Topic-通配符
          • Sender
          • Consume1
          • Consume2
        • 总结
      • SpringBoot集成RabbitMQ
        • 导包
        • yml
        • config
        • producer
        • consumer

RabbitMQ认识

概念

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/

使用场景

在这里插入图片描述

优点

任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)

消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)

服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)

AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)

JMS

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)

RabbitMQ安装

安装elang

otp_win64_20.2.exe
配置环境变量

安装RabbitMQ

rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务

安装管理插件

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)

重启MQ

登录RabbitMQ

进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
在这里插入图片描述

消息队列的工作流程

在这里插入图片描述

RabbitMQ常用模型

HelloWorld-基本消息模型

一个生产者与一个消费者
在这里插入图片描述

生产者发送消息
导包
<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><!--和springboot2.0.5对应--><version>5.4.1</version></dependency>
</dependencies>
获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");//集群的时候才用这个参数factory.setUsername("guest");factory.setPassword("guest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}
消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//消息的生产者
public class Sender {public static  final  String  HELLO_QUEUE="hello_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列(hello这里用默认的交换机)/*  String queue :队列的名字,可自定义,   boolean durable: 持久化,   boolean exclusive:是否独占;大家都能用,传false,boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,Map<String, Object> arguments:没有其他要传的属性就传false          */channel.queueDeclare(HELLO_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}

在这里插入图片描述

消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");}};//3.监听队列/*queue :队列名字autoAck:自动签收Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}

在这里插入图片描述
在这里插入图片描述
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息

手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
//                System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收不等于消费成功,处理逻辑走完没有报错才算签收成功Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}

Work Queues

在这里插入图片描述
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳

Sender
//消息的生产者
/*如果有多个消费者监听同一个队列,默认轮询*/
public class Sender {public static  final  String  WORK_QUEUE="work_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列/*String queue :队列的名字boolean durable: 持久化boolean exclusive:是否独占boolean autoDelete: 用完即删Map<String, Object> arguments*/channel.queueDeclare(WORK_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
public class Consume1 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
//        channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}
Consume2
//模拟消费者
public class Consume2 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
//        channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}

订阅模型-FANOUT-广播

在这里插入图片描述
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

Sender
//消息的生产者
/*变化1.不创建 队列2.创建交换机3.给交换机发送消息*/
public class Sender {public static  final  String  FANOUT_EXCHANGE="fanout_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume1 {public  static final  String FANOUT_QUEUE1="fanout_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume2 {public  static final  String FANOUT_QUEUE2="fanout_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE2,false,callback);}
}

订阅模型-Direct-定向

在这里插入图片描述
把消息交给符合指定routing key 的队列 一堆或一个

Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static  final  String  DIRECT_EXCHANGE="direct_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public  static final  String DIRECT_QUEUE1="direct_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public  static final  String DIRECT_QUEUE2="direct_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE2,false,callback);}
}

订阅模型-Topic-通配符

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static  final  String  TOPIC_EXCHANGE="topic_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public  static final  String TOPIC_QUEUE1="topic_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public  static final  String TOPIC_QUEUE2="topic_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE2,false,callback);}
}

总结

01_hello生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)02_workqueue	默认轮询		可修改(能者多劳)生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)03_fanout	广播	将消息交给所有绑定到交换机的队列(多个消费者都能收到)生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)04_direct	定向	把消息交给符合指定 routing key 的队列 一堆或一个生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)05_topic		通配符	把消息交给符合routing pattern (路由模式) 的队列 一堆或一个生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

SpringBoot集成RabbitMQ

导包

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--spirngboot集成rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

yml

server:port: 44000
spring:application:name: test‐rabbitmq‐producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /listener:simple:acknowledge-mode: manual #手动签收prefetch: 1 #消费者的消息并发处理数量#publisher-confirms: true #消息发送到交换机失败回调#publisher-returns: true  #消息发送到队列失败回调template:mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

config

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";public static final String QUEUE1_SPRINGBOOT="queue1_springboot";public static final String QUEUE2_SPRINGBOOT="queue2_springboot";//创建一个交换机@Beanpublic Exchange createExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();}//创建两个队列@Beanpublic Queue createQueue1(){return  new Queue(QUEUE1_SPRINGBOOT,true);}@Beanpublic Queue createQueue2(){return  new Queue(QUEUE2_SPRINGBOOT,true);}//把交换机和队列绑定到一起@Beanpublic Binding bind1(){return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();}@Beanpublic Binding bind2(){return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();}//消费者 还原对象方式(从MQ里取出json转为对象)@Bean("rabbitListenerContainerFactory")public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setPrefetchCount(1);return factory;}//放到消息队列里面的转换(转为json存进MQ)@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}

producer

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test(){/*问题:多系统之间 信息交互  传递对象解决方案:转换为json存储实现:1.fastjson    对象 - josn  (作业)2.重写转换器模式*/for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT, "email.save", new User(1L,"文达"));}System.out.println("消息发送完毕");}
}

consumer

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;//消费者
@Component
public class Consu {@RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接public void user(@Payload User user, Channel channel, Message message) throws IOException {System.out.println(message);System.out.println("user队列:"+user);//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})public void email(@Payload User user,Channel channel,Message message ) throws IOException {System.out.println(message);System.out.println("email队列:"+user);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

队列内容可传string,entity序列化对象,json对象,

相关文章:

B080-RabbitMQ

目录 RabbitMQ认识概念使用场景优点AMQP协议JMS RabbitMQ安装安装elang安装RabbitMQ安装管理插件登录RabbitMQ消息队列的工作流程 RabbitMQ常用模型HelloWorld-基本消息模型生产者发送消息导包获取链接工具类消息的生产者 消费者消费消息模拟消费者手动签收消息 Work QueuesSen…...

关于岛屿的三道leetcode原题:岛屿周长、岛屿数量、统计子岛屿

题1&#xff1a;岛屿周长 给定一个 row x col 的二维网格地图 grid &#xff0c;其中&#xff1a;gridi 1 表示陆地&#xff0c; gridi 0 表示水域。 网格中的格子 水平和垂直 方向相连&#xff08;对角线方向不相连&#xff09;。整个网格被水完全包围&#xff0c;但其中恰…...

lintcode 1081 · 贴纸拼单词【hard 递归+记忆化搜索才能通过】

题目 https://www.lintcode.com/problem/1081/ 给出N种不同类型的贴纸。 每个贴纸上都写有一个小写英文单词。 通过裁剪贴纸上的所有字母并重排序来拼出字符串target。 每种贴纸可以使用多次&#xff0c;假定每种贴纸数量无限。 拼出target最少需要多少张贴纸&#xff1f;如果…...

HarmonyOS/OpenHarmony(Stage模型)应用开发单一手势(二)

三、拖动手势&#xff08;PanGesture&#xff09; .PanGestureOptions(value?:{ fingers?:number; direction?:PanDirection; distance?:number}) 拖动手势用于触发拖动手势事件&#xff0c;滑动达到最小滑动距离&#xff08;默认值为5vp&#xff09;时拖动手势识别成功&am…...

计算机毕设之基于Python+django+MySQL可视化的学习系统的设计与实现

系统阐述的是使用可视化的学习系统的设计与实现&#xff0c;对于Python、B/S结构、MySql进行了较为深入的学习与应用。主要针对系统的设计&#xff0c;描述&#xff0c;实现和分析与测试方面来表明开发的过程。开发中使用了 django框架和MySql数据库技术搭建系统的整体架构。利…...

Kotlin inline、noinline、crossinline 深入解析

主要内容&#xff1a; inline 高价函数的原理分析Non-local returns noinlinecrossinline inline 如果有C语言基础的&#xff0c;inline 修饰一个函数表示该函数是一个内联函数。编译时&#xff0c;编译器会将内联函数的函数体拷贝到调用的地方。我们先看下在一个普通的 kot…...

在 CentOS 7 / RHEL 7 上安装 Python 3.11

原文链接&#xff1a;https://computingforgeeks.com/install-python-3-on-centos-rhel-7/ Python 是一种高级解释性编程语言&#xff0c;已被用于各种应用程序开发&#xff0c;并在近年来获得了巨大的流行。Python 可用于编写广泛的应用程序&#xff0c;包括 Web 开发、数据分…...

SVN基本使用笔记——广州云科

简介 SVN是什么? 代码版本管理工具 它能记住你每次的修改 查看所有的修改记录 恢复到任何历史版本 恢复己经删除的文件 SVN跟Git比&#xff0c;有什么优势 使用简单&#xff0c;上手快 目录级权限控制&#xff0c;企业安全必备 子目录Checkout&#xff0c;减少不必要的文件检出…...

python爬虫-Selenium

一、Selenium简介 Selenium是一个用于Web应用程序测试的工具&#xff0c;Selenium 测试直接运行在浏览器中&#xff0c;就像真正的用户在操作一样。模拟浏览器功能&#xff0c;自动执行网页中的js代码&#xff0c;实现动态加载。 二、环境配置 1、查看本机电脑谷歌浏览器的版…...

flutter plugins插件【一】【FlutterJsonBeanFactory】

1、FlutterJsonBeanFactory 在Setting->Tools->FlutterJsonBeanFactory里边自定义实体类的后缀&#xff0c;默认是entity 复制json到粘贴板&#xff0c;右键自己要存放实体的目录&#xff0c;可以看到JsonToDartBeanAction Class Name是实体名字&#xff0c;会默认加上…...

系统中出现大量不可中断进程和僵尸进程(理论)

一 进程状态 当 iowait 升高时&#xff0c;进程很可能因为得不到硬件的响应&#xff0c;而长时间处于不可中断状态。从 ps 或者 top 命令的输出中&#xff0c;你可以发现它们都处于 D 状态&#xff0c;也就是不可中断状态&#xff08;Uninterruptible Sleep&#xff09;。 R …...

L1-012 计算指数(Python实现) 测试点全过

前言&#xff1a; {\color{Blue}前言&#xff1a;} 前言&#xff1a;本系列题使用的是“PTA中的团体程序设计天梯赛——练习集”的题库&#xff0c;难度有L1、L2、L3三个等级&#xff0c;分别对应团体程序设计天梯赛的三个难度&#xff0c;如有需要可以直接查看对应专栏。发布个…...

String、StringBuffer、StringBuilder的区别

String、StringBuffer、StringBuilder的区别 String的内容不可修改&#xff0c;StringBuffer与StringBuilder的内容可以修改.StringBuffer与StringBuilder&#xff08;更快&#xff09;大部分功能是相似的StringBuffer采用同步处理&#xff0c;属于线程安全操作&#xff1b;而S…...

.net基础概念

1. .NET Framework .NET Framework开发平台包含公共语言运行库(CLR)和基类库(BCL)&#xff0c;前者负载管理代码的执行&#xff0c;后者提供了丰富的类库来构建应用程序。.NET Framework仅支持Windows平台 2. Mono 由于.NET Framework支支持windows环境&#xff0c;因此社区…...

电缆工厂 3D 可视化管控系统 | 智慧工厂

近年来&#xff0c;我国各类器材制造业已经开始向数字化生产转型&#xff0c;使得生产流程变得更加精准高效。通过应用智能设备、物联网和大数据分析等技术&#xff0c;企业可以更好地监控生产线上的运行和质量情况&#xff0c;及时发现和解决问题&#xff0c;从而提高生产效率…...

bazel高效使用和调优

Bazel 为了正确性和高性能&#xff0c;做了很多优秀的设计&#xff0c;那么我们如何正确的使用这些能力&#xff0c;让我们的构建性能“起飞”呢&#xff0c; 我们将从本地研发和 CI pipeline 两种场景进行分析。 本地研发 本地研发通常采用默认的 Bazel 配置即可&#xff0c…...

【实训项目】传道学习助手APP设计

1.设计摘要 跨入21世纪以来,伴随着时代的飞速发展&#xff0c;国民对教育的重视度也有了进一步的提升。我们不难发现虽然很多学习内容有学习资料或者答案&#xff0c;但是这些内容并不能达到让所有求学的人对所需知识进行完全地理解与掌握。所以我们需要进行提问与求助。那么一…...

短信验证码服务

使用的是 阿里云 阿里云官网 1.找到 左上角侧边栏 -云通信 -短信服务 2.在快速学习测试处 &#xff0c;按照步骤完成快速学习&#xff0c;绑定要测试的手机号&#xff0c;选专用 【测试模板】&#xff0c;自定义模板需要人工审核&#xff0c;要一个工作日 3.右上角 获取 Acces…...

windows如何更改/禁用系统更新

提示&#xff1a;首先说明这属于将更新时间更改&#xff0c;不过你可以的将更新时间更改为十年一百年 废话不多说开始正文&#xff1a; 1.首先:winR打开运行&#xff0c;输入regedit&#xff0c;进入注册表编辑器 2.进入编辑器后依次点击&#xff1a;HKEY_LOCAL_MACHINE\SOFT…...

Clion 使用ffmpeg 学习1 开发环境配置

Clion 使用ffmpeg 学习1 开发环境配置 一、准备工作1. 准备环境2. 下载FFmpeg 二、操作步骤1. Clion 新建一个C项目2. 修改 CMakeLists.txt3. 修改配置4. 运行测试5. 打印rtsp 流信息的 demo 一、准备工作 在视频处理和多媒体应用程序开发中&#xff0c;FFmpeg 是一个强大的开…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

Rust 开发环境搭建

环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行&#xff1a; rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu ​ 2、Hello World fn main() { println…...

WEB3全栈开发——面试专业技能点P4数据库

一、mysql2 原生驱动及其连接机制 概念介绍 mysql2 是 Node.js 环境中广泛使用的 MySQL 客户端库&#xff0c;基于 mysql 库改进而来&#xff0c;具有更好的性能、Promise 支持、流式查询、二进制数据处理能力等。 主要特点&#xff1a; 支持 Promise / async-await&#xf…...