B080-RabbitMQ
目录
- RabbitMQ认识
- 概念
- 使用场景
- 优点
- AMQP协议
- JMS
- RabbitMQ安装
- 安装elang
- 安装RabbitMQ
- 安装管理插件
- 登录RabbitMQ
- 消息队列的工作流程
- RabbitMQ常用模型
- HelloWorld-基本消息模型
- 生产者发送消息
- 导包
- 获取链接工具类
- 消息的生产者
- 消费者消费消息
- 模拟消费者
- 手动签收消息
- Work Queues
- Sender
- Consume1
- Consume2
- 订阅模型-FANOUT-广播
- Sender
- Consume1
- Consume2
- 订阅模型-Direct-定向
- Sender
- Consume1
- Consume2
- 订阅模型-Topic-通配符
- Sender
- Consume1
- Consume2
- 总结
- SpringBoot集成RabbitMQ
- 导包
- yml
- config
- producer
- consumer
RabbitMQ认识
概念
MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/
使用场景
优点
任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)
消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)
服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)
AMQP协议
AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)
JMS
JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)
RabbitMQ安装
安装elang
otp_win64_20.2.exe
配置环境变量
安装RabbitMQ
rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务
安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)
重启MQ
登录RabbitMQ
进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
消息队列的工作流程
RabbitMQ常用模型
HelloWorld-基本消息模型
一个生产者与一个消费者
生产者发送消息
导包
<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><!--和springboot2.0.5对应--><version>5.4.1</version></dependency>
</dependencies>
获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");//集群的时候才用这个参数factory.setUsername("guest");factory.setPassword("guest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}
消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//消息的生产者
public class Sender {public static final String HELLO_QUEUE="hello_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列(hello这里用默认的交换机)/* String queue :队列的名字,可自定义, boolean durable: 持久化, boolean exclusive:是否独占;大家都能用,传false,boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,Map<String, Object> arguments:没有其他要传的属性就传false */channel.queueDeclare(HELLO_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}
消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");}};//3.监听队列/*queue :队列名字autoAck:自动签收Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息
手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
// System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收不等于消费成功,处理逻辑走完没有报错才算签收成功Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}
Work Queues
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳
Sender
//消息的生产者
/*如果有多个消费者监听同一个队列,默认轮询*/
public class Sender {public static final String WORK_QUEUE="work_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列/*String queue :队列的名字boolean durable: 持久化boolean exclusive:是否独占boolean autoDelete: 用完即删Map<String, Object> arguments*/channel.queueDeclare(WORK_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
public class Consume1 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
// channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}
Consume2
//模拟消费者
public class Consume2 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
// channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);System.out.println("消息内容:"+new String(body));
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}
订阅模型-FANOUT-广播
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
Sender
//消息的生产者
/*变化1.不创建 队列2.创建交换机3.给交换机发送消息*/
public class Sender {public static final String FANOUT_EXCHANGE="fanout_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume1 {public static final String FANOUT_QUEUE1="fanout_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume2 {public static final String FANOUT_QUEUE2="fanout_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE2,false,callback);}
}
订阅模型-Direct-定向
把消息交给符合指定routing key 的队列 一堆或一个
Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static final String DIRECT_EXCHANGE="direct_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public static final String DIRECT_QUEUE1="direct_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public static final String DIRECT_QUEUE2="direct_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE2,false,callback);}
}
订阅模型-Topic-通配符
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static final String TOPIC_EXCHANGE="topic_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public static final String TOPIC_QUEUE1="topic_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public static final String TOPIC_QUEUE2="topic_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE2,false,callback);}
}
总结
01_hello生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)02_workqueue 默认轮询 可修改(能者多劳)生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)03_fanout 广播 将消息交给所有绑定到交换机的队列(多个消费者都能收到)生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)04_direct 定向 把消息交给符合指定 routing key 的队列 一堆或一个生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)05_topic 通配符 把消息交给符合routing pattern (路由模式) 的队列 一堆或一个生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)
SpringBoot集成RabbitMQ
导包
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--spirngboot集成rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
yml
server:port: 44000
spring:application:name: test‐rabbitmq‐producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /listener:simple:acknowledge-mode: manual #手动签收prefetch: 1 #消费者的消息并发处理数量#publisher-confirms: true #消息发送到交换机失败回调#publisher-returns: true #消息发送到队列失败回调template:mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
config
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";public static final String QUEUE1_SPRINGBOOT="queue1_springboot";public static final String QUEUE2_SPRINGBOOT="queue2_springboot";//创建一个交换机@Beanpublic Exchange createExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();}//创建两个队列@Beanpublic Queue createQueue1(){return new Queue(QUEUE1_SPRINGBOOT,true);}@Beanpublic Queue createQueue2(){return new Queue(QUEUE2_SPRINGBOOT,true);}//把交换机和队列绑定到一起@Beanpublic Binding bind1(){return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();}@Beanpublic Binding bind2(){return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();}//消费者 还原对象方式(从MQ里取出json转为对象)@Bean("rabbitListenerContainerFactory")public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setPrefetchCount(1);return factory;}//放到消息队列里面的转换(转为json存进MQ)@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}
producer
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test(){/*问题:多系统之间 信息交互 传递对象解决方案:转换为json存储实现:1.fastjson 对象 - josn (作业)2.重写转换器模式*/for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT, "email.save", new User(1L,"文达"));}System.out.println("消息发送完毕");}
}
consumer
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;//消费者
@Component
public class Consu {@RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接public void user(@Payload User user, Channel channel, Message message) throws IOException {System.out.println(message);System.out.println("user队列:"+user);//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})public void email(@Payload User user,Channel channel,Message message ) throws IOException {System.out.println(message);System.out.println("email队列:"+user);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
队列内容可传string,entity序列化对象,json对象,
相关文章:

B080-RabbitMQ
目录 RabbitMQ认识概念使用场景优点AMQP协议JMS RabbitMQ安装安装elang安装RabbitMQ安装管理插件登录RabbitMQ消息队列的工作流程 RabbitMQ常用模型HelloWorld-基本消息模型生产者发送消息导包获取链接工具类消息的生产者 消费者消费消息模拟消费者手动签收消息 Work QueuesSen…...

关于岛屿的三道leetcode原题:岛屿周长、岛屿数量、统计子岛屿
题1:岛屿周长 给定一个 row x col 的二维网格地图 grid ,其中:gridi 1 表示陆地, gridi 0 表示水域。 网格中的格子 水平和垂直 方向相连(对角线方向不相连)。整个网格被水完全包围,但其中恰…...
lintcode 1081 · 贴纸拼单词【hard 递归+记忆化搜索才能通过】
题目 https://www.lintcode.com/problem/1081/ 给出N种不同类型的贴纸。 每个贴纸上都写有一个小写英文单词。 通过裁剪贴纸上的所有字母并重排序来拼出字符串target。 每种贴纸可以使用多次,假定每种贴纸数量无限。 拼出target最少需要多少张贴纸?如果…...

HarmonyOS/OpenHarmony(Stage模型)应用开发单一手势(二)
三、拖动手势(PanGesture) .PanGestureOptions(value?:{ fingers?:number; direction?:PanDirection; distance?:number}) 拖动手势用于触发拖动手势事件,滑动达到最小滑动距离(默认值为5vp)时拖动手势识别成功&am…...

计算机毕设之基于Python+django+MySQL可视化的学习系统的设计与实现
系统阐述的是使用可视化的学习系统的设计与实现,对于Python、B/S结构、MySql进行了较为深入的学习与应用。主要针对系统的设计,描述,实现和分析与测试方面来表明开发的过程。开发中使用了 django框架和MySql数据库技术搭建系统的整体架构。利…...

Kotlin inline、noinline、crossinline 深入解析
主要内容: inline 高价函数的原理分析Non-local returns noinlinecrossinline inline 如果有C语言基础的,inline 修饰一个函数表示该函数是一个内联函数。编译时,编译器会将内联函数的函数体拷贝到调用的地方。我们先看下在一个普通的 kot…...
在 CentOS 7 / RHEL 7 上安装 Python 3.11
原文链接:https://computingforgeeks.com/install-python-3-on-centos-rhel-7/ Python 是一种高级解释性编程语言,已被用于各种应用程序开发,并在近年来获得了巨大的流行。Python 可用于编写广泛的应用程序,包括 Web 开发、数据分…...

SVN基本使用笔记——广州云科
简介 SVN是什么? 代码版本管理工具 它能记住你每次的修改 查看所有的修改记录 恢复到任何历史版本 恢复己经删除的文件 SVN跟Git比,有什么优势 使用简单,上手快 目录级权限控制,企业安全必备 子目录Checkout,减少不必要的文件检出…...

python爬虫-Selenium
一、Selenium简介 Selenium是一个用于Web应用程序测试的工具,Selenium 测试直接运行在浏览器中,就像真正的用户在操作一样。模拟浏览器功能,自动执行网页中的js代码,实现动态加载。 二、环境配置 1、查看本机电脑谷歌浏览器的版…...

flutter plugins插件【一】【FlutterJsonBeanFactory】
1、FlutterJsonBeanFactory 在Setting->Tools->FlutterJsonBeanFactory里边自定义实体类的后缀,默认是entity 复制json到粘贴板,右键自己要存放实体的目录,可以看到JsonToDartBeanAction Class Name是实体名字,会默认加上…...

系统中出现大量不可中断进程和僵尸进程(理论)
一 进程状态 当 iowait 升高时,进程很可能因为得不到硬件的响应,而长时间处于不可中断状态。从 ps 或者 top 命令的输出中,你可以发现它们都处于 D 状态,也就是不可中断状态(Uninterruptible Sleep)。 R …...
L1-012 计算指数(Python实现) 测试点全过
前言: {\color{Blue}前言:} 前言:本系列题使用的是“PTA中的团体程序设计天梯赛——练习集”的题库,难度有L1、L2、L3三个等级,分别对应团体程序设计天梯赛的三个难度,如有需要可以直接查看对应专栏。发布个…...
String、StringBuffer、StringBuilder的区别
String、StringBuffer、StringBuilder的区别 String的内容不可修改,StringBuffer与StringBuilder的内容可以修改.StringBuffer与StringBuilder(更快)大部分功能是相似的StringBuffer采用同步处理,属于线程安全操作;而S…...
.net基础概念
1. .NET Framework .NET Framework开发平台包含公共语言运行库(CLR)和基类库(BCL),前者负载管理代码的执行,后者提供了丰富的类库来构建应用程序。.NET Framework仅支持Windows平台 2. Mono 由于.NET Framework支支持windows环境,因此社区…...

电缆工厂 3D 可视化管控系统 | 智慧工厂
近年来,我国各类器材制造业已经开始向数字化生产转型,使得生产流程变得更加精准高效。通过应用智能设备、物联网和大数据分析等技术,企业可以更好地监控生产线上的运行和质量情况,及时发现和解决问题,从而提高生产效率…...

bazel高效使用和调优
Bazel 为了正确性和高性能,做了很多优秀的设计,那么我们如何正确的使用这些能力,让我们的构建性能“起飞”呢, 我们将从本地研发和 CI pipeline 两种场景进行分析。 本地研发 本地研发通常采用默认的 Bazel 配置即可,…...

【实训项目】传道学习助手APP设计
1.设计摘要 跨入21世纪以来,伴随着时代的飞速发展,国民对教育的重视度也有了进一步的提升。我们不难发现虽然很多学习内容有学习资料或者答案,但是这些内容并不能达到让所有求学的人对所需知识进行完全地理解与掌握。所以我们需要进行提问与求助。那么一…...

短信验证码服务
使用的是 阿里云 阿里云官网 1.找到 左上角侧边栏 -云通信 -短信服务 2.在快速学习测试处 ,按照步骤完成快速学习,绑定要测试的手机号,选专用 【测试模板】,自定义模板需要人工审核,要一个工作日 3.右上角 获取 Acces…...

windows如何更改/禁用系统更新
提示:首先说明这属于将更新时间更改,不过你可以的将更新时间更改为十年一百年 废话不多说开始正文: 1.首先:winR打开运行,输入regedit,进入注册表编辑器 2.进入编辑器后依次点击:HKEY_LOCAL_MACHINE\SOFT…...

Clion 使用ffmpeg 学习1 开发环境配置
Clion 使用ffmpeg 学习1 开发环境配置 一、准备工作1. 准备环境2. 下载FFmpeg 二、操作步骤1. Clion 新建一个C项目2. 修改 CMakeLists.txt3. 修改配置4. 运行测试5. 打印rtsp 流信息的 demo 一、准备工作 在视频处理和多媒体应用程序开发中,FFmpeg 是一个强大的开…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...

微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...

FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...
Git常用命令完全指南:从入门到精通
Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...
为什么要创建 Vue 实例
核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...

9-Oracle 23 ai Vector Search 特性 知识准备
很多小伙伴是不是参加了 免费认证课程(限时至2025/5/15) Oracle AI Vector Search 1Z0-184-25考试,都顺利拿到certified了没。 各行各业的AI 大模型的到来,传统的数据库中的SQL还能不能打,结构化和非结构的话数据如何和…...